aboutsummaryrefslogtreecommitdiffstats
path: root/p2p
diff options
context:
space:
mode:
Diffstat (limited to 'p2p')
-rw-r--r--p2p/discover/table.go121
-rw-r--r--p2p/discover/table_test.go6
-rw-r--r--p2p/discover/udp.go78
-rw-r--r--p2p/discover/udp_test.go75
4 files changed, 197 insertions, 83 deletions
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 67f7ec46f..b077f010c 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -78,9 +78,8 @@ type transport interface {
close()
}
-// bucket contains nodes, ordered by their last activity.
-// the entry that was most recently active is the last element
-// in entries.
+// bucket contains nodes, ordered by their last activity. the entry
+// that was most recently active is the first element in entries.
type bucket struct {
lastLookup time.Time
entries []*Node
@@ -235,7 +234,7 @@ func (tab *Table) Lookup(targetID NodeID) []*Node {
if fails >= maxFindnodeFailures {
glog.V(logger.Detail).Infof("Evacuating node %x: %d findnode failures", n.ID[:8], fails)
- tab.del(n)
+ tab.delete(n)
}
}
reply <- tab.bondall(r)
@@ -401,15 +400,11 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
node = w.n
}
}
- // Even if bonding temporarily failed, give the node a chance
if node != nil {
- tab.mutex.Lock()
- defer tab.mutex.Unlock()
-
- b := tab.buckets[logdist(tab.self.sha, node.sha)]
- if !b.bump(node) {
- tab.pingreplace(node, b)
- }
+ // Add the node to the table even if the bonding ping/pong
+ // fails. It will be relaced quickly if it continues to be
+ // unresponsive.
+ tab.add(node)
tab.db.updateFindFails(id, 0)
}
return node, result
@@ -420,7 +415,7 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
<-tab.bondslots
defer func() { tab.bondslots <- struct{}{} }()
- // Ping the remote side and wait for a pong
+ // Ping the remote side and wait for a pong.
if w.err = tab.ping(id, addr); w.err != nil {
close(w.done)
return
@@ -431,33 +426,14 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
// waitping will simply time out.
tab.net.waitping(id)
}
- // Bonding succeeded, update the node database
+ // Bonding succeeded, update the node database.
w.n = newNode(id, addr.IP, uint16(addr.Port), tcpPort)
tab.db.updateNode(w.n)
close(w.done)
}
-func (tab *Table) pingreplace(new *Node, b *bucket) {
- if len(b.entries) == bucketSize {
- oldest := b.entries[bucketSize-1]
- if err := tab.ping(oldest.ID, oldest.addr()); err == nil {
- // The node responded, we don't need to replace it.
- return
- }
- } else {
- // Add a slot at the end so the last entry doesn't
- // fall off when adding the new node.
- b.entries = append(b.entries, nil)
- }
- copy(b.entries[1:], b.entries)
- b.entries[0] = new
- if tab.nodeAddedHook != nil {
- tab.nodeAddedHook(new)
- }
-}
-
-// ping a remote endpoint and wait for a reply, also updating the node database
-// accordingly.
+// ping a remote endpoint and wait for a reply, also updating the node
+// database accordingly.
func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
// Update the last ping and send the message
tab.db.updateLastPing(id, time.Now())
@@ -467,24 +443,53 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error {
// Pong received, update the database and return
tab.db.updateLastPong(id, time.Now())
tab.db.ensureExpirer()
-
return nil
}
-// add puts the entries into the table if their corresponding
-// bucket is not full. The caller must hold tab.mutex.
-func (tab *Table) add(entries []*Node) {
+// add attempts to add the given node its corresponding bucket. If the
+// bucket has space available, adding the node succeeds immediately.
+// Otherwise, the node is added if the least recently active node in
+// the bucket does not respond to a ping packet.
+//
+// The caller must not hold tab.mutex.
+func (tab *Table) add(new *Node) {
+ b := tab.buckets[logdist(tab.self.sha, new.sha)]
+ tab.mutex.Lock()
+ if b.bump(new) {
+ tab.mutex.Unlock()
+ return
+ }
+ var oldest *Node
+ if len(b.entries) == bucketSize {
+ oldest = b.entries[bucketSize-1]
+ // Let go of the mutex so other goroutines can access
+ // the table while we ping the least recently active node.
+ tab.mutex.Unlock()
+ if err := tab.ping(oldest.ID, oldest.addr()); err == nil {
+ // The node responded, don't replace it.
+ return
+ }
+ tab.mutex.Lock()
+ }
+ added := b.replace(new, oldest)
+ tab.mutex.Unlock()
+ if added && tab.nodeAddedHook != nil {
+ tab.nodeAddedHook(new)
+ }
+}
+
+// stuff adds nodes the table to the end of their corresponding bucket
+// if the bucket is not full. The caller must hold tab.mutex.
+func (tab *Table) stuff(nodes []*Node) {
outer:
- for _, n := range entries {
+ for _, n := range nodes {
if n.ID == tab.self.ID {
- // don't add self.
- continue
+ continue // don't add self
}
bucket := tab.buckets[logdist(tab.self.sha, n.sha)]
for i := range bucket.entries {
if bucket.entries[i].ID == n.ID {
- // already in bucket
- continue outer
+ continue outer // already in bucket
}
}
if len(bucket.entries) < bucketSize {
@@ -496,12 +501,11 @@ outer:
}
}
-// del removes an entry from the node table (used to evacuate failed/non-bonded
-// discovery peers).
-func (tab *Table) del(node *Node) {
+// delete removes an entry from the node table (used to evacuate
+// failed/non-bonded discovery peers).
+func (tab *Table) delete(node *Node) {
tab.mutex.Lock()
defer tab.mutex.Unlock()
-
bucket := tab.buckets[logdist(tab.self.sha, node.sha)]
for i := range bucket.entries {
if bucket.entries[i].ID == node.ID {
@@ -511,6 +515,27 @@ func (tab *Table) del(node *Node) {
}
}
+func (b *bucket) replace(n *Node, last *Node) bool {
+ // Don't add if b already contains n.
+ for i := range b.entries {
+ if b.entries[i].ID == n.ID {
+ return false
+ }
+ }
+ // Replace last if it is still the last entry or just add n if b
+ // isn't full. If is no longer the last entry, it has either been
+ // replaced with someone else or became active.
+ if len(b.entries) == bucketSize && (last == nil || b.entries[bucketSize-1].ID != last.ID) {
+ return false
+ }
+ if len(b.entries) < bucketSize {
+ b.entries = append(b.entries, nil)
+ }
+ copy(b.entries[1:], b.entries)
+ b.entries[0] = n
+ return true
+}
+
func (b *bucket) bump(n *Node) bool {
for i := range b.entries {
if b.entries[i].ID == n.ID {
diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go
index d259177bf..426f4e9cc 100644
--- a/p2p/discover/table_test.go
+++ b/p2p/discover/table_test.go
@@ -178,8 +178,8 @@ func TestTable_closest(t *testing.T) {
test := func(test *closeTest) bool {
// for any node table, Target and N
tab := newTable(nil, test.Self, &net.UDPAddr{}, "")
- tab.add(test.All)
defer tab.Close()
+ tab.stuff(test.All)
// check that doClosest(Target, N) returns nodes
result := tab.closest(test.Target, test.N).entries
@@ -240,7 +240,7 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) {
defer tab.Close()
for i := 0; i < len(buf); i++ {
ld := cfg.Rand.Intn(len(tab.buckets))
- tab.add([]*Node{nodeAtDistance(tab.self.sha, ld)})
+ tab.stuff([]*Node{nodeAtDistance(tab.self.sha, ld)})
}
gotN := tab.ReadRandomNodes(buf)
if gotN != tab.len() {
@@ -288,7 +288,7 @@ func TestTable_Lookup(t *testing.T) {
}
// seed table with initial node (otherwise lookup will terminate immediately)
seed := newNode(lookupTestnet.dists[256][0], net.IP{}, 256, 0)
- tab.add([]*Node{seed})
+ tab.stuff([]*Node{seed})
results := tab.Lookup(lookupTestnet.target)
t.Logf("results:")
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index d7ca9000d..008e63937 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -18,6 +18,7 @@ package discover
import (
"bytes"
+ "container/list"
"crypto/ecdsa"
"errors"
"fmt"
@@ -43,6 +44,7 @@ var (
errUnsolicitedReply = errors.New("unsolicited reply")
errUnknownNode = errors.New("unknown node")
errTimeout = errors.New("RPC timeout")
+ errClockWarp = errors.New("reply deadline too far in the future")
errClosed = errors.New("socket closed")
)
@@ -296,7 +298,7 @@ func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-
}
func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
- matched := make(chan bool)
+ matched := make(chan bool, 1)
select {
case t.gotreply <- reply{from, ptype, req, matched}:
// loop will handle it
@@ -310,68 +312,82 @@ func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
// the refresh timer and the pending reply queue.
func (t *udp) loop() {
var (
- pending []*pending
- nextDeadline time.Time
- timeout = time.NewTimer(0)
- refresh = time.NewTicker(refreshInterval)
+ plist = list.New()
+ timeout = time.NewTimer(0)
+ nextTimeout *pending // head of plist when timeout was last reset
+ refresh = time.NewTicker(refreshInterval)
)
<-timeout.C // ignore first timeout
defer refresh.Stop()
defer timeout.Stop()
- rearmTimeout := func() {
- now := time.Now()
- if len(pending) == 0 || now.Before(nextDeadline) {
+ resetTimeout := func() {
+ if plist.Front() == nil || nextTimeout == plist.Front().Value {
return
}
- nextDeadline = pending[0].deadline
- timeout.Reset(nextDeadline.Sub(now))
+ // Start the timer so it fires when the next pending reply has expired.
+ now := time.Now()
+ for el := plist.Front(); el != nil; el = el.Next() {
+ nextTimeout = el.Value.(*pending)
+ if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout {
+ timeout.Reset(dist)
+ return
+ }
+ // Remove pending replies whose deadline is too far in the
+ // future. These can occur if the system clock jumped
+ // backwards after the deadline was assigned.
+ nextTimeout.errc <- errClockWarp
+ plist.Remove(el)
+ }
+ nextTimeout = nil
+ timeout.Stop()
}
for {
+ resetTimeout()
+
select {
case <-refresh.C:
go t.refresh()
case <-t.closing:
- for _, p := range pending {
- p.errc <- errClosed
+ for el := plist.Front(); el != nil; el = el.Next() {
+ el.Value.(*pending).errc <- errClosed
}
- pending = nil
return
case p := <-t.addpending:
p.deadline = time.Now().Add(respTimeout)
- pending = append(pending, p)
- rearmTimeout()
+ plist.PushBack(p)
case r := <-t.gotreply:
var matched bool
- for i := 0; i < len(pending); i++ {
- if p := pending[i]; p.from == r.from && p.ptype == r.ptype {
+ for el := plist.Front(); el != nil; el = el.Next() {
+ p := el.Value.(*pending)
+ if p.from == r.from && p.ptype == r.ptype {
matched = true
+ // Remove the matcher if its callback indicates
+ // that all replies have been received. This is
+ // required for packet types that expect multiple
+ // reply packets.
if p.callback(r.data) {
- // callback indicates the request is done, remove it.
p.errc <- nil
- copy(pending[i:], pending[i+1:])
- pending = pending[:len(pending)-1]
- i--
+ plist.Remove(el)
}
}
}
r.matched <- matched
case now := <-timeout.C:
- // notify and remove callbacks whose deadline is in the past.
- i := 0
- for ; i < len(pending) && now.After(pending[i].deadline); i++ {
- pending[i].errc <- errTimeout
- }
- if i > 0 {
- copy(pending, pending[i:])
- pending = pending[:len(pending)-i]
+ nextTimeout = nil
+ // Notify and remove callbacks whose deadline is in the past.
+ for el := plist.Front(); el != nil; el = el.Next() {
+ p := el.Value.(*pending)
+ if now.After(p.deadline) || now.Equal(p.deadline) {
+ p.errc <- errTimeout
+ plist.Remove(el)
+ }
}
- rearmTimeout()
}
}
}
@@ -385,7 +401,7 @@ const (
var (
headSpace = make([]byte, headSize)
- // Neighbors responses are sent across multiple packets to
+ // Neighbors replies are sent across multiple packets to
// stay below the 1280 byte limit. We compute the maximum number
// of entries by stuffing a packet until it grows too large.
maxNeighbors int
diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go
index 8d6d3e855..a86d3737b 100644
--- a/p2p/discover/udp_test.go
+++ b/p2p/discover/udp_test.go
@@ -19,10 +19,12 @@ package discover
import (
"bytes"
"crypto/ecdsa"
+ "encoding/binary"
"errors"
"fmt"
"io"
logpkg "log"
+ "math/rand"
"net"
"os"
"path/filepath"
@@ -138,6 +140,77 @@ func TestUDP_pingTimeout(t *testing.T) {
}
}
+func TestUDP_responseTimeouts(t *testing.T) {
+ t.Parallel()
+ test := newUDPTest(t)
+ defer test.table.Close()
+
+ rand.Seed(time.Now().UnixNano())
+ randomDuration := func(max time.Duration) time.Duration {
+ return time.Duration(rand.Int63n(int64(max)))
+ }
+
+ var (
+ nReqs = 200
+ nTimeouts = 0 // number of requests with ptype > 128
+ nilErr = make(chan error, nReqs) // for requests that get a reply
+ timeoutErr = make(chan error, nReqs) // for requests that time out
+ )
+ for i := 0; i < nReqs; i++ {
+ // Create a matcher for a random request in udp.loop. Requests
+ // with ptype <= 128 will not get a reply and should time out.
+ // For all other requests, a reply is scheduled to arrive
+ // within the timeout window.
+ p := &pending{
+ ptype: byte(rand.Intn(255)),
+ callback: func(interface{}) bool { return true },
+ }
+ binary.BigEndian.PutUint64(p.from[:], uint64(i))
+ if p.ptype <= 128 {
+ p.errc = timeoutErr
+ nTimeouts++
+ } else {
+ p.errc = nilErr
+ time.AfterFunc(randomDuration(60*time.Millisecond), func() {
+ if !test.udp.handleReply(p.from, p.ptype, nil) {
+ t.Logf("not matched: %v", p)
+ }
+ })
+ }
+ test.udp.addpending <- p
+ time.Sleep(randomDuration(30 * time.Millisecond))
+ }
+
+ // Check that all timeouts were delivered and that the rest got nil errors.
+ // The replies must be delivered.
+ var (
+ recvDeadline = time.After(20 * time.Second)
+ nTimeoutsRecv, nNil = 0, 0
+ )
+ for i := 0; i < nReqs; i++ {
+ select {
+ case err := <-timeoutErr:
+ if err != errTimeout {
+ t.Fatalf("got non-timeout error on timeoutErr %d: %v", i, err)
+ }
+ nTimeoutsRecv++
+ case err := <-nilErr:
+ if err != nil {
+ t.Fatalf("got non-nil error on nilErr %d: %v", i, err)
+ }
+ nNil++
+ case <-recvDeadline:
+ t.Fatalf("exceeded recv deadline")
+ }
+ }
+ if nTimeoutsRecv != nTimeouts {
+ t.Errorf("wrong number of timeout errors received: got %d, want %d", nTimeoutsRecv, nTimeouts)
+ }
+ if nNil != nReqs-nTimeouts {
+ t.Errorf("wrong number of successful replies: got %d, want %d", nNil, nReqs-nTimeouts)
+ }
+}
+
func TestUDP_findnodeTimeout(t *testing.T) {
t.Parallel()
test := newUDPTest(t)
@@ -167,7 +240,7 @@ func TestUDP_findnode(t *testing.T) {
for i := 0; i < bucketSize; i++ {
nodes.push(nodeAtDistance(test.table.self.sha, i+2), bucketSize)
}
- test.table.add(nodes.entries)
+ test.table.stuff(nodes.entries)
// ensure there's a bond with the test node,
// findnode won't be accepted otherwise.