diff options
Diffstat (limited to 'p2p/discover/table.go')
-rw-r--r-- | p2p/discover/table.go | 185 |
1 files changed, 134 insertions, 51 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 33b705a12..dbf86c084 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -14,9 +14,10 @@ import ( ) const ( - alpha = 3 // Kademlia concurrency factor - bucketSize = 16 // Kademlia bucket size - nBuckets = nodeIDBits + 1 // Number of buckets + alpha = 3 // Kademlia concurrency factor + bucketSize = 16 // Kademlia bucket size + nBuckets = nodeIDBits + 1 // Number of buckets + maxBondingPingPongs = 10 ) type Table struct { @@ -24,27 +25,50 @@ type Table struct { buckets [nBuckets]*bucket // index of known nodes by distance nursery []*Node // bootstrap nodes + bondmu sync.Mutex + bonding map[NodeID]*bondproc + bondslots chan struct{} // limits total number of active bonding processes + net transport self *Node // metadata of the local node + db *nodeDB +} + +type bondproc struct { + err error + n *Node + done chan struct{} } // transport is implemented by the UDP transport. // it is an interface so we can test without opening lots of UDP // sockets and without generating a private key. type transport interface { - ping(*Node) error - findnode(e *Node, target NodeID) ([]*Node, error) + ping(NodeID, *net.UDPAddr) error + waitping(NodeID) error + findnode(toid NodeID, addr *net.UDPAddr, target NodeID) ([]*Node, error) close() } // bucket contains nodes, ordered by their last activity. +// the entry that was most recently active is the last element +// in entries. type bucket struct { lastLookup time.Time entries []*Node } func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr) *Table { - tab := &Table{net: t, self: newNode(ourID, ourAddr)} + tab := &Table{ + net: t, + db: new(nodeDB), + self: newNode(ourID, ourAddr), + bonding: make(map[NodeID]*bondproc), + bondslots: make(chan struct{}, maxBondingPingPongs), + } + for i := 0; i < cap(tab.bondslots); i++ { + tab.bondslots <- struct{}{} + } for i := range tab.buckets { tab.buckets[i] = new(bucket) } @@ -107,8 +131,8 @@ func (tab *Table) Lookup(target NodeID) []*Node { asked[n.ID] = true pendingQueries++ go func() { - result, _ := tab.net.findnode(n, target) - reply <- result + r, _ := tab.net.findnode(n.ID, n.addr(), target) + reply <- tab.bondall(r) }() } } @@ -116,13 +140,11 @@ func (tab *Table) Lookup(target NodeID) []*Node { // we have asked all closest nodes, stop the search break } - // wait for the next reply for _, n := range <-reply { - cn := n - if !seen[n.ID] { + if n != nil && !seen[n.ID] { seen[n.ID] = true - result.push(cn, bucketSize) + result.push(n, bucketSize) } } pendingQueries-- @@ -145,8 +167,9 @@ func (tab *Table) refresh() { result := tab.Lookup(randomID(tab.self.ID, ld)) if len(result) == 0 { // bootstrap the table with a self lookup + all := tab.bondall(tab.nursery) tab.mutex.Lock() - tab.add(tab.nursery) + tab.add(all) tab.mutex.Unlock() tab.Lookup(tab.self.ID) // TODO: the Kademlia paper says that we're supposed to perform @@ -176,45 +199,105 @@ func (tab *Table) len() (n int) { return n } -// bumpOrAdd updates the activity timestamp for the given node and -// attempts to insert the node into a bucket. The returned Node might -// not be part of the table. The caller must hold tab.mutex. -func (tab *Table) bumpOrAdd(node NodeID, from *net.UDPAddr) (n *Node) { - b := tab.buckets[logdist(tab.self.ID, node)] - if n = b.bump(node); n == nil { - n = newNode(node, from) - if len(b.entries) == bucketSize { - tab.pingReplace(n, b) - } else { - b.entries = append(b.entries, n) +// bondall bonds with all given nodes concurrently and returns +// those nodes for which bonding has probably succeeded. +func (tab *Table) bondall(nodes []*Node) (result []*Node) { + rc := make(chan *Node, len(nodes)) + for i := range nodes { + go func(n *Node) { + nn, _ := tab.bond(false, n.ID, n.addr(), uint16(n.TCPPort)) + rc <- nn + }(nodes[i]) + } + for _ = range nodes { + if n := <-rc; n != nil { + result = append(result, n) } } - return n + return result } -func (tab *Table) pingReplace(n *Node, b *bucket) { - old := b.entries[bucketSize-1] - go func() { - if err := tab.net.ping(old); err == nil { - // it responded, we don't need to replace it. - return +// bond ensures the local node has a bond with the given remote node. +// It also attempts to insert the node into the table if bonding succeeds. +// The caller must not hold tab.mutex. +// +// A bond is must be established before sending findnode requests. +// Both sides must have completed a ping/pong exchange for a bond to +// exist. The total number of active bonding processes is limited in +// order to restrain network use. +// +// bond is meant to operate idempotently in that bonding with a remote +// node which still remembers a previously established bond will work. +// The remote node will simply not send a ping back, causing waitping +// to time out. +// +// If pinged is true, the remote node has just pinged us and one half +// of the process can be skipped. +func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) { + var n *Node + if n = tab.db.get(id); n == nil { + tab.bondmu.Lock() + w := tab.bonding[id] + if w != nil { + // Wait for an existing bonding process to complete. + tab.bondmu.Unlock() + <-w.done + } else { + // Register a new bonding process. + w = &bondproc{done: make(chan struct{})} + tab.bonding[id] = w + tab.bondmu.Unlock() + // Do the ping/pong. The result goes into w. + tab.pingpong(w, pinged, id, addr, tcpPort) + // Unregister the process after it's done. + tab.bondmu.Lock() + delete(tab.bonding, id) + tab.bondmu.Unlock() } - // it didn't respond, replace the node if it is still the oldest node. - tab.mutex.Lock() - if len(b.entries) > 0 && b.entries[len(b.entries)-1] == old { - // slide down other entries and put the new one in front. - // TODO: insert in correct position to keep the order - copy(b.entries[1:], b.entries) - b.entries[0] = n + n = w.n + if w.err != nil { + return nil, w.err } - tab.mutex.Unlock() - }() + } + tab.mutex.Lock() + defer tab.mutex.Unlock() + if b := tab.buckets[logdist(tab.self.ID, n.ID)]; !b.bump(n) { + tab.pingreplace(n, b) + } + return n, nil +} + +func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) { + <-tab.bondslots + defer func() { tab.bondslots <- struct{}{} }() + if w.err = tab.net.ping(id, addr); w.err != nil { + close(w.done) + return + } + if !pinged { + // Give the remote node a chance to ping us before we start + // sending findnode requests. If they still remember us, + // waitping will simply time out. + tab.net.waitping(id) + } + w.n = tab.db.add(id, addr, tcpPort) + close(w.done) } -// bump updates the activity timestamp for the given node. -// The caller must hold tab.mutex. -func (tab *Table) bump(node NodeID) { - tab.buckets[logdist(tab.self.ID, node)].bump(node) +func (tab *Table) pingreplace(new *Node, b *bucket) { + if len(b.entries) == bucketSize { + oldest := b.entries[bucketSize-1] + if err := tab.net.ping(oldest.ID, oldest.addr()); err == nil { + // The node responded, we don't need to replace it. + return + } + } else { + // Add a slot at the end so the last entry doesn't + // fall off when adding the new node. + b.entries = append(b.entries, nil) + } + copy(b.entries[1:], b.entries) + b.entries[0] = new } // add puts the entries into the table if their corresponding @@ -240,17 +323,17 @@ outer: } } -func (b *bucket) bump(id NodeID) *Node { - for i, n := range b.entries { - if n.ID == id { - n.active = time.Now() +func (b *bucket) bump(n *Node) bool { + for i := range b.entries { + if b.entries[i].ID == n.ID { + n.bumpActive() // move it to the front - copy(b.entries[1:], b.entries[:i+1]) + copy(b.entries[1:], b.entries[:i]) b.entries[0] = n - return n + return true } } - return nil + return false } // nodesByDistance is a list of nodes, ordered by |