aboutsummaryrefslogtreecommitdiffstats
path: root/p2p
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2015-04-01 23:10:10 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2015-04-01 23:10:10 +0800
commitfd171eff7f5bb6822c927d982dcf018abaa9ae3c (patch)
tree1ec9ee7788ccd040f46e2bf50873b7db258f1d92 /p2p
parent101ea1a1e8e8f7b0592dbd06e4eca8432e2e2527 (diff)
parent76218959ab36828f340a113e3b3b7dffabb1f36a (diff)
downloaddexon-fd171eff7f5bb6822c927d982dcf018abaa9ae3c.tar.gz
dexon-fd171eff7f5bb6822c927d982dcf018abaa9ae3c.tar.zst
dexon-fd171eff7f5bb6822c927d982dcf018abaa9ae3c.zip
Merge pull request #592 from fjl/disco-ping-pong
Discovery bonding protocol
Diffstat (limited to 'p2p')
-rw-r--r--p2p/discover/node.go43
-rw-r--r--p2p/discover/table.go185
-rw-r--r--p2p/discover/table_test.go202
-rw-r--r--p2p/discover/udp.go218
-rw-r--r--p2p/discover/udp_test.go422
5 files changed, 693 insertions, 377 deletions
diff --git a/p2p/discover/node.go b/p2p/discover/node.go
index e1130e0b5..99cb549a5 100644
--- a/p2p/discover/node.go
+++ b/p2p/discover/node.go
@@ -13,6 +13,8 @@ import (
"net/url"
"strconv"
"strings"
+ "sync"
+ "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/crypto"
@@ -30,7 +32,8 @@ type Node struct {
DiscPort int // UDP listening port for discovery protocol
TCPPort int // TCP listening port for RLPx
- active time.Time
+ // this must be set/read using atomic load and store.
+ activeStamp int64
}
func newNode(id NodeID, addr *net.UDPAddr) *Node {
@@ -39,7 +42,6 @@ func newNode(id NodeID, addr *net.UDPAddr) *Node {
IP: addr.IP,
DiscPort: addr.Port,
TCPPort: addr.Port,
- active: time.Now(),
}
}
@@ -48,6 +50,20 @@ func (n *Node) isValid() bool {
return !n.IP.IsMulticast() && !n.IP.IsUnspecified() && n.TCPPort != 0 && n.DiscPort != 0
}
+func (n *Node) bumpActive() {
+ stamp := time.Now().Unix()
+ atomic.StoreInt64(&n.activeStamp, stamp)
+}
+
+func (n *Node) active() time.Time {
+ stamp := atomic.LoadInt64(&n.activeStamp)
+ return time.Unix(stamp, 0)
+}
+
+func (n *Node) addr() *net.UDPAddr {
+ return &net.UDPAddr{IP: n.IP, Port: n.DiscPort}
+}
+
// The string representation of a Node is a URL.
// Please see ParseNode for a description of the format.
func (n *Node) String() string {
@@ -304,3 +320,26 @@ func randomID(a NodeID, n int) (b NodeID) {
}
return b
}
+
+// nodeDB stores all nodes we know about.
+type nodeDB struct {
+ mu sync.RWMutex
+ byID map[NodeID]*Node
+}
+
+func (db *nodeDB) get(id NodeID) *Node {
+ db.mu.RLock()
+ defer db.mu.RUnlock()
+ return db.byID[id]
+}
+
+func (db *nodeDB) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node {
+ db.mu.Lock()
+ defer db.mu.Unlock()
+ if db.byID == nil {
+ db.byID = make(map[NodeID]*Node)
+ }
+ n := &Node{ID: id, IP: addr.IP, DiscPort: addr.Port, TCPPort: int(tcpPort)}
+ db.byID[n.ID] = n
+ return n
+}
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 33b705a12..dbf86c084 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -14,9 +14,10 @@ import (
)
const (
- alpha = 3 // Kademlia concurrency factor
- bucketSize = 16 // Kademlia bucket size
- nBuckets = nodeIDBits + 1 // Number of buckets
+ alpha = 3 // Kademlia concurrency factor
+ bucketSize = 16 // Kademlia bucket size
+ nBuckets = nodeIDBits + 1 // Number of buckets
+ maxBondingPingPongs = 10
)
type Table struct {
@@ -24,27 +25,50 @@ type Table struct {
buckets [nBuckets]*bucket // index of known nodes by distance
nursery []*Node // bootstrap nodes
+ bondmu sync.Mutex
+ bonding map[NodeID]*bondproc
+ bondslots chan struct{} // limits total number of active bonding processes
+
net transport
self *Node // metadata of the local node
+ db *nodeDB
+}
+
+type bondproc struct {
+ err error
+ n *Node
+ done chan struct{}
}
// transport is implemented by the UDP transport.
// it is an interface so we can test without opening lots of UDP
// sockets and without generating a private key.
type transport interface {
- ping(*Node) error
- findnode(e *Node, target NodeID) ([]*Node, error)
+ ping(NodeID, *net.UDPAddr) error
+ waitping(NodeID) error
+ findnode(toid NodeID, addr *net.UDPAddr, target NodeID) ([]*Node, error)
close()
}
// bucket contains nodes, ordered by their last activity.
+// the entry that was most recently active is the last element
+// in entries.
type bucket struct {
lastLookup time.Time
entries []*Node
}
func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr) *Table {
- tab := &Table{net: t, self: newNode(ourID, ourAddr)}
+ tab := &Table{
+ net: t,
+ db: new(nodeDB),
+ self: newNode(ourID, ourAddr),
+ bonding: make(map[NodeID]*bondproc),
+ bondslots: make(chan struct{}, maxBondingPingPongs),
+ }
+ for i := 0; i < cap(tab.bondslots); i++ {
+ tab.bondslots <- struct{}{}
+ }
for i := range tab.buckets {
tab.buckets[i] = new(bucket)
}
@@ -107,8 +131,8 @@ func (tab *Table) Lookup(target NodeID) []*Node {
asked[n.ID] = true
pendingQueries++
go func() {
- result, _ := tab.net.findnode(n, target)
- reply <- result
+ r, _ := tab.net.findnode(n.ID, n.addr(), target)
+ reply <- tab.bondall(r)
}()
}
}
@@ -116,13 +140,11 @@ func (tab *Table) Lookup(target NodeID) []*Node {
// we have asked all closest nodes, stop the search
break
}
-
// wait for the next reply
for _, n := range <-reply {
- cn := n
- if !seen[n.ID] {
+ if n != nil && !seen[n.ID] {
seen[n.ID] = true
- result.push(cn, bucketSize)
+ result.push(n, bucketSize)
}
}
pendingQueries--
@@ -145,8 +167,9 @@ func (tab *Table) refresh() {
result := tab.Lookup(randomID(tab.self.ID, ld))
if len(result) == 0 {
// bootstrap the table with a self lookup
+ all := tab.bondall(tab.nursery)
tab.mutex.Lock()
- tab.add(tab.nursery)
+ tab.add(all)
tab.mutex.Unlock()
tab.Lookup(tab.self.ID)
// TODO: the Kademlia paper says that we're supposed to perform
@@ -176,45 +199,105 @@ func (tab *Table) len() (n int) {
return n
}
-// bumpOrAdd updates the activity timestamp for the given node and
-// attempts to insert the node into a bucket. The returned Node might
-// not be part of the table. The caller must hold tab.mutex.
-func (tab *Table) bumpOrAdd(node NodeID, from *net.UDPAddr) (n *Node) {
- b := tab.buckets[logdist(tab.self.ID, node)]
- if n = b.bump(node); n == nil {
- n = newNode(node, from)
- if len(b.entries) == bucketSize {
- tab.pingReplace(n, b)
- } else {
- b.entries = append(b.entries, n)
+// bondall bonds with all given nodes concurrently and returns
+// those nodes for which bonding has probably succeeded.
+func (tab *Table) bondall(nodes []*Node) (result []*Node) {
+ rc := make(chan *Node, len(nodes))
+ for i := range nodes {
+ go func(n *Node) {
+ nn, _ := tab.bond(false, n.ID, n.addr(), uint16(n.TCPPort))
+ rc <- nn
+ }(nodes[i])
+ }
+ for _ = range nodes {
+ if n := <-rc; n != nil {
+ result = append(result, n)
}
}
- return n
+ return result
}
-func (tab *Table) pingReplace(n *Node, b *bucket) {
- old := b.entries[bucketSize-1]
- go func() {
- if err := tab.net.ping(old); err == nil {
- // it responded, we don't need to replace it.
- return
+// bond ensures the local node has a bond with the given remote node.
+// It also attempts to insert the node into the table if bonding succeeds.
+// The caller must not hold tab.mutex.
+//
+// A bond is must be established before sending findnode requests.
+// Both sides must have completed a ping/pong exchange for a bond to
+// exist. The total number of active bonding processes is limited in
+// order to restrain network use.
+//
+// bond is meant to operate idempotently in that bonding with a remote
+// node which still remembers a previously established bond will work.
+// The remote node will simply not send a ping back, causing waitping
+// to time out.
+//
+// If pinged is true, the remote node has just pinged us and one half
+// of the process can be skipped.
+func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) {
+ var n *Node
+ if n = tab.db.get(id); n == nil {
+ tab.bondmu.Lock()
+ w := tab.bonding[id]
+ if w != nil {
+ // Wait for an existing bonding process to complete.
+ tab.bondmu.Unlock()
+ <-w.done
+ } else {
+ // Register a new bonding process.
+ w = &bondproc{done: make(chan struct{})}
+ tab.bonding[id] = w
+ tab.bondmu.Unlock()
+ // Do the ping/pong. The result goes into w.
+ tab.pingpong(w, pinged, id, addr, tcpPort)
+ // Unregister the process after it's done.
+ tab.bondmu.Lock()
+ delete(tab.bonding, id)
+ tab.bondmu.Unlock()
}
- // it didn't respond, replace the node if it is still the oldest node.
- tab.mutex.Lock()
- if len(b.entries) > 0 && b.entries[len(b.entries)-1] == old {
- // slide down other entries and put the new one in front.
- // TODO: insert in correct position to keep the order
- copy(b.entries[1:], b.entries)
- b.entries[0] = n
+ n = w.n
+ if w.err != nil {
+ return nil, w.err
}
- tab.mutex.Unlock()
- }()
+ }
+ tab.mutex.Lock()
+ defer tab.mutex.Unlock()
+ if b := tab.buckets[logdist(tab.self.ID, n.ID)]; !b.bump(n) {
+ tab.pingreplace(n, b)
+ }
+ return n, nil
+}
+
+func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) {
+ <-tab.bondslots
+ defer func() { tab.bondslots <- struct{}{} }()
+ if w.err = tab.net.ping(id, addr); w.err != nil {
+ close(w.done)
+ return
+ }
+ if !pinged {
+ // Give the remote node a chance to ping us before we start
+ // sending findnode requests. If they still remember us,
+ // waitping will simply time out.
+ tab.net.waitping(id)
+ }
+ w.n = tab.db.add(id, addr, tcpPort)
+ close(w.done)
}
-// bump updates the activity timestamp for the given node.
-// The caller must hold tab.mutex.
-func (tab *Table) bump(node NodeID) {
- tab.buckets[logdist(tab.self.ID, node)].bump(node)
+func (tab *Table) pingreplace(new *Node, b *bucket) {
+ if len(b.entries) == bucketSize {
+ oldest := b.entries[bucketSize-1]
+ if err := tab.net.ping(oldest.ID, oldest.addr()); err == nil {
+ // The node responded, we don't need to replace it.
+ return
+ }
+ } else {
+ // Add a slot at the end so the last entry doesn't
+ // fall off when adding the new node.
+ b.entries = append(b.entries, nil)
+ }
+ copy(b.entries[1:], b.entries)
+ b.entries[0] = new
}
// add puts the entries into the table if their corresponding
@@ -240,17 +323,17 @@ outer:
}
}
-func (b *bucket) bump(id NodeID) *Node {
- for i, n := range b.entries {
- if n.ID == id {
- n.active = time.Now()
+func (b *bucket) bump(n *Node) bool {
+ for i := range b.entries {
+ if b.entries[i].ID == n.ID {
+ n.bumpActive()
// move it to the front
- copy(b.entries[1:], b.entries[:i+1])
+ copy(b.entries[1:], b.entries[:i])
b.entries[0] = n
- return n
+ return true
}
}
- return nil
+ return false
}
// nodesByDistance is a list of nodes, ordered by
diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go
index 08faea68e..a98376bca 100644
--- a/p2p/discover/table_test.go
+++ b/p2p/discover/table_test.go
@@ -2,78 +2,109 @@ package discover
import (
"crypto/ecdsa"
- "errors"
"fmt"
"math/rand"
"net"
"reflect"
"testing"
"testing/quick"
- "time"
"github.com/ethereum/go-ethereum/crypto"
)
-func TestTable_bumpOrAddBucketAssign(t *testing.T) {
- tab := newTable(nil, NodeID{}, &net.UDPAddr{})
- for i := 1; i < len(tab.buckets); i++ {
- tab.bumpOrAdd(randomID(tab.self.ID, i), &net.UDPAddr{})
- }
- for i, b := range tab.buckets {
- if i > 0 && len(b.entries) != 1 {
- t.Errorf("bucket %d has %d entries, want 1", i, len(b.entries))
+func TestTable_pingReplace(t *testing.T) {
+ doit := func(newNodeIsResponding, lastInBucketIsResponding bool) {
+ transport := newPingRecorder()
+ tab := newTable(transport, NodeID{}, &net.UDPAddr{})
+ last := fillBucket(tab, 200)
+ pingSender := randomID(tab.self.ID, 200)
+
+ // this gotPing should replace the last node
+ // if the last node is not responding.
+ transport.responding[last.ID] = lastInBucketIsResponding
+ transport.responding[pingSender] = newNodeIsResponding
+ tab.bond(true, pingSender, &net.UDPAddr{}, 0)
+
+ // first ping goes to sender (bonding pingback)
+ if !transport.pinged[pingSender] {
+ t.Error("table did not ping back sender")
+ }
+ if newNodeIsResponding {
+ // second ping goes to oldest node in bucket
+ // to see whether it is still alive.
+ if !transport.pinged[last.ID] {
+ t.Error("table did not ping last node in bucket")
+ }
}
- }
-}
-
-func TestTable_bumpOrAddPingReplace(t *testing.T) {
- pingC := make(pingC)
- tab := newTable(pingC, NodeID{}, &net.UDPAddr{})
- last := fillBucket(tab, 200)
- // this bumpOrAdd should not replace the last node
- // because the node replies to ping.
- new := tab.bumpOrAdd(randomID(tab.self.ID, 200), &net.UDPAddr{})
+ tab.mutex.Lock()
+ defer tab.mutex.Unlock()
+ if l := len(tab.buckets[200].entries); l != bucketSize {
+ t.Errorf("wrong bucket size after gotPing: got %d, want %d", bucketSize, l)
+ }
- pinged := <-pingC
- if pinged != last.ID {
- t.Fatalf("pinged wrong node: %v\nwant %v", pinged, last.ID)
+ if lastInBucketIsResponding || !newNodeIsResponding {
+ if !contains(tab.buckets[200].entries, last.ID) {
+ t.Error("last entry was removed")
+ }
+ if contains(tab.buckets[200].entries, pingSender) {
+ t.Error("new entry was added")
+ }
+ } else {
+ if contains(tab.buckets[200].entries, last.ID) {
+ t.Error("last entry was not removed")
+ }
+ if !contains(tab.buckets[200].entries, pingSender) {
+ t.Error("new entry was not added")
+ }
+ }
}
- tab.mutex.Lock()
- defer tab.mutex.Unlock()
- if l := len(tab.buckets[200].entries); l != bucketSize {
- t.Errorf("wrong bucket size after bumpOrAdd: got %d, want %d", bucketSize, l)
- }
- if !contains(tab.buckets[200].entries, last.ID) {
- t.Error("last entry was removed")
- }
- if contains(tab.buckets[200].entries, new.ID) {
- t.Error("new entry was added")
- }
+ doit(true, true)
+ doit(false, true)
+ doit(false, true)
+ doit(false, false)
}
-func TestTable_bumpOrAddPingTimeout(t *testing.T) {
- tab := newTable(pingC(nil), NodeID{}, &net.UDPAddr{})
- last := fillBucket(tab, 200)
-
- // this bumpOrAdd should replace the last node
- // because the node does not reply to ping.
- new := tab.bumpOrAdd(randomID(tab.self.ID, 200), &net.UDPAddr{})
-
- // wait for async bucket update. damn. this needs to go away.
- time.Sleep(2 * time.Millisecond)
-
- tab.mutex.Lock()
- defer tab.mutex.Unlock()
- if l := len(tab.buckets[200].entries); l != bucketSize {
- t.Errorf("wrong bucket size after bumpOrAdd: got %d, want %d", bucketSize, l)
+func TestBucket_bumpNoDuplicates(t *testing.T) {
+ t.Parallel()
+ cfg := &quick.Config{
+ MaxCount: 1000,
+ Rand: quickrand,
+ Values: func(args []reflect.Value, rand *rand.Rand) {
+ // generate a random list of nodes. this will be the content of the bucket.
+ n := rand.Intn(bucketSize-1) + 1
+ nodes := make([]*Node, n)
+ for i := range nodes {
+ nodes[i] = &Node{ID: randomID(NodeID{}, 200)}
+ }
+ args[0] = reflect.ValueOf(nodes)
+ // generate random bump positions.
+ bumps := make([]int, rand.Intn(100))
+ for i := range bumps {
+ bumps[i] = rand.Intn(len(nodes))
+ }
+ args[1] = reflect.ValueOf(bumps)
+ },
}
- if contains(tab.buckets[200].entries, last.ID) {
- t.Error("last entry was not removed")
+
+ prop := func(nodes []*Node, bumps []int) (ok bool) {
+ b := &bucket{entries: make([]*Node, len(nodes))}
+ copy(b.entries, nodes)
+ for i, pos := range bumps {
+ b.bump(b.entries[pos])
+ if hasDuplicates(b.entries) {
+ t.Logf("bucket has duplicates after %d/%d bumps:", i+1, len(bumps))
+ for _, n := range b.entries {
+ t.Logf(" %p", n)
+ }
+ return false
+ }
+ }
+ return true
}
- if !contains(tab.buckets[200].entries, new.ID) {
- t.Error("new entry was not added")
+ if err := quick.Check(prop, cfg); err != nil {
+ t.Error(err)
}
}
@@ -85,44 +116,27 @@ func fillBucket(tab *Table, ld int) (last *Node) {
return b.entries[bucketSize-1]
}
-type pingC chan NodeID
+type pingRecorder struct{ responding, pinged map[NodeID]bool }
-func (t pingC) findnode(n *Node, target NodeID) ([]*Node, error) {
+func newPingRecorder() *pingRecorder {
+ return &pingRecorder{make(map[NodeID]bool), make(map[NodeID]bool)}
+}
+
+func (t *pingRecorder) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
panic("findnode called on pingRecorder")
}
-func (t pingC) close() {
+func (t *pingRecorder) close() {
panic("close called on pingRecorder")
}
-func (t pingC) ping(n *Node) error {
- if t == nil {
- return errTimeout
- }
- t <- n.ID
- return nil
+func (t *pingRecorder) waitping(from NodeID) error {
+ return nil // remote always pings
}
-
-func TestTable_bump(t *testing.T) {
- tab := newTable(nil, NodeID{}, &net.UDPAddr{})
-
- // add an old entry and two recent ones
- oldactive := time.Now().Add(-2 * time.Minute)
- old := &Node{ID: randomID(tab.self.ID, 200), active: oldactive}
- others := []*Node{
- &Node{ID: randomID(tab.self.ID, 200), active: time.Now()},
- &Node{ID: randomID(tab.self.ID, 200), active: time.Now()},
- }
- tab.add(append(others, old))
- if tab.buckets[200].entries[0] == old {
- t.Fatal("old entry is at front of bucket")
- }
-
- // bumping the old entry should move it to the front
- tab.bump(old.ID)
- if old.active == oldactive {
- t.Error("activity timestamp not updated")
- }
- if tab.buckets[200].entries[0] != old {
- t.Errorf("bumped entry did not move to the front of bucket")
+func (t *pingRecorder) ping(toid NodeID, toaddr *net.UDPAddr) error {
+ t.pinged[toid] = true
+ if t.responding[toid] {
+ return nil
+ } else {
+ return errTimeout
}
}
@@ -210,7 +224,7 @@ func TestTable_Lookup(t *testing.T) {
t.Fatalf("lookup on empty table returned %d results: %#v", len(results), results)
}
// seed table with initial node (otherwise lookup will terminate immediately)
- tab.bumpOrAdd(randomID(target, 200), &net.UDPAddr{Port: 200})
+ tab.add([]*Node{newNode(randomID(target, 200), &net.UDPAddr{Port: 200})})
results := tab.Lookup(target)
t.Logf("results:")
@@ -238,16 +252,16 @@ type findnodeOracle struct {
target NodeID
}
-func (t findnodeOracle) findnode(n *Node, target NodeID) ([]*Node, error) {
- t.t.Logf("findnode query at dist %d", n.DiscPort)
+func (t findnodeOracle) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
+ t.t.Logf("findnode query at dist %d", toaddr.Port)
// current log distance is encoded in port number
var result []*Node
- switch n.DiscPort {
+ switch toaddr.Port {
case 0:
panic("query to node at distance 0")
default:
// TODO: add more randomness to distances
- next := n.DiscPort - 1
+ next := toaddr.Port - 1
for i := 0; i < bucketSize; i++ {
result = append(result, &Node{ID: randomID(t.target, next), DiscPort: next})
}
@@ -255,11 +269,9 @@ func (t findnodeOracle) findnode(n *Node, target NodeID) ([]*Node, error) {
return result, nil
}
-func (t findnodeOracle) close() {}
-
-func (t findnodeOracle) ping(n *Node) error {
- return errors.New("ping is not supported by this transport")
-}
+func (t findnodeOracle) close() {}
+func (t findnodeOracle) waitping(from NodeID) error { return nil }
+func (t findnodeOracle) ping(toid NodeID, toaddr *net.UDPAddr) error { return nil }
func hasDuplicates(slice []*Node) bool {
seen := make(map[NodeID]bool)
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index 69e9f3c2e..e9ede1397 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -16,13 +16,18 @@ import (
var log = logger.NewLogger("P2P Discovery")
+const Version = 3
+
// Errors
var (
- errPacketTooSmall = errors.New("too small")
- errBadHash = errors.New("bad hash")
- errExpired = errors.New("expired")
- errTimeout = errors.New("RPC timeout")
- errClosed = errors.New("socket closed")
+ errPacketTooSmall = errors.New("too small")
+ errBadHash = errors.New("bad hash")
+ errExpired = errors.New("expired")
+ errBadVersion = errors.New("version mismatch")
+ errUnsolicitedReply = errors.New("unsolicited reply")
+ errUnknownNode = errors.New("unknown node")
+ errTimeout = errors.New("RPC timeout")
+ errClosed = errors.New("socket closed")
)
// Timeouts
@@ -45,6 +50,7 @@ const (
// RPC request structures
type (
ping struct {
+ Version uint // must match Version
IP string // our IP
Port uint16 // our port
Expiration uint64
@@ -76,14 +82,27 @@ type rpcNode struct {
ID NodeID
}
+type packet interface {
+ handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
+}
+
+type conn interface {
+ ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error)
+ WriteToUDP(b []byte, addr *net.UDPAddr) (n int, err error)
+ Close() error
+ LocalAddr() net.Addr
+}
+
// udp implements the RPC protocol.
type udp struct {
- conn *net.UDPConn
- priv *ecdsa.PrivateKey
+ conn conn
+ priv *ecdsa.PrivateKey
+
addpending chan *pending
- replies chan reply
- closing chan struct{}
- nat nat.Interface
+ gotreply chan reply
+
+ closing chan struct{}
+ nat nat.Interface
*Table
}
@@ -120,6 +139,9 @@ type reply struct {
from NodeID
ptype byte
data interface{}
+ // loop indicates whether there was
+ // a matching request by sending on this channel.
+ matched chan<- bool
}
// ListenUDP returns a new table that listens for UDP packets on laddr.
@@ -132,15 +154,20 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface) (*Table
if err != nil {
return nil, err
}
+ tab, _ := newUDP(priv, conn, natm)
+ log.Infoln("Listening,", tab.self)
+ return tab, nil
+}
+
+func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface) (*Table, *udp) {
udp := &udp{
- conn: conn,
+ conn: c,
priv: priv,
closing: make(chan struct{}),
+ gotreply: make(chan reply),
addpending: make(chan *pending),
- replies: make(chan reply),
}
-
- realaddr := conn.LocalAddr().(*net.UDPAddr)
+ realaddr := c.LocalAddr().(*net.UDPAddr)
if natm != nil {
if !realaddr.IP.IsLoopback() {
go nat.Map(natm, udp.closing, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
@@ -151,11 +178,9 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface) (*Table
}
}
udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr)
-
go udp.loop()
go udp.readLoop()
- log.Infoln("Listening, ", udp.self)
- return udp.Table, nil
+ return udp.Table, udp
}
func (t *udp) close() {
@@ -165,10 +190,11 @@ func (t *udp) close() {
}
// ping sends a ping message to the given node and waits for a reply.
-func (t *udp) ping(e *Node) error {
+func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error {
// TODO: maybe check for ReplyTo field in callback to measure RTT
- errc := t.pending(e.ID, pongPacket, func(interface{}) bool { return true })
- t.send(e, pingPacket, ping{
+ errc := t.pending(toid, pongPacket, func(interface{}) bool { return true })
+ t.send(toaddr, pingPacket, ping{
+ Version: Version,
IP: t.self.IP.String(),
Port: uint16(t.self.TCPPort),
Expiration: uint64(time.Now().Add(expiration).Unix()),
@@ -176,12 +202,16 @@ func (t *udp) ping(e *Node) error {
return <-errc
}
+func (t *udp) waitping(from NodeID) error {
+ return <-t.pending(from, pingPacket, func(interface{}) bool { return true })
+}
+
// findnode sends a findnode request to the given node and waits until
// the node has sent up to k neighbors.
-func (t *udp) findnode(to *Node, target NodeID) ([]*Node, error) {
+func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) {
nodes := make([]*Node, 0, bucketSize)
nreceived := 0
- errc := t.pending(to.ID, neighborsPacket, func(r interface{}) bool {
+ errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
reply := r.(*neighbors)
for _, n := range reply.Nodes {
nreceived++
@@ -191,8 +221,7 @@ func (t *udp) findnode(to *Node, target NodeID) ([]*Node, error) {
}
return nreceived >= bucketSize
})
-
- t.send(to, findnodePacket, findnode{
+ t.send(toaddr, findnodePacket, findnode{
Target: target,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
@@ -214,6 +243,17 @@ func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-
return ch
}
+func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool {
+ matched := make(chan bool)
+ select {
+ case t.gotreply <- reply{from, ptype, req, matched}:
+ // loop will handle it
+ return <-matched
+ case <-t.closing:
+ return false
+ }
+}
+
// loop runs in its own goroutin. it keeps track of
// the refresh timer and the pending reply queue.
func (t *udp) loop() {
@@ -244,6 +284,7 @@ func (t *udp) loop() {
for _, p := range pending {
p.errc <- errClosed
}
+ pending = nil
return
case p := <-t.addpending:
@@ -251,18 +292,21 @@ func (t *udp) loop() {
pending = append(pending, p)
rearmTimeout()
- case reply := <-t.replies:
- // run matching callbacks, remove if they return false.
+ case r := <-t.gotreply:
+ var matched bool
for i := 0; i < len(pending); i++ {
- p := pending[i]
- if reply.from == p.from && reply.ptype == p.ptype && p.callback(reply.data) {
- p.errc <- nil
- copy(pending[i:], pending[i+1:])
- pending = pending[:len(pending)-1]
- i--
+ if p := pending[i]; p.from == r.from && p.ptype == r.ptype {
+ matched = true
+ if p.callback(r.data) {
+ // callback indicates the request is done, remove it.
+ p.errc <- nil
+ copy(pending[i:], pending[i+1:])
+ pending = pending[:len(pending)-1]
+ i--
+ }
}
}
- rearmTimeout()
+ r.matched <- matched
case now := <-timeout.C:
// notify and remove callbacks whose deadline is in the past.
@@ -287,33 +331,38 @@ const (
var headSpace = make([]byte, headSize)
-func (t *udp) send(to *Node, ptype byte, req interface{}) error {
+func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req interface{}) error {
+ packet, err := encodePacket(t.priv, ptype, req)
+ if err != nil {
+ return err
+ }
+ log.DebugDetailf(">>> %v %T %v\n", toaddr, req, req)
+ if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
+ log.DebugDetailln("UDP send failed:", err)
+ }
+ return err
+}
+
+func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte, error) {
b := new(bytes.Buffer)
b.Write(headSpace)
b.WriteByte(ptype)
if err := rlp.Encode(b, req); err != nil {
log.Errorln("error encoding packet:", err)
- return err
+ return nil, err
}
-
packet := b.Bytes()
- sig, err := crypto.Sign(crypto.Sha3(packet[headSize:]), t.priv)
+ sig, err := crypto.Sign(crypto.Sha3(packet[headSize:]), priv)
if err != nil {
log.Errorln("could not sign packet:", err)
- return err
+ return nil, err
}
copy(packet[macSize:], sig)
// add the hash to the front. Note: this doesn't protect the
// packet in any way. Our public key will be part of this hash in
- // the future.
+ // The future.
copy(packet, crypto.Sha3(packet[macSize:]))
-
- toaddr := &net.UDPAddr{IP: to.IP, Port: to.DiscPort}
- log.DebugDetailf(">>> %v %T %v\n", toaddr, req, req)
- if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil {
- log.DebugDetailln("UDP send failed:", err)
- }
- return err
+ return packet, nil
}
// readLoop runs in its own goroutine. it handles incoming UDP packets.
@@ -325,29 +374,34 @@ func (t *udp) readLoop() {
if err != nil {
return
}
- if err := t.packetIn(from, buf[:nbytes]); err != nil {
+ packet, fromID, hash, err := decodePacket(buf[:nbytes])
+ if err != nil {
log.Debugf("Bad packet from %v: %v\n", from, err)
+ continue
}
+ log.DebugDetailf("<<< %v %T %v\n", from, packet, packet)
+ go func() {
+ if err := packet.handle(t, from, fromID, hash); err != nil {
+ log.Debugf("error handling %T from %v: %v", packet, from, err)
+ }
+ }()
}
}
-func (t *udp) packetIn(from *net.UDPAddr, buf []byte) error {
+func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
if len(buf) < headSize+1 {
- return errPacketTooSmall
+ return nil, NodeID{}, nil, errPacketTooSmall
}
hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:]
shouldhash := crypto.Sha3(buf[macSize:])
if !bytes.Equal(hash, shouldhash) {
- return errBadHash
+ return nil, NodeID{}, nil, errBadHash
}
fromID, err := recoverNodeID(crypto.Sha3(buf[headSize:]), sig)
if err != nil {
- return err
- }
-
- var req interface {
- handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error
+ return nil, NodeID{}, hash, err
}
+ var req packet
switch ptype := sigdata[0]; ptype {
case pingPacket:
req = new(ping)
@@ -358,31 +412,27 @@ func (t *udp) packetIn(from *net.UDPAddr, buf []byte) error {
case neighborsPacket:
req = new(neighbors)
default:
- return fmt.Errorf("unknown type: %d", ptype)
+ return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype)
}
- if err := rlp.Decode(bytes.NewReader(sigdata[1:]), req); err != nil {
- return err
- }
- log.DebugDetailf("<<< %v %T %v\n", from, req, req)
- return req.handle(t, from, fromID, hash)
+ err = rlp.Decode(bytes.NewReader(sigdata[1:]), req)
+ return req, fromID, hash, err
}
func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error {
if expired(req.Expiration) {
return errExpired
}
- t.mutex.Lock()
- // Note: we're ignoring the provided IP address right now
- n := t.bumpOrAdd(fromID, from)
- if req.Port != 0 {
- n.TCPPort = int(req.Port)
+ if req.Version != Version {
+ return errBadVersion
}
- t.mutex.Unlock()
-
- t.send(n, pongPacket, pong{
+ t.send(from, pongPacket, pong{
ReplyTok: mac,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
+ if !t.handleReply(fromID, pingPacket, req) {
+ // Note: we're ignoring the provided IP address right now
+ t.bond(true, fromID, from, req.Port)
+ }
return nil
}
@@ -390,11 +440,9 @@ func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er
if expired(req.Expiration) {
return errExpired
}
- t.mutex.Lock()
- t.bump(fromID)
- t.mutex.Unlock()
-
- t.replies <- reply{fromID, pongPacket, req}
+ if !t.handleReply(fromID, pongPacket, req) {
+ return errUnsolicitedReply
+ }
return nil
}
@@ -402,12 +450,21 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte
if expired(req.Expiration) {
return errExpired
}
+ if t.db.get(fromID) == nil {
+ // No bond exists, we don't process the packet. This prevents
+ // an attack vector where the discovery protocol could be used
+ // to amplify traffic in a DDOS attack. A malicious actor
+ // would send a findnode request with the IP address and UDP
+ // port of the target as the source address. The recipient of
+ // the findnode packet would then send a neighbors packet
+ // (which is a much bigger packet than findnode) to the victim.
+ return errUnknownNode
+ }
t.mutex.Lock()
- e := t.bumpOrAdd(fromID, from)
closest := t.closest(req.Target, bucketSize).entries
t.mutex.Unlock()
- t.send(e, neighborsPacket, neighbors{
+ t.send(from, neighborsPacket, neighbors{
Nodes: closest,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
@@ -418,12 +475,9 @@ func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byt
if expired(req.Expiration) {
return errExpired
}
- t.mutex.Lock()
- t.bump(fromID)
- t.add(req.Nodes)
- t.mutex.Unlock()
-
- t.replies <- reply{fromID, neighborsPacket, req}
+ if !t.handleReply(fromID, neighborsPacket, req) {
+ return errUnsolicitedReply
+ }
return nil
}
diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go
index 0a8ff6358..c6c4d78e3 100644
--- a/p2p/discover/udp_test.go
+++ b/p2p/discover/udp_test.go
@@ -1,10 +1,18 @@
package discover
import (
+ "bytes"
+ "crypto/ecdsa"
+ "errors"
"fmt"
+ "io"
logpkg "log"
"net"
"os"
+ "path"
+ "reflect"
+ "runtime"
+ "sync"
"testing"
"time"
@@ -15,197 +23,317 @@ func init() {
logger.AddLogSystem(logger.NewStdLogSystem(os.Stdout, logpkg.LstdFlags, logger.ErrorLevel))
}
-func TestUDP_ping(t *testing.T) {
- t.Parallel()
-
- n1, _ := ListenUDP(newkey(), "127.0.0.1:0", nil)
- n2, _ := ListenUDP(newkey(), "127.0.0.1:0", nil)
- defer n1.Close()
- defer n2.Close()
+type udpTest struct {
+ t *testing.T
+ pipe *dgramPipe
+ table *Table
+ udp *udp
+ sent [][]byte
+ localkey, remotekey *ecdsa.PrivateKey
+ remoteaddr *net.UDPAddr
+}
- if err := n1.net.ping(n2.self); err != nil {
- t.Fatalf("ping error: %v", err)
+func newUDPTest(t *testing.T) *udpTest {
+ test := &udpTest{
+ t: t,
+ pipe: newpipe(),
+ localkey: newkey(),
+ remotekey: newkey(),
+ remoteaddr: &net.UDPAddr{IP: net.IP{1, 2, 3, 4}, Port: 30303},
}
- if find(n2, n1.self.ID) == nil {
- t.Errorf("node 2 does not contain id of node 1")
+ test.table, test.udp = newUDP(test.localkey, test.pipe, nil)
+ return test
+}
+
+// handles a packet as if it had been sent to the transport.
+func (test *udpTest) packetIn(wantError error, ptype byte, data packet) error {
+ enc, err := encodePacket(test.remotekey, ptype, data)
+ if err != nil {
+ return test.errorf("packet (%d) encode error: %v", err)
}
- if e := find(n1, n2.self.ID); e != nil {
- t.Errorf("node 1 does contains id of node 2: %v", e)
+ test.sent = append(test.sent, enc)
+ err = data.handle(test.udp, test.remoteaddr, PubkeyID(&test.remotekey.PublicKey), enc[:macSize])
+ if err != wantError {
+ return test.errorf("error mismatch: got %q, want %q", err, wantError)
}
+ return nil
}
-func find(tab *Table, id NodeID) *Node {
- for _, b := range tab.buckets {
- for _, e := range b.entries {
- if e.ID == id {
- return e
- }
- }
+// waits for a packet to be sent by the transport.
+// validate should have type func(*udpTest, X) error, where X is a packet type.
+func (test *udpTest) waitPacketOut(validate interface{}) error {
+ dgram := test.pipe.waitPacketOut()
+ p, _, _, err := decodePacket(dgram)
+ if err != nil {
+ return test.errorf("sent packet decode error: %v", err)
}
+ fn := reflect.ValueOf(validate)
+ exptype := fn.Type().In(0)
+ if reflect.TypeOf(p) != exptype {
+ return test.errorf("sent packet type mismatch, got: %v, want: %v", reflect.TypeOf(p), exptype)
+ }
+ fn.Call([]reflect.Value{reflect.ValueOf(p)})
return nil
}
-func TestUDP_findnode(t *testing.T) {
+func (test *udpTest) errorf(format string, args ...interface{}) error {
+ _, file, line, ok := runtime.Caller(2) // errorf + waitPacketOut
+ if ok {
+ file = path.Base(file)
+ } else {
+ file = "???"
+ line = 1
+ }
+ err := fmt.Errorf(format, args...)
+ fmt.Printf("\t%s:%d: %v\n", file, line, err)
+ test.t.Fail()
+ return err
+}
+
+// shared test variables
+var (
+ futureExp = uint64(time.Now().Add(10 * time.Hour).Unix())
+ testTarget = MustHexID("01010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101010101")
+)
+
+func TestUDP_packetErrors(t *testing.T) {
+ test := newUDPTest(t)
+ defer test.table.Close()
+
+ test.packetIn(errExpired, pingPacket, &ping{IP: "foo", Port: 99, Version: Version})
+ test.packetIn(errBadVersion, pingPacket, &ping{IP: "foo", Port: 99, Version: 99, Expiration: futureExp})
+ test.packetIn(errUnsolicitedReply, pongPacket, &pong{ReplyTok: []byte{}, Expiration: futureExp})
+ test.packetIn(errUnknownNode, findnodePacket, &findnode{Expiration: futureExp})
+ test.packetIn(errUnsolicitedReply, neighborsPacket, &neighbors{Expiration: futureExp})
+}
+
+func TestUDP_pingTimeout(t *testing.T) {
+ t.Parallel()
+ test := newUDPTest(t)
+ defer test.table.Close()
+
+ toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222}
+ toid := NodeID{1, 2, 3, 4}
+ if err := test.udp.ping(toid, toaddr); err != errTimeout {
+ t.Error("expected timeout error, got", err)
+ }
+}
+
+func TestUDP_findnodeTimeout(t *testing.T) {
t.Parallel()
+ test := newUDPTest(t)
+ defer test.table.Close()
- n1, _ := ListenUDP(newkey(), "127.0.0.1:0", nil)
- n2, _ := ListenUDP(newkey(), "127.0.0.1:0", nil)
- defer n1.Close()
- defer n2.Close()
+ toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222}
+ toid := NodeID{1, 2, 3, 4}
+ target := NodeID{4, 5, 6, 7}
+ result, err := test.udp.findnode(toid, toaddr, target)
+ if err != errTimeout {
+ t.Error("expected timeout error, got", err)
+ }
+ if len(result) > 0 {
+ t.Error("expected empty result, got", result)
+ }
+}
- // put a few nodes into n2. the exact distribution shouldn't
- // matter much, altough we need to take care not to overflow
- // any bucket.
- target := randomID(n1.self.ID, 100)
+func TestUDP_findnode(t *testing.T) {
+ test := newUDPTest(t)
+ defer test.table.Close()
+
+ // put a few nodes into the table. their exact
+ // distribution shouldn't matter much, altough we need to
+ // take care not to overflow any bucket.
+ target := testTarget
nodes := &nodesByDistance{target: target}
for i := 0; i < bucketSize; i++ {
- n2.add([]*Node{&Node{
+ nodes.push(&Node{
IP: net.IP{1, 2, 3, byte(i)},
DiscPort: i + 2,
TCPPort: i + 2,
- ID: randomID(n2.self.ID, i+2),
- }})
+ ID: randomID(test.table.self.ID, i+2),
+ }, bucketSize)
}
- n2.add(nodes.entries)
- n2.bumpOrAdd(n1.self.ID, &net.UDPAddr{IP: n1.self.IP, Port: n1.self.DiscPort})
- expected := n2.closest(target, bucketSize)
+ test.table.add(nodes.entries)
+
+ // ensure there's a bond with the test node,
+ // findnode won't be accepted otherwise.
+ test.table.db.add(PubkeyID(&test.remotekey.PublicKey), test.remoteaddr, 99)
- err := runUDP(10, func() error {
- result, _ := n1.net.findnode(n2.self, target)
- if len(result) != bucketSize {
- return fmt.Errorf("wrong number of results: got %d, want %d", len(result), bucketSize)
+ // check that closest neighbors are returned.
+ test.packetIn(nil, findnodePacket, &findnode{Target: testTarget, Expiration: futureExp})
+ test.waitPacketOut(func(p *neighbors) {
+ expected := test.table.closest(testTarget, bucketSize)
+ if len(p.Nodes) != bucketSize {
+ t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), bucketSize)
}
- for i := range result {
- if result[i].ID != expected.entries[i].ID {
- return fmt.Errorf("result mismatch at %d:\n got: %v\n want: %v", i, result[i], expected.entries[i])
+ for i := range p.Nodes {
+ if p.Nodes[i].ID != expected.entries[i].ID {
+ t.Errorf("result mismatch at %d:\n got: %v\n want: %v", i, p.Nodes[i], expected.entries[i])
}
}
- return nil
})
- if err != nil {
- t.Error(err)
- }
}
-func TestUDP_replytimeout(t *testing.T) {
- t.Parallel()
+func TestUDP_findnodeMultiReply(t *testing.T) {
+ test := newUDPTest(t)
+ defer test.table.Close()
- // reserve a port so we don't talk to an existing service by accident
- addr, _ := net.ResolveUDPAddr("udp", "127.0.0.1:0")
- fd, err := net.ListenUDP("udp", addr)
- if err != nil {
- t.Fatal(err)
- }
- defer fd.Close()
+ // queue a pending findnode request
+ resultc, errc := make(chan []*Node), make(chan error)
+ go func() {
+ rid := PubkeyID(&test.remotekey.PublicKey)
+ ns, err := test.udp.findnode(rid, test.remoteaddr, testTarget)
+ if err != nil && len(ns) == 0 {
+ errc <- err
+ } else {
+ resultc <- ns
+ }
+ }()
- n1, _ := ListenUDP(newkey(), "127.0.0.1:0", nil)
- defer n1.Close()
- n2 := n1.bumpOrAdd(randomID(n1.self.ID, 10), fd.LocalAddr().(*net.UDPAddr))
+ // wait for the findnode to be sent.
+ // after it is sent, the transport is waiting for a reply
+ test.waitPacketOut(func(p *findnode) {
+ if p.Target != testTarget {
+ t.Errorf("wrong target: got %v, want %v", p.Target, testTarget)
+ }
+ })
- if err := n1.net.ping(n2); err != errTimeout {
- t.Error("expected timeout error, got", err)
+ // send the reply as two packets.
+ list := []*Node{
+ MustParseNode("enode://ba85011c70bcc5c04d8607d3a0ed29aa6179c092cbdda10d5d32684fb33ed01bd94f588ca8f91ac48318087dcb02eaf36773a7a453f0eedd6742af668097b29c@10.0.1.16:30303"),
+ MustParseNode("enode://81fa361d25f157cd421c60dcc28d8dac5ef6a89476633339c5df30287474520caca09627da18543d9079b5b288698b542d56167aa5c09111e55acdbbdf2ef799@10.0.1.16:30303"),
+ MustParseNode("enode://9bffefd833d53fac8e652415f4973bee289e8b1a5c6c4cbe70abf817ce8a64cee11b823b66a987f51aaa9fba0d6a91b3e6bf0d5a5d1042de8e9eeea057b217f8@10.0.1.36:30301"),
+ MustParseNode("enode://1b5b4aa662d7cb44a7221bfba67302590b643028197a7d5214790f3bac7aaa4a3241be9e83c09cf1f6c69d007c634faae3dc1b1221793e8446c0b3a09de65960@10.0.1.16:30303"),
}
+ test.packetIn(nil, neighborsPacket, &neighbors{Expiration: futureExp, Nodes: list[:2]})
+ test.packetIn(nil, neighborsPacket, &neighbors{Expiration: futureExp, Nodes: list[2:]})
- if result, err := n1.net.findnode(n2, n1.self.ID); err != errTimeout {
- t.Error("expected timeout error, got", err)
- } else if len(result) > 0 {
- t.Error("expected empty result, got", result)
+ // check that the sent neighbors are all returned by findnode
+ select {
+ case result := <-resultc:
+ if !reflect.DeepEqual(result, list) {
+ t.Errorf("neighbors mismatch:\n got: %v\n want: %v", result, list)
+ }
+ case err := <-errc:
+ t.Errorf("findnode error: %v", err)
+ case <-time.After(5 * time.Second):
+ t.Error("findnode did not return within 5 seconds")
}
}
-func TestUDP_findnodeMultiReply(t *testing.T) {
- t.Parallel()
+func TestUDP_successfulPing(t *testing.T) {
+ test := newUDPTest(t)
+ defer test.table.Close()
- n1, _ := ListenUDP(newkey(), "127.0.0.1:0", nil)
- n2, _ := ListenUDP(newkey(), "127.0.0.1:0", nil)
- udp2 := n2.net.(*udp)
- defer n1.Close()
- defer n2.Close()
-
- err := runUDP(10, func() error {
- nodes := make([]*Node, bucketSize)
- for i := range nodes {
- nodes[i] = &Node{
- IP: net.IP{1, 2, 3, 4},
- DiscPort: i + 1,
- TCPPort: i + 1,
- ID: randomID(n2.self.ID, i+1),
- }
- }
+ done := make(chan struct{})
+ go func() {
+ test.packetIn(nil, pingPacket, &ping{IP: "foo", Port: 99, Version: Version, Expiration: futureExp})
+ close(done)
+ }()
- // ask N2 for neighbors. it will send an empty reply back.
- // the request will wait for up to bucketSize replies.
- resultc := make(chan []*Node)
- errc := make(chan error)
- go func() {
- ns, err := n1.net.findnode(n2.self, n1.self.ID)
- if err != nil {
- errc <- err
- } else {
- resultc <- ns
- }
- }()
-
- // send a few more neighbors packets to N1.
- // it should collect those.
- for end := 0; end < len(nodes); {
- off := end
- if end = end + 5; end > len(nodes) {
- end = len(nodes)
- }
- udp2.send(n1.self, neighborsPacket, neighbors{
- Nodes: nodes[off:end],
- Expiration: uint64(time.Now().Add(10 * time.Second).Unix()),
- })
+ // the ping is replied to.
+ test.waitPacketOut(func(p *pong) {
+ pinghash := test.sent[0][:macSize]
+ if !bytes.Equal(p.ReplyTok, pinghash) {
+ t.Errorf("got ReplyTok %x, want %x", p.ReplyTok, pinghash)
}
+ })
- // check that they are all returned. we cannot just check for
- // equality because they might not be returned in the order they
- // were sent.
- var result []*Node
- select {
- case result = <-resultc:
- case err := <-errc:
- return err
- }
- if hasDuplicates(result) {
- return fmt.Errorf("result slice contains duplicates")
- }
- if len(result) != len(nodes) {
- return fmt.Errorf("wrong number of nodes returned: got %d, want %d", len(result), len(nodes))
- }
- matched := make(map[NodeID]bool)
- for _, n := range result {
- for _, expn := range nodes {
- if n.ID == expn.ID { // && bytes.Equal(n.Addr.IP, expn.Addr.IP) && n.Addr.Port == expn.Addr.Port {
- matched[n.ID] = true
- }
+ // remote is unknown, the table pings back.
+ test.waitPacketOut(func(p *ping) error { return nil })
+ test.packetIn(nil, pongPacket, &pong{Expiration: futureExp})
+
+ // ping should return shortly after getting the pong packet.
+ <-done
+
+ // check that the node was added.
+ rid := PubkeyID(&test.remotekey.PublicKey)
+ rnode := find(test.table, rid)
+ if rnode == nil {
+ t.Fatalf("node %v not found in table", rid)
+ }
+ if !bytes.Equal(rnode.IP, test.remoteaddr.IP) {
+ t.Errorf("node has wrong IP: got %v, want: %v", rnode.IP, test.remoteaddr.IP)
+ }
+ if rnode.DiscPort != test.remoteaddr.Port {
+ t.Errorf("node has wrong Port: got %v, want: %v", rnode.DiscPort, test.remoteaddr.Port)
+ }
+ if rnode.TCPPort != 99 {
+ t.Errorf("node has wrong Port: got %v, want: %v", rnode.TCPPort, 99)
+ }
+}
+
+func find(tab *Table, id NodeID) *Node {
+ for _, b := range tab.buckets {
+ for _, e := range b.entries {
+ if e.ID == id {
+ return e
}
}
- if len(matched) != len(nodes) {
- return fmt.Errorf("wrong number of matching nodes: got %d, want %d", len(matched), len(nodes))
- }
- return nil
- })
- if err != nil {
- t.Error(err)
}
+ return nil
}
-// runUDP runs a test n times and returns an error if the test failed
-// in all n runs. This is necessary because UDP is unreliable even for
-// connections on the local machine, causing test failures.
-func runUDP(n int, test func() error) error {
- errcount := 0
- errors := ""
- for i := 0; i < n; i++ {
- if err := test(); err != nil {
- errors += fmt.Sprintf("\n#%d: %v", i, err)
- errcount++
- }
+// dgramPipe is a fake UDP socket. It queues all sent datagrams.
+type dgramPipe struct {
+ mu *sync.Mutex
+ cond *sync.Cond
+ closing chan struct{}
+ closed bool
+ queue [][]byte
+}
+
+func newpipe() *dgramPipe {
+ mu := new(sync.Mutex)
+ return &dgramPipe{
+ closing: make(chan struct{}),
+ cond: &sync.Cond{L: mu},
+ mu: mu,
}
- if errcount == n {
- return fmt.Errorf("failed on all %d iterations:%s", n, errors)
+}
+
+// WriteToUDP queues a datagram.
+func (c *dgramPipe) WriteToUDP(b []byte, to *net.UDPAddr) (n int, err error) {
+ msg := make([]byte, len(b))
+ copy(msg, b)
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.closed {
+ return 0, errors.New("closed")
+ }
+ c.queue = append(c.queue, msg)
+ c.cond.Signal()
+ return len(b), nil
+}
+
+// ReadFromUDP just hangs until the pipe is closed.
+func (c *dgramPipe) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) {
+ <-c.closing
+ return 0, nil, io.EOF
+}
+
+func (c *dgramPipe) Close() error {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if !c.closed {
+ close(c.closing)
+ c.closed = true
}
return nil
}
+
+func (c *dgramPipe) LocalAddr() net.Addr {
+ return &net.UDPAddr{}
+}
+
+func (c *dgramPipe) waitPacketOut() []byte {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ for len(c.queue) == 0 {
+ c.cond.Wait()
+ }
+ p := c.queue[0]
+ copy(c.queue, c.queue[1:])
+ c.queue = c.queue[:len(c.queue)-1]
+ return p
+}