aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discover/table.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/discover/table.go')
-rw-r--r--p2p/discover/table.go185
1 files changed, 134 insertions, 51 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 33b705a12..dbf86c084 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -14,9 +14,10 @@ import (
)
const (
- alpha = 3 // Kademlia concurrency factor
- bucketSize = 16 // Kademlia bucket size
- nBuckets = nodeIDBits + 1 // Number of buckets
+ alpha = 3 // Kademlia concurrency factor
+ bucketSize = 16 // Kademlia bucket size
+ nBuckets = nodeIDBits + 1 // Number of buckets
+ maxBondingPingPongs = 10
)
type Table struct {
@@ -24,27 +25,50 @@ type Table struct {
buckets [nBuckets]*bucket // index of known nodes by distance
nursery []*Node // bootstrap nodes
+ bondmu sync.Mutex
+ bonding map[NodeID]*bondproc
+ bondslots chan struct{} // limits total number of active bonding processes
+
net transport
self *Node // metadata of the local node
+ db *nodeDB
+}
+
+type bondproc struct {
+ err error
+ n *Node
+ done chan struct{}
}
// transport is implemented by the UDP transport.
// it is an interface so we can test without opening lots of UDP
// sockets and without generating a private key.
type transport interface {
- ping(*Node) error
- findnode(e *Node, target NodeID) ([]*Node, error)
+ ping(NodeID, *net.UDPAddr) error
+ waitping(NodeID) error
+ findnode(toid NodeID, addr *net.UDPAddr, target NodeID) ([]*Node, error)
close()
}
// bucket contains nodes, ordered by their last activity.
+// the entry that was most recently active is the last element
+// in entries.
type bucket struct {
lastLookup time.Time
entries []*Node
}
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr) *Table {
- tab := &Table{net: t, self: newNode(ourID, ourAddr)}
+ tab := &Table{
+ net: t,
+ db: new(nodeDB),
+ self: newNode(ourID, ourAddr),
+ bonding: make(map[NodeID]*bondproc),
+ bondslots: make(chan struct{}, maxBondingPingPongs),
+ }
+ for i := 0; i < cap(tab.bondslots); i++ {
+ tab.bondslots <- struct{}{}
+ }
for i := range tab.buckets {
tab.buckets[i] = new(bucket)
}
@@ -107,8 +131,8 @@ func (tab *Table) Lookup(target NodeID) []*Node {
asked[n.ID] = true
pendingQueries++
go func() {
- result, _ := tab.net.findnode(n, target)
- reply <- result
+ r, _ := tab.net.findnode(n.ID, n.addr(), target)
+ reply <- tab.bondall(r)
}()
}
}
@@ -116,13 +140,11 @@ func (tab *Table) Lookup(target NodeID) []*Node {
// we have asked all closest nodes, stop the search
break
}
-
// wait for the next reply
for _, n := range <-reply {
- cn := n
- if !seen[n.ID] {
+ if n != nil && !seen[n.ID] {
seen[n.ID] = true
- result.push(cn, bucketSize)
+ result.push(n, bucketSize)
}
}
pendingQueries--
@@ -145,8 +167,9 @@ func (tab *Table) refresh() {
result := tab.Lookup(randomID(tab.self.ID, ld))
if len(result) == 0 {
// bootstrap the table with a self lookup
+ all := tab.bondall(tab.nursery)
tab.mutex.Lock()
- tab.add(tab.nursery)
+ tab.add(all)
tab.mutex.Unlock()
tab.Lookup(tab.self.ID)
// TODO: the Kademlia paper says that we're supposed to perform
@@ -176,45 +199,105 @@ func (tab *Table) len() (n int) {
return n
}
-// bumpOrAdd updates the activity timestamp for the given node and
-// attempts to insert the node into a bucket. The returned Node might
-// not be part of the table. The caller must hold tab.mutex.
-func (tab *Table) bumpOrAdd(node NodeID, from *net.UDPAddr) (n *Node) {
- b := tab.buckets[logdist(tab.self.ID, node)]
- if n = b.bump(node); n == nil {
- n = newNode(node, from)
- if len(b.entries) == bucketSize {
- tab.pingReplace(n, b)
- } else {
- b.entries = append(b.entries, n)
+// bondall bonds with all given nodes concurrently and returns
+// those nodes for which bonding has probably succeeded.
+func (tab *Table) bondall(nodes []*Node) (result []*Node) {
+ rc := make(chan *Node, len(nodes))
+ for i := range nodes {
+ go func(n *Node) {
+ nn, _ := tab.bond(false, n.ID, n.addr(), uint16(n.TCPPort))
+ rc <- nn
+ }(nodes[i])
+ }
+ for _ = range nodes {
+ if n := <-rc; n != nil {
+ result = append(result, n)
}
}
- return n
+ return result
}
-func (tab *Table) pingReplace(n *Node, b *bucket) {
- old := b.entries[bucketSize-1]
- go func() {
- if err := tab.net.ping(old); err == nil {
- // it responded, we don't need to replace it.
- return
+// bond ensures the local node has a bond with the given remote node.
+// It also attempts to insert the node into the table if bonding succeeds.
+// The caller must not hold tab.mutex.
+//
+// A bond is must be established before sending findnode requests.
+// Both sides must have completed a ping/pong exchange for a bond to
+// exist. The total number of active bonding processes is limited in
+// order to restrain network use.
+//
+// bond is meant to operate idempotently in that bonding with a remote
+// node which still remembers a previously established bond will work.
+// The remote node will simply not send a ping back, causing waitping
+// to time out.
+//
+// If pinged is true, the remote node has just pinged us and one half
+// of the process can be skipped.
+func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) {
+ var n *Node
+ if n = tab.db.get(id); n == nil {
+ tab.bondmu.Lock()
+ w := tab.bonding[id]
+ if w != nil {
+ // Wait for an existing bonding process to complete.
+ tab.bondmu.Unlock()
+ <-w.done
+ } else {
+ // Register a new bonding process.
+ w = &bondproc{done: make(chan struct{})}
+ tab.bonding[id] = w
+ tab.bondmu.Unlock()
+ // Do the ping/pong. The result goes into w.
+ tab.pingpong(w, pinged, id, addr, tcpPort)
+ // Unregister the process after it's done.
+ tab.bondmu.Lock()
+ delete(tab.bonding, id)
+ tab.bondmu.Unlock()
}
- // it didn't respond, replace the node if it is still the oldest node.
- tab.mutex.Lock()
- if len(b.entries) > 0 && b.entries[len(b.entries)-1] == old {
- // slide down other entries and put the new one in front.
- // TODO: insert in correct position to keep the order
- copy(b.entries[1:], b.entries)
- b.entries[0] = n
+ n = w.n
+ if w.err != nil {
+ return nil, w.err
}
- tab.mutex.Unlock()
- }()
+ }
+ tab.mutex.Lock()
+ defer tab.mutex.Unlock()
+ if b := tab.buckets[logdist(tab.self.ID, n.ID)]; !b.bump(n) {
+ tab.pingreplace(n, b)
+ }
+ return n, nil
+}
+
+func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) {
+ <-tab.bondslots
+ defer func() { tab.bondslots <- struct{}{} }()
+ if w.err = tab.net.ping(id, addr); w.err != nil {
+ close(w.done)
+ return
+ }
+ if !pinged {
+ // Give the remote node a chance to ping us before we start
+ // sending findnode requests. If they still remember us,
+ // waitping will simply time out.
+ tab.net.waitping(id)
+ }
+ w.n = tab.db.add(id, addr, tcpPort)
+ close(w.done)
}
-// bump updates the activity timestamp for the given node.
-// The caller must hold tab.mutex.
-func (tab *Table) bump(node NodeID) {
- tab.buckets[logdist(tab.self.ID, node)].bump(node)
+func (tab *Table) pingreplace(new *Node, b *bucket) {
+ if len(b.entries) == bucketSize {
+ oldest := b.entries[bucketSize-1]
+ if err := tab.net.ping(oldest.ID, oldest.addr()); err == nil {
+ // The node responded, we don't need to replace it.
+ return
+ }
+ } else {
+ // Add a slot at the end so the last entry doesn't
+ // fall off when adding the new node.
+ b.entries = append(b.entries, nil)
+ }
+ copy(b.entries[1:], b.entries)
+ b.entries[0] = new
}
// add puts the entries into the table if their corresponding
@@ -240,17 +323,17 @@ outer:
}
}
-func (b *bucket) bump(id NodeID) *Node {
- for i, n := range b.entries {
- if n.ID == id {
- n.active = time.Now()
+func (b *bucket) bump(n *Node) bool {
+ for i := range b.entries {
+ if b.entries[i].ID == n.ID {
+ n.bumpActive()
// move it to the front
- copy(b.entries[1:], b.entries[:i+1])
+ copy(b.entries[1:], b.entries[:i])
b.entries[0] = n
- return n
+ return true
}
}
- return nil
+ return false
}
// nodesByDistance is a list of nodes, ordered by