aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@users.noreply.github.com>2018-10-12 17:47:24 +0800
committerGitHub <noreply@github.com>2018-10-12 17:47:24 +0800
commit6f607de5d590ff2fbe8798b04e5924be3b7ca0b4 (patch)
tree2905b3462c0d4f162914a948dac6d1836ace4b77 /p2p/server.go
parentdcae0d348bb7f5d9052e50a83383a33538ce376a (diff)
downloaddexon-6f607de5d590ff2fbe8798b04e5924be3b7ca0b4.tar.gz
dexon-6f607de5d590ff2fbe8798b04e5924be3b7ca0b4.tar.zst
dexon-6f607de5d590ff2fbe8798b04e5924be3b7ca0b4.zip
p2p, p2p/discover: add signed ENR generation (#17753)
This PR adds enode.LocalNode and integrates it into the p2p subsystem. This new object is the keeper of the local node record. For now, a new version of the record is produced every time the client restarts. We'll make it smarter to avoid that in the future. There are a couple of other changes in this commit: discovery now waits for all of its goroutines at shutdown and the p2p server now closes the node database after discovery has shut down. This fixes a leveldb crash in tests. p2p server startup is faster because it doesn't need to wait for the external IP query anymore.
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go232
1 files changed, 122 insertions, 110 deletions
diff --git a/p2p/server.go b/p2p/server.go
index 40db758e2..6482c0401 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -20,9 +20,11 @@ package p2p
import (
"bytes"
"crypto/ecdsa"
+ "encoding/hex"
"errors"
"fmt"
"net"
+ "sort"
"sync"
"sync/atomic"
"time"
@@ -35,8 +37,10 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/p2p/netutil"
+ "github.com/ethereum/go-ethereum/rlp"
)
const (
@@ -160,6 +164,8 @@ type Server struct {
lock sync.Mutex // protects running
running bool
+ nodedb *enode.DB
+ localnode *enode.LocalNode
ntab discoverTable
listener net.Listener
ourHandshake *protoHandshake
@@ -347,43 +353,13 @@ func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
// Self returns the local node's endpoint information.
func (srv *Server) Self() *enode.Node {
srv.lock.Lock()
- running, listener, ntab := srv.running, srv.listener, srv.ntab
+ ln := srv.localnode
srv.lock.Unlock()
- if !running {
+ if ln == nil {
return enode.NewV4(&srv.PrivateKey.PublicKey, net.ParseIP("0.0.0.0"), 0, 0)
}
- return srv.makeSelf(listener, ntab)
-}
-
-func (srv *Server) makeSelf(listener net.Listener, ntab discoverTable) *enode.Node {
- // If the node is running but discovery is off, manually assemble the node infos.
- if ntab == nil {
- addr := srv.tcpAddr(listener)
- return enode.NewV4(&srv.PrivateKey.PublicKey, addr.IP, addr.Port, 0)
- }
- // Otherwise return the discovery node.
- return ntab.Self()
-}
-
-func (srv *Server) tcpAddr(listener net.Listener) net.TCPAddr {
- addr := net.TCPAddr{IP: net.IP{0, 0, 0, 0}}
- if listener == nil {
- return addr // Inbound connections disabled, use zero address.
- }
- // Otherwise inject the listener address too.
- if a, ok := listener.Addr().(*net.TCPAddr); ok {
- addr = *a
- }
- if srv.NAT != nil {
- if ip, err := srv.NAT.ExternalIP(); err == nil {
- addr.IP = ip
- }
- }
- if addr.IP.IsUnspecified() {
- addr.IP = net.IP{127, 0, 0, 1}
- }
- return addr
+ return ln.Node()
}
// Stop terminates the server and all active peer connections.
@@ -443,7 +419,9 @@ func (srv *Server) Start() (err error) {
if srv.log == nil {
srv.log = log.New()
}
- srv.log.Info("Starting P2P networking")
+ if srv.NoDial && srv.ListenAddr == "" {
+ srv.log.Warn("P2P server will be useless, neither dialing nor listening")
+ }
// static fields
if srv.PrivateKey == nil {
@@ -466,65 +444,120 @@ func (srv *Server) Start() (err error) {
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})
- var (
- conn *net.UDPConn
- sconn *sharedUDPConn
- realaddr *net.UDPAddr
- unhandled chan discover.ReadPacket
- )
-
- if !srv.NoDiscovery || srv.DiscoveryV5 {
- addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
- if err != nil {
+ if err := srv.setupLocalNode(); err != nil {
+ return err
+ }
+ if srv.ListenAddr != "" {
+ if err := srv.setupListening(); err != nil {
return err
}
- conn, err = net.ListenUDP("udp", addr)
- if err != nil {
- return err
+ }
+ if err := srv.setupDiscovery(); err != nil {
+ return err
+ }
+
+ dynPeers := srv.maxDialedConns()
+ dialer := newDialState(srv.localnode.ID(), srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
+ srv.loopWG.Add(1)
+ go srv.run(dialer)
+ return nil
+}
+
+func (srv *Server) setupLocalNode() error {
+ // Create the devp2p handshake.
+ pubkey := crypto.FromECDSAPub(&srv.PrivateKey.PublicKey)
+ srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: pubkey[1:]}
+ for _, p := range srv.Protocols {
+ srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
+ }
+ sort.Sort(capsByNameAndVersion(srv.ourHandshake.Caps))
+
+ // Create the local node.
+ db, err := enode.OpenDB(srv.Config.NodeDatabase)
+ if err != nil {
+ return err
+ }
+ srv.nodedb = db
+ srv.localnode = enode.NewLocalNode(db, srv.PrivateKey)
+ srv.localnode.SetFallbackIP(net.IP{127, 0, 0, 1})
+ srv.localnode.Set(capsByNameAndVersion(srv.ourHandshake.Caps))
+ // TODO: check conflicts
+ for _, p := range srv.Protocols {
+ for _, e := range p.Attributes {
+ srv.localnode.Set(e)
}
- realaddr = conn.LocalAddr().(*net.UDPAddr)
- if srv.NAT != nil {
- if !realaddr.IP.IsLoopback() {
- go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
- }
- // TODO: react to external IP changes over time.
- if ext, err := srv.NAT.ExternalIP(); err == nil {
- realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
+ }
+ switch srv.NAT.(type) {
+ case nil:
+ // No NAT interface, do nothing.
+ case nat.ExtIP:
+ // ExtIP doesn't block, set the IP right away.
+ ip, _ := srv.NAT.ExternalIP()
+ srv.localnode.SetStaticIP(ip)
+ default:
+ // Ask the router about the IP. This takes a while and blocks startup,
+ // do it in the background.
+ srv.loopWG.Add(1)
+ go func() {
+ defer srv.loopWG.Done()
+ if ip, err := srv.NAT.ExternalIP(); err == nil {
+ srv.localnode.SetStaticIP(ip)
}
- }
+ }()
+ }
+ return nil
+}
+
+func (srv *Server) setupDiscovery() error {
+ if srv.NoDiscovery && !srv.DiscoveryV5 {
+ return nil
}
- if !srv.NoDiscovery && srv.DiscoveryV5 {
- unhandled = make(chan discover.ReadPacket, 100)
- sconn = &sharedUDPConn{conn, unhandled}
+ addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
+ if err != nil {
+ return err
+ }
+ conn, err := net.ListenUDP("udp", addr)
+ if err != nil {
+ return err
+ }
+ realaddr := conn.LocalAddr().(*net.UDPAddr)
+ srv.log.Debug("UDP listener up", "addr", realaddr)
+ if srv.NAT != nil {
+ if !realaddr.IP.IsLoopback() {
+ go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
+ }
}
+ srv.localnode.SetFallbackUDP(realaddr.Port)
- // node table
+ // Discovery V4
+ var unhandled chan discover.ReadPacket
+ var sconn *sharedUDPConn
if !srv.NoDiscovery {
+ if srv.DiscoveryV5 {
+ unhandled = make(chan discover.ReadPacket, 100)
+ sconn = &sharedUDPConn{conn, unhandled}
+ }
cfg := discover.Config{
- PrivateKey: srv.PrivateKey,
- AnnounceAddr: realaddr,
- NodeDBPath: srv.NodeDatabase,
- NetRestrict: srv.NetRestrict,
- Bootnodes: srv.BootstrapNodes,
- Unhandled: unhandled,
+ PrivateKey: srv.PrivateKey,
+ NetRestrict: srv.NetRestrict,
+ Bootnodes: srv.BootstrapNodes,
+ Unhandled: unhandled,
}
- ntab, err := discover.ListenUDP(conn, cfg)
+ ntab, err := discover.ListenUDP(conn, srv.localnode, cfg)
if err != nil {
return err
}
srv.ntab = ntab
}
-
+ // Discovery V5
if srv.DiscoveryV5 {
- var (
- ntab *discv5.Network
- err error
- )
+ var ntab *discv5.Network
+ var err error
if sconn != nil {
- ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
+ ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, "", srv.NetRestrict)
} else {
- ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
+ ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, "", srv.NetRestrict)
}
if err != nil {
return err
@@ -534,32 +567,10 @@ func (srv *Server) Start() (err error) {
}
srv.DiscV5 = ntab
}
-
- dynPeers := srv.maxDialedConns()
- dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
-
- // handshake
- pubkey := crypto.FromECDSAPub(&srv.PrivateKey.PublicKey)
- srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: pubkey[1:]}
- for _, p := range srv.Protocols {
- srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
- }
- // listen/dial
- if srv.ListenAddr != "" {
- if err := srv.startListening(); err != nil {
- return err
- }
- }
- if srv.NoDial && srv.ListenAddr == "" {
- srv.log.Warn("P2P server will be useless, neither dialing nor listening")
- }
-
- srv.loopWG.Add(1)
- go srv.run(dialer)
return nil
}
-func (srv *Server) startListening() error {
+func (srv *Server) setupListening() error {
// Launch the TCP listener.
listener, err := net.Listen("tcp", srv.ListenAddr)
if err != nil {
@@ -568,8 +579,11 @@ func (srv *Server) startListening() error {
laddr := listener.Addr().(*net.TCPAddr)
srv.ListenAddr = laddr.String()
srv.listener = listener
+ srv.localnode.Set(enr.TCP(laddr.Port))
+
srv.loopWG.Add(1)
go srv.listenLoop()
+
// Map the TCP listening port if NAT is configured.
if !laddr.IP.IsLoopback() && srv.NAT != nil {
srv.loopWG.Add(1)
@@ -589,7 +603,10 @@ type dialer interface {
}
func (srv *Server) run(dialstate dialer) {
+ srv.log.Info("Started P2P networking", "self", srv.localnode.Node())
defer srv.loopWG.Done()
+ defer srv.nodedb.Close()
+
var (
peers = make(map[enode.ID]*Peer)
inboundCount = 0
@@ -781,7 +798,7 @@ func (srv *Server) encHandshakeChecks(peers map[enode.ID]*Peer, inboundCount int
return DiscTooManyPeers
case peers[c.node.ID()] != nil:
return DiscAlreadyConnected
- case c.node.ID() == srv.Self().ID():
+ case c.node.ID() == srv.localnode.ID():
return DiscSelf
default:
return nil
@@ -802,15 +819,11 @@ func (srv *Server) maxDialedConns() int {
return srv.MaxPeers / r
}
-type tempError interface {
- Temporary() bool
-}
-
// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop() {
defer srv.loopWG.Done()
- srv.log.Info("RLPx listener up", "self", srv.Self())
+ srv.log.Debug("TCP listener up", "addr", srv.listener.Addr())
tokens := defaultMaxPendingPeers
if srv.MaxPendingPeers > 0 {
@@ -831,7 +844,7 @@ func (srv *Server) listenLoop() {
)
for {
fd, err = srv.listener.Accept()
- if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
+ if netutil.IsTemporaryError(err) {
srv.log.Debug("Temporary read error", "err", err)
continue
} else if err != nil {
@@ -864,10 +877,6 @@ func (srv *Server) listenLoop() {
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node) error {
- self := srv.Self()
- if self == nil {
- return errors.New("shutdown")
- }
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
err := srv.setupConn(c, flags, dialDest)
if err != nil {
@@ -1003,6 +1012,7 @@ type NodeInfo struct {
ID string `json:"id"` // Unique node identifier (also the encryption key)
Name string `json:"name"` // Name of the node, including client type, version, OS, custom data
Enode string `json:"enode"` // Enode URL for adding this peer from remote peers
+ ENR string `json:"enr"` // Ethereum Node Record
IP string `json:"ip"` // IP address of the node
Ports struct {
Discovery int `json:"discovery"` // UDP listening port for discovery protocol
@@ -1014,9 +1024,8 @@ type NodeInfo struct {
// NodeInfo gathers and returns a collection of metadata known about the host.
func (srv *Server) NodeInfo() *NodeInfo {
- node := srv.Self()
-
// Gather and assemble the generic node infos
+ node := srv.Self()
info := &NodeInfo{
Name: srv.Name,
Enode: node.String(),
@@ -1027,6 +1036,9 @@ func (srv *Server) NodeInfo() *NodeInfo {
}
info.Ports.Discovery = node.UDP()
info.Ports.Listener = node.TCP()
+ if enc, err := rlp.EncodeToBytes(node.Record()); err == nil {
+ info.ENR = "0x" + hex.EncodeToString(enc)
+ }
// Gather all the running protocol infos (only once per protocol type)
for _, proto := range srv.Protocols {