diff options
author | Felix Lange <fjl@twurst.com> | 2015-08-07 06:10:26 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2015-08-11 17:42:17 +0800 |
commit | 01ed3fa1a9414328eb6c4fc839e1b2044a786a7a (patch) | |
tree | f081f6c63b7e5ca04c4976d85654797a4c7f2a0b /p2p/discover | |
parent | 698e98d9814605bfea98ba3ad2fe7fda073cb2b1 (diff) | |
download | go-tangerine-01ed3fa1a9414328eb6c4fc839e1b2044a786a7a.tar.gz go-tangerine-01ed3fa1a9414328eb6c4fc839e1b2044a786a7a.tar.zst go-tangerine-01ed3fa1a9414328eb6c4fc839e1b2044a786a7a.zip |
p2p/discover: unlock the table during ping replacement
Table.mutex was being held while waiting for a reply packet, which
effectively made many parts of the whole stack block on that packet,
including the net_peerCount RPC call.
Diffstat (limited to 'p2p/discover')
-rw-r--r-- | p2p/discover/table.go | 121 | ||||
-rw-r--r-- | p2p/discover/table_test.go | 6 | ||||
-rw-r--r-- | p2p/discover/udp_test.go | 2 |
3 files changed, 77 insertions, 52 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 67f7ec46f..b077f010c 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -78,9 +78,8 @@ type transport interface { close() } -// bucket contains nodes, ordered by their last activity. -// the entry that was most recently active is the last element -// in entries. +// bucket contains nodes, ordered by their last activity. the entry +// that was most recently active is the first element in entries. type bucket struct { lastLookup time.Time entries []*Node @@ -235,7 +234,7 @@ func (tab *Table) Lookup(targetID NodeID) []*Node { if fails >= maxFindnodeFailures { glog.V(logger.Detail).Infof("Evacuating node %x: %d findnode failures", n.ID[:8], fails) - tab.del(n) + tab.delete(n) } } reply <- tab.bondall(r) @@ -401,15 +400,11 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16 node = w.n } } - // Even if bonding temporarily failed, give the node a chance if node != nil { - tab.mutex.Lock() - defer tab.mutex.Unlock() - - b := tab.buckets[logdist(tab.self.sha, node.sha)] - if !b.bump(node) { - tab.pingreplace(node, b) - } + // Add the node to the table even if the bonding ping/pong + // fails. It will be relaced quickly if it continues to be + // unresponsive. + tab.add(node) tab.db.updateFindFails(id, 0) } return node, result @@ -420,7 +415,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd <-tab.bondslots defer func() { tab.bondslots <- struct{}{} }() - // Ping the remote side and wait for a pong + // Ping the remote side and wait for a pong. if w.err = tab.ping(id, addr); w.err != nil { close(w.done) return @@ -431,33 +426,14 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd // waitping will simply time out. tab.net.waitping(id) } - // Bonding succeeded, update the node database + // Bonding succeeded, update the node database. w.n = newNode(id, addr.IP, uint16(addr.Port), tcpPort) tab.db.updateNode(w.n) close(w.done) } -func (tab *Table) pingreplace(new *Node, b *bucket) { - if len(b.entries) == bucketSize { - oldest := b.entries[bucketSize-1] - if err := tab.ping(oldest.ID, oldest.addr()); err == nil { - // The node responded, we don't need to replace it. - return - } - } else { - // Add a slot at the end so the last entry doesn't - // fall off when adding the new node. - b.entries = append(b.entries, nil) - } - copy(b.entries[1:], b.entries) - b.entries[0] = new - if tab.nodeAddedHook != nil { - tab.nodeAddedHook(new) - } -} - -// ping a remote endpoint and wait for a reply, also updating the node database -// accordingly. +// ping a remote endpoint and wait for a reply, also updating the node +// database accordingly. func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { // Update the last ping and send the message tab.db.updateLastPing(id, time.Now()) @@ -467,24 +443,53 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { // Pong received, update the database and return tab.db.updateLastPong(id, time.Now()) tab.db.ensureExpirer() - return nil } -// add puts the entries into the table if their corresponding -// bucket is not full. The caller must hold tab.mutex. -func (tab *Table) add(entries []*Node) { +// add attempts to add the given node its corresponding bucket. If the +// bucket has space available, adding the node succeeds immediately. +// Otherwise, the node is added if the least recently active node in +// the bucket does not respond to a ping packet. +// +// The caller must not hold tab.mutex. +func (tab *Table) add(new *Node) { + b := tab.buckets[logdist(tab.self.sha, new.sha)] + tab.mutex.Lock() + if b.bump(new) { + tab.mutex.Unlock() + return + } + var oldest *Node + if len(b.entries) == bucketSize { + oldest = b.entries[bucketSize-1] + // Let go of the mutex so other goroutines can access + // the table while we ping the least recently active node. + tab.mutex.Unlock() + if err := tab.ping(oldest.ID, oldest.addr()); err == nil { + // The node responded, don't replace it. + return + } + tab.mutex.Lock() + } + added := b.replace(new, oldest) + tab.mutex.Unlock() + if added && tab.nodeAddedHook != nil { + tab.nodeAddedHook(new) + } +} + +// stuff adds nodes the table to the end of their corresponding bucket +// if the bucket is not full. The caller must hold tab.mutex. +func (tab *Table) stuff(nodes []*Node) { outer: - for _, n := range entries { + for _, n := range nodes { if n.ID == tab.self.ID { - // don't add self. - continue + continue // don't add self } bucket := tab.buckets[logdist(tab.self.sha, n.sha)] for i := range bucket.entries { if bucket.entries[i].ID == n.ID { - // already in bucket - continue outer + continue outer // already in bucket } } if len(bucket.entries) < bucketSize { @@ -496,12 +501,11 @@ outer: } } -// del removes an entry from the node table (used to evacuate failed/non-bonded -// discovery peers). -func (tab *Table) del(node *Node) { +// delete removes an entry from the node table (used to evacuate +// failed/non-bonded discovery peers). +func (tab *Table) delete(node *Node) { tab.mutex.Lock() defer tab.mutex.Unlock() - bucket := tab.buckets[logdist(tab.self.sha, node.sha)] for i := range bucket.entries { if bucket.entries[i].ID == node.ID { @@ -511,6 +515,27 @@ func (tab *Table) del(node *Node) { } } +func (b *bucket) replace(n *Node, last *Node) bool { + // Don't add if b already contains n. + for i := range b.entries { + if b.entries[i].ID == n.ID { + return false + } + } + // Replace last if it is still the last entry or just add n if b + // isn't full. If is no longer the last entry, it has either been + // replaced with someone else or became active. + if len(b.entries) == bucketSize && (last == nil || b.entries[bucketSize-1].ID != last.ID) { + return false + } + if len(b.entries) < bucketSize { + b.entries = append(b.entries, nil) + } + copy(b.entries[1:], b.entries) + b.entries[0] = n + return true +} + func (b *bucket) bump(n *Node) bool { for i := range b.entries { if b.entries[i].ID == n.ID { diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index d259177bf..426f4e9cc 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -178,8 +178,8 @@ func TestTable_closest(t *testing.T) { test := func(test *closeTest) bool { // for any node table, Target and N tab := newTable(nil, test.Self, &net.UDPAddr{}, "") - tab.add(test.All) defer tab.Close() + tab.stuff(test.All) // check that doClosest(Target, N) returns nodes result := tab.closest(test.Target, test.N).entries @@ -240,7 +240,7 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) { defer tab.Close() for i := 0; i < len(buf); i++ { ld := cfg.Rand.Intn(len(tab.buckets)) - tab.add([]*Node{nodeAtDistance(tab.self.sha, ld)}) + tab.stuff([]*Node{nodeAtDistance(tab.self.sha, ld)}) } gotN := tab.ReadRandomNodes(buf) if gotN != tab.len() { @@ -288,7 +288,7 @@ func TestTable_Lookup(t *testing.T) { } // seed table with initial node (otherwise lookup will terminate immediately) seed := newNode(lookupTestnet.dists[256][0], net.IP{}, 256, 0) - tab.add([]*Node{seed}) + tab.stuff([]*Node{seed}) results := tab.Lookup(lookupTestnet.target) t.Logf("results:") diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go index 8d6d3e855..b913424dd 100644 --- a/p2p/discover/udp_test.go +++ b/p2p/discover/udp_test.go @@ -167,7 +167,7 @@ func TestUDP_findnode(t *testing.T) { for i := 0; i < bucketSize; i++ { nodes.push(nodeAtDistance(test.table.self.sha, i+2), bucketSize) } - test.table.add(nodes.entries) + test.table.stuff(nodes.entries) // ensure there's a bond with the test node, // findnode won't be accepted otherwise. |