diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-04-24 23:04:41 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-04-24 23:04:41 +0800 |
commit | 8646365b42ddae30e596835b4512792ca11196a5 (patch) | |
tree | 03977b5bf677fa2a7e57953fba28c0aa21da6a88 /p2p/discover | |
parent | 6def110c37d4d43402c4b658ce6b291400f840e5 (diff) | |
download | go-tangerine-8646365b42ddae30e596835b4512792ca11196a5.tar.gz go-tangerine-8646365b42ddae30e596835b4512792ca11196a5.tar.zst go-tangerine-8646365b42ddae30e596835b4512792ca11196a5.zip |
cmd/bootnode, eth, p2p, p2p/discover: use a fancier db design
Diffstat (limited to 'p2p/discover')
-rw-r--r-- | p2p/discover/cache.go | 134 | ||||
-rw-r--r-- | p2p/discover/database.go | 233 | ||||
-rw-r--r-- | p2p/discover/table.go | 36 | ||||
-rw-r--r-- | p2p/discover/table_test.go | 6 | ||||
-rw-r--r-- | p2p/discover/udp.go | 10 | ||||
-rw-r--r-- | p2p/discover/udp_test.go | 10 |
6 files changed, 273 insertions, 156 deletions
diff --git a/p2p/discover/cache.go b/p2p/discover/cache.go deleted file mode 100644 index f6bab4591..000000000 --- a/p2p/discover/cache.go +++ /dev/null @@ -1,134 +0,0 @@ -// Contains the discovery cache, storing previously seen nodes to act as seed -// servers during bootstrapping the network. - -package discover - -import ( - "bytes" - "encoding/binary" - "net" - "os" - - "github.com/ethereum/go-ethereum/rlp" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/storage" -) - -// Cache stores all nodes we know about. -type Cache struct { - db *leveldb.DB -} - -// Cache version to allow dumping old data if it changes. -var cacheVersionKey = []byte("pv") - -// NewMemoryCache creates a new in-memory peer cache without a persistent backend. -func NewMemoryCache() (*Cache, error) { - db, err := leveldb.Open(storage.NewMemStorage(), nil) - if err != nil { - return nil, err - } - return &Cache{db: db}, nil -} - -// NewPersistentCache creates/opens a leveldb backed persistent peer cache, also -// flushing its contents in case of a version mismatch. -func NewPersistentCache(path string) (*Cache, error) { - // Try to open the cache, recovering any corruption - db, err := leveldb.OpenFile(path, nil) - if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted { - db, err = leveldb.RecoverFile(path, nil) - } - if err != nil { - return nil, err - } - // The nodes contained in the cache correspond to a certain protocol version. - // Flush all nodes if the version doesn't match. - currentVer := make([]byte, binary.MaxVarintLen64) - currentVer = currentVer[:binary.PutVarint(currentVer, Version)] - - blob, err := db.Get(cacheVersionKey, nil) - switch err { - case leveldb.ErrNotFound: - // Version not found (i.e. empty cache), insert it - err = db.Put(cacheVersionKey, currentVer, nil) - - case nil: - // Version present, flush if different - if !bytes.Equal(blob, currentVer) { - db.Close() - if err = os.RemoveAll(path); err != nil { - return nil, err - } - return NewPersistentCache(path) - } - } - // Clean up in case of an error - if err != nil { - db.Close() - return nil, err - } - return &Cache{db: db}, nil -} - -// get retrieves a node with a given id from the seed da -func (c *Cache) get(id NodeID) *Node { - blob, err := c.db.Get(id[:], nil) - if err != nil { - return nil - } - node := new(Node) - if err := rlp.DecodeBytes(blob, node); err != nil { - return nil - } - return node -} - -// list retrieves a batch of nodes from the database. -func (c *Cache) list(n int) []*Node { - it := c.db.NewIterator(nil, nil) - defer it.Release() - - nodes := make([]*Node, 0, n) - for i := 0; i < n && it.Next(); i++ { - var id NodeID - copy(id[:], it.Key()) - - if node := c.get(id); node != nil { - nodes = append(nodes, node) - } - } - return nodes -} - -// update inserts - potentially overwriting - a node in the seed database. -func (c *Cache) update(node *Node) error { - blob, err := rlp.EncodeToBytes(node) - if err != nil { - return err - } - return c.db.Put(node.ID[:], blob, nil) -} - -// add inserts a new node into the seed database. -func (c *Cache) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node { - node := &Node{ - ID: id, - IP: addr.IP, - DiscPort: addr.Port, - TCPPort: int(tcpPort), - } - c.update(node) - - return node -} - -// delete removes a node from the database. -func (c *Cache) delete(id NodeID) error { - return c.db.Delete(id[:], nil) -} - -// Close flushes and closes the database files. -func (c *Cache) Close() { - c.db.Close() -} diff --git a/p2p/discover/database.go b/p2p/discover/database.go new file mode 100644 index 000000000..93a2ded24 --- /dev/null +++ b/p2p/discover/database.go @@ -0,0 +1,233 @@ +// Contains the node database, storing previously seen nodes and any collected +// metadata about them for QoS purposes. + +package discover + +import ( + "bytes" + "encoding/binary" + "os" + "time" + + "github.com/ethereum/go-ethereum/rlp" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/storage" +) + +// nodeDB stores all nodes we know about. +type nodeDB struct { + lvl *leveldb.DB +} + +// Schema layout for the node database +var ( + nodeDBVersionKey = []byte("version") // Version of the database to flush if changes + nodeDBStartupKey = []byte("startup") // Time when the node discovery started (seed selection) + nodeDBItemPrefix = []byte("n:") // Identifier to prefix node entries with + + nodeDBDiscoverRoot = ":discover" + nodeDBDiscoverPing = nodeDBDiscoverRoot + ":lastping" + nodeDBDiscoverBond = nodeDBDiscoverRoot + ":lastbond" +) + +// newNodeDB creates a new node database for storing and retrieving infos about +// known peers in the network. If no path is given, an in-memory, temporary +// database is constructed. +func newNodeDB(path string) (*nodeDB, error) { + if path == "" { + return newMemoryNodeDB() + } + return newPersistentNodeDB(path) +} + +// newMemoryNodeDB creates a new in-memory node database without a persistent +// backend. +func newMemoryNodeDB() (*nodeDB, error) { + db, err := leveldb.Open(storage.NewMemStorage(), nil) + if err != nil { + return nil, err + } + return &nodeDB{lvl: db}, nil +} + +// newPersistentNodeDB creates/opens a leveldb backed persistent node database, +// also flushing its contents in case of a version mismatch. +func newPersistentNodeDB(path string) (*nodeDB, error) { + // Try to open the cache, recovering any corruption + db, err := leveldb.OpenFile(path, nil) + if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted { + db, err = leveldb.RecoverFile(path, nil) + } + if err != nil { + return nil, err + } + // The nodes contained in the cache correspond to a certain protocol version. + // Flush all nodes if the version doesn't match. + currentVer := make([]byte, binary.MaxVarintLen64) + currentVer = currentVer[:binary.PutVarint(currentVer, Version)] + + blob, err := db.Get(nodeDBVersionKey, nil) + switch err { + case leveldb.ErrNotFound: + // Version not found (i.e. empty cache), insert it + err = db.Put(nodeDBVersionKey, currentVer, nil) + + case nil: + // Version present, flush if different + if !bytes.Equal(blob, currentVer) { + db.Close() + if err = os.RemoveAll(path); err != nil { + return nil, err + } + return newPersistentNodeDB(path) + } + } + // Clean up in case of an error + if err != nil { + db.Close() + return nil, err + } + return &nodeDB{lvl: db}, nil +} + +// key generates the leveldb key-blob from a node id and its particular field of +// interest. +func (db *nodeDB) key(id NodeID, field string) []byte { + return append(nodeDBItemPrefix, append(id[:], field...)...) +} + +// splitKey tries to split a database key into a node id and a field part. +func (db *nodeDB) splitKey(key []byte) (id NodeID, field string) { + // If the key is not of a node, return it plainly + if !bytes.HasPrefix(key, nodeDBItemPrefix) { + return NodeID{}, string(key) + } + // Otherwise split the id and field + item := key[len(nodeDBItemPrefix):] + copy(id[:], item[:len(id)]) + field = string(item[len(id):]) + + return id, field +} + +// fetchTime retrieves a time instance (encoded as a unix timestamp) associated +// with a particular database key. +func (db *nodeDB) fetchTime(key []byte) time.Time { + blob, err := db.lvl.Get(key, nil) + if err != nil { + return time.Time{} + } + var unix int64 + if err := rlp.DecodeBytes(blob, &unix); err != nil { + return time.Time{} + } + return time.Unix(unix, 0) +} + +// storeTime update a specific database entry to the current time instance as a +// unix timestamp. +func (db *nodeDB) storeTime(key []byte, instance time.Time) error { + blob, err := rlp.EncodeToBytes(instance.Unix()) + if err != nil { + return err + } + return db.lvl.Put(key, blob, nil) +} + +// startup retrieves the time instance when the bootstrapping last begun. Its +// purpose is to prevent contacting potential seed nodes multiple times in the +// same boot cycle. +func (db *nodeDB) startup() time.Time { + return db.fetchTime(nodeDBStartupKey) +} + +// updateStartup updates the bootstrap initiation time to the one specified. +func (db *nodeDB) updateStartup(instance time.Time) error { + return db.storeTime(nodeDBStartupKey, instance) +} + +// node retrieves a node with a given id from the database. +func (db *nodeDB) node(id NodeID) *Node { + blob, err := db.lvl.Get(db.key(id, nodeDBDiscoverRoot), nil) + if err != nil { + return nil + } + node := new(Node) + if err := rlp.DecodeBytes(blob, node); err != nil { + return nil + } + return node +} + +// updateNode inserts - potentially overwriting - a node into the peer database. +func (db *nodeDB) updateNode(node *Node) error { + blob, err := rlp.EncodeToBytes(node) + if err != nil { + return err + } + return db.lvl.Put(db.key(node.ID, nodeDBDiscoverRoot), blob, nil) +} + +// lastPing retrieves the time of the last ping packet send to a remote node, +// requesting binding. +func (db *nodeDB) lastPing(id NodeID) time.Time { + return db.fetchTime(db.key(id, nodeDBDiscoverPing)) +} + +// updateLastPing updates the last time we tried contacting a remote node. +func (db *nodeDB) updateLastPing(id NodeID, instance time.Time) error { + return db.storeTime(db.key(id, nodeDBDiscoverPing), instance) +} + +// lastBond retrieves the time of the last successful bonding with a remote node. +func (db *nodeDB) lastBond(id NodeID) time.Time { + return db.fetchTime(db.key(id, nodeDBDiscoverBond)) +} + +// updateLastBond updates the last time we successfully bound to a remote node. +func (db *nodeDB) updateLastBond(id NodeID, instance time.Time) error { + return db.storeTime(db.key(id, nodeDBDiscoverBond), instance) +} + +// querySeeds retrieves a batch of nodes to be used as potential seed servers +// during bootstrapping the node into the network. +// +// Ideal seeds are the most recently seen nodes (highest probability to be still +// alive), but yet untried. However, since leveldb only supports dumb iteration +// we will instead start pulling in potential seeds that haven't been yet pinged +// since the start of the boot procedure. +// +// If the database runs out of potential seeds, we restart the startup counter +// and start iterating over the peers again. +func (db *nodeDB) querySeeds(n int) []*Node { + startup := db.startup() + + it := db.lvl.NewIterator(nil, nil) + defer it.Release() + + nodes := make([]*Node, 0, n) + for len(nodes) < n && it.Next() { + // Iterate until a discovery node is found + id, field := db.splitKey(it.Key()) + if field != nodeDBDiscoverRoot { + continue + } + // Retrieve the last ping time, and if older than startup, query + lastPing := db.lastPing(id) + if lastPing.Before(startup) { + if node := db.node(id); node != nil { + nodes = append(nodes, node) + } + } + } + // Reset the startup time if no seeds were found + if len(nodes) == 0 { + db.updateStartup(time.Now()) + } + return nodes +} + +// close flushes and closes the database files. +func (db *nodeDB) close() { + db.lvl.Close() +} diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 98371d6f9..891bbfd05 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -27,7 +27,7 @@ type Table struct { mutex sync.Mutex // protects buckets, their content, and nursery buckets [nBuckets]*bucket // index of known nodes by distance nursery []*Node // bootstrap nodes - cache *Cache // cache of known nodes + db *nodeDB // database of known nodes bondmu sync.Mutex bonding map[NodeID]*bondproc @@ -61,15 +61,17 @@ type bucket struct { entries []*Node } -func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, seeder *Cache) *Table { +func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string) *Table { // If no seed cache was given, use an in-memory one - if seeder == nil { - seeder, _ = NewMemoryCache() + db, err := newNodeDB(nodeDBPath) + if err != nil { + glog.V(logger.Warn).Infoln("Failed to open node database:", err) + db, _ = newNodeDB("") } // Create the bootstrap table tab := &Table{ net: t, - cache: seeder, + db: db, self: newNode(ourID, ourAddr), bonding: make(map[NodeID]*bondproc), bondslots: make(chan struct{}, maxBondingPingPongs), @@ -91,6 +93,7 @@ func (tab *Table) Self() *Node { // Close terminates the network listener and flushes the seed cache. func (tab *Table) Close() { tab.net.close() + tab.db.close() } // Bootstrap sets the bootstrap nodes. These nodes are used to connect @@ -174,11 +177,10 @@ func (tab *Table) refresh() { result := tab.Lookup(randomID(tab.self.ID, ld)) if len(result) == 0 { - // Pick a batch of previously know seeds to lookup with and discard them (will come back if they are still live) - seeds := tab.cache.list(10) + // Pick a batch of previously know seeds to lookup with + seeds := tab.db.querySeeds(10) for _, seed := range seeds { - glog.V(logger.Debug).Infoln("Seeding network with:", seed) - tab.cache.delete(seed.ID) + glog.V(logger.Debug).Infoln("Seeding network with", seed) } // Bootstrap the table with a self lookup all := tab.bondall(append(tab.nursery, seeds...)) @@ -249,7 +251,7 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) { // of the process can be skipped. func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) { var n *Node - if n = tab.cache.get(id); n == nil { + if n = tab.db.node(id); n == nil { tab.bondmu.Lock() w := tab.bonding[id] if w != nil { @@ -282,8 +284,12 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16 } func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) { + // Request a bonding slot to limit network usage <-tab.bondslots defer func() { tab.bondslots <- struct{}{} }() + + // Ping the remote side and wait for a pong + tab.db.updateLastPing(id, time.Now()) if w.err = tab.net.ping(id, addr); w.err != nil { close(w.done) return @@ -294,7 +300,15 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd // waitping will simply time out. tab.net.waitping(id) } - w.n = tab.cache.add(id, addr, tcpPort) + // Bonding succeeded, update the node database + w.n = &Node{ + ID: id, + IP: addr.IP, + DiscPort: addr.Port, + TCPPort: int(tcpPort), + } + tab.db.updateNode(w.n) + tab.db.updateLastBond(id, time.Now()) close(w.done) } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 8274731e3..e2bd3c8ad 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -15,7 +15,7 @@ import ( func TestTable_pingReplace(t *testing.T) { doit := func(newNodeIsResponding, lastInBucketIsResponding bool) { transport := newPingRecorder() - tab := newTable(transport, NodeID{}, &net.UDPAddr{}, nil) + tab := newTable(transport, NodeID{}, &net.UDPAddr{}, "") last := fillBucket(tab, 200) pingSender := randomID(tab.self.ID, 200) @@ -145,7 +145,7 @@ func TestTable_closest(t *testing.T) { test := func(test *closeTest) bool { // for any node table, Target and N - tab := newTable(nil, test.Self, &net.UDPAddr{}, nil) + tab := newTable(nil, test.Self, &net.UDPAddr{}, "") tab.add(test.All) // check that doClosest(Target, N) returns nodes @@ -217,7 +217,7 @@ func TestTable_Lookup(t *testing.T) { self := gen(NodeID{}, quickrand).(NodeID) target := randomID(self, 200) transport := findnodeOracle{t, target} - tab := newTable(transport, self, &net.UDPAddr{}, nil) + tab := newTable(transport, self, &net.UDPAddr{}, "") // lookup on empty table returns no nodes if results := tab.Lookup(target); len(results) > 0 { diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index 6805fb686..65741b5f5 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -144,7 +144,7 @@ type reply struct { } // ListenUDP returns a new table that listens for UDP packets on laddr. -func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seeder *Cache) (*Table, error) { +func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBPath string) (*Table, error) { addr, err := net.ResolveUDPAddr("udp", laddr) if err != nil { return nil, err @@ -153,12 +153,12 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seeder if err != nil { return nil, err } - tab, _ := newUDP(priv, conn, natm, seeder) + tab, _ := newUDP(priv, conn, natm, nodeDBPath) glog.V(logger.Info).Infoln("Listening,", tab.self) return tab, nil } -func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seeder *Cache) (*Table, *udp) { +func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath string) (*Table, *udp) { udp := &udp{ conn: c, priv: priv, @@ -176,7 +176,7 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seeder *Cache) ( realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port} } } - udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, seeder) + udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, nodeDBPath) go udp.loop() go udp.readLoop() return udp.Table, udp @@ -449,7 +449,7 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte if expired(req.Expiration) { return errExpired } - if t.cache.get(fromID) == nil { + if t.db.node(fromID) == nil { // No bond exists, we don't process the packet. This prevents // an attack vector where the discovery protocol could be used // to amplify traffic in a DDOS attack. A malicious actor diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go index 299f94543..47e04b85a 100644 --- a/p2p/discover/udp_test.go +++ b/p2p/discover/udp_test.go @@ -41,7 +41,7 @@ func newUDPTest(t *testing.T) *udpTest { remotekey: newkey(), remoteaddr: &net.UDPAddr{IP: net.IP{1, 2, 3, 4}, Port: 30303}, } - test.table, test.udp = newUDP(test.localkey, test.pipe, nil, nil) + test.table, test.udp = newUDP(test.localkey, test.pipe, nil, "") return test } @@ -157,8 +157,12 @@ func TestUDP_findnode(t *testing.T) { // ensure there's a bond with the test node, // findnode won't be accepted otherwise. - test.table.cache.add(PubkeyID(&test.remotekey.PublicKey), test.remoteaddr, 99) - + test.table.db.updateNode(&Node{ + ID: PubkeyID(&test.remotekey.PublicKey), + IP: test.remoteaddr.IP, + DiscPort: test.remoteaddr.Port, + TCPPort: 99, + }) // check that closest neighbors are returned. test.packetIn(nil, findnodePacket, &findnode{Target: testTarget, Expiration: futureExp}) test.waitPacketOut(func(p *neighbors) { |