1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
|
package lookup
import (
"context"
"sync/atomic"
"time"
)
type stepFunc func(ctx context.Context, t uint64, hint Epoch) interface{}
// LongEarthLookaheadDelay is the headstart the lookahead gives R before it launches
var LongEarthLookaheadDelay = 250 * time.Millisecond
// LongEarthLookbackDelay is the headstart the lookback gives R before it launches
var LongEarthLookbackDelay = 250 * time.Millisecond
// LongEarthAlgorithm explores possible lookup paths in parallel, pruning paths as soon
// as a more promising lookup path is found. As a result, this lookup algorithm is an order
// of magnitude faster than the FluzCapacitor algorithm, but at the expense of more exploratory reads.
// This algorithm works as follows. On each step, the next epoch is immediately looked up (R)
// and given a head start, while two parallel "steps" are launched a short time after:
// look ahead (A) is the path the algorithm would take if the R lookup returns a value, whereas
// look back (B) is the path the algorithm would take if the R lookup failed.
// as soon as R is actually finished, the A or B paths are pruned depending on the value of R.
// if A returns earlier than R, then R and B read operations can be safely canceled, saving time.
// The maximum number of active read operations is calculated as 2^(timeout/headstart).
// If headstart is infinite, this algorithm behaves as FluzCapacitor.
// timeout is the maximum execution time of the passed `read` function.
// the two head starts can be configured by changing LongEarthLookaheadDelay or LongEarthLookbackDelay
func LongEarthAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (interface{}, error) {
if hint == NoClue {
hint = worstHint
}
var stepCounter int32 // for debugging, stepCounter allows to give an ID to each step instance
errc := make(chan struct{}) // errc will help as an error shortcut signal
var gerr error // in case of error, this variable will be set
var step stepFunc // For efficiency, the algorithm step is defined as a closure
step = func(ctxS context.Context, t uint64, last Epoch) interface{} {
stepID := atomic.AddInt32(&stepCounter, 1) // give an ID to this call instance
trace(stepID, "init: t=%d, last=%s", t, last.String())
var valueA, valueB, valueR interface{}
// initialize the three read contexts
ctxR, cancelR := context.WithCancel(ctxS) // will handle the current read operation
ctxA, cancelA := context.WithCancel(ctxS) // will handle the lookahead path
ctxB, cancelB := context.WithCancel(ctxS) // will handle the lookback path
epoch := GetNextEpoch(last, t) // calculate the epoch to look up in this step instance
// define the lookAhead function, which will follow the path as if R was successful
lookAhead := func() {
valueA = step(ctxA, t, epoch) // launch the next step, recursively.
if valueA != nil { // if this path is successful, we don't need R or B.
cancelB()
cancelR()
}
}
// define the lookBack function, which will follow the path as if R was unsuccessful
lookBack := func() {
if epoch.Base() == last.Base() {
return
}
base := epoch.Base()
if base == 0 {
return
}
valueB = step(ctxB, base-1, last)
}
go func() { //goroutine to read the current epoch (R)
defer cancelR()
var err error
valueR, err = read(ctxR, epoch, now) // read this epoch
if valueR == nil { // if unsuccessful, cancel lookahead, otherwise cancel lookback.
cancelA()
} else {
cancelB()
}
if err != nil && err != context.Canceled {
gerr = err
close(errc)
}
}()
go func() { // goroutine to give a headstart to R and then launch lookahead.
defer cancelA()
// if we are at the lowest level or the epoch to look up equals the last one,
// then we cannot lookahead (can't go lower or repeat the same lookup, this would
// cause an infinite loop)
if epoch.Level == LowestLevel || epoch.Equals(last) {
return
}
// give a head start to R, or launch immediately if R finishes early enough
select {
case <-TimeAfter(LongEarthLookaheadDelay):
lookAhead()
case <-ctxR.Done():
if valueR != nil {
lookAhead() // only look ahead if R was successful
}
case <-ctxA.Done():
}
}()
go func() { // goroutine to give a headstart to R and then launch lookback.
defer cancelB()
// give a head start to R, or launch immediately if R finishes early enough
select {
case <-TimeAfter(LongEarthLookbackDelay):
lookBack()
case <-ctxR.Done():
if valueR == nil {
lookBack() // only look back in case R failed
}
case <-ctxB.Done():
}
}()
<-ctxA.Done()
if valueA != nil {
trace(stepID, "Returning valueA=%v", valueA)
return valueA
}
<-ctxR.Done()
if valueR != nil {
trace(stepID, "Returning valueR=%v", valueR)
return valueR
}
<-ctxB.Done()
trace(stepID, "Returning valueB=%v", valueB)
return valueB
}
var value interface{}
stepCtx, cancel := context.WithCancel(ctx)
go func() { // launch the root step in its own goroutine to allow cancellation
defer cancel()
value = step(stepCtx, now, hint)
}()
// wait for the algorithm to finish, but shortcut in case
// of errors
select {
case <-stepCtx.Done():
case <-errc:
cancel()
return nil, gerr
}
if ctx.Err() != nil {
return nil, ctx.Err()
}
if value != nil || hint == worstHint {
return value, nil
}
// at this point the algorithm did not return a value,
// so we challenge the hint given.
value, err := read(ctx, hint, now)
if err != nil {
return nil, err
}
if value != nil {
return value, nil // hint is valid, return it.
}
// hint is invalid. Invoke the algorithm
// without hint.
now = hint.Base()
if hint.Level == HighestLevel {
now--
}
return LongEarthAlgorithm(ctx, now, NoClue, read)
}
|