aboutsummaryrefslogtreecommitdiffstats
path: root/p2p
diff options
context:
space:
mode:
Diffstat (limited to 'p2p')
-rw-r--r--p2p/discover/node.go3
-rw-r--r--p2p/discover/table.go84
-rw-r--r--p2p/discover/table_test.go34
-rw-r--r--p2p/discover/table_util_test.go21
-rw-r--r--p2p/discover/udp.go222
-rw-r--r--p2p/discover/udp_test.go137
-rw-r--r--p2p/enode/nodedb.go210
-rw-r--r--p2p/enode/nodedb_test.go216
8 files changed, 595 insertions, 332 deletions
diff --git a/p2p/discover/node.go b/p2p/discover/node.go
index 7ddf04fe8..8d4af166b 100644
--- a/p2p/discover/node.go
+++ b/p2p/discover/node.go
@@ -33,7 +33,8 @@ import (
// The fields of Node may not be modified.
type node struct {
enode.Node
- addedAt time.Time // time when the node was added to the table
+ addedAt time.Time // time when the node was added to the table
+ livenessChecks uint // how often liveness was checked
}
type encPubkey [64]byte
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 9f7f1d41b..ba4c06327 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -75,8 +75,10 @@ type Table struct {
net transport
refreshReq chan chan struct{}
initDone chan struct{}
- closeReq chan struct{}
- closed chan struct{}
+
+ closeOnce sync.Once
+ closeReq chan struct{}
+ closed chan struct{}
nodeAddedHook func(*node) // for testing
}
@@ -180,16 +182,14 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) {
// Close terminates the network listener and flushes the node database.
func (tab *Table) Close() {
- if tab.net != nil {
- tab.net.close()
- }
-
- select {
- case <-tab.closed:
- // already closed.
- case tab.closeReq <- struct{}{}:
- <-tab.closed // wait for refreshLoop to end.
- }
+ tab.closeOnce.Do(func() {
+ if tab.net != nil {
+ tab.net.close()
+ }
+ // Wait for loop to end.
+ close(tab.closeReq)
+ <-tab.closed
+ })
}
// setFallbackNodes sets the initial points of contact. These nodes
@@ -290,12 +290,16 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
// we have asked all closest nodes, stop the search
break
}
- // wait for the next reply
- for _, n := range <-reply {
- if n != nil && !seen[n.ID()] {
- seen[n.ID()] = true
- result.push(n, bucketSize)
+ select {
+ case nodes := <-reply:
+ for _, n := range nodes {
+ if n != nil && !seen[n.ID()] {
+ seen[n.ID()] = true
+ result.push(n, bucketSize)
+ }
}
+ case <-tab.closeReq:
+ return nil // shutdown, no need to continue.
}
pendingQueries--
}
@@ -303,18 +307,22 @@ func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
}
func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
- fails := tab.db.FindFails(n.ID())
+ fails := tab.db.FindFails(n.ID(), n.IP())
r, err := tab.net.findnode(n.ID(), n.addr(), targetKey)
- if err != nil || len(r) == 0 {
+ if err == errClosed {
+ // Avoid recording failures on shutdown.
+ reply <- nil
+ return
+ } else if err != nil || len(r) == 0 {
fails++
- tab.db.UpdateFindFails(n.ID(), fails)
+ tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "err", err)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
tab.delete(n)
}
} else if fails > 0 {
- tab.db.UpdateFindFails(n.ID(), fails-1)
+ tab.db.UpdateFindFails(n.ID(), n.IP(), fails-1)
}
// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
@@ -329,7 +337,7 @@ func (tab *Table) refresh() <-chan struct{} {
done := make(chan struct{})
select {
case tab.refreshReq <- done:
- case <-tab.closed:
+ case <-tab.closeReq:
close(done)
}
return done
@@ -433,7 +441,7 @@ func (tab *Table) loadSeedNodes() {
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
- age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID())) }}
+ age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
tab.add(seed)
}
@@ -458,16 +466,17 @@ func (tab *Table) doRevalidate(done chan<- struct{}) {
b := tab.buckets[bi]
if err == nil {
// The node responded, move it to the front.
- log.Debug("Revalidated node", "b", bi, "id", last.ID())
+ last.livenessChecks++
+ log.Debug("Revalidated node", "b", bi, "id", last.ID(), "checks", last.livenessChecks)
b.bump(last)
return
}
// No reply received, pick a replacement or delete the node if there aren't
// any replacements.
if r := tab.replace(b, last); r != nil {
- log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "r", r.ID(), "rip", r.IP())
+ log.Debug("Replaced dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks, "r", r.ID(), "rip", r.IP())
} else {
- log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP())
+ log.Debug("Removed dead node", "b", bi, "id", last.ID(), "ip", last.IP(), "checks", last.livenessChecks)
}
}
@@ -502,7 +511,7 @@ func (tab *Table) copyLiveNodes() {
now := time.Now()
for _, b := range &tab.buckets {
for _, n := range b.entries {
- if now.Sub(n.addedAt) >= seedMinTableTime {
+ if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime {
tab.db.UpdateNode(unwrapNode(n))
}
}
@@ -518,7 +527,9 @@ func (tab *Table) closest(target enode.ID, nresults int) *nodesByDistance {
close := &nodesByDistance{target: target}
for _, b := range &tab.buckets {
for _, n := range b.entries {
- close.push(n, nresults)
+ if n.livenessChecks > 0 {
+ close.push(n, nresults)
+ }
}
}
return close
@@ -572,23 +583,6 @@ func (tab *Table) addThroughPing(n *node) {
tab.add(n)
}
-// stuff adds nodes the table to the end of their corresponding bucket
-// if the bucket is not full. The caller must not hold tab.mutex.
-func (tab *Table) stuff(nodes []*node) {
- tab.mutex.Lock()
- defer tab.mutex.Unlock()
-
- for _, n := range nodes {
- if n.ID() == tab.self().ID() {
- continue // don't add self
- }
- b := tab.bucket(n.ID())
- if len(b.entries) < bucketSize {
- tab.bumpOrAdd(b, n)
- }
- }
-}
-
// delete removes an entry from the node table. It is used to evacuate dead nodes.
func (tab *Table) delete(node *node) {
tab.mutex.Lock()
diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go
index 6b4cd2d18..b00a93211 100644
--- a/p2p/discover/table_test.go
+++ b/p2p/discover/table_test.go
@@ -50,8 +50,8 @@ func TestTable_pingReplace(t *testing.T) {
func testPingReplace(t *testing.T, newNodeIsResponding, lastInBucketIsResponding bool) {
transport := newPingRecorder()
tab, db := newTestTable(transport)
- defer tab.Close()
defer db.Close()
+ defer tab.Close()
<-tab.initDone
@@ -137,8 +137,8 @@ func TestBucket_bumpNoDuplicates(t *testing.T) {
func TestTable_IPLimit(t *testing.T) {
transport := newPingRecorder()
tab, db := newTestTable(transport)
- defer tab.Close()
defer db.Close()
+ defer tab.Close()
for i := 0; i < tableIPLimit+1; i++ {
n := nodeAtDistance(tab.self().ID(), i, net.IP{172, 0, 1, byte(i)})
@@ -153,8 +153,8 @@ func TestTable_IPLimit(t *testing.T) {
func TestTable_BucketIPLimit(t *testing.T) {
transport := newPingRecorder()
tab, db := newTestTable(transport)
- defer tab.Close()
defer db.Close()
+ defer tab.Close()
d := 3
for i := 0; i < bucketIPLimit+1; i++ {
@@ -173,9 +173,9 @@ func TestTable_closest(t *testing.T) {
// for any node table, Target and N
transport := newPingRecorder()
tab, db := newTestTable(transport)
- defer tab.Close()
defer db.Close()
- tab.stuff(test.All)
+ defer tab.Close()
+ fillTable(tab, test.All)
// check that closest(Target, N) returns nodes
result := tab.closest(test.Target, test.N).entries
@@ -234,13 +234,13 @@ func TestTable_ReadRandomNodesGetAll(t *testing.T) {
test := func(buf []*enode.Node) bool {
transport := newPingRecorder()
tab, db := newTestTable(transport)
- defer tab.Close()
defer db.Close()
+ defer tab.Close()
<-tab.initDone
for i := 0; i < len(buf); i++ {
ld := cfg.Rand.Intn(len(tab.buckets))
- tab.stuff([]*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))})
+ fillTable(tab, []*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))})
}
gotN := tab.ReadRandomNodes(buf)
if gotN != tab.len() {
@@ -272,16 +272,19 @@ func (*closeTest) Generate(rand *rand.Rand, size int) reflect.Value {
N: rand.Intn(bucketSize),
}
for _, id := range gen([]enode.ID{}, rand).([]enode.ID) {
- n := enode.SignNull(new(enr.Record), id)
- t.All = append(t.All, wrapNode(n))
+ r := new(enr.Record)
+ r.Set(enr.IP(genIP(rand)))
+ n := wrapNode(enode.SignNull(r, id))
+ n.livenessChecks = 1
+ t.All = append(t.All, n)
}
return reflect.ValueOf(t)
}
func TestTable_Lookup(t *testing.T) {
tab, db := newTestTable(lookupTestnet)
- defer tab.Close()
defer db.Close()
+ defer tab.Close()
// lookup on empty table returns no nodes
if results := tab.lookup(lookupTestnet.target, false); len(results) > 0 {
@@ -289,8 +292,9 @@ func TestTable_Lookup(t *testing.T) {
}
// seed table with initial node (otherwise lookup will terminate immediately)
seedKey, _ := decodePubkey(lookupTestnet.dists[256][0])
- seed := wrapNode(enode.NewV4(seedKey, net.IP{}, 0, 256))
- tab.stuff([]*node{seed})
+ seed := wrapNode(enode.NewV4(seedKey, net.IP{127, 0, 0, 1}, 0, 256))
+ seed.livenessChecks = 1
+ fillTable(tab, []*node{seed})
results := tab.lookup(lookupTestnet.target, true)
t.Logf("results:")
@@ -578,6 +582,12 @@ func gen(typ interface{}, rand *rand.Rand) interface{} {
return v.Interface()
}
+func genIP(rand *rand.Rand) net.IP {
+ ip := make(net.IP, 4)
+ rand.Read(ip)
+ return ip
+}
+
func quickcfg() *quick.Config {
return &quick.Config{
MaxCount: 5000,
diff --git a/p2p/discover/table_util_test.go b/p2p/discover/table_util_test.go
index d41519452..3ce582b99 100644
--- a/p2p/discover/table_util_test.go
+++ b/p2p/discover/table_util_test.go
@@ -83,6 +83,23 @@ func fillBucket(tab *Table, n *node) (last *node) {
return b.entries[bucketSize-1]
}
+// fillTable adds nodes the table to the end of their corresponding bucket
+// if the bucket is not full. The caller must not hold tab.mutex.
+func fillTable(tab *Table, nodes []*node) {
+ tab.mutex.Lock()
+ defer tab.mutex.Unlock()
+
+ for _, n := range nodes {
+ if n.ID() == tab.self().ID() {
+ continue // don't add self
+ }
+ b := tab.bucket(n.ID())
+ if len(b.entries) < bucketSize {
+ tab.bumpOrAdd(b, n)
+ }
+ }
+}
+
type pingRecorder struct {
mu sync.Mutex
dead, pinged map[enode.ID]bool
@@ -109,10 +126,6 @@ func (t *pingRecorder) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPu
return nil, nil
}
-func (t *pingRecorder) waitping(from enode.ID) error {
- return nil // remote always pings
-}
-
func (t *pingRecorder) ping(toid enode.ID, toaddr *net.UDPAddr) error {
t.mu.Lock()
defer t.mu.Unlock()
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index 37a044902..5ce4c43dc 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -67,6 +67,8 @@ const (
// RPC request structures
type (
ping struct {
+ senderKey *ecdsa.PublicKey // filled in by preverify
+
Version uint
From, To rpcEndpoint
Expiration uint64
@@ -155,8 +157,13 @@ func nodeToRPC(n *node) rpcNode {
return rpcNode{ID: ekey, IP: n.IP(), UDP: uint16(n.UDP()), TCP: uint16(n.TCP())}
}
+// packet is implemented by all protocol messages.
type packet interface {
- handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error
+ // preverify checks whether the packet is valid and should be handled at all.
+ preverify(t *udp, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error
+ // handle handles the packet.
+ handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte)
+ // name returns the name of the packet for logging purposes.
name() string
}
@@ -177,43 +184,48 @@ type udp struct {
tab *Table
wg sync.WaitGroup
- addpending chan *pending
- gotreply chan reply
- closing chan struct{}
+ addReplyMatcher chan *replyMatcher
+ gotreply chan reply
+ closing chan struct{}
}
// pending represents a pending reply.
//
-// some implementations of the protocol wish to send more than one
-// reply packet to findnode. in general, any neighbors packet cannot
+// Some implementations of the protocol wish to send more than one
+// reply packet to findnode. In general, any neighbors packet cannot
// be matched up with a specific findnode packet.
//
-// our implementation handles this by storing a callback function for
-// each pending reply. incoming packets from a node are dispatched
-// to all the callback functions for that node.
-type pending struct {
+// Our implementation handles this by storing a callback function for
+// each pending reply. Incoming packets from a node are dispatched
+// to all callback functions for that node.
+type replyMatcher struct {
// these fields must match in the reply.
from enode.ID
+ ip net.IP
ptype byte
// time when the request must complete
deadline time.Time
- // callback is called when a matching reply arrives. if it returns
- // true, the callback is removed from the pending reply queue.
- // if it returns false, the reply is considered incomplete and
- // the callback will be invoked again for the next matching reply.
- callback func(resp interface{}) (done bool)
+ // callback is called when a matching reply arrives. If it returns matched == true, the
+ // reply was acceptable. The second return value indicates whether the callback should
+ // be removed from the pending reply queue. If it returns false, the reply is considered
+ // incomplete and the callback will be invoked again for the next matching reply.
+ callback replyMatchFunc
// errc receives nil when the callback indicates completion or an
// error if no further reply is received within the timeout.
errc chan<- error
}
+type replyMatchFunc func(interface{}) (matched bool, requestDone bool)
+
type reply struct {
from enode.ID
+ ip net.IP
ptype byte
- data interface{}
+ data packet
+
// loop indicates whether there was
// a matching request by sending on this channel.
matched chan<- bool
@@ -247,14 +259,14 @@ func ListenUDP(c conn, ln *enode.LocalNode, cfg Config) (*Table, error) {
func newUDP(c conn, ln *enode.LocalNode, cfg Config) (*Table, *udp, error) {
udp := &udp{
- conn: c,
- priv: cfg.PrivateKey,
- netrestrict: cfg.NetRestrict,
- localNode: ln,
- db: ln.Database(),
- closing: make(chan struct{}),
- gotreply: make(chan reply),
- addpending: make(chan *pending),
+ conn: c,
+ priv: cfg.PrivateKey,
+ netrestrict: cfg.NetRestrict,
+ localNode: ln,
+ db: ln.Database(),
+ closing: make(chan struct{}),
+ gotreply: make(chan reply),
+ addReplyMatcher: make(chan *replyMatcher),
}
tab, err := newTable(udp, ln.Database(), cfg.Bootnodes)
if err != nil {
@@ -304,35 +316,37 @@ func (t *udp) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) <-ch
errc <- err
return errc
}
- errc := t.pending(toid, pongPacket, func(p interface{}) bool {
- ok := bytes.Equal(p.(*pong).ReplyTok, hash)
- if ok && callback != nil {
+ // Add a matcher for the reply to the pending reply queue. Pongs are matched if they
+ // reference the ping we're about to send.
+ errc := t.pending(toid, toaddr.IP, pongPacket, func(p interface{}) (matched bool, requestDone bool) {
+ matched = bytes.Equal(p.(*pong).ReplyTok, hash)
+ if matched && callback != nil {
callback()
}
- return ok
+ return matched, matched
})
+ // Send the packet.
t.localNode.UDPContact(toaddr)
- t.write(toaddr, req.name(), packet)
+ t.write(toaddr, toid, req.name(), packet)
return errc
}
-func (t *udp) waitping(from enode.ID) error {
- return <-t.pending(from, pingPacket, func(interface{}) bool { return true })
-}
-
// findnode sends a findnode request to the given node and waits until
// the node has sent up to k neighbors.
func (t *udp) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPubkey) ([]*node, error) {
// If we haven't seen a ping from the destination node for a while, it won't remember
// our endpoint proof and reject findnode. Solicit a ping first.
- if time.Since(t.db.LastPingReceived(toid)) > bondExpiration {
+ if time.Since(t.db.LastPingReceived(toid, toaddr.IP)) > bondExpiration {
t.ping(toid, toaddr)
- t.waitping(toid)
+ // Wait for them to ping back and process our pong.
+ time.Sleep(respTimeout)
}
+ // Add a matcher for 'neighbours' replies to the pending reply queue. The matcher is
+ // active until enough nodes have been received.
nodes := make([]*node, 0, bucketSize)
nreceived := 0
- errc := t.pending(toid, neighborsPacket, func(r interface{}) bool {
+ errc := t.pending(toid, toaddr.IP, neighborsPacket, func(r interface{}) (matched bool, requestDone bool) {
reply := r.(*neighbors)
for _, rn := range reply.Nodes {
nreceived++
@@ -343,22 +357,22 @@ func (t *udp) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPubkey) ([]
}
nodes = append(nodes, n)
}
- return nreceived >= bucketSize
+ return true, nreceived >= bucketSize
})
- t.send(toaddr, findnodePacket, &findnode{
+ t.send(toaddr, toid, findnodePacket, &findnode{
Target: target,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
return nodes, <-errc
}
-// pending adds a reply callback to the pending reply queue.
-// see the documentation of type pending for a detailed explanation.
-func (t *udp) pending(id enode.ID, ptype byte, callback func(interface{}) bool) <-chan error {
+// pending adds a reply matcher to the pending reply queue.
+// see the documentation of type replyMatcher for a detailed explanation.
+func (t *udp) pending(id enode.ID, ip net.IP, ptype byte, callback replyMatchFunc) <-chan error {
ch := make(chan error, 1)
- p := &pending{from: id, ptype: ptype, callback: callback, errc: ch}
+ p := &replyMatcher{from: id, ip: ip, ptype: ptype, callback: callback, errc: ch}
select {
- case t.addpending <- p:
+ case t.addReplyMatcher <- p:
// loop will handle it
case <-t.closing:
ch <- errClosed
@@ -366,10 +380,12 @@ func (t *udp) pending(id enode.ID, ptype byte, callback func(interface{}) bool)
return ch
}
-func (t *udp) handleReply(from enode.ID, ptype byte, req packet) bool {
+// handleReply dispatches a reply packet, invoking reply matchers. It returns
+// whether any matcher considered the packet acceptable.
+func (t *udp) handleReply(from enode.ID, fromIP net.IP, ptype byte, req packet) bool {
matched := make(chan bool, 1)
select {
- case t.gotreply <- reply{from, ptype, req, matched}:
+ case t.gotreply <- reply{from, fromIP, ptype, req, matched}:
// loop will handle it
return <-matched
case <-t.closing:
@@ -385,8 +401,8 @@ func (t *udp) loop() {
var (
plist = list.New()
timeout = time.NewTimer(0)
- nextTimeout *pending // head of plist when timeout was last reset
- contTimeouts = 0 // number of continuous timeouts to do NTP checks
+ nextTimeout *replyMatcher // head of plist when timeout was last reset
+ contTimeouts = 0 // number of continuous timeouts to do NTP checks
ntpWarnTime = time.Unix(0, 0)
)
<-timeout.C // ignore first timeout
@@ -399,7 +415,7 @@ func (t *udp) loop() {
// Start the timer so it fires when the next pending reply has expired.
now := time.Now()
for el := plist.Front(); el != nil; el = el.Next() {
- nextTimeout = el.Value.(*pending)
+ nextTimeout = el.Value.(*replyMatcher)
if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout {
timeout.Reset(dist)
return
@@ -420,25 +436,23 @@ func (t *udp) loop() {
select {
case <-t.closing:
for el := plist.Front(); el != nil; el = el.Next() {
- el.Value.(*pending).errc <- errClosed
+ el.Value.(*replyMatcher).errc <- errClosed
}
return
- case p := <-t.addpending:
+ case p := <-t.addReplyMatcher:
p.deadline = time.Now().Add(respTimeout)
plist.PushBack(p)
case r := <-t.gotreply:
- var matched bool
+ var matched bool // whether any replyMatcher considered the reply acceptable.
for el := plist.Front(); el != nil; el = el.Next() {
- p := el.Value.(*pending)
- if p.from == r.from && p.ptype == r.ptype {
- matched = true
- // Remove the matcher if its callback indicates
- // that all replies have been received. This is
- // required for packet types that expect multiple
- // reply packets.
- if p.callback(r.data) {
+ p := el.Value.(*replyMatcher)
+ if p.from == r.from && p.ptype == r.ptype && p.ip.Equal(r.ip) {
+ ok, requestDone := p.callback(r.data)
+ matched = matched || ok
+ // Remove the matcher if callback indicates that all replies have been received.
+ if requestDone {
p.errc <- nil
plist.Remove(el)
}
@@ -453,7 +467,7 @@ func (t *udp) loop() {
// Notify and remove callbacks whose deadline is in the past.
for el := plist.Front(); el != nil; el = el.Next() {
- p := el.Value.(*pending)
+ p := el.Value.(*replyMatcher)
if now.After(p.deadline) || now.Equal(p.deadline) {
p.errc <- errTimeout
plist.Remove(el)
@@ -504,17 +518,17 @@ func init() {
}
}
-func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req packet) ([]byte, error) {
+func (t *udp) send(toaddr *net.UDPAddr, toid enode.ID, ptype byte, req packet) ([]byte, error) {
packet, hash, err := encodePacket(t.priv, ptype, req)
if err != nil {
return hash, err
}
- return hash, t.write(toaddr, req.name(), packet)
+ return hash, t.write(toaddr, toid, req.name(), packet)
}
-func (t *udp) write(toaddr *net.UDPAddr, what string, packet []byte) error {
+func (t *udp) write(toaddr *net.UDPAddr, toid enode.ID, what string, packet []byte) error {
_, err := t.conn.WriteToUDP(packet, toaddr)
- log.Trace(">> "+what, "addr", toaddr, "err", err)
+ log.Trace(">> "+what, "id", toid, "addr", toaddr, "err", err)
return err
}
@@ -573,13 +587,19 @@ func (t *udp) readLoop(unhandled chan<- ReadPacket) {
}
func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error {
- packet, fromID, hash, err := decodePacket(buf)
+ packet, fromKey, hash, err := decodePacket(buf)
if err != nil {
log.Debug("Bad discv4 packet", "addr", from, "err", err)
return err
}
- err = packet.handle(t, from, fromID, hash)
- log.Trace("<< "+packet.name(), "addr", from, "err", err)
+ fromID := fromKey.id()
+ if err == nil {
+ err = packet.preverify(t, from, fromID, fromKey)
+ }
+ log.Trace("<< "+packet.name(), "id", fromID, "addr", from, "err", err)
+ if err == nil {
+ packet.handle(t, from, fromID, hash)
+ }
return err
}
@@ -615,54 +635,67 @@ func decodePacket(buf []byte) (packet, encPubkey, []byte, error) {
return req, fromKey, hash, err
}
-func (req *ping) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error {
+// Packet Handlers
+
+func (req *ping) preverify(t *udp, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error {
if expired(req.Expiration) {
return errExpired
}
key, err := decodePubkey(fromKey)
if err != nil {
- return fmt.Errorf("invalid public key: %v", err)
+ return errors.New("invalid public key")
}
- t.send(from, pongPacket, &pong{
+ req.senderKey = key
+ return nil
+}
+
+func (req *ping) handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte) {
+ // Reply.
+ t.send(from, fromID, pongPacket, &pong{
To: makeEndpoint(from, req.From.TCP),
ReplyTok: mac,
Expiration: uint64(time.Now().Add(expiration).Unix()),
})
- n := wrapNode(enode.NewV4(key, from.IP, int(req.From.TCP), from.Port))
- t.handleReply(n.ID(), pingPacket, req)
- if time.Since(t.db.LastPongReceived(n.ID())) > bondExpiration {
- t.sendPing(n.ID(), from, func() { t.tab.addThroughPing(n) })
+
+ // Ping back if our last pong on file is too far in the past.
+ n := wrapNode(enode.NewV4(req.senderKey, from.IP, int(req.From.TCP), from.Port))
+ if time.Since(t.db.LastPongReceived(n.ID(), from.IP)) > bondExpiration {
+ t.sendPing(fromID, from, func() {
+ t.tab.addThroughPing(n)
+ })
} else {
t.tab.addThroughPing(n)
}
+
+ // Update node database and endpoint predictor.
+ t.db.UpdateLastPingReceived(n.ID(), from.IP, time.Now())
t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)})
- t.db.UpdateLastPingReceived(n.ID(), time.Now())
- return nil
}
func (req *ping) name() string { return "PING/v4" }
-func (req *pong) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error {
+func (req *pong) preverify(t *udp, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error {
if expired(req.Expiration) {
return errExpired
}
- fromID := fromKey.id()
- if !t.handleReply(fromID, pongPacket, req) {
+ if !t.handleReply(fromID, from.IP, pongPacket, req) {
return errUnsolicitedReply
}
- t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)})
- t.db.UpdateLastPongReceived(fromID, time.Now())
return nil
}
+func (req *pong) handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte) {
+ t.localNode.UDPEndpointStatement(from, &net.UDPAddr{IP: req.To.IP, Port: int(req.To.UDP)})
+ t.db.UpdateLastPongReceived(fromID, from.IP, time.Now())
+}
+
func (req *pong) name() string { return "PONG/v4" }
-func (req *findnode) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error {
+func (req *findnode) preverify(t *udp, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error {
if expired(req.Expiration) {
return errExpired
}
- fromID := fromKey.id()
- if time.Since(t.db.LastPongReceived(fromID)) > bondExpiration {
+ if time.Since(t.db.LastPongReceived(fromID, from.IP)) > bondExpiration {
// No endpoint proof pong exists, we don't process the packet. This prevents an
// attack vector where the discovery protocol could be used to amplify traffic in a
// DDOS attack. A malicious actor would send a findnode request with the IP address
@@ -671,43 +704,50 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []
// findnode) to the victim.
return errUnknownNode
}
+ return nil
+}
+
+func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte) {
+ // Determine closest nodes.
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
t.tab.mutex.Lock()
closest := t.tab.closest(target, bucketSize).entries
t.tab.mutex.Unlock()
- p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
- var sent bool
// Send neighbors in chunks with at most maxNeighbors per packet
// to stay below the 1280 byte limit.
+ p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())}
+ var sent bool
for _, n := range closest {
if netutil.CheckRelayIP(from.IP, n.IP()) == nil {
p.Nodes = append(p.Nodes, nodeToRPC(n))
}
if len(p.Nodes) == maxNeighbors {
- t.send(from, neighborsPacket, &p)
+ t.send(from, fromID, neighborsPacket, &p)
p.Nodes = p.Nodes[:0]
sent = true
}
}
if len(p.Nodes) > 0 || !sent {
- t.send(from, neighborsPacket, &p)
+ t.send(from, fromID, neighborsPacket, &p)
}
- return nil
}
func (req *findnode) name() string { return "FINDNODE/v4" }
-func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error {
+func (req *neighbors) preverify(t *udp, from *net.UDPAddr, fromID enode.ID, fromKey encPubkey) error {
if expired(req.Expiration) {
return errExpired
}
- if !t.handleReply(fromKey.id(), neighborsPacket, req) {
+ if !t.handleReply(fromID, from.IP, neighborsPacket, req) {
return errUnsolicitedReply
}
return nil
}
+func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID enode.ID, mac []byte) {
+}
+
func (req *neighbors) name() string { return "NEIGHBORS/v4" }
func expired(ts uint64) bool {
diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go
index a4ddaf750..3d53c9309 100644
--- a/p2p/discover/udp_test.go
+++ b/p2p/discover/udp_test.go
@@ -19,6 +19,7 @@ package discover
import (
"bytes"
"crypto/ecdsa"
+ crand "crypto/rand"
"encoding/binary"
"encoding/hex"
"errors"
@@ -57,6 +58,7 @@ type udpTest struct {
t *testing.T
pipe *dgramPipe
table *Table
+ db *enode.DB
udp *udp
sent [][]byte
localkey, remotekey *ecdsa.PrivateKey
@@ -71,22 +73,32 @@ func newUDPTest(t *testing.T) *udpTest {
remotekey: newkey(),
remoteaddr: &net.UDPAddr{IP: net.IP{10, 0, 1, 99}, Port: 30303},
}
- db, _ := enode.OpenDB("")
- ln := enode.NewLocalNode(db, test.localkey)
+ test.db, _ = enode.OpenDB("")
+ ln := enode.NewLocalNode(test.db, test.localkey)
test.table, test.udp, _ = newUDP(test.pipe, ln, Config{PrivateKey: test.localkey})
// Wait for initial refresh so the table doesn't send unexpected findnode.
<-test.table.initDone
return test
}
+func (test *udpTest) close() {
+ test.table.Close()
+ test.db.Close()
+}
+
// handles a packet as if it had been sent to the transport.
func (test *udpTest) packetIn(wantError error, ptype byte, data packet) error {
- enc, _, err := encodePacket(test.remotekey, ptype, data)
+ return test.packetInFrom(wantError, test.remotekey, test.remoteaddr, ptype, data)
+}
+
+// handles a packet as if it had been sent to the transport by the key/endpoint.
+func (test *udpTest) packetInFrom(wantError error, key *ecdsa.PrivateKey, addr *net.UDPAddr, ptype byte, data packet) error {
+ enc, _, err := encodePacket(key, ptype, data)
if err != nil {
return test.errorf("packet (%d) encode error: %v", ptype, err)
}
test.sent = append(test.sent, enc)
- if err = test.udp.handlePacket(test.remoteaddr, enc); err != wantError {
+ if err = test.udp.handlePacket(addr, enc); err != wantError {
return test.errorf("error mismatch: got %q, want %q", err, wantError)
}
return nil
@@ -94,19 +106,19 @@ func (test *udpTest) packetIn(wantError error, ptype byte, data packet) error {
// waits for a packet to be sent by the transport.
// validate should have type func(*udpTest, X) error, where X is a packet type.
-func (test *udpTest) waitPacketOut(validate interface{}) ([]byte, error) {
+func (test *udpTest) waitPacketOut(validate interface{}) (*net.UDPAddr, []byte, error) {
dgram := test.pipe.waitPacketOut()
- p, _, hash, err := decodePacket(dgram)
+ p, _, hash, err := decodePacket(dgram.data)
if err != nil {
- return hash, test.errorf("sent packet decode error: %v", err)
+ return &dgram.to, hash, test.errorf("sent packet decode error: %v", err)
}
fn := reflect.ValueOf(validate)
exptype := fn.Type().In(0)
if reflect.TypeOf(p) != exptype {
- return hash, test.errorf("sent packet type mismatch, got: %v, want: %v", reflect.TypeOf(p), exptype)
+ return &dgram.to, hash, test.errorf("sent packet type mismatch, got: %v, want: %v", reflect.TypeOf(p), exptype)
}
fn.Call([]reflect.Value{reflect.ValueOf(p)})
- return hash, nil
+ return &dgram.to, hash, nil
}
func (test *udpTest) errorf(format string, args ...interface{}) error {
@@ -125,7 +137,7 @@ func (test *udpTest) errorf(format string, args ...interface{}) error {
func TestUDP_packetErrors(t *testing.T) {
test := newUDPTest(t)
- defer test.table.Close()
+ defer test.close()
test.packetIn(errExpired, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: 4})
test.packetIn(errUnsolicitedReply, pongPacket, &pong{ReplyTok: []byte{}, Expiration: futureExp})
@@ -136,7 +148,7 @@ func TestUDP_packetErrors(t *testing.T) {
func TestUDP_pingTimeout(t *testing.T) {
t.Parallel()
test := newUDPTest(t)
- defer test.table.Close()
+ defer test.close()
toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222}
toid := enode.ID{1, 2, 3, 4}
@@ -148,7 +160,7 @@ func TestUDP_pingTimeout(t *testing.T) {
func TestUDP_responseTimeouts(t *testing.T) {
t.Parallel()
test := newUDPTest(t)
- defer test.table.Close()
+ defer test.close()
rand.Seed(time.Now().UnixNano())
randomDuration := func(max time.Duration) time.Duration {
@@ -166,20 +178,20 @@ func TestUDP_responseTimeouts(t *testing.T) {
// with ptype <= 128 will not get a reply and should time out.
// For all other requests, a reply is scheduled to arrive
// within the timeout window.
- p := &pending{
+ p := &replyMatcher{
ptype: byte(rand.Intn(255)),
- callback: func(interface{}) bool { return true },
+ callback: func(interface{}) (bool, bool) { return true, true },
}
binary.BigEndian.PutUint64(p.from[:], uint64(i))
if p.ptype <= 128 {
p.errc = timeoutErr
- test.udp.addpending <- p
+ test.udp.addReplyMatcher <- p
nTimeouts++
} else {
p.errc = nilErr
- test.udp.addpending <- p
+ test.udp.addReplyMatcher <- p
time.AfterFunc(randomDuration(60*time.Millisecond), func() {
- if !test.udp.handleReply(p.from, p.ptype, nil) {
+ if !test.udp.handleReply(p.from, p.ip, p.ptype, nil) {
t.Logf("not matched: %v", p)
}
})
@@ -220,7 +232,7 @@ func TestUDP_responseTimeouts(t *testing.T) {
func TestUDP_findnodeTimeout(t *testing.T) {
t.Parallel()
test := newUDPTest(t)
- defer test.table.Close()
+ defer test.close()
toaddr := &net.UDPAddr{IP: net.ParseIP("1.2.3.4"), Port: 2222}
toid := enode.ID{1, 2, 3, 4}
@@ -236,50 +248,65 @@ func TestUDP_findnodeTimeout(t *testing.T) {
func TestUDP_findnode(t *testing.T) {
test := newUDPTest(t)
- defer test.table.Close()
+ defer test.close()
// put a few nodes into the table. their exact
// distribution shouldn't matter much, although we need to
// take care not to overflow any bucket.
nodes := &nodesByDistance{target: testTarget.id()}
- for i := 0; i < bucketSize; i++ {
+ live := make(map[enode.ID]bool)
+ numCandidates := 2 * bucketSize
+ for i := 0; i < numCandidates; i++ {
key := newkey()
- n := wrapNode(enode.NewV4(&key.PublicKey, net.IP{10, 13, 0, 1}, 0, i))
- nodes.push(n, bucketSize)
+ ip := net.IP{10, 13, 0, byte(i)}
+ n := wrapNode(enode.NewV4(&key.PublicKey, ip, 0, 2000))
+ // Ensure half of table content isn't verified live yet.
+ if i > numCandidates/2 {
+ n.livenessChecks = 1
+ live[n.ID()] = true
+ }
+ nodes.push(n, numCandidates)
}
- test.table.stuff(nodes.entries)
+ fillTable(test.table, nodes.entries)
// ensure there's a bond with the test node,
// findnode won't be accepted otherwise.
remoteID := encodePubkey(&test.remotekey.PublicKey).id()
- test.table.db.UpdateLastPongReceived(remoteID, time.Now())
+ test.table.db.UpdateLastPongReceived(remoteID, test.remoteaddr.IP, time.Now())
// check that closest neighbors are returned.
- test.packetIn(nil, findnodePacket, &findnode{Target: testTarget, Expiration: futureExp})
expected := test.table.closest(testTarget.id(), bucketSize)
-
+ test.packetIn(nil, findnodePacket, &findnode{Target: testTarget, Expiration: futureExp})
waitNeighbors := func(want []*node) {
test.waitPacketOut(func(p *neighbors) {
if len(p.Nodes) != len(want) {
t.Errorf("wrong number of results: got %d, want %d", len(p.Nodes), bucketSize)
}
- for i := range p.Nodes {
- if p.Nodes[i].ID.id() != want[i].ID() {
- t.Errorf("result mismatch at %d:\n got: %v\n want: %v", i, p.Nodes[i], expected.entries[i])
+ for i, n := range p.Nodes {
+ if n.ID.id() != want[i].ID() {
+ t.Errorf("result mismatch at %d:\n got: %v\n want: %v", i, n, expected.entries[i])
+ }
+ if !live[n.ID.id()] {
+ t.Errorf("result includes dead node %v", n.ID.id())
}
}
})
}
- waitNeighbors(expected.entries[:maxNeighbors])
- waitNeighbors(expected.entries[maxNeighbors:])
+ // Receive replies.
+ want := expected.entries
+ if len(want) > maxNeighbors {
+ waitNeighbors(want[:maxNeighbors])
+ want = want[maxNeighbors:]
+ }
+ waitNeighbors(want)
}
func TestUDP_findnodeMultiReply(t *testing.T) {
test := newUDPTest(t)
- defer test.table.Close()
+ defer test.close()
rid := enode.PubkeyToIDV4(&test.remotekey.PublicKey)
- test.table.db.UpdateLastPingReceived(rid, time.Now())
+ test.table.db.UpdateLastPingReceived(rid, test.remoteaddr.IP, time.Now())
// queue a pending findnode request
resultc, errc := make(chan []*node), make(chan error)
@@ -329,11 +356,40 @@ func TestUDP_findnodeMultiReply(t *testing.T) {
}
}
+func TestUDP_pingMatch(t *testing.T) {
+ test := newUDPTest(t)
+ defer test.close()
+
+ randToken := make([]byte, 32)
+ crand.Read(randToken)
+
+ test.packetIn(nil, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp})
+ test.waitPacketOut(func(*pong) error { return nil })
+ test.waitPacketOut(func(*ping) error { return nil })
+ test.packetIn(errUnsolicitedReply, pongPacket, &pong{ReplyTok: randToken, To: testLocalAnnounced, Expiration: futureExp})
+}
+
+func TestUDP_pingMatchIP(t *testing.T) {
+ test := newUDPTest(t)
+ defer test.close()
+
+ test.packetIn(nil, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp})
+ test.waitPacketOut(func(*pong) error { return nil })
+
+ _, hash, _ := test.waitPacketOut(func(*ping) error { return nil })
+ wrongAddr := &net.UDPAddr{IP: net.IP{33, 44, 1, 2}, Port: 30000}
+ test.packetInFrom(errUnsolicitedReply, test.remotekey, wrongAddr, pongPacket, &pong{
+ ReplyTok: hash,
+ To: testLocalAnnounced,
+ Expiration: futureExp,
+ })
+}
+
func TestUDP_successfulPing(t *testing.T) {
test := newUDPTest(t)
added := make(chan *node, 1)
test.table.nodeAddedHook = func(n *node) { added <- n }
- defer test.table.Close()
+ defer test.close()
// The remote side sends a ping packet to initiate the exchange.
go test.packetIn(nil, pingPacket, &ping{From: testRemote, To: testLocalAnnounced, Version: 4, Expiration: futureExp})
@@ -356,7 +412,7 @@ func TestUDP_successfulPing(t *testing.T) {
})
// remote is unknown, the table pings back.
- hash, _ := test.waitPacketOut(func(p *ping) error {
+ _, hash, _ := test.waitPacketOut(func(p *ping) error {
if !reflect.DeepEqual(p.From, test.udp.ourEndpoint()) {
t.Errorf("got ping.From %#v, want %#v", p.From, test.udp.ourEndpoint())
}
@@ -510,7 +566,12 @@ type dgramPipe struct {
cond *sync.Cond
closing chan struct{}
closed bool
- queue [][]byte
+ queue []dgram
+}
+
+type dgram struct {
+ to net.UDPAddr
+ data []byte
}
func newpipe() *dgramPipe {
@@ -531,7 +592,7 @@ func (c *dgramPipe) WriteToUDP(b []byte, to *net.UDPAddr) (n int, err error) {
if c.closed {
return 0, errors.New("closed")
}
- c.queue = append(c.queue, msg)
+ c.queue = append(c.queue, dgram{*to, b})
c.cond.Signal()
return len(b), nil
}
@@ -556,7 +617,7 @@ func (c *dgramPipe) LocalAddr() net.Addr {
return &net.UDPAddr{IP: testLocal.IP, Port: int(testLocal.UDP)}
}
-func (c *dgramPipe) waitPacketOut() []byte {
+func (c *dgramPipe) waitPacketOut() dgram {
c.mu.Lock()
defer c.mu.Unlock()
for len(c.queue) == 0 {
diff --git a/p2p/enode/nodedb.go b/p2p/enode/nodedb.go
index 7ee0c09a9..9353b155c 100644
--- a/p2p/enode/nodedb.go
+++ b/p2p/enode/nodedb.go
@@ -21,11 +21,11 @@ import (
"crypto/rand"
"encoding/binary"
"fmt"
+ "net"
"os"
"sync"
"time"
- "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
@@ -37,24 +37,31 @@ import (
// Keys in the node database.
const (
- dbVersionKey = "version" // Version of the database to flush if changes
- dbItemPrefix = "n:" // Identifier to prefix node entries with
-
- dbDiscoverRoot = ":discover"
- dbDiscoverSeq = dbDiscoverRoot + ":seq"
- dbDiscoverPing = dbDiscoverRoot + ":lastping"
- dbDiscoverPong = dbDiscoverRoot + ":lastpong"
- dbDiscoverFindFails = dbDiscoverRoot + ":findfail"
- dbLocalRoot = ":local"
- dbLocalSeq = dbLocalRoot + ":seq"
+ dbVersionKey = "version" // Version of the database to flush if changes
+ dbNodePrefix = "n:" // Identifier to prefix node entries with
+ dbLocalPrefix = "local:"
+ dbDiscoverRoot = "v4"
+
+ // These fields are stored per ID and IP, the full key is "n:<ID>:v4:<IP>:findfail".
+ // Use nodeItemKey to create those keys.
+ dbNodeFindFails = "findfail"
+ dbNodePing = "lastping"
+ dbNodePong = "lastpong"
+ dbNodeSeq = "seq"
+
+ // Local information is keyed by ID only, the full key is "local:<ID>:seq".
+ // Use localItemKey to create those keys.
+ dbLocalSeq = "seq"
)
-var (
+const (
dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
dbCleanupCycle = time.Hour // Time period for running the expiration task.
- dbVersion = 7
+ dbVersion = 8
)
+var zeroIP = make(net.IP, 16)
+
// DB is the node database, storing previously seen nodes and any collected metadata about
// them for QoS purposes.
type DB struct {
@@ -119,27 +126,58 @@ func newPersistentDB(path string) (*DB, error) {
return &DB{lvl: db, quit: make(chan struct{})}, nil
}
-// makeKey generates the leveldb key-blob from a node id and its particular
-// field of interest.
-func makeKey(id ID, field string) []byte {
- if (id == ID{}) {
- return []byte(field)
- }
- return append([]byte(dbItemPrefix), append(id[:], field...)...)
+// nodeKey returns the database key for a node record.
+func nodeKey(id ID) []byte {
+ key := append([]byte(dbNodePrefix), id[:]...)
+ key = append(key, ':')
+ key = append(key, dbDiscoverRoot...)
+ return key
}
-// splitKey tries to split a database key into a node id and a field part.
-func splitKey(key []byte) (id ID, field string) {
- // If the key is not of a node, return it plainly
- if !bytes.HasPrefix(key, []byte(dbItemPrefix)) {
- return ID{}, string(key)
+// splitNodeKey returns the node ID of a key created by nodeKey.
+func splitNodeKey(key []byte) (id ID, rest []byte) {
+ if !bytes.HasPrefix(key, []byte(dbNodePrefix)) {
+ return ID{}, nil
}
- // Otherwise split the id and field
- item := key[len(dbItemPrefix):]
+ item := key[len(dbNodePrefix):]
copy(id[:], item[:len(id)])
- field = string(item[len(id):])
+ return id, item[len(id)+1:]
+}
- return id, field
+// nodeItemKey returns the database key for a node metadata field.
+func nodeItemKey(id ID, ip net.IP, field string) []byte {
+ ip16 := ip.To16()
+ if ip16 == nil {
+ panic(fmt.Errorf("invalid IP (length %d)", len(ip)))
+ }
+ return bytes.Join([][]byte{nodeKey(id), ip16, []byte(field)}, []byte{':'})
+}
+
+// splitNodeItemKey returns the components of a key created by nodeItemKey.
+func splitNodeItemKey(key []byte) (id ID, ip net.IP, field string) {
+ id, key = splitNodeKey(key)
+ // Skip discover root.
+ if string(key) == dbDiscoverRoot {
+ return id, nil, ""
+ }
+ key = key[len(dbDiscoverRoot)+1:]
+ // Split out the IP.
+ ip = net.IP(key[:16])
+ if ip4 := ip.To4(); ip4 != nil {
+ ip = ip4
+ }
+ key = key[16+1:]
+ // Field is the remainder of key.
+ field = string(key)
+ return id, ip, field
+}
+
+// localItemKey returns the key of a local node item.
+func localItemKey(id ID, field string) []byte {
+ key := append([]byte(dbLocalPrefix), id[:]...)
+ key = append(key, ':')
+ key = append(key, field...)
+ return key
}
// fetchInt64 retrieves an integer associated with a particular key.
@@ -181,7 +219,7 @@ func (db *DB) storeUint64(key []byte, n uint64) error {
// Node retrieves a node with a given id from the database.
func (db *DB) Node(id ID) *Node {
- blob, err := db.lvl.Get(makeKey(id, dbDiscoverRoot), nil)
+ blob, err := db.lvl.Get(nodeKey(id), nil)
if err != nil {
return nil
}
@@ -207,15 +245,15 @@ func (db *DB) UpdateNode(node *Node) error {
if err != nil {
return err
}
- if err := db.lvl.Put(makeKey(node.ID(), dbDiscoverRoot), blob, nil); err != nil {
+ if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil {
return err
}
- return db.storeUint64(makeKey(node.ID(), dbDiscoverSeq), node.Seq())
+ return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq())
}
// NodeSeq returns the stored record sequence number of the given node.
func (db *DB) NodeSeq(id ID) uint64 {
- return db.fetchUint64(makeKey(id, dbDiscoverSeq))
+ return db.fetchUint64(nodeItemKey(id, zeroIP, dbNodeSeq))
}
// Resolve returns the stored record of the node if it has a larger sequence
@@ -227,15 +265,17 @@ func (db *DB) Resolve(n *Node) *Node {
return db.Node(n.ID())
}
-// DeleteNode deletes all information/keys associated with a node.
-func (db *DB) DeleteNode(id ID) error {
- deleter := db.lvl.NewIterator(util.BytesPrefix(makeKey(id, "")), nil)
- for deleter.Next() {
- if err := db.lvl.Delete(deleter.Key(), nil); err != nil {
- return err
- }
+// DeleteNode deletes all information associated with a node.
+func (db *DB) DeleteNode(id ID) {
+ deleteRange(db.lvl, nodeKey(id))
+}
+
+func deleteRange(db *leveldb.DB, prefix []byte) {
+ it := db.NewIterator(util.BytesPrefix(prefix), nil)
+ defer it.Release()
+ for it.Next() {
+ db.Delete(it.Key(), nil)
}
- return nil
}
// ensureExpirer is a small helper method ensuring that the data expiration
@@ -259,9 +299,7 @@ func (db *DB) expirer() {
for {
select {
case <-tick.C:
- if err := db.expireNodes(); err != nil {
- log.Error("Failed to expire nodedb items", "err", err)
- }
+ db.expireNodes()
case <-db.quit:
return
}
@@ -269,71 +307,85 @@ func (db *DB) expirer() {
}
// expireNodes iterates over the database and deletes all nodes that have not
-// been seen (i.e. received a pong from) for some allotted time.
-func (db *DB) expireNodes() error {
- threshold := time.Now().Add(-dbNodeExpiration)
-
- // Find discovered nodes that are older than the allowance
- it := db.lvl.NewIterator(nil, nil)
+// been seen (i.e. received a pong from) for some time.
+func (db *DB) expireNodes() {
+ it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil)
defer it.Release()
+ if !it.Next() {
+ return
+ }
- for it.Next() {
- // Skip the item if not a discovery node
- id, field := splitKey(it.Key())
- if field != dbDiscoverRoot {
- continue
+ var (
+ threshold = time.Now().Add(-dbNodeExpiration).Unix()
+ youngestPong int64
+ atEnd = false
+ )
+ for !atEnd {
+ id, ip, field := splitNodeItemKey(it.Key())
+ if field == dbNodePong {
+ time, _ := binary.Varint(it.Value())
+ if time > youngestPong {
+ youngestPong = time
+ }
+ if time < threshold {
+ // Last pong from this IP older than threshold, remove fields belonging to it.
+ deleteRange(db.lvl, nodeItemKey(id, ip, ""))
+ }
}
- // Skip the node if not expired yet (and not self)
- if seen := db.LastPongReceived(id); seen.After(threshold) {
- continue
+ atEnd = !it.Next()
+ nextID, _ := splitNodeKey(it.Key())
+ if atEnd || nextID != id {
+ // We've moved beyond the last entry of the current ID.
+ // Remove everything if there was no recent enough pong.
+ if youngestPong > 0 && youngestPong < threshold {
+ deleteRange(db.lvl, nodeKey(id))
+ }
+ youngestPong = 0
}
- // Otherwise delete all associated information
- db.DeleteNode(id)
}
- return nil
}
// LastPingReceived retrieves the time of the last ping packet received from
// a remote node.
-func (db *DB) LastPingReceived(id ID) time.Time {
- return time.Unix(db.fetchInt64(makeKey(id, dbDiscoverPing)), 0)
+func (db *DB) LastPingReceived(id ID, ip net.IP) time.Time {
+ return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePing)), 0)
}
// UpdateLastPingReceived updates the last time we tried contacting a remote node.
-func (db *DB) UpdateLastPingReceived(id ID, instance time.Time) error {
- return db.storeInt64(makeKey(id, dbDiscoverPing), instance.Unix())
+func (db *DB) UpdateLastPingReceived(id ID, ip net.IP, instance time.Time) error {
+ return db.storeInt64(nodeItemKey(id, ip, dbNodePing), instance.Unix())
}
// LastPongReceived retrieves the time of the last successful pong from remote node.
-func (db *DB) LastPongReceived(id ID) time.Time {
+func (db *DB) LastPongReceived(id ID, ip net.IP) time.Time {
// Launch expirer
db.ensureExpirer()
- return time.Unix(db.fetchInt64(makeKey(id, dbDiscoverPong)), 0)
+ return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePong)), 0)
}
// UpdateLastPongReceived updates the last pong time of a node.
-func (db *DB) UpdateLastPongReceived(id ID, instance time.Time) error {
- return db.storeInt64(makeKey(id, dbDiscoverPong), instance.Unix())
+func (db *DB) UpdateLastPongReceived(id ID, ip net.IP, instance time.Time) error {
+ return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix())
}
// FindFails retrieves the number of findnode failures since bonding.
-func (db *DB) FindFails(id ID) int {
- return int(db.fetchInt64(makeKey(id, dbDiscoverFindFails)))
+func (db *DB) FindFails(id ID, ip net.IP) int {
+ return int(db.fetchInt64(nodeItemKey(id, ip, dbNodeFindFails)))
}
// UpdateFindFails updates the number of findnode failures since bonding.
-func (db *DB) UpdateFindFails(id ID, fails int) error {
- return db.storeInt64(makeKey(id, dbDiscoverFindFails), int64(fails))
+func (db *DB) UpdateFindFails(id ID, ip net.IP, fails int) error {
+ return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails))
}
// LocalSeq retrieves the local record sequence counter.
func (db *DB) localSeq(id ID) uint64 {
- return db.fetchUint64(makeKey(id, dbLocalSeq))
+ return db.fetchUint64(nodeItemKey(id, zeroIP, dbLocalSeq))
}
// storeLocalSeq stores the local record sequence counter.
func (db *DB) storeLocalSeq(id ID, n uint64) {
- db.storeUint64(makeKey(id, dbLocalSeq), n)
+ db.storeUint64(nodeItemKey(id, zeroIP, dbLocalSeq), n)
}
// QuerySeeds retrieves random nodes to be used as potential seed nodes
@@ -355,14 +407,14 @@ seek:
ctr := id[0]
rand.Read(id[:])
id[0] = ctr + id[0]%16
- it.Seek(makeKey(id, dbDiscoverRoot))
+ it.Seek(nodeKey(id))
n := nextNode(it)
if n == nil {
id[0] = 0
continue seek // iterator exhausted
}
- if now.Sub(db.LastPongReceived(n.ID())) > maxAge {
+ if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge {
continue seek
}
for i := range nodes {
@@ -379,8 +431,8 @@ seek:
// database entries.
func nextNode(it iterator.Iterator) *Node {
for end := false; !end; end = !it.Next() {
- id, field := splitKey(it.Key())
- if field != dbDiscoverRoot {
+ id, rest := splitNodeKey(it.Key())
+ if string(rest) != dbDiscoverRoot {
continue
}
return mustDecodeNode(id[:], it.Value())
diff --git a/p2p/enode/nodedb_test.go b/p2p/enode/nodedb_test.go
index 96794827c..341b61a28 100644
--- a/p2p/enode/nodedb_test.go
+++ b/p2p/enode/nodedb_test.go
@@ -28,42 +28,54 @@ import (
"time"
)
-var nodeDBKeyTests = []struct {
- id ID
- field string
- key []byte
-}{
- {
- id: ID{},
- field: "version",
- key: []byte{0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e}, // field
- },
- {
- id: HexID("51232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"),
- field: ":discover",
- key: []byte{
- 0x6e, 0x3a, // prefix
- 0x51, 0x23, 0x2b, 0x8d, 0x78, 0x21, 0x61, 0x7d, // node id
- 0x2b, 0x29, 0xb5, 0x4b, 0x81, 0xcd, 0xef, 0xb9, //
- 0xb3, 0xe9, 0xc3, 0x7d, 0x7f, 0xd5, 0xf6, 0x32, //
- 0x70, 0xbc, 0xc9, 0xe1, 0xa6, 0xf6, 0xa4, 0x39, //
- 0x3a, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, // field
- },
- },
+var keytestID = HexID("51232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439")
+
+func TestDBNodeKey(t *testing.T) {
+ enc := nodeKey(keytestID)
+ want := []byte{
+ 'n', ':',
+ 0x51, 0x23, 0x2b, 0x8d, 0x78, 0x21, 0x61, 0x7d, // node id
+ 0x2b, 0x29, 0xb5, 0x4b, 0x81, 0xcd, 0xef, 0xb9, //
+ 0xb3, 0xe9, 0xc3, 0x7d, 0x7f, 0xd5, 0xf6, 0x32, //
+ 0x70, 0xbc, 0xc9, 0xe1, 0xa6, 0xf6, 0xa4, 0x39, //
+ ':', 'v', '4',
+ }
+ if !bytes.Equal(enc, want) {
+ t.Errorf("wrong encoded key:\ngot %q\nwant %q", enc, want)
+ }
+ id, _ := splitNodeKey(enc)
+ if id != keytestID {
+ t.Errorf("wrong ID from splitNodeKey")
+ }
}
-func TestDBKeys(t *testing.T) {
- for i, tt := range nodeDBKeyTests {
- if key := makeKey(tt.id, tt.field); !bytes.Equal(key, tt.key) {
- t.Errorf("make test %d: key mismatch: have 0x%x, want 0x%x", i, key, tt.key)
- }
- id, field := splitKey(tt.key)
- if !bytes.Equal(id[:], tt.id[:]) {
- t.Errorf("split test %d: id mismatch: have 0x%x, want 0x%x", i, id, tt.id)
- }
- if field != tt.field {
- t.Errorf("split test %d: field mismatch: have 0x%x, want 0x%x", i, field, tt.field)
- }
+func TestDBNodeItemKey(t *testing.T) {
+ wantIP := net.IP{127, 0, 0, 3}
+ wantField := "foobar"
+ enc := nodeItemKey(keytestID, wantIP, wantField)
+ want := []byte{
+ 'n', ':',
+ 0x51, 0x23, 0x2b, 0x8d, 0x78, 0x21, 0x61, 0x7d, // node id
+ 0x2b, 0x29, 0xb5, 0x4b, 0x81, 0xcd, 0xef, 0xb9, //
+ 0xb3, 0xe9, 0xc3, 0x7d, 0x7f, 0xd5, 0xf6, 0x32, //
+ 0x70, 0xbc, 0xc9, 0xe1, 0xa6, 0xf6, 0xa4, 0x39, //
+ ':', 'v', '4', ':',
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // IP
+ 0x00, 0x00, 0xff, 0xff, 0x7f, 0x00, 0x00, 0x03, //
+ ':', 'f', 'o', 'o', 'b', 'a', 'r',
+ }
+ if !bytes.Equal(enc, want) {
+ t.Errorf("wrong encoded key:\ngot %q\nwant %q", enc, want)
+ }
+ id, ip, field := splitNodeItemKey(enc)
+ if id != keytestID {
+ t.Errorf("splitNodeItemKey returned wrong ID: %v", id)
+ }
+ if !bytes.Equal(ip, wantIP) {
+ t.Errorf("splitNodeItemKey returned wrong IP: %v", ip)
+ }
+ if field != wantField {
+ t.Errorf("splitNodeItemKey returned wrong field: %q", field)
}
}
@@ -113,33 +125,33 @@ func TestDBFetchStore(t *testing.T) {
defer db.Close()
// Check fetch/store operations on a node ping object
- if stored := db.LastPingReceived(node.ID()); stored.Unix() != 0 {
+ if stored := db.LastPingReceived(node.ID(), node.IP()); stored.Unix() != 0 {
t.Errorf("ping: non-existing object: %v", stored)
}
- if err := db.UpdateLastPingReceived(node.ID(), inst); err != nil {
+ if err := db.UpdateLastPingReceived(node.ID(), node.IP(), inst); err != nil {
t.Errorf("ping: failed to update: %v", err)
}
- if stored := db.LastPingReceived(node.ID()); stored.Unix() != inst.Unix() {
+ if stored := db.LastPingReceived(node.ID(), node.IP()); stored.Unix() != inst.Unix() {
t.Errorf("ping: value mismatch: have %v, want %v", stored, inst)
}
// Check fetch/store operations on a node pong object
- if stored := db.LastPongReceived(node.ID()); stored.Unix() != 0 {
+ if stored := db.LastPongReceived(node.ID(), node.IP()); stored.Unix() != 0 {
t.Errorf("pong: non-existing object: %v", stored)
}
- if err := db.UpdateLastPongReceived(node.ID(), inst); err != nil {
+ if err := db.UpdateLastPongReceived(node.ID(), node.IP(), inst); err != nil {
t.Errorf("pong: failed to update: %v", err)
}
- if stored := db.LastPongReceived(node.ID()); stored.Unix() != inst.Unix() {
+ if stored := db.LastPongReceived(node.ID(), node.IP()); stored.Unix() != inst.Unix() {
t.Errorf("pong: value mismatch: have %v, want %v", stored, inst)
}
// Check fetch/store operations on a node findnode-failure object
- if stored := db.FindFails(node.ID()); stored != 0 {
+ if stored := db.FindFails(node.ID(), node.IP()); stored != 0 {
t.Errorf("find-node fails: non-existing object: %v", stored)
}
- if err := db.UpdateFindFails(node.ID(), num); err != nil {
+ if err := db.UpdateFindFails(node.ID(), node.IP(), num); err != nil {
t.Errorf("find-node fails: failed to update: %v", err)
}
- if stored := db.FindFails(node.ID()); stored != num {
+ if stored := db.FindFails(node.ID(), node.IP()); stored != num {
t.Errorf("find-node fails: value mismatch: have %v, want %v", stored, num)
}
// Check fetch/store operations on an actual node object
@@ -256,7 +268,7 @@ func testSeedQuery() error {
if err := db.UpdateNode(seed.node); err != nil {
return fmt.Errorf("node %d: failed to insert: %v", i, err)
}
- if err := db.UpdateLastPongReceived(seed.node.ID(), seed.pong); err != nil {
+ if err := db.UpdateLastPongReceived(seed.node.ID(), seed.node.IP(), seed.pong); err != nil {
return fmt.Errorf("node %d: failed to insert bondTime: %v", i, err)
}
}
@@ -321,10 +333,12 @@ func TestDBPersistency(t *testing.T) {
}
var nodeDBExpirationNodes = []struct {
- node *Node
- pong time.Time
- exp bool
+ node *Node
+ pong time.Time
+ storeNode bool
+ exp bool
}{
+ // Node has new enough pong time and isn't expired:
{
node: NewV4(
hexPubkey("8d110e2ed4b446d9b5fb50f117e5f37fb7597af455e1dab0e6f045a6eeaa786a6781141659020d38bdc5e698ed3d4d2bafa8b5061810dfa63e8ac038db2e9b67"),
@@ -332,17 +346,79 @@ var nodeDBExpirationNodes = []struct {
30303,
30303,
),
- pong: time.Now().Add(-dbNodeExpiration + time.Minute),
- exp: false,
- }, {
+ storeNode: true,
+ pong: time.Now().Add(-dbNodeExpiration + time.Minute),
+ exp: false,
+ },
+ // Node with pong time before expiration is removed:
+ {
node: NewV4(
hexPubkey("913a205579c32425b220dfba999d215066e5bdbf900226b11da1907eae5e93eb40616d47412cf819664e9eacbdfcca6b0c6e07e09847a38472d4be46ab0c3672"),
net.IP{127, 0, 0, 2},
30303,
30303,
),
- pong: time.Now().Add(-dbNodeExpiration - time.Minute),
- exp: true,
+ storeNode: true,
+ pong: time.Now().Add(-dbNodeExpiration - time.Minute),
+ exp: true,
+ },
+ // Just pong time, no node stored:
+ {
+ node: NewV4(
+ hexPubkey("b56670e0b6bad2c5dab9f9fe6f061a16cf78d68b6ae2cfda3144262d08d97ce5f46fd8799b6d1f709b1abe718f2863e224488bd7518e5e3b43809ac9bd1138ca"),
+ net.IP{127, 0, 0, 3},
+ 30303,
+ 30303,
+ ),
+ storeNode: false,
+ pong: time.Now().Add(-dbNodeExpiration - time.Minute),
+ exp: true,
+ },
+ // Node with multiple pong times, all older than expiration.
+ {
+ node: NewV4(
+ hexPubkey("29f619cebfd32c9eab34aec797ed5e3fe15b9b45be95b4df3f5fe6a9ae892f433eb08d7698b2ef3621568b0fb70d57b515ab30d4e72583b798298e0f0a66b9d1"),
+ net.IP{127, 0, 0, 4},
+ 30303,
+ 30303,
+ ),
+ storeNode: true,
+ pong: time.Now().Add(-dbNodeExpiration - time.Minute),
+ exp: true,
+ },
+ {
+ node: NewV4(
+ hexPubkey("29f619cebfd32c9eab34aec797ed5e3fe15b9b45be95b4df3f5fe6a9ae892f433eb08d7698b2ef3621568b0fb70d57b515ab30d4e72583b798298e0f0a66b9d1"),
+ net.IP{127, 0, 0, 5},
+ 30303,
+ 30303,
+ ),
+ storeNode: false,
+ pong: time.Now().Add(-dbNodeExpiration - 2*time.Minute),
+ exp: true,
+ },
+ // Node with multiple pong times, one newer, one older than expiration.
+ {
+ node: NewV4(
+ hexPubkey("3b73a9e5f4af6c4701c57c73cc8cfa0f4802840b24c11eba92aac3aef65644a3728b4b2aec8199f6d72bd66be2c65861c773129039bd47daa091ca90a6d4c857"),
+ net.IP{127, 0, 0, 6},
+ 30303,
+ 30303,
+ ),
+ storeNode: true,
+ pong: time.Now().Add(-dbNodeExpiration + time.Minute),
+ exp: false,
+ },
+ {
+ node: NewV4(
+ hexPubkey("3b73a9e5f4af6c4701c57c73cc8cfa0f4802840b24c11eba92aac3aef65644a3728b4b2aec8199f6d72bd66be2c65861c773129039bd47daa091ca90a6d4c857"),
+ net.IP{127, 0, 0, 7},
+ 30303,
+ 30303,
+ ),
+ storeNode: false,
+ pong: time.Now().Add(-dbNodeExpiration - time.Minute),
+ exp: true,
},
}
@@ -350,23 +426,39 @@ func TestDBExpiration(t *testing.T) {
db, _ := OpenDB("")
defer db.Close()
- // Add all the test nodes and set their last pong time
+ // Add all the test nodes and set their last pong time.
for i, seed := range nodeDBExpirationNodes {
- if err := db.UpdateNode(seed.node); err != nil {
- t.Fatalf("node %d: failed to insert: %v", i, err)
+ if seed.storeNode {
+ if err := db.UpdateNode(seed.node); err != nil {
+ t.Fatalf("node %d: failed to insert: %v", i, err)
+ }
}
- if err := db.UpdateLastPongReceived(seed.node.ID(), seed.pong); err != nil {
+ if err := db.UpdateLastPongReceived(seed.node.ID(), seed.node.IP(), seed.pong); err != nil {
t.Fatalf("node %d: failed to update bondTime: %v", i, err)
}
}
- // Expire some of them, and check the rest
- if err := db.expireNodes(); err != nil {
- t.Fatalf("failed to expire nodes: %v", err)
- }
+
+ db.expireNodes()
+
+ // Check that expired entries have been removed.
+ unixZeroTime := time.Unix(0, 0)
for i, seed := range nodeDBExpirationNodes {
node := db.Node(seed.node.ID())
- if (node == nil && !seed.exp) || (node != nil && seed.exp) {
- t.Errorf("node %d: expiration mismatch: have %v, want %v", i, node, seed.exp)
+ pong := db.LastPongReceived(seed.node.ID(), seed.node.IP())
+ if seed.exp {
+ if seed.storeNode && node != nil {
+ t.Errorf("node %d (%s) shouldn't be present after expiration", i, seed.node.ID().TerminalString())
+ }
+ if !pong.Equal(unixZeroTime) {
+ t.Errorf("pong time %d (%s %v) shouldn't be present after expiration", i, seed.node.ID().TerminalString(), seed.node.IP())
+ }
+ } else {
+ if seed.storeNode && node == nil {
+ t.Errorf("node %d (%s) should be present after expiration", i, seed.node.ID().TerminalString())
+ }
+ if !pong.Equal(seed.pong.Truncate(1 * time.Second)) {
+ t.Errorf("pong time %d (%s) should be %v after expiration, but is %v", i, seed.node.ID().TerminalString(), seed.pong, pong)
+ }
}
}
}