diff options
Diffstat (limited to 'p2p/enode/nodedb.go')
-rw-r--r-- | p2p/enode/nodedb.go | 210 |
1 files changed, 131 insertions, 79 deletions
diff --git a/p2p/enode/nodedb.go b/p2p/enode/nodedb.go index 7ee0c09a9..9353b155c 100644 --- a/p2p/enode/nodedb.go +++ b/p2p/enode/nodedb.go @@ -21,11 +21,11 @@ import ( "crypto/rand" "encoding/binary" "fmt" + "net" "os" "sync" "time" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/errors" @@ -37,24 +37,31 @@ import ( // Keys in the node database. const ( - dbVersionKey = "version" // Version of the database to flush if changes - dbItemPrefix = "n:" // Identifier to prefix node entries with - - dbDiscoverRoot = ":discover" - dbDiscoverSeq = dbDiscoverRoot + ":seq" - dbDiscoverPing = dbDiscoverRoot + ":lastping" - dbDiscoverPong = dbDiscoverRoot + ":lastpong" - dbDiscoverFindFails = dbDiscoverRoot + ":findfail" - dbLocalRoot = ":local" - dbLocalSeq = dbLocalRoot + ":seq" + dbVersionKey = "version" // Version of the database to flush if changes + dbNodePrefix = "n:" // Identifier to prefix node entries with + dbLocalPrefix = "local:" + dbDiscoverRoot = "v4" + + // These fields are stored per ID and IP, the full key is "n:<ID>:v4:<IP>:findfail". + // Use nodeItemKey to create those keys. + dbNodeFindFails = "findfail" + dbNodePing = "lastping" + dbNodePong = "lastpong" + dbNodeSeq = "seq" + + // Local information is keyed by ID only, the full key is "local:<ID>:seq". + // Use localItemKey to create those keys. + dbLocalSeq = "seq" ) -var ( +const ( dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped. dbCleanupCycle = time.Hour // Time period for running the expiration task. - dbVersion = 7 + dbVersion = 8 ) +var zeroIP = make(net.IP, 16) + // DB is the node database, storing previously seen nodes and any collected metadata about // them for QoS purposes. type DB struct { @@ -119,27 +126,58 @@ func newPersistentDB(path string) (*DB, error) { return &DB{lvl: db, quit: make(chan struct{})}, nil } -// makeKey generates the leveldb key-blob from a node id and its particular -// field of interest. -func makeKey(id ID, field string) []byte { - if (id == ID{}) { - return []byte(field) - } - return append([]byte(dbItemPrefix), append(id[:], field...)...) +// nodeKey returns the database key for a node record. +func nodeKey(id ID) []byte { + key := append([]byte(dbNodePrefix), id[:]...) + key = append(key, ':') + key = append(key, dbDiscoverRoot...) + return key } -// splitKey tries to split a database key into a node id and a field part. -func splitKey(key []byte) (id ID, field string) { - // If the key is not of a node, return it plainly - if !bytes.HasPrefix(key, []byte(dbItemPrefix)) { - return ID{}, string(key) +// splitNodeKey returns the node ID of a key created by nodeKey. +func splitNodeKey(key []byte) (id ID, rest []byte) { + if !bytes.HasPrefix(key, []byte(dbNodePrefix)) { + return ID{}, nil } - // Otherwise split the id and field - item := key[len(dbItemPrefix):] + item := key[len(dbNodePrefix):] copy(id[:], item[:len(id)]) - field = string(item[len(id):]) + return id, item[len(id)+1:] +} - return id, field +// nodeItemKey returns the database key for a node metadata field. +func nodeItemKey(id ID, ip net.IP, field string) []byte { + ip16 := ip.To16() + if ip16 == nil { + panic(fmt.Errorf("invalid IP (length %d)", len(ip))) + } + return bytes.Join([][]byte{nodeKey(id), ip16, []byte(field)}, []byte{':'}) +} + +// splitNodeItemKey returns the components of a key created by nodeItemKey. +func splitNodeItemKey(key []byte) (id ID, ip net.IP, field string) { + id, key = splitNodeKey(key) + // Skip discover root. + if string(key) == dbDiscoverRoot { + return id, nil, "" + } + key = key[len(dbDiscoverRoot)+1:] + // Split out the IP. + ip = net.IP(key[:16]) + if ip4 := ip.To4(); ip4 != nil { + ip = ip4 + } + key = key[16+1:] + // Field is the remainder of key. + field = string(key) + return id, ip, field +} + +// localItemKey returns the key of a local node item. +func localItemKey(id ID, field string) []byte { + key := append([]byte(dbLocalPrefix), id[:]...) + key = append(key, ':') + key = append(key, field...) + return key } // fetchInt64 retrieves an integer associated with a particular key. @@ -181,7 +219,7 @@ func (db *DB) storeUint64(key []byte, n uint64) error { // Node retrieves a node with a given id from the database. func (db *DB) Node(id ID) *Node { - blob, err := db.lvl.Get(makeKey(id, dbDiscoverRoot), nil) + blob, err := db.lvl.Get(nodeKey(id), nil) if err != nil { return nil } @@ -207,15 +245,15 @@ func (db *DB) UpdateNode(node *Node) error { if err != nil { return err } - if err := db.lvl.Put(makeKey(node.ID(), dbDiscoverRoot), blob, nil); err != nil { + if err := db.lvl.Put(nodeKey(node.ID()), blob, nil); err != nil { return err } - return db.storeUint64(makeKey(node.ID(), dbDiscoverSeq), node.Seq()) + return db.storeUint64(nodeItemKey(node.ID(), zeroIP, dbNodeSeq), node.Seq()) } // NodeSeq returns the stored record sequence number of the given node. func (db *DB) NodeSeq(id ID) uint64 { - return db.fetchUint64(makeKey(id, dbDiscoverSeq)) + return db.fetchUint64(nodeItemKey(id, zeroIP, dbNodeSeq)) } // Resolve returns the stored record of the node if it has a larger sequence @@ -227,15 +265,17 @@ func (db *DB) Resolve(n *Node) *Node { return db.Node(n.ID()) } -// DeleteNode deletes all information/keys associated with a node. -func (db *DB) DeleteNode(id ID) error { - deleter := db.lvl.NewIterator(util.BytesPrefix(makeKey(id, "")), nil) - for deleter.Next() { - if err := db.lvl.Delete(deleter.Key(), nil); err != nil { - return err - } +// DeleteNode deletes all information associated with a node. +func (db *DB) DeleteNode(id ID) { + deleteRange(db.lvl, nodeKey(id)) +} + +func deleteRange(db *leveldb.DB, prefix []byte) { + it := db.NewIterator(util.BytesPrefix(prefix), nil) + defer it.Release() + for it.Next() { + db.Delete(it.Key(), nil) } - return nil } // ensureExpirer is a small helper method ensuring that the data expiration @@ -259,9 +299,7 @@ func (db *DB) expirer() { for { select { case <-tick.C: - if err := db.expireNodes(); err != nil { - log.Error("Failed to expire nodedb items", "err", err) - } + db.expireNodes() case <-db.quit: return } @@ -269,71 +307,85 @@ func (db *DB) expirer() { } // expireNodes iterates over the database and deletes all nodes that have not -// been seen (i.e. received a pong from) for some allotted time. -func (db *DB) expireNodes() error { - threshold := time.Now().Add(-dbNodeExpiration) - - // Find discovered nodes that are older than the allowance - it := db.lvl.NewIterator(nil, nil) +// been seen (i.e. received a pong from) for some time. +func (db *DB) expireNodes() { + it := db.lvl.NewIterator(util.BytesPrefix([]byte(dbNodePrefix)), nil) defer it.Release() + if !it.Next() { + return + } - for it.Next() { - // Skip the item if not a discovery node - id, field := splitKey(it.Key()) - if field != dbDiscoverRoot { - continue + var ( + threshold = time.Now().Add(-dbNodeExpiration).Unix() + youngestPong int64 + atEnd = false + ) + for !atEnd { + id, ip, field := splitNodeItemKey(it.Key()) + if field == dbNodePong { + time, _ := binary.Varint(it.Value()) + if time > youngestPong { + youngestPong = time + } + if time < threshold { + // Last pong from this IP older than threshold, remove fields belonging to it. + deleteRange(db.lvl, nodeItemKey(id, ip, "")) + } } - // Skip the node if not expired yet (and not self) - if seen := db.LastPongReceived(id); seen.After(threshold) { - continue + atEnd = !it.Next() + nextID, _ := splitNodeKey(it.Key()) + if atEnd || nextID != id { + // We've moved beyond the last entry of the current ID. + // Remove everything if there was no recent enough pong. + if youngestPong > 0 && youngestPong < threshold { + deleteRange(db.lvl, nodeKey(id)) + } + youngestPong = 0 } - // Otherwise delete all associated information - db.DeleteNode(id) } - return nil } // LastPingReceived retrieves the time of the last ping packet received from // a remote node. -func (db *DB) LastPingReceived(id ID) time.Time { - return time.Unix(db.fetchInt64(makeKey(id, dbDiscoverPing)), 0) +func (db *DB) LastPingReceived(id ID, ip net.IP) time.Time { + return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePing)), 0) } // UpdateLastPingReceived updates the last time we tried contacting a remote node. -func (db *DB) UpdateLastPingReceived(id ID, instance time.Time) error { - return db.storeInt64(makeKey(id, dbDiscoverPing), instance.Unix()) +func (db *DB) UpdateLastPingReceived(id ID, ip net.IP, instance time.Time) error { + return db.storeInt64(nodeItemKey(id, ip, dbNodePing), instance.Unix()) } // LastPongReceived retrieves the time of the last successful pong from remote node. -func (db *DB) LastPongReceived(id ID) time.Time { +func (db *DB) LastPongReceived(id ID, ip net.IP) time.Time { // Launch expirer db.ensureExpirer() - return time.Unix(db.fetchInt64(makeKey(id, dbDiscoverPong)), 0) + return time.Unix(db.fetchInt64(nodeItemKey(id, ip, dbNodePong)), 0) } // UpdateLastPongReceived updates the last pong time of a node. -func (db *DB) UpdateLastPongReceived(id ID, instance time.Time) error { - return db.storeInt64(makeKey(id, dbDiscoverPong), instance.Unix()) +func (db *DB) UpdateLastPongReceived(id ID, ip net.IP, instance time.Time) error { + return db.storeInt64(nodeItemKey(id, ip, dbNodePong), instance.Unix()) } // FindFails retrieves the number of findnode failures since bonding. -func (db *DB) FindFails(id ID) int { - return int(db.fetchInt64(makeKey(id, dbDiscoverFindFails))) +func (db *DB) FindFails(id ID, ip net.IP) int { + return int(db.fetchInt64(nodeItemKey(id, ip, dbNodeFindFails))) } // UpdateFindFails updates the number of findnode failures since bonding. -func (db *DB) UpdateFindFails(id ID, fails int) error { - return db.storeInt64(makeKey(id, dbDiscoverFindFails), int64(fails)) +func (db *DB) UpdateFindFails(id ID, ip net.IP, fails int) error { + return db.storeInt64(nodeItemKey(id, ip, dbNodeFindFails), int64(fails)) } // LocalSeq retrieves the local record sequence counter. func (db *DB) localSeq(id ID) uint64 { - return db.fetchUint64(makeKey(id, dbLocalSeq)) + return db.fetchUint64(nodeItemKey(id, zeroIP, dbLocalSeq)) } // storeLocalSeq stores the local record sequence counter. func (db *DB) storeLocalSeq(id ID, n uint64) { - db.storeUint64(makeKey(id, dbLocalSeq), n) + db.storeUint64(nodeItemKey(id, zeroIP, dbLocalSeq), n) } // QuerySeeds retrieves random nodes to be used as potential seed nodes @@ -355,14 +407,14 @@ seek: ctr := id[0] rand.Read(id[:]) id[0] = ctr + id[0]%16 - it.Seek(makeKey(id, dbDiscoverRoot)) + it.Seek(nodeKey(id)) n := nextNode(it) if n == nil { id[0] = 0 continue seek // iterator exhausted } - if now.Sub(db.LastPongReceived(n.ID())) > maxAge { + if now.Sub(db.LastPongReceived(n.ID(), n.IP())) > maxAge { continue seek } for i := range nodes { @@ -379,8 +431,8 @@ seek: // database entries. func nextNode(it iterator.Iterator) *Node { for end := false; !end; end = !it.Next() { - id, field := splitKey(it.Key()) - if field != dbDiscoverRoot { + id, rest := splitNodeKey(it.Key()) + if string(rest) != dbDiscoverRoot { continue } return mustDecodeNode(id[:], it.Value()) |