aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--p2p/handshake.go8
-rw-r--r--p2p/message.go40
-rw-r--r--p2p/peer.go3
-rw-r--r--p2p/rlpx.go5
-rw-r--r--p2p/server.go18
5 files changed, 37 insertions, 37 deletions
diff --git a/p2p/handshake.go b/p2p/handshake.go
index 3ad25bae4..7fc497517 100644
--- a/p2p/handshake.go
+++ b/p2p/handshake.go
@@ -37,7 +37,7 @@ const (
//
// The MsgReadWriter is usually layered as follows:
//
-// lockedRW (thread-safety for ReadMsg, WriteMsg)
+// netWrapper (I/O timeouts, thread-safe ReadMsg, WriteMsg)
// rlpxFrameRW (message encoding, encryption, authentication)
// bufio.ReadWriter (buffering)
// net.Conn (network I/O)
@@ -83,7 +83,6 @@ func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake) (
}
// Run the protocol handshake using authenticated messages.
- // TODO: move buffering setup here (out of newFrameRW)
rw := newRlpxFrameRW(fd, secrets)
rhs, err := readProtocolHandshake(rw, our)
if err != nil {
@@ -96,7 +95,7 @@ func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake) (
if err := writeProtocolHandshake(rw, our); err != nil {
return nil, fmt.Errorf("protocol write error: %v", err)
}
- return &conn{&lockedRW{wrapped: rw}, rhs}, nil
+ return &conn{rw, rhs}, nil
}
func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node) (*conn, error) {
@@ -106,7 +105,6 @@ func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake,
}
// Run the protocol handshake using authenticated messages.
- // TODO: move buffering setup here (out of newFrameRW)
rw := newRlpxFrameRW(fd, secrets)
if err := writeProtocolHandshake(rw, our); err != nil {
return nil, fmt.Errorf("protocol write error: %v", err)
@@ -118,7 +116,7 @@ func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake,
if rhs.ID != dial.ID {
return nil, errors.New("dialed node id mismatch")
}
- return &conn{&lockedRW{wrapped: rw}, rhs}, nil
+ return &conn{rw, rhs}, nil
}
// encHandshake contains the state of the encryption handshake.
diff --git a/p2p/message.go b/p2p/message.go
index 04b9e71f3..f88c31d1d 100644
--- a/p2p/message.go
+++ b/p2p/message.go
@@ -6,6 +6,7 @@ import (
"fmt"
"io"
"io/ioutil"
+ "net"
"sync"
"sync/atomic"
"time"
@@ -14,28 +15,6 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)
-// parameters for frameRW
-const (
- // maximum time allowed for reading a message header.
- // this is effectively the amount of time a connection can be idle.
- frameReadTimeout = 1 * time.Minute
-
- // maximum time allowed for reading the payload data of a message.
- // this is shorter than (and distinct from) frameReadTimeout because
- // the connection is not considered idle while a message is transferred.
- // this also limits the payload size of messages to how much the connection
- // can transfer within the timeout.
- payloadReadTimeout = 5 * time.Second
-
- // maximum amount of time allowed for writing a complete message.
- msgWriteTimeout = 5 * time.Second
-
- // messages smaller than this many bytes will be read at
- // once before passing them to a protocol. this increases
- // concurrency in the processing.
- wholePayloadSize = 64 * 1024
-)
-
// Msg defines the structure of a p2p message.
//
// Note that a Msg can only be sent once since the Payload reader is
@@ -103,22 +82,27 @@ func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error {
return w.WriteMsg(NewMsg(code, data...))
}
-// lockedRW wraps a MsgReadWriter with locks around
-// ReadMsg and WriteMsg.
-type lockedRW struct {
+// netWrapper wrapsa MsgReadWriter with locks around
+// ReadMsg/WriteMsg and applies read/write deadlines.
+type netWrapper struct {
rmu, wmu sync.Mutex
- wrapped MsgReadWriter
+
+ rtimeout, wtimeout time.Duration
+ conn net.Conn
+ wrapped MsgReadWriter
}
-func (rw *lockedRW) ReadMsg() (Msg, error) {
+func (rw *netWrapper) ReadMsg() (Msg, error) {
rw.rmu.Lock()
defer rw.rmu.Unlock()
+ rw.conn.SetReadDeadline(time.Now().Add(rw.rtimeout))
return rw.wrapped.ReadMsg()
}
-func (rw *lockedRW) WriteMsg(msg Msg) error {
+func (rw *netWrapper) WriteMsg(msg Msg) error {
rw.wmu.Lock()
defer rw.wmu.Unlock()
+ rw.conn.SetWriteDeadline(time.Now().Add(rw.wtimeout))
return rw.wrapped.WriteMsg(msg)
}
diff --git a/p2p/peer.go b/p2p/peer.go
index 025be4ba9..c2c83abfc 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -20,8 +20,8 @@ const (
baseProtocolLength = uint64(16)
baseProtocolMaxMsgSize = 10 * 1024 * 1024
- disconnectGracePeriod = 2 * time.Second
pingInterval = 15 * time.Second
+ disconnectGracePeriod = 2 * time.Second
)
const (
@@ -176,6 +176,7 @@ func (p *Peer) politeDisconnect(reason DiscReason) {
func (p *Peer) readLoop() error {
for {
+ p.conn.SetDeadline(time.Now().Add(frameReadTimeout))
msg, err := p.rw.ReadMsg()
if err != nil {
return err
diff --git a/p2p/rlpx.go b/p2p/rlpx.go
index a041bb314..166bbb5e6 100644
--- a/p2p/rlpx.go
+++ b/p2p/rlpx.go
@@ -21,6 +21,11 @@ var (
zero16 = make([]byte, 16)
)
+// rlpxFrameRW implements a simplified version of RLPx framing.
+// chunked messages are not supported and all headers are equal to
+// zeroHeader.
+//
+// rlpxFrameRW is not safe for concurrent use from multiple goroutines.
type rlpxFrameRW struct {
conn io.ReadWriter
enc cipher.Stream
diff --git a/p2p/server.go b/p2p/server.go
index 67d5514b4..8f99bc33d 100644
--- a/p2p/server.go
+++ b/p2p/server.go
@@ -17,9 +17,17 @@ import (
)
const (
- handshakeTimeout = 5 * time.Second
defaultDialTimeout = 10 * time.Second
refreshPeersInterval = 30 * time.Second
+
+ // total timeout for encryption handshake and protocol
+ // handshake in both directions.
+ handshakeTimeout = 5 * time.Second
+ // maximum time allowed for reading a complete message.
+ // this is effectively the amount of time a connection can be idle.
+ frameReadTimeout = 1 * time.Minute
+ // maximum amount of time allowed for writing a complete message.
+ frameWriteTimeout = 5 * time.Second
)
var srvlog = logger.NewLogger("P2P Server")
@@ -359,14 +367,18 @@ func (srv *Server) findPeers() {
func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
// TODO: handle/store session token
- // TODO: reenable deadlines
- // fd.SetDeadline(time.Now().Add(handshakeTimeout))
+ fd.SetDeadline(time.Now().Add(handshakeTimeout))
conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest)
if err != nil {
fd.Close()
srvlog.Debugf("Handshake with %v failed: %v", fd.RemoteAddr(), err)
return
}
+
+ conn.MsgReadWriter = &netWrapper{
+ wrapped: conn.MsgReadWriter,
+ conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
+ }
p := newPeer(fd, conn, srv.Protocols)
if ok, reason := srv.addPeer(conn.ID, p); !ok {
srvlog.DebugDetailf("Not adding %v (%v)\n", p, reason)