diff options
author | Felix Lange <fjl@users.noreply.github.com> | 2018-02-12 20:36:09 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2018-02-12 20:36:09 +0800 |
commit | 9123eceb0f78f69e88d909a56ad7fadb75570198 (patch) | |
tree | f3f02d178a7b8452e29e8902490bc308ebae0bde /p2p/discover/udp.go | |
parent | 1d39912a9b9ec710460c82c403cc8f72ab7d30d9 (diff) | |
download | dexon-9123eceb0f78f69e88d909a56ad7fadb75570198.tar.gz dexon-9123eceb0f78f69e88d909a56ad7fadb75570198.tar.zst dexon-9123eceb0f78f69e88d909a56ad7fadb75570198.zip |
p2p, p2p/discover: misc connectivity improvements (#16069)
* p2p: add DialRatio for configuration of inbound vs. dialed connections
* p2p: add connection flags to PeerInfo
* p2p/netutil: add SameNet, DistinctNetSet
* p2p/discover: improve revalidation and seeding
This changes node revalidation to be periodic instead of on-demand. This
should prevent issues where dead nodes get stuck in closer buckets
because no other node will ever come along to replace them.
Every 5 seconds (on average), the last node in a random bucket is
checked and moved to the front of the bucket if it is still responding.
If revalidation fails, the last node is replaced by an entry of the
'replacement list' containing recently-seen nodes.
Most close buckets are removed because it's very unlikely we'll ever
encounter a node that would fall into any of those buckets.
Table seeding is also improved: we now require a few minutes of table
membership before considering a node as a potential seed node. This
should make it less likely to store short-lived nodes as potential
seeds.
* p2p/discover: fix nits in UDP transport
We would skip sending neighbors replies if there were fewer than
maxNeighbors results and CheckRelayIP returned an error for the last
one. While here, also resolve a TODO about pong reply tokens.
Diffstat (limited to 'p2p/discover/udp.go')
-rw-r--r-- | p2p/discover/udp.go | 86 |
1 files changed, 59 insertions, 27 deletions
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index 60436952d..e40de2c36 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -216,9 +216,22 @@ type ReadPacket struct { Addr *net.UDPAddr } +// Config holds Table-related settings. +type Config struct { + // These settings are required and configure the UDP listener: + PrivateKey *ecdsa.PrivateKey + + // These settings are optional: + AnnounceAddr *net.UDPAddr // local address announced in the DHT + NodeDBPath string // if set, the node database is stored at this filesystem location + NetRestrict *netutil.Netlist // network whitelist + Bootnodes []*Node // list of bootstrap nodes + Unhandled chan<- ReadPacket // unhandled packets are sent on this channel +} + // ListenUDP returns a new table that listens for UDP packets on laddr. -func ListenUDP(priv *ecdsa.PrivateKey, conn conn, realaddr *net.UDPAddr, unhandled chan ReadPacket, nodeDBPath string, netrestrict *netutil.Netlist) (*Table, error) { - tab, _, err := newUDP(priv, conn, realaddr, unhandled, nodeDBPath, netrestrict) +func ListenUDP(c conn, cfg Config) (*Table, error) { + tab, _, err := newUDP(c, cfg) if err != nil { return nil, err } @@ -226,25 +239,29 @@ func ListenUDP(priv *ecdsa.PrivateKey, conn conn, realaddr *net.UDPAddr, unhandl return tab, nil } -func newUDP(priv *ecdsa.PrivateKey, c conn, realaddr *net.UDPAddr, unhandled chan ReadPacket, nodeDBPath string, netrestrict *netutil.Netlist) (*Table, *udp, error) { +func newUDP(c conn, cfg Config) (*Table, *udp, error) { udp := &udp{ conn: c, - priv: priv, - netrestrict: netrestrict, + priv: cfg.PrivateKey, + netrestrict: cfg.NetRestrict, closing: make(chan struct{}), gotreply: make(chan reply), addpending: make(chan *pending), } + realaddr := c.LocalAddr().(*net.UDPAddr) + if cfg.AnnounceAddr != nil { + realaddr = cfg.AnnounceAddr + } // TODO: separate TCP port udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port)) - tab, err := newTable(udp, PubkeyID(&priv.PublicKey), realaddr, nodeDBPath) + tab, err := newTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes) if err != nil { return nil, nil, err } udp.Table = tab go udp.loop() - go udp.readLoop(unhandled) + go udp.readLoop(cfg.Unhandled) return udp.Table, udp, nil } @@ -256,14 +273,20 @@ func (t *udp) close() { // ping sends a ping message to the given node and waits for a reply. func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error { - // TODO: maybe check for ReplyTo field in callback to measure RTT - errc := t.pending(toid, pongPacket, func(interface{}) bool { return true }) - t.send(toaddr, pingPacket, &ping{ + req := &ping{ Version: Version, From: t.ourEndpoint, To: makeEndpoint(toaddr, 0), // TODO: maybe use known TCP port from DB Expiration: uint64(time.Now().Add(expiration).Unix()), + } + packet, hash, err := encodePacket(t.priv, pingPacket, req) + if err != nil { + return err + } + errc := t.pending(toid, pongPacket, func(p interface{}) bool { + return bytes.Equal(p.(*pong).ReplyTok, hash) }) + t.write(toaddr, req.name(), packet) return <-errc } @@ -447,40 +470,45 @@ func init() { } } -func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req packet) error { - packet, err := encodePacket(t.priv, ptype, req) +func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req packet) ([]byte, error) { + packet, hash, err := encodePacket(t.priv, ptype, req) if err != nil { - return err + return hash, err } - _, err = t.conn.WriteToUDP(packet, toaddr) - log.Trace(">> "+req.name(), "addr", toaddr, "err", err) + return hash, t.write(toaddr, req.name(), packet) +} + +func (t *udp) write(toaddr *net.UDPAddr, what string, packet []byte) error { + _, err := t.conn.WriteToUDP(packet, toaddr) + log.Trace(">> "+what, "addr", toaddr, "err", err) return err } -func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte, error) { +func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) (packet, hash []byte, err error) { b := new(bytes.Buffer) b.Write(headSpace) b.WriteByte(ptype) if err := rlp.Encode(b, req); err != nil { log.Error("Can't encode discv4 packet", "err", err) - return nil, err + return nil, nil, err } - packet := b.Bytes() + packet = b.Bytes() sig, err := crypto.Sign(crypto.Keccak256(packet[headSize:]), priv) if err != nil { log.Error("Can't sign discv4 packet", "err", err) - return nil, err + return nil, nil, err } copy(packet[macSize:], sig) // add the hash to the front. Note: this doesn't protect the // packet in any way. Our public key will be part of this hash in // The future. - copy(packet, crypto.Keccak256(packet[macSize:])) - return packet, nil + hash = crypto.Keccak256(packet[macSize:]) + copy(packet, hash) + return packet, hash, nil } // readLoop runs in its own goroutine. it handles incoming UDP packets. -func (t *udp) readLoop(unhandled chan ReadPacket) { +func (t *udp) readLoop(unhandled chan<- ReadPacket) { defer t.conn.Close() if unhandled != nil { defer close(unhandled) @@ -601,18 +629,22 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte t.mutex.Unlock() p := neighbors{Expiration: uint64(time.Now().Add(expiration).Unix())} + var sent bool // Send neighbors in chunks with at most maxNeighbors per packet // to stay below the 1280 byte limit. - for i, n := range closest { - if netutil.CheckRelayIP(from.IP, n.IP) != nil { - continue + for _, n := range closest { + if netutil.CheckRelayIP(from.IP, n.IP) == nil { + p.Nodes = append(p.Nodes, nodeToRPC(n)) } - p.Nodes = append(p.Nodes, nodeToRPC(n)) - if len(p.Nodes) == maxNeighbors || i == len(closest)-1 { + if len(p.Nodes) == maxNeighbors { t.send(from, neighborsPacket, &p) p.Nodes = p.Nodes[:0] + sent = true } } + if len(p.Nodes) > 0 || !sent { + t.send(from, neighborsPacket, &p) + } return nil } |