diff options
author | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-08-20 03:55:40 +0800 |
---|---|---|
committer | Jeffrey Wilcke <jeffrey@ethereum.org> | 2015-08-20 03:55:40 +0800 |
commit | 61a6911eebfac5c70959227a8fe0a4e9c2ef5c61 (patch) | |
tree | d64cc534c627b2250e395df588982e61190a7573 | |
parent | 382d35bf403ab5dd9b0d2fe3a87c3960902d6e57 (diff) | |
parent | dd54fef89888372ab5961c1b5a6ac917fc47d49c (diff) | |
download | go-tangerine-61a6911eebfac5c70959227a8fe0a4e9c2ef5c61.tar.gz go-tangerine-61a6911eebfac5c70959227a8fe0a4e9c2ef5c61.tar.zst go-tangerine-61a6911eebfac5c70959227a8fe0a4e9c2ef5c61.zip |
Merge pull request #1689 from fjl/discover-ignore-temp-errors
p2p, p2p/discover: small fixes
-rw-r--r-- | p2p/discover/node.go | 4 | ||||
-rw-r--r-- | p2p/discover/table.go | 15 | ||||
-rw-r--r-- | p2p/discover/udp.go | 12 | ||||
-rw-r--r-- | p2p/server.go | 31 |
4 files changed, 51 insertions, 11 deletions
diff --git a/p2p/discover/node.go b/p2p/discover/node.go index b6956e197..a14f29424 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -48,6 +48,10 @@ type Node struct { // In those tests, the content of sha will not actually correspond // with ID. sha common.Hash + + // whether this node is currently being pinged in order to replace + // it in a bucket + contested bool } func newNode(id NodeID, ip net.IP, udpPort, tcpPort uint16) *Node { diff --git a/p2p/discover/table.go b/p2p/discover/table.go index b077f010c..972bc1077 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -455,24 +455,31 @@ func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { func (tab *Table) add(new *Node) { b := tab.buckets[logdist(tab.self.sha, new.sha)] tab.mutex.Lock() + defer tab.mutex.Unlock() if b.bump(new) { - tab.mutex.Unlock() return } var oldest *Node if len(b.entries) == bucketSize { oldest = b.entries[bucketSize-1] + if oldest.contested { + // The node is already being replaced, don't attempt + // to replace it. + return + } + oldest.contested = true // Let go of the mutex so other goroutines can access // the table while we ping the least recently active node. tab.mutex.Unlock() - if err := tab.ping(oldest.ID, oldest.addr()); err == nil { + err := tab.ping(oldest.ID, oldest.addr()) + tab.mutex.Lock() + oldest.contested = false + if err == nil { // The node responded, don't replace it. return } - tab.mutex.Lock() } added := b.replace(new, oldest) - tab.mutex.Unlock() if added && tab.nodeAddedHook != nil { tab.nodeAddedHook(new) } diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index 008e63937..6aefb68f7 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -458,6 +458,10 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte, return packet, nil } +type tempError interface { + Temporary() bool +} + // readLoop runs in its own goroutine. it handles incoming UDP packets. func (t *udp) readLoop() { defer t.conn.Close() @@ -467,7 +471,13 @@ func (t *udp) readLoop() { buf := make([]byte, 1280) for { nbytes, from, err := t.conn.ReadFromUDP(buf) - if err != nil { + if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { + // Ignore temporary read errors. + glog.V(logger.Debug).Infof("Temporary read error: %v", err) + continue + } else if err != nil { + // Shut down the loop for permament errors. + glog.V(logger.Debug).Infof("Read error: %v", err) return } t.handlePacket(from, buf[:nbytes]) diff --git a/p2p/server.go b/p2p/server.go index 7351a2654..d8be85323 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -542,6 +542,10 @@ func (srv *Server) encHandshakeChecks(peers map[discover.NodeID]*Peer, c *conn) } } +type tempError interface { + Temporary() bool +} + // listenLoop runs in its own goroutine and accepts // inbound connections. func (srv *Server) listenLoop() { @@ -561,16 +565,31 @@ func (srv *Server) listenLoop() { } for { + // Wait for a handshake slot before accepting. <-slots - fd, err := srv.listener.Accept() - if err != nil { - return + + var ( + fd net.Conn + err error + ) + for { + fd, err = srv.listener.Accept() + if tempErr, ok := err.(tempError); ok && tempErr.Temporary() { + glog.V(logger.Debug).Infof("Temporary read error: %v", err) + continue + } else if err != nil { + glog.V(logger.Debug).Infof("Read error: %v", err) + return + } + break } - mfd := newMeteredConn(fd, true) + fd = newMeteredConn(fd, true) + glog.V(logger.Debug).Infof("Accepted conn %v\n", fd.RemoteAddr()) - glog.V(logger.Debug).Infof("Accepted conn %v\n", mfd.RemoteAddr()) + // Spawn the handler. It will give the slot back when the connection + // has been established. go func() { - srv.setupConn(mfd, inboundConn, nil) + srv.setupConn(fd, inboundConn, nil) slots <- struct{}{} }() } |