aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/discover
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-04-24 23:04:41 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-04-24 23:04:41 +0800
commit8646365b42ddae30e596835b4512792ca11196a5 (patch)
tree03977b5bf677fa2a7e57953fba28c0aa21da6a88 /p2p/discover
parent6def110c37d4d43402c4b658ce6b291400f840e5 (diff)
downloadgo-tangerine-8646365b42ddae30e596835b4512792ca11196a5.tar.gz
go-tangerine-8646365b42ddae30e596835b4512792ca11196a5.tar.zst
go-tangerine-8646365b42ddae30e596835b4512792ca11196a5.zip
cmd/bootnode, eth, p2p, p2p/discover: use a fancier db design
Diffstat (limited to 'p2p/discover')
-rw-r--r--p2p/discover/cache.go134
-rw-r--r--p2p/discover/database.go233
-rw-r--r--p2p/discover/table.go36
-rw-r--r--p2p/discover/table_test.go6
-rw-r--r--p2p/discover/udp.go10
-rw-r--r--p2p/discover/udp_test.go10
6 files changed, 273 insertions, 156 deletions
diff --git a/p2p/discover/cache.go b/p2p/discover/cache.go
deleted file mode 100644
index f6bab4591..000000000
--- a/p2p/discover/cache.go
+++ /dev/null
@@ -1,134 +0,0 @@
-// Contains the discovery cache, storing previously seen nodes to act as seed
-// servers during bootstrapping the network.
-
-package discover
-
-import (
- "bytes"
- "encoding/binary"
- "net"
- "os"
-
- "github.com/ethereum/go-ethereum/rlp"
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/storage"
-)
-
-// Cache stores all nodes we know about.
-type Cache struct {
- db *leveldb.DB
-}
-
-// Cache version to allow dumping old data if it changes.
-var cacheVersionKey = []byte("pv")
-
-// NewMemoryCache creates a new in-memory peer cache without a persistent backend.
-func NewMemoryCache() (*Cache, error) {
- db, err := leveldb.Open(storage.NewMemStorage(), nil)
- if err != nil {
- return nil, err
- }
- return &Cache{db: db}, nil
-}
-
-// NewPersistentCache creates/opens a leveldb backed persistent peer cache, also
-// flushing its contents in case of a version mismatch.
-func NewPersistentCache(path string) (*Cache, error) {
- // Try to open the cache, recovering any corruption
- db, err := leveldb.OpenFile(path, nil)
- if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted {
- db, err = leveldb.RecoverFile(path, nil)
- }
- if err != nil {
- return nil, err
- }
- // The nodes contained in the cache correspond to a certain protocol version.
- // Flush all nodes if the version doesn't match.
- currentVer := make([]byte, binary.MaxVarintLen64)
- currentVer = currentVer[:binary.PutVarint(currentVer, Version)]
-
- blob, err := db.Get(cacheVersionKey, nil)
- switch err {
- case leveldb.ErrNotFound:
- // Version not found (i.e. empty cache), insert it
- err = db.Put(cacheVersionKey, currentVer, nil)
-
- case nil:
- // Version present, flush if different
- if !bytes.Equal(blob, currentVer) {
- db.Close()
- if err = os.RemoveAll(path); err != nil {
- return nil, err
- }
- return NewPersistentCache(path)
- }
- }
- // Clean up in case of an error
- if err != nil {
- db.Close()
- return nil, err
- }
- return &Cache{db: db}, nil
-}
-
-// get retrieves a node with a given id from the seed da
-func (c *Cache) get(id NodeID) *Node {
- blob, err := c.db.Get(id[:], nil)
- if err != nil {
- return nil
- }
- node := new(Node)
- if err := rlp.DecodeBytes(blob, node); err != nil {
- return nil
- }
- return node
-}
-
-// list retrieves a batch of nodes from the database.
-func (c *Cache) list(n int) []*Node {
- it := c.db.NewIterator(nil, nil)
- defer it.Release()
-
- nodes := make([]*Node, 0, n)
- for i := 0; i < n && it.Next(); i++ {
- var id NodeID
- copy(id[:], it.Key())
-
- if node := c.get(id); node != nil {
- nodes = append(nodes, node)
- }
- }
- return nodes
-}
-
-// update inserts - potentially overwriting - a node in the seed database.
-func (c *Cache) update(node *Node) error {
- blob, err := rlp.EncodeToBytes(node)
- if err != nil {
- return err
- }
- return c.db.Put(node.ID[:], blob, nil)
-}
-
-// add inserts a new node into the seed database.
-func (c *Cache) add(id NodeID, addr *net.UDPAddr, tcpPort uint16) *Node {
- node := &Node{
- ID: id,
- IP: addr.IP,
- DiscPort: addr.Port,
- TCPPort: int(tcpPort),
- }
- c.update(node)
-
- return node
-}
-
-// delete removes a node from the database.
-func (c *Cache) delete(id NodeID) error {
- return c.db.Delete(id[:], nil)
-}
-
-// Close flushes and closes the database files.
-func (c *Cache) Close() {
- c.db.Close()
-}
diff --git a/p2p/discover/database.go b/p2p/discover/database.go
new file mode 100644
index 000000000..93a2ded24
--- /dev/null
+++ b/p2p/discover/database.go
@@ -0,0 +1,233 @@
+// Contains the node database, storing previously seen nodes and any collected
+// metadata about them for QoS purposes.
+
+package discover
+
+import (
+ "bytes"
+ "encoding/binary"
+ "os"
+ "time"
+
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/storage"
+)
+
+// nodeDB stores all nodes we know about.
+type nodeDB struct {
+ lvl *leveldb.DB
+}
+
+// Schema layout for the node database
+var (
+ nodeDBVersionKey = []byte("version") // Version of the database to flush if changes
+ nodeDBStartupKey = []byte("startup") // Time when the node discovery started (seed selection)
+ nodeDBItemPrefix = []byte("n:") // Identifier to prefix node entries with
+
+ nodeDBDiscoverRoot = ":discover"
+ nodeDBDiscoverPing = nodeDBDiscoverRoot + ":lastping"
+ nodeDBDiscoverBond = nodeDBDiscoverRoot + ":lastbond"
+)
+
+// newNodeDB creates a new node database for storing and retrieving infos about
+// known peers in the network. If no path is given, an in-memory, temporary
+// database is constructed.
+func newNodeDB(path string) (*nodeDB, error) {
+ if path == "" {
+ return newMemoryNodeDB()
+ }
+ return newPersistentNodeDB(path)
+}
+
+// newMemoryNodeDB creates a new in-memory node database without a persistent
+// backend.
+func newMemoryNodeDB() (*nodeDB, error) {
+ db, err := leveldb.Open(storage.NewMemStorage(), nil)
+ if err != nil {
+ return nil, err
+ }
+ return &nodeDB{lvl: db}, nil
+}
+
+// newPersistentNodeDB creates/opens a leveldb backed persistent node database,
+// also flushing its contents in case of a version mismatch.
+func newPersistentNodeDB(path string) (*nodeDB, error) {
+ // Try to open the cache, recovering any corruption
+ db, err := leveldb.OpenFile(path, nil)
+ if _, iscorrupted := err.(leveldb.ErrCorrupted); iscorrupted {
+ db, err = leveldb.RecoverFile(path, nil)
+ }
+ if err != nil {
+ return nil, err
+ }
+ // The nodes contained in the cache correspond to a certain protocol version.
+ // Flush all nodes if the version doesn't match.
+ currentVer := make([]byte, binary.MaxVarintLen64)
+ currentVer = currentVer[:binary.PutVarint(currentVer, Version)]
+
+ blob, err := db.Get(nodeDBVersionKey, nil)
+ switch err {
+ case leveldb.ErrNotFound:
+ // Version not found (i.e. empty cache), insert it
+ err = db.Put(nodeDBVersionKey, currentVer, nil)
+
+ case nil:
+ // Version present, flush if different
+ if !bytes.Equal(blob, currentVer) {
+ db.Close()
+ if err = os.RemoveAll(path); err != nil {
+ return nil, err
+ }
+ return newPersistentNodeDB(path)
+ }
+ }
+ // Clean up in case of an error
+ if err != nil {
+ db.Close()
+ return nil, err
+ }
+ return &nodeDB{lvl: db}, nil
+}
+
+// key generates the leveldb key-blob from a node id and its particular field of
+// interest.
+func (db *nodeDB) key(id NodeID, field string) []byte {
+ return append(nodeDBItemPrefix, append(id[:], field...)...)
+}
+
+// splitKey tries to split a database key into a node id and a field part.
+func (db *nodeDB) splitKey(key []byte) (id NodeID, field string) {
+ // If the key is not of a node, return it plainly
+ if !bytes.HasPrefix(key, nodeDBItemPrefix) {
+ return NodeID{}, string(key)
+ }
+ // Otherwise split the id and field
+ item := key[len(nodeDBItemPrefix):]
+ copy(id[:], item[:len(id)])
+ field = string(item[len(id):])
+
+ return id, field
+}
+
+// fetchTime retrieves a time instance (encoded as a unix timestamp) associated
+// with a particular database key.
+func (db *nodeDB) fetchTime(key []byte) time.Time {
+ blob, err := db.lvl.Get(key, nil)
+ if err != nil {
+ return time.Time{}
+ }
+ var unix int64
+ if err := rlp.DecodeBytes(blob, &unix); err != nil {
+ return time.Time{}
+ }
+ return time.Unix(unix, 0)
+}
+
+// storeTime update a specific database entry to the current time instance as a
+// unix timestamp.
+func (db *nodeDB) storeTime(key []byte, instance time.Time) error {
+ blob, err := rlp.EncodeToBytes(instance.Unix())
+ if err != nil {
+ return err
+ }
+ return db.lvl.Put(key, blob, nil)
+}
+
+// startup retrieves the time instance when the bootstrapping last begun. Its
+// purpose is to prevent contacting potential seed nodes multiple times in the
+// same boot cycle.
+func (db *nodeDB) startup() time.Time {
+ return db.fetchTime(nodeDBStartupKey)
+}
+
+// updateStartup updates the bootstrap initiation time to the one specified.
+func (db *nodeDB) updateStartup(instance time.Time) error {
+ return db.storeTime(nodeDBStartupKey, instance)
+}
+
+// node retrieves a node with a given id from the database.
+func (db *nodeDB) node(id NodeID) *Node {
+ blob, err := db.lvl.Get(db.key(id, nodeDBDiscoverRoot), nil)
+ if err != nil {
+ return nil
+ }
+ node := new(Node)
+ if err := rlp.DecodeBytes(blob, node); err != nil {
+ return nil
+ }
+ return node
+}
+
+// updateNode inserts - potentially overwriting - a node into the peer database.
+func (db *nodeDB) updateNode(node *Node) error {
+ blob, err := rlp.EncodeToBytes(node)
+ if err != nil {
+ return err
+ }
+ return db.lvl.Put(db.key(node.ID, nodeDBDiscoverRoot), blob, nil)
+}
+
+// lastPing retrieves the time of the last ping packet send to a remote node,
+// requesting binding.
+func (db *nodeDB) lastPing(id NodeID) time.Time {
+ return db.fetchTime(db.key(id, nodeDBDiscoverPing))
+}
+
+// updateLastPing updates the last time we tried contacting a remote node.
+func (db *nodeDB) updateLastPing(id NodeID, instance time.Time) error {
+ return db.storeTime(db.key(id, nodeDBDiscoverPing), instance)
+}
+
+// lastBond retrieves the time of the last successful bonding with a remote node.
+func (db *nodeDB) lastBond(id NodeID) time.Time {
+ return db.fetchTime(db.key(id, nodeDBDiscoverBond))
+}
+
+// updateLastBond updates the last time we successfully bound to a remote node.
+func (db *nodeDB) updateLastBond(id NodeID, instance time.Time) error {
+ return db.storeTime(db.key(id, nodeDBDiscoverBond), instance)
+}
+
+// querySeeds retrieves a batch of nodes to be used as potential seed servers
+// during bootstrapping the node into the network.
+//
+// Ideal seeds are the most recently seen nodes (highest probability to be still
+// alive), but yet untried. However, since leveldb only supports dumb iteration
+// we will instead start pulling in potential seeds that haven't been yet pinged
+// since the start of the boot procedure.
+//
+// If the database runs out of potential seeds, we restart the startup counter
+// and start iterating over the peers again.
+func (db *nodeDB) querySeeds(n int) []*Node {
+ startup := db.startup()
+
+ it := db.lvl.NewIterator(nil, nil)
+ defer it.Release()
+
+ nodes := make([]*Node, 0, n)
+ for len(nodes) < n && it.Next() {
+ // Iterate until a discovery node is found
+ id, field := db.splitKey(it.Key())
+ if field != nodeDBDiscoverRoot {
+ continue
+ }
+ // Retrieve the last ping time, and if older than startup, query
+ lastPing := db.lastPing(id)
+ if lastPing.Before(startup) {
+ if node := db.node(id); node != nil {
+ nodes = append(nodes, node)
+ }
+ }
+ }
+ // Reset the startup time if no seeds were found
+ if len(nodes) == 0 {
+ db.updateStartup(time.Now())
+ }
+ return nodes
+}
+
+// close flushes and closes the database files.
+func (db *nodeDB) close() {
+ db.lvl.Close()
+}
diff --git a/p2p/discover/table.go b/p2p/discover/table.go
index 98371d6f9..891bbfd05 100644
--- a/p2p/discover/table.go
+++ b/p2p/discover/table.go
@@ -27,7 +27,7 @@ type Table struct {
mutex sync.Mutex // protects buckets, their content, and nursery
buckets [nBuckets]*bucket // index of known nodes by distance
nursery []*Node // bootstrap nodes
- cache *Cache // cache of known nodes
+ db *nodeDB // database of known nodes
bondmu sync.Mutex
bonding map[NodeID]*bondproc
@@ -61,15 +61,17 @@ type bucket struct {
entries []*Node
}
-func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, seeder *Cache) *Table {
+func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string) *Table {
// If no seed cache was given, use an in-memory one
- if seeder == nil {
- seeder, _ = NewMemoryCache()
+ db, err := newNodeDB(nodeDBPath)
+ if err != nil {
+ glog.V(logger.Warn).Infoln("Failed to open node database:", err)
+ db, _ = newNodeDB("")
}
// Create the bootstrap table
tab := &Table{
net: t,
- cache: seeder,
+ db: db,
self: newNode(ourID, ourAddr),
bonding: make(map[NodeID]*bondproc),
bondslots: make(chan struct{}, maxBondingPingPongs),
@@ -91,6 +93,7 @@ func (tab *Table) Self() *Node {
// Close terminates the network listener and flushes the seed cache.
func (tab *Table) Close() {
tab.net.close()
+ tab.db.close()
}
// Bootstrap sets the bootstrap nodes. These nodes are used to connect
@@ -174,11 +177,10 @@ func (tab *Table) refresh() {
result := tab.Lookup(randomID(tab.self.ID, ld))
if len(result) == 0 {
- // Pick a batch of previously know seeds to lookup with and discard them (will come back if they are still live)
- seeds := tab.cache.list(10)
+ // Pick a batch of previously know seeds to lookup with
+ seeds := tab.db.querySeeds(10)
for _, seed := range seeds {
- glog.V(logger.Debug).Infoln("Seeding network with:", seed)
- tab.cache.delete(seed.ID)
+ glog.V(logger.Debug).Infoln("Seeding network with", seed)
}
// Bootstrap the table with a self lookup
all := tab.bondall(append(tab.nursery, seeds...))
@@ -249,7 +251,7 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) {
// of the process can be skipped.
func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) {
var n *Node
- if n = tab.cache.get(id); n == nil {
+ if n = tab.db.node(id); n == nil {
tab.bondmu.Lock()
w := tab.bonding[id]
if w != nil {
@@ -282,8 +284,12 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16
}
func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) {
+ // Request a bonding slot to limit network usage
<-tab.bondslots
defer func() { tab.bondslots <- struct{}{} }()
+
+ // Ping the remote side and wait for a pong
+ tab.db.updateLastPing(id, time.Now())
if w.err = tab.net.ping(id, addr); w.err != nil {
close(w.done)
return
@@ -294,7 +300,15 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd
// waitping will simply time out.
tab.net.waitping(id)
}
- w.n = tab.cache.add(id, addr, tcpPort)
+ // Bonding succeeded, update the node database
+ w.n = &Node{
+ ID: id,
+ IP: addr.IP,
+ DiscPort: addr.Port,
+ TCPPort: int(tcpPort),
+ }
+ tab.db.updateNode(w.n)
+ tab.db.updateLastBond(id, time.Now())
close(w.done)
}
diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go
index 8274731e3..e2bd3c8ad 100644
--- a/p2p/discover/table_test.go
+++ b/p2p/discover/table_test.go
@@ -15,7 +15,7 @@ import (
func TestTable_pingReplace(t *testing.T) {
doit := func(newNodeIsResponding, lastInBucketIsResponding bool) {
transport := newPingRecorder()
- tab := newTable(transport, NodeID{}, &net.UDPAddr{}, nil)
+ tab := newTable(transport, NodeID{}, &net.UDPAddr{}, "")
last := fillBucket(tab, 200)
pingSender := randomID(tab.self.ID, 200)
@@ -145,7 +145,7 @@ func TestTable_closest(t *testing.T) {
test := func(test *closeTest) bool {
// for any node table, Target and N
- tab := newTable(nil, test.Self, &net.UDPAddr{}, nil)
+ tab := newTable(nil, test.Self, &net.UDPAddr{}, "")
tab.add(test.All)
// check that doClosest(Target, N) returns nodes
@@ -217,7 +217,7 @@ func TestTable_Lookup(t *testing.T) {
self := gen(NodeID{}, quickrand).(NodeID)
target := randomID(self, 200)
transport := findnodeOracle{t, target}
- tab := newTable(transport, self, &net.UDPAddr{}, nil)
+ tab := newTable(transport, self, &net.UDPAddr{}, "")
// lookup on empty table returns no nodes
if results := tab.Lookup(target); len(results) > 0 {
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index 6805fb686..65741b5f5 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -144,7 +144,7 @@ type reply struct {
}
// ListenUDP returns a new table that listens for UDP packets on laddr.
-func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seeder *Cache) (*Table, error) {
+func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBPath string) (*Table, error) {
addr, err := net.ResolveUDPAddr("udp", laddr)
if err != nil {
return nil, err
@@ -153,12 +153,12 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, seeder
if err != nil {
return nil, err
}
- tab, _ := newUDP(priv, conn, natm, seeder)
+ tab, _ := newUDP(priv, conn, natm, nodeDBPath)
glog.V(logger.Info).Infoln("Listening,", tab.self)
return tab, nil
}
-func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seeder *Cache) (*Table, *udp) {
+func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath string) (*Table, *udp) {
udp := &udp{
conn: c,
priv: priv,
@@ -176,7 +176,7 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, seeder *Cache) (
realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
}
}
- udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, seeder)
+ udp.Table = newTable(udp, PubkeyID(&priv.PublicKey), realaddr, nodeDBPath)
go udp.loop()
go udp.readLoop()
return udp.Table, udp
@@ -449,7 +449,7 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte
if expired(req.Expiration) {
return errExpired
}
- if t.cache.get(fromID) == nil {
+ if t.db.node(fromID) == nil {
// No bond exists, we don't process the packet. This prevents
// an attack vector where the discovery protocol could be used
// to amplify traffic in a DDOS attack. A malicious actor
diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go
index 299f94543..47e04b85a 100644
--- a/p2p/discover/udp_test.go
+++ b/p2p/discover/udp_test.go
@@ -41,7 +41,7 @@ func newUDPTest(t *testing.T) *udpTest {
remotekey: newkey(),
remoteaddr: &net.UDPAddr{IP: net.IP{1, 2, 3, 4}, Port: 30303},
}
- test.table, test.udp = newUDP(test.localkey, test.pipe, nil, nil)
+ test.table, test.udp = newUDP(test.localkey, test.pipe, nil, "")
return test
}
@@ -157,8 +157,12 @@ func TestUDP_findnode(t *testing.T) {
// ensure there's a bond with the test node,
// findnode won't be accepted otherwise.
- test.table.cache.add(PubkeyID(&test.remotekey.PublicKey), test.remoteaddr, 99)
-
+ test.table.db.updateNode(&Node{
+ ID: PubkeyID(&test.remotekey.PublicKey),
+ IP: test.remoteaddr.IP,
+ DiscPort: test.remoteaddr.Port,
+ TCPPort: 99,
+ })
// check that closest neighbors are returned.
test.packetIn(nil, findnodePacket, &findnode{Target: testTarget, Expiration: futureExp})
test.waitPacketOut(func(p *neighbors) {