aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/server.go
diff options
context:
space:
mode:
authorLewis Marshall <lewis@lmars.net>2017-12-01 19:49:04 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-12-01 19:49:04 +0800
commit54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d (patch)
tree07bd996822874272ef163bedb56a2ade537cf658 /p2p/server.go
parent73067fd24f39cb7d2cdf63a99f6fdac661f7a8bf (diff)
downloaddexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar.gz
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.tar.zst
dexon-54aeb8e4c0bb9f0e7a6c67258af67df3b266af3d.zip
p2p/simulations: various stability fixes (#15198)
p2p/simulations: introduce dialBan - Refactor simulations/network connection getters to support avoiding simultaneous dials between two peers If two peers dial simultaneously, the connection will be dropped to help avoid that, we essentially lock the connection object with a timestamp which serves as a ban on dialing for a period of time (dialBanTimeout). - The connection getter InitConn can be wrapped and passed to the nodes via adapters.NodeConfig#Reachable field and then used by the respective services when they initiate connections. This massively stablise the emerging connectivity when running with hundreds of nodes bootstrapping a network. p2p: add Inbound public method to p2p.Peer p2p/simulations: Add server id to logs to support debugging in-memory network simulations when multiple peers are logging. p2p: SetupConn now returns error. The dialer checks the error and only calls resolve if the actual TCP dial fails.
Diffstat (limited to 'p2p/server.go')
-rw-r--r--p2p/server.go84
1 files changed, 51 insertions, 33 deletions
diff --git a/p2p/server.go b/p2p/server.go
index d1d578401..922df55ba 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -139,6 +139,9 @@ type Config struct {
// If EnableMsgEvents is set then the server will emit PeerEvents
// whenever a message is sent to or received from a peer
EnableMsgEvents bool
+
+ // Logger is a custom logger to use with the p2p.Server.
+ Logger log.Logger
}
// Server manages all peer connections.
@@ -172,6 +175,7 @@ type Server struct {
delpeer chan peerDrop
loopWG sync.WaitGroup // loop, listenLoop
peerFeed event.Feed
+ log log.Logger
}
type peerOpFunc func(map[discover.NodeID]*Peer)
@@ -359,7 +363,11 @@ func (srv *Server) Start() (err error) {
return errors.New("server already running")
}
srv.running = true
- log.Info("Starting P2P networking")
+ srv.log = srv.Config.Logger
+ if srv.log == nil {
+ srv.log = log.New()
+ }
+ srv.log.Info("Starting P2P networking")
// static fields
if srv.PrivateKey == nil {
@@ -421,7 +429,7 @@ func (srv *Server) Start() (err error) {
}
}
if srv.NoDial && srv.ListenAddr == "" {
- log.Warn("P2P server will be useless, neither dialing nor listening")
+ srv.log.Warn("P2P server will be useless, neither dialing nor listening")
}
srv.loopWG.Add(1)
@@ -489,7 +497,7 @@ func (srv *Server) run(dialstate dialer) {
i := 0
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
t := ts[i]
- log.Trace("New dial task", "task", t)
+ srv.log.Trace("New dial task", "task", t)
go func() { t.Do(srv); taskdone <- t }()
runningTasks = append(runningTasks, t)
}
@@ -517,13 +525,13 @@ running:
// This channel is used by AddPeer to add to the
// ephemeral static peer list. Add it to the dialer,
// it will keep the node connected.
- log.Debug("Adding static node", "node", n)
+ srv.log.Debug("Adding static node", "node", n)
dialstate.addStatic(n)
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected
- log.Debug("Removing static node", "node", n)
+ srv.log.Debug("Removing static node", "node", n)
dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested)
@@ -536,7 +544,7 @@ running:
// A task got done. Tell dialstate about it so it
// can update its state and remove it from the active
// tasks list.
- log.Trace("Dial task done", "task", t)
+ srv.log.Trace("Dial task done", "task", t)
dialstate.taskDone(t, time.Now())
delTask(t)
case c := <-srv.posthandshake:
@@ -565,7 +573,7 @@ running:
p.events = &srv.peerFeed
}
name := truncateName(c.name)
- log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
+ srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
peers[c.id] = p
go srv.runPeer(p)
}
@@ -585,7 +593,7 @@ running:
}
}
- log.Trace("P2P networking is spinning down")
+ srv.log.Trace("P2P networking is spinning down")
// Terminate discovery. If there is a running lookup it will terminate soon.
if srv.ntab != nil {
@@ -639,7 +647,7 @@ type tempError interface {
// inbound connections.
func (srv *Server) listenLoop() {
defer srv.loopWG.Done()
- log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))
+ srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))
// This channel acts as a semaphore limiting
// active inbound connections that are lingering pre-handshake.
@@ -664,10 +672,10 @@ func (srv *Server) listenLoop() {
for {
fd, err = srv.listener.Accept()
if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
- log.Debug("Temporary read error", "err", err)
+ srv.log.Debug("Temporary read error", "err", err)
continue
} else if err != nil {
- log.Debug("Read error", "err", err)
+ srv.log.Debug("Read error", "err", err)
return
}
break
@@ -676,7 +684,7 @@ func (srv *Server) listenLoop() {
// Reject connections that do not match NetRestrict.
if srv.NetRestrict != nil {
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
- log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
+ srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
fd.Close()
slots <- struct{}{}
continue
@@ -684,7 +692,7 @@ func (srv *Server) listenLoop() {
}
fd = newMeteredConn(fd, true)
- log.Trace("Accepted connection", "addr", fd.RemoteAddr())
+ srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
// Spawn the handler. It will give the slot back when the connection
// has been established.
@@ -698,55 +706,65 @@ func (srv *Server) listenLoop() {
// SetupConn runs the handshakes and attempts to add the connection
// as a peer. It returns when the connection has been added as a peer
// or the handshakes have failed.
-func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
+func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
+ self := srv.Self()
+ if self == nil {
+ return errors.New("shutdown")
+ }
+ c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
+ err := srv.setupConn(c, flags, dialDest)
+ if err != nil {
+ c.close(err)
+ srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
+ }
+ return err
+}
+
+func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
// Prevent leftover pending conns from entering the handshake.
srv.lock.Lock()
running := srv.running
srv.lock.Unlock()
- c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
if !running {
- c.close(errServerStopped)
- return
+ return errServerStopped
}
// Run the encryption handshake.
var err error
if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
- log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
- c.close(err)
- return
+ srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
+ return err
}
- clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
+ clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
// For dialed connections, check that the remote public key matches.
if dialDest != nil && c.id != dialDest.ID {
- c.close(DiscUnexpectedIdentity)
clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
- return
+ return DiscUnexpectedIdentity
}
- if err := srv.checkpoint(c, srv.posthandshake); err != nil {
+ err = srv.checkpoint(c, srv.posthandshake)
+ if err != nil {
clog.Trace("Rejected peer before protocol handshake", "err", err)
- c.close(err)
- return
+ return err
}
// Run the protocol handshake
phs, err := c.doProtoHandshake(srv.ourHandshake)
if err != nil {
clog.Trace("Failed proto handshake", "err", err)
- c.close(err)
- return
+ return err
}
if phs.ID != c.id {
clog.Trace("Wrong devp2p handshake identity", "err", phs.ID)
- c.close(DiscUnexpectedIdentity)
- return
+ return DiscUnexpectedIdentity
}
c.caps, c.name = phs.Caps, phs.Name
- if err := srv.checkpoint(c, srv.addpeer); err != nil {
+ err = srv.checkpoint(c, srv.addpeer)
+ if err != nil {
clog.Trace("Rejected peer", "err", err)
- c.close(err)
- return
+ return err
}
// If the checks completed successfully, runPeer has now been
// launched by run.
+ clog.Trace("connection set up", "inbound", dialDest == nil)
+ return nil
}
func truncateName(s string) string {