diff options
Diffstat (limited to 'p2p/discover/udp.go')
-rw-r--r-- | p2p/discover/udp.go | 122 |
1 files changed, 72 insertions, 50 deletions
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index 0ff47c5e4..45fcce282 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/nat" "github.com/ethereum/go-ethereum/p2p/netutil" "github.com/ethereum/go-ethereum/rlp" @@ -46,8 +47,9 @@ var ( // Timeouts const ( - respTimeout = 500 * time.Millisecond - expiration = 20 * time.Second + respTimeout = 500 * time.Millisecond + expiration = 20 * time.Second + bondExpiration = 24 * time.Hour ntpFailureThreshold = 32 // Continuous timeouts after which to check NTP ntpWarningCooldown = 10 * time.Minute // Minimum amount of time to pass before repeating NTP warning @@ -87,7 +89,7 @@ type ( // findnode is a query for nodes close to the given target. findnode struct { - Target NodeID // doesn't need to be an actual public key + Target encPubkey Expiration uint64 // Ignore additional fields (for forward compatibility). Rest []rlp.RawValue `rlp:"tail"` @@ -105,7 +107,7 @@ type ( IP net.IP // len 4 for IPv4 or 16 for IPv6 UDP uint16 // for discovery protocol TCP uint16 // for RLPx protocol - ID NodeID + ID encPubkey } rpcEndpoint struct { @@ -123,7 +125,7 @@ func makeEndpoint(addr *net.UDPAddr, tcpPort uint16) rpcEndpoint { return rpcEndpoint{IP: ip, UDP: uint16(addr.Port), TCP: tcpPort} } -func (t *udp) nodeFromRPC(sender *net.UDPAddr, rn rpcNode) (*Node, error) { +func (t *udp) nodeFromRPC(sender *net.UDPAddr, rn rpcNode) (*node, error) { if rn.UDP <= 1024 { return nil, errors.New("low port") } @@ -133,17 +135,26 @@ func (t *udp) nodeFromRPC(sender *net.UDPAddr, rn rpcNode) (*Node, error) { if t.netrestrict != nil && !t.netrestrict.Contains(rn.IP) { return nil, errors.New("not contained in netrestrict whitelist") } - n := NewNode(rn.ID, rn.IP, rn.UDP, rn.TCP) - err := n.validateComplete() + key, err := decodePubkey(rn.ID) + if err != nil { + return nil, err + } + n := wrapNode(enode.NewV4(key, rn.IP, int(rn.TCP), int(rn.UDP))) + err = n.ValidateComplete() return n, err } -func nodeToRPC(n *Node) rpcNode { - return rpcNode{ID: n.ID, IP: n.IP, UDP: n.UDP, TCP: n.TCP} +func nodeToRPC(n *node) rpcNode { + var key ecdsa.PublicKey + var ekey encPubkey + if err := n.Load((*enode.Secp256k1)(&key)); err == nil { + ekey = encodePubkey(&key) + } + return rpcNode{ID: ekey, IP: n.IP(), UDP: uint16(n.UDP()), TCP: uint16(n.TCP())} } type packet interface { - handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error + handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error name() string } @@ -181,7 +192,7 @@ type udp struct { // to all the callback functions for that node. type pending struct { // these fields must match in the reply. - from NodeID + from enode.ID ptype byte // time when the request must complete @@ -199,7 +210,7 @@ type pending struct { } type reply struct { - from NodeID + from enode.ID ptype byte data interface{} // loop indicates whether there was @@ -222,7 +233,7 @@ type Config struct { AnnounceAddr *net.UDPAddr // local address announced in the DHT NodeDBPath string // if set, the node database is stored at this filesystem location NetRestrict *netutil.Netlist // network whitelist - Bootnodes []*Node // list of bootstrap nodes + Bootnodes []*enode.Node // list of bootstrap nodes Unhandled chan<- ReadPacket // unhandled packets are sent on this channel } @@ -237,6 +248,16 @@ func ListenUDP(c conn, cfg Config) (*Table, error) { } func newUDP(c conn, cfg Config) (*Table, *udp, error) { + realaddr := c.LocalAddr().(*net.UDPAddr) + if cfg.AnnounceAddr != nil { + realaddr = cfg.AnnounceAddr + } + self := enode.NewV4(&cfg.PrivateKey.PublicKey, realaddr.IP, realaddr.Port, realaddr.Port) + db, err := enode.OpenDB(cfg.NodeDBPath) + if err != nil { + return nil, nil, err + } + udp := &udp{ conn: c, priv: cfg.PrivateKey, @@ -245,13 +266,9 @@ func newUDP(c conn, cfg Config) (*Table, *udp, error) { gotreply: make(chan reply), addpending: make(chan *pending), } - realaddr := c.LocalAddr().(*net.UDPAddr) - if cfg.AnnounceAddr != nil { - realaddr = cfg.AnnounceAddr - } // TODO: separate TCP port udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port)) - tab, err := newTable(udp, PubkeyID(&cfg.PrivateKey.PublicKey), realaddr, cfg.NodeDBPath, cfg.Bootnodes) + tab, err := newTable(udp, self, db, cfg.Bootnodes) if err != nil { return nil, nil, err } @@ -265,17 +282,18 @@ func newUDP(c conn, cfg Config) (*Table, *udp, error) { func (t *udp) close() { close(t.closing) t.conn.Close() + t.db.Close() // TODO: wait for the loops to end. } // ping sends a ping message to the given node and waits for a reply. -func (t *udp) ping(toid NodeID, toaddr *net.UDPAddr) error { +func (t *udp) ping(toid enode.ID, toaddr *net.UDPAddr) error { return <-t.sendPing(toid, toaddr, nil) } // sendPing sends a ping message to the given node and invokes the callback // when the reply arrives. -func (t *udp) sendPing(toid NodeID, toaddr *net.UDPAddr, callback func()) <-chan error { +func (t *udp) sendPing(toid enode.ID, toaddr *net.UDPAddr, callback func()) <-chan error { req := &ping{ Version: 4, From: t.ourEndpoint, @@ -299,21 +317,21 @@ func (t *udp) sendPing(toid NodeID, toaddr *net.UDPAddr, callback func()) <-chan return errc } -func (t *udp) waitping(from NodeID) error { +func (t *udp) waitping(from enode.ID) error { return <-t.pending(from, pingPacket, func(interface{}) bool { return true }) } // findnode sends a findnode request to the given node and waits until // the node has sent up to k neighbors. -func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node, error) { +func (t *udp) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPubkey) ([]*node, error) { // If we haven't seen a ping from the destination node for a while, it won't remember // our endpoint proof and reject findnode. Solicit a ping first. - if time.Since(t.db.lastPingReceived(toid)) > nodeDBNodeExpiration { + if time.Since(t.db.LastPingReceived(toid)) > bondExpiration { t.ping(toid, toaddr) t.waitping(toid) } - nodes := make([]*Node, 0, bucketSize) + nodes := make([]*node, 0, bucketSize) nreceived := 0 errc := t.pending(toid, neighborsPacket, func(r interface{}) bool { reply := r.(*neighbors) @@ -337,7 +355,7 @@ func (t *udp) findnode(toid NodeID, toaddr *net.UDPAddr, target NodeID) ([]*Node // pending adds a reply callback to the pending reply queue. // see the documentation of type pending for a detailed explanation. -func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <-chan error { +func (t *udp) pending(id enode.ID, ptype byte, callback func(interface{}) bool) <-chan error { ch := make(chan error, 1) p := &pending{from: id, ptype: ptype, callback: callback, errc: ch} select { @@ -349,7 +367,7 @@ func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <- return ch } -func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool { +func (t *udp) handleReply(from enode.ID, ptype byte, req packet) bool { matched := make(chan bool, 1) select { case t.gotreply <- reply{from, ptype, req, matched}: @@ -563,19 +581,20 @@ func (t *udp) handlePacket(from *net.UDPAddr, buf []byte) error { return err } -func decodePacket(buf []byte) (packet, NodeID, []byte, error) { +func decodePacket(buf []byte) (packet, encPubkey, []byte, error) { if len(buf) < headSize+1 { - return nil, NodeID{}, nil, errPacketTooSmall + return nil, encPubkey{}, nil, errPacketTooSmall } hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:] shouldhash := crypto.Keccak256(buf[macSize:]) if !bytes.Equal(hash, shouldhash) { - return nil, NodeID{}, nil, errBadHash + return nil, encPubkey{}, nil, errBadHash } - fromID, err := recoverNodeID(crypto.Keccak256(buf[headSize:]), sig) + fromKey, err := recoverNodeKey(crypto.Keccak256(buf[headSize:]), sig) if err != nil { - return nil, NodeID{}, hash, err + return nil, fromKey, hash, err } + var req packet switch ptype := sigdata[0]; ptype { case pingPacket: @@ -587,56 +606,59 @@ func decodePacket(buf []byte) (packet, NodeID, []byte, error) { case neighborsPacket: req = new(neighbors) default: - return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype) + return nil, fromKey, hash, fmt.Errorf("unknown type: %d", ptype) } s := rlp.NewStream(bytes.NewReader(sigdata[1:]), 0) err = s.Decode(req) - return req, fromID, hash, err + return req, fromKey, hash, err } -func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error { +func (req *ping) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error { if expired(req.Expiration) { return errExpired } + key, err := decodePubkey(fromKey) + if err != nil { + return fmt.Errorf("invalid public key: %v", err) + } t.send(from, pongPacket, &pong{ To: makeEndpoint(from, req.From.TCP), ReplyTok: mac, Expiration: uint64(time.Now().Add(expiration).Unix()), }) - t.handleReply(fromID, pingPacket, req) - - // Add the node to the table. Before doing so, ensure that we have a recent enough pong - // recorded in the database so their findnode requests will be accepted later. - n := NewNode(fromID, from.IP, uint16(from.Port), req.From.TCP) - if time.Since(t.db.lastPongReceived(fromID)) > nodeDBNodeExpiration { - t.sendPing(fromID, from, func() { t.addThroughPing(n) }) + n := wrapNode(enode.NewV4(key, from.IP, int(req.From.TCP), from.Port)) + t.handleReply(n.ID(), pingPacket, req) + if time.Since(t.db.LastPongReceived(n.ID())) > bondExpiration { + t.sendPing(n.ID(), from, func() { t.addThroughPing(n) }) } else { t.addThroughPing(n) } - t.db.updateLastPingReceived(fromID, time.Now()) + t.db.UpdateLastPingReceived(n.ID(), time.Now()) return nil } func (req *ping) name() string { return "PING/v4" } -func (req *pong) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error { +func (req *pong) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error { if expired(req.Expiration) { return errExpired } + fromID := fromKey.id() if !t.handleReply(fromID, pongPacket, req) { return errUnsolicitedReply } - t.db.updateLastPongReceived(fromID, time.Now()) + t.db.UpdateLastPongReceived(fromID, time.Now()) return nil } func (req *pong) name() string { return "PONG/v4" } -func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error { +func (req *findnode) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error { if expired(req.Expiration) { return errExpired } - if !t.db.hasBond(fromID) { + fromID := fromKey.id() + if time.Since(t.db.LastPongReceived(fromID)) > bondExpiration { // No endpoint proof pong exists, we don't process the packet. This prevents an // attack vector where the discovery protocol could be used to amplify traffic in a // DDOS attack. A malicious actor would send a findnode request with the IP address @@ -645,7 +667,7 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte // findnode) to the victim. return errUnknownNode } - target := crypto.Keccak256Hash(req.Target[:]) + target := enode.ID(crypto.Keccak256Hash(req.Target[:])) t.mutex.Lock() closest := t.closest(target, bucketSize).entries t.mutex.Unlock() @@ -655,7 +677,7 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte // Send neighbors in chunks with at most maxNeighbors per packet // to stay below the 1280 byte limit. for _, n := range closest { - if netutil.CheckRelayIP(from.IP, n.IP) == nil { + if netutil.CheckRelayIP(from.IP, n.IP()) == nil { p.Nodes = append(p.Nodes, nodeToRPC(n)) } if len(p.Nodes) == maxNeighbors { @@ -672,11 +694,11 @@ func (req *findnode) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte func (req *findnode) name() string { return "FINDNODE/v4" } -func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) error { +func (req *neighbors) handle(t *udp, from *net.UDPAddr, fromKey encPubkey, mac []byte) error { if expired(req.Expiration) { return errExpired } - if !t.handleReply(fromID, neighborsPacket, req) { + if !t.handleReply(fromKey.id(), neighborsPacket, req) { return errUnsolicitedReply } return nil |