diff options
Diffstat (limited to 'p2p/discover/table.go')
-rw-r--r-- | p2p/discover/table.go | 105 |
1 files changed, 68 insertions, 37 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go index efa6e8eea..abb7980f8 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -25,6 +25,7 @@ package discover import ( "crypto/rand" "encoding/binary" + "fmt" "net" "sort" "sync" @@ -56,7 +57,7 @@ type Table struct { nursery []*Node // bootstrap nodes db *nodeDB // database of known nodes - refreshReq chan struct{} + refreshReq chan chan struct{} closeReq chan struct{} closed chan struct{} @@ -102,7 +103,7 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string self: NewNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)), bonding: make(map[NodeID]*bondproc), bondslots: make(chan struct{}, maxBondingPingPongs), - refreshReq: make(chan struct{}), + refreshReq: make(chan chan struct{}), closeReq: make(chan struct{}), closed: make(chan struct{}), } @@ -179,21 +180,27 @@ func (tab *Table) Close() { } } -// Bootstrap sets the bootstrap nodes. These nodes are used to connect -// to the network if the table is empty. Bootstrap will also attempt to -// fill the table by performing random lookup operations on the -// network. -func (tab *Table) Bootstrap(nodes []*Node) { +// SetFallbackNodes sets the initial points of contact. These nodes +// are used to connect to the network if the table is empty and there +// are no known nodes in the database. +func (tab *Table) SetFallbackNodes(nodes []*Node) error { + for _, n := range nodes { + if err := n.validateComplete(); err != nil { + return fmt.Errorf("bad bootstrap/fallback node %q (%v)", n, err) + } + } tab.mutex.Lock() - // TODO: maybe filter nodes with bad fields (nil, etc.) to avoid strange crashes tab.nursery = make([]*Node, 0, len(nodes)) for _, n := range nodes { cpy := *n + // Recompute cpy.sha because the node might not have been + // created by NewNode or ParseNode. cpy.sha = crypto.Sha3Hash(n.ID[:]) tab.nursery = append(tab.nursery, &cpy) } tab.mutex.Unlock() - tab.requestRefresh() + tab.refresh() + return nil } // Resolve searches for a specific node with the given ID. @@ -224,26 +231,36 @@ func (tab *Table) Resolve(targetID NodeID) *Node { // The given target does not need to be an actual node // identifier. func (tab *Table) Lookup(targetID NodeID) []*Node { + return tab.lookup(targetID, true) +} + +func (tab *Table) lookup(targetID NodeID, refreshIfEmpty bool) []*Node { var ( target = crypto.Sha3Hash(targetID[:]) asked = make(map[NodeID]bool) seen = make(map[NodeID]bool) reply = make(chan []*Node, alpha) pendingQueries = 0 + result *nodesByDistance ) // don't query further if we hit ourself. // unlikely to happen often in practice. asked[tab.self.ID] = true - tab.mutex.Lock() - // generate initial result set - result := tab.closest(target, bucketSize) - tab.mutex.Unlock() - - // If the result set is empty, all nodes were dropped, refresh. - if len(result.entries) == 0 { - tab.requestRefresh() - return nil + for { + tab.mutex.Lock() + // generate initial result set + result = tab.closest(target, bucketSize) + tab.mutex.Unlock() + if len(result.entries) > 0 || !refreshIfEmpty { + break + } + // The result set is empty, all nodes were dropped, refresh. + // We actually wait for the refresh to complete here. The very + // first query will hit this case and run the bootstrapping + // logic. + <-tab.refresh() + refreshIfEmpty = false } for { @@ -287,24 +304,24 @@ func (tab *Table) Lookup(targetID NodeID) []*Node { return result.entries } -func (tab *Table) requestRefresh() { +func (tab *Table) refresh() <-chan struct{} { + done := make(chan struct{}) select { - case tab.refreshReq <- struct{}{}: + case tab.refreshReq <- done: case <-tab.closed: + close(done) } + return done } +// refreshLoop schedules doRefresh runs and coordinates shutdown. func (tab *Table) refreshLoop() { - defer func() { - tab.db.close() - if tab.net != nil { - tab.net.close() - } - close(tab.closed) - }() - - timer := time.NewTicker(autoRefreshInterval) - var done chan struct{} + var ( + timer = time.NewTicker(autoRefreshInterval) + waiting []chan struct{} // accumulates waiting callers while doRefresh runs + done chan struct{} // where doRefresh reports completion + ) +loop: for { select { case <-timer.C: @@ -312,20 +329,34 @@ func (tab *Table) refreshLoop() { done = make(chan struct{}) go tab.doRefresh(done) } - case <-tab.refreshReq: + case req := <-tab.refreshReq: + waiting = append(waiting, req) if done == nil { done = make(chan struct{}) go tab.doRefresh(done) } case <-done: + for _, ch := range waiting { + close(ch) + } + waiting = nil done = nil case <-tab.closeReq: - if done != nil { - <-done - } - return + break loop } } + + if tab.net != nil { + tab.net.close() + } + if done != nil { + <-done + } + for _, ch := range waiting { + close(ch) + } + tab.db.close() + close(tab.closed) } // doRefresh performs a lookup for a random target to keep buckets @@ -342,7 +373,7 @@ func (tab *Table) doRefresh(done chan struct{}) { // We perform a lookup with a random target instead. var target NodeID rand.Read(target[:]) - result := tab.Lookup(target) + result := tab.lookup(target, false) if len(result) > 0 { return } @@ -366,7 +397,7 @@ func (tab *Table) doRefresh(done chan struct{}) { tab.mutex.Unlock() // Finally, do a self lookup to fill up the buckets. - tab.Lookup(tab.self.ID) + tab.lookup(tab.self.ID, false) } // closest returns the n nodes in the table that are closest to the |