aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/kademlia/kaddb.go
blob: cb0869467e808ffd0b4c89bc6ca058ae8b58f677 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package kademlia

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "os"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/log"
)

type NodeData interface {
    json.Marshaler
    json.Unmarshaler
}

// allow inactive peers under
type NodeRecord struct {
    Addr  Address          // address of node
    Url   string           // Url, used to connect to node
    After time.Time        // next call after time
    Seen  time.Time        // last connected at time
    Meta  *json.RawMessage // arbitrary metadata saved for a peer

    node Node
}

func (self *NodeRecord) setSeen() {
    t := time.Now()
    self.Seen = t
    self.After = t
}

func (self *NodeRecord) String() string {
    return fmt.Sprintf("<%v>", self.Addr)
}

// persisted node record database ()
type KadDb struct {
    Address              Address
    Nodes                [][]*NodeRecord
    index                map[Address]*NodeRecord
    cursors              []int
    lock                 sync.RWMutex
    purgeInterval        time.Duration
    initialRetryInterval time.Duration
    connRetryExp         int
}

func newKadDb(addr Address, params *KadParams) *KadDb {
    return &KadDb{
        Address:              addr,
        Nodes:                make([][]*NodeRecord, params.MaxProx+1), // overwritten by load
        cursors:              make([]int, params.MaxProx+1),
        index:                make(map[Address]*NodeRecord),
        purgeInterval:        params.PurgeInterval,
        initialRetryInterval: params.InitialRetryInterval,
        connRetryExp:         params.ConnRetryExp,
    }
}

func (self *KadDb) findOrCreate(index int, a Address, url string) *NodeRecord {
    defer self.lock.Unlock()
    self.lock.Lock()

    record, found := self.index[a]
    if !found {
        record = &NodeRecord{
            Addr: a,
            Url:  url,
        }
        log.Info(fmt.Sprintf("add new record %v to kaddb", record))
        // insert in kaddb
        self.index[a] = record
        self.Nodes[index] = append(self.Nodes[index], record)
    } else {
        log.Info(fmt.Sprintf("found record %v in kaddb", record))
    }
    // update last seen time
    record.setSeen()
    // update with url in case IP/port changes
    record.Url = url
    return record
}

// add adds node records to kaddb (persisted node record db)
func (self *KadDb) add(nrs []*NodeRecord, proximityBin func(Address) int) {
    defer self.lock.Unlock()
    self.lock.Lock()
    var n int
    var nodes []*NodeRecord
    for _, node := range nrs {
        _, found := self.index[node.Addr]
        if !found && node.Addr != self.Address {
            node.setSeen()
            self.index[node.Addr] = node
            index := proximityBin(node.Addr)
            dbcursor := self.cursors[index]
            nodes = self.Nodes[index]
            // this is inefficient for allocation, need to just append then shift
            newnodes := make([]*NodeRecord, len(nodes)+1)
            copy(newnodes[:], nodes[:dbcursor])
            newnodes[dbcursor] = node
            copy(newnodes[dbcursor+1:], nodes[dbcursor:])
            log.Trace(fmt.Sprintf("new nodes: %v, nodes: %v", newnodes, nodes))
            self.Nodes[index] = newnodes
            n++
        }
    }
    if n > 0 {
        log.Debug(fmt.Sprintf("%d/%d node records (new/known)", n, len(nrs)))
    }
}

/*
next return one node record with the highest priority for desired
connection.
This is used to pick candidates for live nodes that are most wanted for
a higly connected low centrality network structure for Swarm which best suits
for a Kademlia-style routing.

* Starting as naive node with empty db, this implements Kademlia bootstrapping
* As a mature node, it fills short lines. All on demand.

The candidate is chosen using the following strategy:
We check for missing online nodes in the buckets for 1 upto Max BucketSize rounds.
On each round we proceed from the low to high proximity order buckets.
If the number of active nodes (=connected peers) is < rounds, then start looking
for a known candidate. To determine if there is a candidate to recommend the
kaddb node record database row corresponding to the bucket is checked.

If the row cursor is on position i, the ith element in the row is chosen.
If the record is scheduled not to be retried before NOW, the next element is taken.
If the record is scheduled to be retried, it is set as checked, scheduled for
checking and is returned. The time of the next check is in X (duration) such that
X = ConnRetryExp * delta where delta is the time past since the last check and
ConnRetryExp is constant obsoletion factor. (Note that when node records are added
from peer messages, they are marked as checked and placed at the cursor, ie.
given priority over older entries). Entries which were checked more than
purgeInterval ago are deleted from the kaddb row. If no candidate is found after
a full round of checking the next bucket up is considered. If no candidate is
found when we reach the maximum-proximity bucket, the next round starts.

node record a is more favoured to b a > b iff a is a passive node (record of
offline past peer)
|proxBin(a)| < |proxBin(b)|
|| (proxBin(a) < proxBin(b) && |proxBin(a)| == |proxBin(b)|)
|| (proxBin(a) == proxBin(b) && lastChecked(a) < lastChecked(b))


The second argument returned names the first missing slot found
*/
func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRecord, need bool, proxLimit int) {
    // return nil, proxLimit indicates that all buckets are filled
    defer self.lock.Unlock()
    self.lock.Lock()

    var interval time.Duration
    var found bool
    var purge []bool
    var delta time.Duration
    var cursor int
    var count int
    var after time.Time

    // iterate over columns maximum bucketsize times
    for rounds := 1; rounds <= maxBinSize; rounds++ {
    ROUND:
        // iterate over rows from PO 0 upto MaxProx
        for po, dbrow := range self.Nodes {
            // if row has rounds connected peers, then take the next
            if binSize(po) >= rounds {
                continue ROUND
            }
            if !need {
                // set proxlimit to the PO where the first missing slot is found
                proxLimit = po
                need = true
            }
            purge = make([]bool, len(dbrow))

            // there is a missing slot - finding a node to connect to
            // select a node record from the relavant kaddb row (of identical prox order)
        ROW:
            for cursor = self.cursors[po]; !found && count < len(dbrow); cursor = (cursor + 1) % len(dbrow) {
                count++
                node = dbrow[cursor]

                // skip already connected nodes
                if node.node != nil {
                    log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow)))
                    continue ROW
                }

                // if node is scheduled to connect
                if node.After.After(time.Now()) {
                    log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After))
                    continue ROW
                }

                delta = time.Since(node.Seen)
                if delta < self.initialRetryInterval {
                    delta = self.initialRetryInterval
                }
                if delta > self.purgeInterval {
                    // remove node
                    purge[cursor] = true
                    log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen))
                    continue ROW
                }

                log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After))

                // scheduling next check
                interval = delta * time.Duration(self.connRetryExp)
                after = time.Now().Add(interval)

                log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval))
                node.After = after
                found = true
            } // ROW
            self.cursors[po] = cursor
            self.delete(po, purge)
            if found {
                return node, need, proxLimit
            }
        } // ROUND
    } // ROUNDS

    return nil, need, proxLimit
}

// deletes the noderecords of a kaddb row corresponding to the indexes
// caller must hold the dblock
// the call is unsafe, no index checks
func (self *KadDb) delete(row int, purge []bool) {
    var nodes []*NodeRecord
    dbrow := self.Nodes[row]
    for i, del := range purge {
        if i == self.cursors[row] {
            //reset cursor
            self.cursors[row] = len(nodes)
        }
        // delete the entry to be purged
        if del {
            delete(self.index, dbrow[i].Addr)
            continue
        }
        // otherwise append to new list
        nodes = append(nodes, dbrow[i])
    }
    self.Nodes[row] = nodes
}

// save persists kaddb on disk (written to file on path in json format.
func (self *KadDb) save(path string, cb func(*NodeRecord, Node)) error {
    defer self.lock.Unlock()
    self.lock.Lock()

    var n int

    for _, b := range self.Nodes {
        for _, node := range b {
            n++
            node.After = time.Now()
            node.Seen = time.Now()
            if cb != nil {
                cb(node, node.node)
            }
        }
    }

    data, err := json.MarshalIndent(self, "", " ")
    if err != nil {
        return err
    }
    err = ioutil.WriteFile(path, data, os.ModePerm)
    if err != nil {
        log.Warn(fmt.Sprintf("unable to save kaddb with %v nodes to %v: %v", n, path, err))
    } else {
        log.Info(fmt.Sprintf("saved kaddb with %v nodes to %v", n, path))
    }
    return err
}

// Load(path) loads the node record database (kaddb) from file on path.
func (self *KadDb) load(path string, cb func(*NodeRecord, Node) error) (err error) {
    defer self.lock.Unlock()
    self.lock.Lock()

    var data []byte
    data, err = ioutil.ReadFile(path)
    if err != nil {
        return
    }

    err = json.Unmarshal(data, self)
    if err != nil {
        return
    }
    var n int
    var purge []bool
    for po, b := range self.Nodes {
        purge = make([]bool, len(b))
    ROW:
        for i, node := range b {
            if cb != nil {
                err = cb(node, node.node)
                if err != nil {
                    purge[i] = true
                    continue ROW
                }
            }
            n++
            if (node.After == time.Time{}) {
                node.After = time.Now()
            }
            self.index[node.Addr] = node
        }
        self.delete(po, purge)
    }
    log.Info(fmt.Sprintf("loaded kaddb with %v nodes from %v", n, path))

    return
}

// accessor for KAD offline db count
func (self *KadDb) count() int {
    defer self.lock.Unlock()
    self.lock.Lock()
    return len(self.index)
}