aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/storage/feed/lookup/algorithm_longearth.go
blob: d0342f67cb20db37b96f90b4d8b68e44693c19e6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package lookup

import (
    "context"
    "sync/atomic"
    "time"
)

type stepFunc func(ctx context.Context, t uint64, hint Epoch) interface{}

// LongEarthLookaheadDelay is the headstart the lookahead gives R before it launches
var LongEarthLookaheadDelay = 250 * time.Millisecond

// LongEarthLookbackDelay is the headstart the lookback gives R before it launches
var LongEarthLookbackDelay = 250 * time.Millisecond

// LongEarthAlgorithm explores possible lookup paths in parallel, pruning paths as soon
// as a more promising lookup path is found. As a result, this lookup algorithm is an order
// of magnitude faster than the FluzCapacitor algorithm, but at the expense of more exploratory reads.
// This algorithm works as follows. On each step, the next epoch is immediately looked up (R)
// and given a head start, while two parallel "steps" are launched a short time after:
// look ahead (A) is the path the algorithm would take if the R lookup returns a value, whereas
// look back (B) is the path the algorithm would take if the R lookup failed.
// as soon as R is actually finished, the A or B paths are pruned depending on the value of R.
// if A returns earlier than R, then R and B read operations can be safely canceled, saving time.
// The maximum number of active read operations is calculated as 2^(timeout/headstart).
// If headstart is infinite, this algorithm behaves as FluzCapacitor.
// timeout is the maximum execution time of the passed `read` function.
// the two head starts can be configured by changing LongEarthLookaheadDelay or LongEarthLookbackDelay
func LongEarthAlgorithm(ctx context.Context, now uint64, hint Epoch, read ReadFunc) (interface{}, error) {
    if hint == NoClue {
        hint = worstHint
    }

    var stepCounter int32 // for debugging, stepCounter allows to give an ID to each step instance

    errc := make(chan struct{}) // errc will help as an error shortcut signal
    var gerr error              // in case of error, this variable will be set

    var step stepFunc // For efficiency, the algorithm step is defined as a closure
    step = func(ctxS context.Context, t uint64, last Epoch) interface{} {
        stepID := atomic.AddInt32(&stepCounter, 1) // give an ID to this call instance
        trace(stepID, "init: t=%d, last=%s", t, last.String())
        var valueA, valueB, valueR interface{}

        // initialize the three read contexts
        ctxR, cancelR := context.WithCancel(ctxS) // will handle the current read operation
        ctxA, cancelA := context.WithCancel(ctxS) // will handle the lookahead path
        ctxB, cancelB := context.WithCancel(ctxS) // will handle the lookback path

        epoch := GetNextEpoch(last, t) // calculate the epoch to look up in this step instance

        // define the lookAhead function, which will follow the path as if R was successful
        lookAhead := func() {
            valueA = step(ctxA, t, epoch) // launch the next step, recursively.
            if valueA != nil {            // if this path is successful, we don't need R or B.
                cancelB()
                cancelR()
            }
        }

        // define the lookBack function, which will follow the path as if R was unsuccessful
        lookBack := func() {
            if epoch.Base() == last.Base() {
                return
            }
            base := epoch.Base()
            if base == 0 {
                return
            }
            valueB = step(ctxB, base-1, last)
        }

        go func() { //goroutine to read the current epoch (R)
            defer cancelR()
            var err error
            valueR, err = read(ctxR, epoch, now) // read this epoch
            if valueR == nil {                   // if unsuccessful, cancel lookahead, otherwise cancel lookback.
                cancelA()
            } else {
                cancelB()
            }
            if err != nil && err != context.Canceled {
                gerr = err
                close(errc)
            }
        }()

        go func() { // goroutine to give a headstart to R and then launch lookahead.
            defer cancelA()

            // if we are at the lowest level or the epoch to look up equals the last one,
            // then we cannot lookahead (can't go lower or repeat the same lookup, this would
            // cause an infinite loop)
            if epoch.Level == LowestLevel || epoch.Equals(last) {
                return
            }

            // give a head start to R, or launch immediately if R finishes early enough
            select {
            case <-TimeAfter(LongEarthLookaheadDelay):
                lookAhead()
            case <-ctxR.Done():
                if valueR != nil {
                    lookAhead() // only look ahead if R was successful
                }
            case <-ctxA.Done():
            }
        }()

        go func() { // goroutine to give a headstart to R and then launch lookback.
            defer cancelB()

            // give a head start to R, or launch immediately if R finishes early enough
            select {
            case <-TimeAfter(LongEarthLookbackDelay):
                lookBack()
            case <-ctxR.Done():
                if valueR == nil {
                    lookBack() // only look back in case R failed
                }
            case <-ctxB.Done():
            }
        }()

        <-ctxA.Done()
        if valueA != nil {
            trace(stepID, "Returning valueA=%v", valueA)
            return valueA
        }

        <-ctxR.Done()
        if valueR != nil {
            trace(stepID, "Returning valueR=%v", valueR)
            return valueR
        }
        <-ctxB.Done()
        trace(stepID, "Returning valueB=%v", valueB)
        return valueB
    }

    var value interface{}
    stepCtx, cancel := context.WithCancel(ctx)

    go func() { // launch the root step in its own goroutine to allow cancellation
        defer cancel()
        value = step(stepCtx, now, hint)
    }()

    // wait for the algorithm to finish, but shortcut in case
    // of errors
    select {
    case <-stepCtx.Done():
    case <-errc:
        cancel()
        return nil, gerr
    }

    if ctx.Err() != nil {
        return nil, ctx.Err()
    }

    if value != nil || hint == worstHint {
        return value, nil
    }

    // at this point the algorithm did not return a value,
    // so we challenge the hint given.
    value, err := read(ctx, hint, now)
    if err != nil {
        return nil, err
    }
    if value != nil {
        return value, nil // hint is valid, return it.
    }

    // hint is invalid. Invoke the algorithm
    // without hint.
    now = hint.Base()
    if hint.Level == HighestLevel {
        now--
    }

    return LongEarthAlgorithm(ctx, now, NoClue, read)
}