diff options
Diffstat (limited to 'p2p/discover/table.go')
-rw-r--r-- | p2p/discover/table.go | 161 |
1 files changed, 105 insertions, 56 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 972bc1077..66afa52ea 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -44,6 +44,10 @@ const ( maxBondingPingPongs = 16 maxFindnodeFailures = 5 + + autoRefreshInterval = 1 * time.Hour + seedCount = 30 + seedMaxAge = 5 * 24 * time.Hour ) type Table struct { @@ -52,6 +56,10 @@ type Table struct { nursery []*Node // bootstrap nodes db *nodeDB // database of known nodes + refreshReq chan struct{} + closeReq chan struct{} + closed chan struct{} + bondmu sync.Mutex bonding map[NodeID]*bondproc bondslots chan struct{} // limits total number of active bonding processes @@ -93,11 +101,14 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string db, _ = newNodeDB("", Version, ourID) } tab := &Table{ - net: t, - db: db, - self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)), - bonding: make(map[NodeID]*bondproc), - bondslots: make(chan struct{}, maxBondingPingPongs), + net: t, + db: db, + self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)), + bonding: make(map[NodeID]*bondproc), + bondslots: make(chan struct{}, maxBondingPingPongs), + refreshReq: make(chan struct{}), + closeReq: make(chan struct{}), + closed: make(chan struct{}), } for i := 0; i < cap(tab.bondslots); i++ { tab.bondslots <- struct{}{} @@ -105,6 +116,7 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string for i := range tab.buckets { tab.buckets[i] = new(bucket) } + go tab.refreshLoop() return tab } @@ -163,10 +175,12 @@ func randUint(max uint32) uint32 { // Close terminates the network listener and flushes the node database. func (tab *Table) Close() { - if tab.net != nil { - tab.net.close() + select { + case <-tab.closed: + // already closed. + case tab.closeReq <- struct{}{}: + <-tab.closed // wait for refreshLoop to end. } - tab.db.close() } // Bootstrap sets the bootstrap nodes. These nodes are used to connect @@ -183,7 +197,7 @@ func (tab *Table) Bootstrap(nodes []*Node) { tab.nursery = append(tab.nursery, &cpy) } tab.mutex.Unlock() - tab.refresh() + tab.requestRefresh() } // Lookup performs a network search for nodes close @@ -210,9 +224,9 @@ func (tab *Table) Lookup(targetID NodeID) []*Node { result := tab.closest(target, bucketSize) tab.mutex.Unlock() - // If the result set is empty, all nodes were dropped, refresh + // If the result set is empty, all nodes were dropped, refresh. if len(result.entries) == 0 { - tab.refresh() + tab.requestRefresh() return nil } @@ -257,56 +271,86 @@ func (tab *Table) Lookup(targetID NodeID) []*Node { return result.entries } -// refresh performs a lookup for a random target to keep buckets full, or seeds -// the table if it is empty (initial bootstrap or discarded faulty peers). -func (tab *Table) refresh() { - seed := true +func (tab *Table) requestRefresh() { + select { + case tab.refreshReq <- struct{}{}: + case <-tab.closed: + } +} - // If the discovery table is empty, seed with previously known nodes - tab.mutex.Lock() - for _, bucket := range tab.buckets { - if len(bucket.entries) > 0 { - seed = false - break +func (tab *Table) refreshLoop() { + defer func() { + tab.db.close() + if tab.net != nil { + tab.net.close() } - } - tab.mutex.Unlock() + close(tab.closed) + }() - // If the table is not empty, try to refresh using the live entries - if !seed { - // The Kademlia paper specifies that the bucket refresh should - // perform a refresh in the least recently used bucket. We cannot - // adhere to this because the findnode target is a 512bit value - // (not hash-sized) and it is not easily possible to generate a - // sha3 preimage that falls into a chosen bucket. - // - // We perform a lookup with a random target instead. - var target NodeID - rand.Read(target[:]) - - result := tab.Lookup(target) - if len(result) == 0 { - // Lookup failed, seed after all - seed = true + timer := time.NewTicker(autoRefreshInterval) + var done chan struct{} + for { + select { + case <-timer.C: + if done == nil { + done = make(chan struct{}) + go tab.doRefresh(done) + } + case <-tab.refreshReq: + if done == nil { + done = make(chan struct{}) + go tab.doRefresh(done) + } + case <-done: + done = nil + case <-tab.closeReq: + if done != nil { + <-done + } + return } } +} - if seed { - // Pick a batch of previously know seeds to lookup with - seeds := tab.db.querySeeds(10) - for _, seed := range seeds { - glog.V(logger.Debug).Infoln("Seeding network with", seed) - } - nodes := append(tab.nursery, seeds...) +// doRefresh performs a lookup for a random target to keep buckets +// full. seed nodes are inserted if the table is empty (initial +// bootstrap or discarded faulty peers). +func (tab *Table) doRefresh(done chan struct{}) { + defer close(done) + + // The Kademlia paper specifies that the bucket refresh should + // perform a lookup in the least recently used bucket. We cannot + // adhere to this because the findnode target is a 512bit value + // (not hash-sized) and it is not easily possible to generate a + // sha3 preimage that falls into a chosen bucket. + // We perform a lookup with a random target instead. + var target NodeID + rand.Read(target[:]) + result := tab.Lookup(target) + if len(result) > 0 { + return + } - // Bond with all the seed nodes (will pingpong only if failed recently) - bonded := tab.bondall(nodes) - if len(bonded) > 0 { - tab.Lookup(tab.self.ID) + // The table is empty. Load nodes from the database and insert + // them. This should yield a few previously seen nodes that are + // (hopefully) still alive. + seeds := tab.db.querySeeds(seedCount, seedMaxAge) + seeds = tab.bondall(append(seeds, tab.nursery...)) + if glog.V(logger.Debug) { + if len(seeds) == 0 { + glog.Infof("no seed nodes found") + } + for _, n := range seeds { + age := time.Since(tab.db.lastPong(n.ID)) + glog.Infof("seed node (age %v): %v", age, n) } - // TODO: the Kademlia paper says that we're supposed to perform - // random lookups in all buckets further away than our closest neighbor. } + tab.mutex.Lock() + tab.stuff(seeds) + tab.mutex.Unlock() + + // Finally, do a self lookup to fill up the buckets. + tab.Lookup(tab.self.ID) } // closest returns the n nodes in the table that are closest to the @@ -373,8 +417,9 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16 } // If the node is unknown (non-bonded) or failed (remotely unknown), bond from scratch var result error - if node == nil || fails > 0 { - glog.V(logger.Detail).Infof("Bonding %x: known=%v, fails=%v", id[:8], node != nil, fails) + age := time.Since(tab.db.lastPong(id)) + if node == nil || fails > 0 || age > nodeDBNodeExpiration { + glog.V(logger.Detail).Infof("Bonding %x: known=%t, fails=%d age=%v", id[:8], node != nil, fails, age) tab.bondmu.Lock() w := tab.bonding[id] @@ -435,13 +480,17 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd // ping a remote endpoint and wait for a reply, also updating the node // database accordingly. func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { - // Update the last ping and send the message tab.db.updateLastPing(id, time.Now()) if err := tab.net.ping(id, addr); err != nil { return err } - // Pong received, update the database and return tab.db.updateLastPong(id, time.Now()) + + // Start the background expiration goroutine after the first + // successful communication. Subsequent calls have no effect if it + // is already running. We do this here instead of somewhere else + // so that the search for seed nodes also considers older nodes + // that would otherwise be removed by the expiration. tab.db.ensureExpirer() return nil } |