aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/peer.go')
-rw-r--r--p2p/peer.go518
1 files changed, 192 insertions, 326 deletions
diff --git a/p2p/peer.go b/p2p/peer.go
index e82bca222..1fa8264a3 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -1,10 +1,6 @@
package p2p
import (
- "bufio"
- "bytes"
- "crypto/ecdsa"
- "crypto/rand"
"fmt"
"io"
"io/ioutil"
@@ -13,179 +9,118 @@ import (
"sync"
"time"
- "github.com/ethereum/go-ethereum/crypto"
-
- "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/rlp"
)
-// peerAddr is the structure of a peer list element.
-// It is also a valid net.Addr.
-type peerAddr struct {
- IP net.IP
- Port uint64
- Pubkey []byte // optional
-}
+const (
+ // maximum amount of time allowed for reading a message
+ msgReadTimeout = 5 * time.Second
+ // maximum amount of time allowed for writing a message
+ msgWriteTimeout = 5 * time.Second
+ // messages smaller than this many bytes will be read at
+ // once before passing them to a protocol.
+ wholePayloadSize = 64 * 1024
-func newPeerAddr(addr net.Addr, pubkey []byte) *peerAddr {
- n := addr.Network()
- if n != "tcp" && n != "tcp4" && n != "tcp6" {
- // for testing with non-TCP
- return &peerAddr{net.ParseIP("127.0.0.1"), 30303, pubkey}
- }
- ta := addr.(*net.TCPAddr)
- return &peerAddr{ta.IP, uint64(ta.Port), pubkey}
-}
+ disconnectGracePeriod = 2 * time.Second
+)
-func (d peerAddr) Network() string {
- if d.IP.To4() != nil {
- return "tcp4"
- } else {
- return "tcp6"
- }
-}
+const (
+ baseProtocolVersion = 2
+ baseProtocolLength = uint64(16)
+ baseProtocolMaxMsgSize = 10 * 1024 * 1024
+)
-func (d peerAddr) String() string {
- return fmt.Sprintf("%v:%d", d.IP, d.Port)
-}
+const (
+ // devp2p message codes
+ handshakeMsg = 0x00
+ discMsg = 0x01
+ pingMsg = 0x02
+ pongMsg = 0x03
+ getPeersMsg = 0x04
+ peersMsg = 0x05
+)
-func (d *peerAddr) RlpData() interface{} {
- return []interface{}{string(d.IP), d.Port, d.Pubkey}
+// handshake is the RLP structure of the protocol handshake.
+type handshake struct {
+ Version uint64
+ Name string
+ Caps []Cap
+ ListenPort uint64
+ NodeID discover.NodeID
}
-// Peer represents a remote peer.
+// Peer represents a connected remote node.
type Peer struct {
// Peers have all the log methods.
// Use them to display messages related to the peer.
*logger.Logger
- infolock sync.Mutex
- identity ClientIdentity
- caps []Cap
- listenAddr *peerAddr // what remote peer is listening on
- dialAddr *peerAddr // non-nil if dialing
+ infoMu sync.Mutex
+ name string
+ caps []Cap
- // The mutex protects the connection
- // so only one protocol can write at a time.
- writeMu sync.Mutex
- conn net.Conn
- bufconn *bufio.ReadWriter
+ ourID, remoteID *discover.NodeID
+ ourName string
+
+ rw *frameRW
// These fields maintain the running protocols.
- protocols []Protocol
- runBaseProtocol bool // for testing
- cryptoHandshake bool // for testing
- cryptoReady chan struct{}
- privateKey []byte
+ protocols []Protocol
+ runlock sync.RWMutex // protects running
+ running map[string]*proto
- runlock sync.RWMutex // protects running
- running map[string]*proto
+ protocolHandshakeEnabled bool
protoWG sync.WaitGroup
protoErr chan error
closed chan struct{}
disc chan DiscReason
-
- activity event.TypeMux // for activity events
-
- slot int // index into Server peer list
-
- // These fields are kept so base protocol can access them.
- // TODO: this should be one or more interfaces
- ourID ClientIdentity // client id of the Server
- ourListenAddr *peerAddr // listen addr of Server, nil if not listening
- newPeerAddr chan<- *peerAddr // tell server about received peers
- otherPeers func() []*Peer // should return the list of all peers
- pubkeyHook func(*peerAddr) error // called at end of handshake to validate pubkey
}
// NewPeer returns a peer for testing purposes.
-func NewPeer(id ClientIdentity, caps []Cap) *Peer {
+func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer {
conn, _ := net.Pipe()
- peer := newPeer(conn, nil, nil)
- peer.setHandshakeInfo(id, nil, caps)
- close(peer.closed)
+ peer := newPeer(conn, nil, "", nil, &id)
+ peer.setHandshakeInfo(name, caps)
+ close(peer.closed) // ensures Disconnect doesn't block
return peer
}
-func newServerPeer(server *Server, conn net.Conn, dialAddr *peerAddr) *Peer {
- p := newPeer(conn, server.Protocols, dialAddr)
- p.ourID = server.Identity
- p.newPeerAddr = server.peerConnect
- p.otherPeers = server.Peers
- p.pubkeyHook = server.verifyPeer
- p.runBaseProtocol = true
-
- // laddr can be updated concurrently by NAT traversal.
- // newServerPeer must be called with the server lock held.
- if server.laddr != nil {
- p.ourListenAddr = newPeerAddr(server.laddr, server.Identity.Pubkey())
- }
- return p
-}
-
-func newPeer(conn net.Conn, protocols []Protocol, dialAddr *peerAddr) *Peer {
- p := &Peer{
- Logger: logger.NewLogger("P2P " + conn.RemoteAddr().String()),
- conn: conn,
- dialAddr: dialAddr,
- bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)),
- protocols: protocols,
- running: make(map[string]*proto),
- disc: make(chan DiscReason),
- protoErr: make(chan error),
- closed: make(chan struct{}),
- cryptoReady: make(chan struct{}),
- }
- return p
+// ID returns the node's public key.
+func (p *Peer) ID() discover.NodeID {
+ return *p.remoteID
}
-// Identity returns the client identity of the remote peer. The
-// identity can be nil if the peer has not yet completed the
-// handshake.
-func (p *Peer) Identity() ClientIdentity {
- p.infolock.Lock()
- defer p.infolock.Unlock()
- return p.identity
-}
-
-func (self *Peer) Pubkey() (pubkey []byte) {
- self.infolock.Lock()
- defer self.infolock.Unlock()
- switch {
- case self.identity != nil:
- pubkey = self.identity.Pubkey()[1:]
- case self.dialAddr != nil:
- pubkey = self.dialAddr.Pubkey
- case self.listenAddr != nil:
- pubkey = self.listenAddr.Pubkey
- }
- return
+// Name returns the node name that the remote node advertised.
+func (p *Peer) Name() string {
+ // this needs a lock because the information is part of the
+ // protocol handshake.
+ p.infoMu.Lock()
+ name := p.name
+ p.infoMu.Unlock()
+ return name
}
// Caps returns the capabilities (supported subprotocols) of the remote peer.
func (p *Peer) Caps() []Cap {
- p.infolock.Lock()
- defer p.infolock.Unlock()
- return p.caps
-}
-
-func (p *Peer) setHandshakeInfo(id ClientIdentity, laddr *peerAddr, caps []Cap) {
- p.infolock.Lock()
- p.identity = id
- p.listenAddr = laddr
- p.caps = caps
- p.infolock.Unlock()
+ // this needs a lock because the information is part of the
+ // protocol handshake.
+ p.infoMu.Lock()
+ caps := p.caps
+ p.infoMu.Unlock()
+ return caps
}
// RemoteAddr returns the remote address of the network connection.
func (p *Peer) RemoteAddr() net.Addr {
- return p.conn.RemoteAddr()
+ return p.rw.RemoteAddr()
}
// LocalAddr returns the local address of the network connection.
func (p *Peer) LocalAddr() net.Addr {
- return p.conn.LocalAddr()
+ return p.rw.LocalAddr()
}
// Disconnect terminates the peer connection with the given reason.
@@ -199,201 +134,167 @@ func (p *Peer) Disconnect(reason DiscReason) {
// String implements fmt.Stringer.
func (p *Peer) String() string {
- kind := "inbound"
- p.infolock.Lock()
- if p.dialAddr != nil {
- kind = "outbound"
+ return fmt.Sprintf("Peer %.8x %v", p.remoteID, p.RemoteAddr())
+}
+
+func newPeer(conn net.Conn, protocols []Protocol, ourName string, ourID, remoteID *discover.NodeID) *Peer {
+ logtag := fmt.Sprintf("Peer %.8x %v", remoteID, conn.RemoteAddr())
+ return &Peer{
+ Logger: logger.NewLogger(logtag),
+ rw: newFrameRW(conn, msgWriteTimeout),
+ ourID: ourID,
+ ourName: ourName,
+ remoteID: remoteID,
+ protocols: protocols,
+ running: make(map[string]*proto),
+ disc: make(chan DiscReason),
+ protoErr: make(chan error),
+ closed: make(chan struct{}),
}
- p.infolock.Unlock()
- return fmt.Sprintf("Peer(%p %v %s)", p, p.conn.RemoteAddr(), kind)
}
-const (
- // maximum amount of time allowed for reading a message
- msgReadTimeout = 5 * time.Second
- // maximum amount of time allowed for writing a message
- msgWriteTimeout = 5 * time.Second
- // messages smaller than this many bytes will be read at
- // once before passing them to a protocol.
- wholePayloadSize = 64 * 1024
-)
-
-var (
- inactivityTimeout = 2 * time.Second
- disconnectGracePeriod = 2 * time.Second
-)
+func (p *Peer) setHandshakeInfo(name string, caps []Cap) {
+ p.infoMu.Lock()
+ p.name = name
+ p.caps = caps
+ p.infoMu.Unlock()
+}
-func (p *Peer) loop() (reason DiscReason, err error) {
- defer p.activity.Stop()
+func (p *Peer) run() DiscReason {
+ var readErr = make(chan error, 1)
defer p.closeProtocols()
defer close(p.closed)
- defer p.conn.Close()
+ defer p.rw.Close()
+
+ // start the read loop
+ go func() { readErr <- p.readLoop() }()
- var readLoop func(chan<- Msg, chan<- error, <-chan bool)
- if p.cryptoHandshake {
- if readLoop, err = p.handleCryptoHandshake(); err != nil {
- // from here on everything can be encrypted, authenticated
- return DiscProtocolError, err // no graceful disconnect
+ if p.protocolHandshakeEnabled {
+ if err := writeProtocolHandshake(p.rw, p.ourName, *p.ourID, p.protocols); err != nil {
+ p.DebugDetailf("Protocol handshake error: %v\n", err)
+ return DiscProtocolError
}
- } else {
- readLoop = p.readLoop
}
- // read loop
- readMsg := make(chan Msg)
- readErr := make(chan error)
- readNext := make(chan bool, 1)
- protoDone := make(chan struct{}, 1)
- go readLoop(readMsg, readErr, readNext)
- readNext <- true
-
- close(p.cryptoReady)
- if p.runBaseProtocol {
- p.startBaseProtocol()
+ // wait for an error or disconnect
+ var reason DiscReason
+ select {
+ case err := <-readErr:
+ // We rely on protocols to abort if there is a write error. It
+ // might be more robust to handle them here as well.
+ p.DebugDetailf("Read error: %v\n", err)
+ reason = DiscNetworkError
+ case err := <-p.protoErr:
+ reason = discReasonForError(err)
+ case reason = <-p.disc:
}
-
-loop:
- for {
- select {
- case msg := <-readMsg:
- // a new message has arrived.
- var wait bool
- if wait, err = p.dispatch(msg, protoDone); err != nil {
- p.Errorf("msg dispatch error: %v\n", err)
- reason = discReasonForError(err)
- break loop
- }
- if !wait {
- // Msg has already been read completely, continue with next message.
- readNext <- true
- }
- p.activity.Post(time.Now())
- case <-protoDone:
- // protocol has consumed the message payload,
- // we can continue reading from the socket.
- readNext <- true
-
- case err := <-readErr:
- // read failed. there is no need to run the
- // polite disconnect sequence because the connection
- // is probably dead anyway.
- // TODO: handle write errors as well
- return DiscNetworkError, err
- case err = <-p.protoErr:
- reason = discReasonForError(err)
- break loop
- case reason = <-p.disc:
- break loop
- }
+ if reason != DiscNetworkError {
+ p.politeDisconnect(reason)
}
+ p.Debugf("Disconnected: %v\n", reason)
+ return reason
+}
- // wait for read loop to return.
- close(readNext)
- <-readErr
- // tell the remote end to disconnect
+func (p *Peer) politeDisconnect(reason DiscReason) {
done := make(chan struct{})
go func() {
- p.conn.SetDeadline(time.Now().Add(disconnectGracePeriod))
- p.writeMsg(NewMsg(discMsg, reason), disconnectGracePeriod)
- io.Copy(ioutil.Discard, p.conn)
+ // send reason
+ EncodeMsg(p.rw, discMsg, uint(reason))
+ // discard any data that might arrive
+ io.Copy(ioutil.Discard, p.rw)
close(done)
}()
select {
case <-done:
case <-time.After(disconnectGracePeriod):
}
- return reason, err
}
-func (p *Peer) readLoop(msgc chan<- Msg, errc chan<- error, unblock <-chan bool) {
- for _ = range unblock {
- p.conn.SetReadDeadline(time.Now().Add(msgReadTimeout))
- if msg, err := readMsg(p.bufconn); err != nil {
- errc <- err
- } else {
- msgc <- msg
+func (p *Peer) readLoop() error {
+ if p.protocolHandshakeEnabled {
+ if err := readProtocolHandshake(p, p.rw); err != nil {
+ return err
}
}
- close(errc)
+ for {
+ msg, err := p.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if err = p.handle(msg); err != nil {
+ return err
+ }
+ }
+ return nil
}
-func (p *Peer) dispatch(msg Msg, protoDone chan struct{}) (wait bool, err error) {
- proto, err := p.getProto(msg.Code)
- if err != nil {
- return false, err
- }
- if msg.Size <= wholePayloadSize {
- // optimization: msg is small enough, read all
- // of it and move on to the next message
- buf, err := ioutil.ReadAll(msg.Payload)
+func (p *Peer) handle(msg Msg) error {
+ switch {
+ case msg.Code == pingMsg:
+ msg.Discard()
+ go EncodeMsg(p.rw, pongMsg)
+ case msg.Code == discMsg:
+ var reason DiscReason
+ // no need to discard or for error checking, we'll close the
+ // connection after this.
+ rlp.Decode(msg.Payload, &reason)
+ p.Disconnect(DiscRequested)
+ return discRequestedError(reason)
+ case msg.Code < baseProtocolLength:
+ // ignore other base protocol messages
+ return msg.Discard()
+ default:
+ // it's a subprotocol message
+ proto, err := p.getProto(msg.Code)
if err != nil {
- return false, err
+ return fmt.Errorf("msg code out of range: %v", msg.Code)
}
- msg.Payload = bytes.NewReader(buf)
- proto.in <- msg
- } else {
- wait = true
- pr := &eofSignal{msg.Payload, int64(msg.Size), protoDone}
- msg.Payload = pr
proto.in <- msg
}
- return wait, nil
+ return nil
}
-type readLoop func(chan<- Msg, chan<- error, <-chan bool)
-
-func (p *Peer) PrivateKey() (prv *ecdsa.PrivateKey, err error) {
- if prv = crypto.ToECDSA(p.privateKey); prv == nil {
- err = fmt.Errorf("invalid private key")
+func readProtocolHandshake(p *Peer, rw MsgReadWriter) error {
+ // read and handle remote handshake
+ msg, err := rw.ReadMsg()
+ if err != nil {
+ return err
}
- return
-}
-
-func (p *Peer) handleCryptoHandshake() (loop readLoop, err error) {
- // cryptoId is just created for the lifecycle of the handshake
- // it is survived by an encrypted readwriter
- var initiator bool
- var sessionToken []byte
- sessionToken = make([]byte, shaLen)
- if _, err = rand.Read(sessionToken); err != nil {
- return
+ if msg.Code != handshakeMsg {
+ return newPeerError(errProtocolBreach, "expected handshake, got %x", msg.Code)
}
- if p.dialAddr != nil { // this should have its own method Outgoing() bool
- initiator = true
+ if msg.Size > baseProtocolMaxMsgSize {
+ return newPeerError(errMisc, "message too big")
}
-
- // run on peer
- // this bit handles the handshake and creates a secure communications channel with
- // var rw *secretRW
- var prvKey *ecdsa.PrivateKey
- if prvKey, err = p.PrivateKey(); err != nil {
- err = fmt.Errorf("unable to access private key for client: %v", err)
- return
+ var hs handshake
+ if err := msg.Decode(&hs); err != nil {
+ return err
}
- // initialise a new secure session
- if sessionToken, _, err = NewSecureSession(p.conn, prvKey, p.Pubkey(), sessionToken, initiator); err != nil {
- p.Debugf("unable to setup secure session: %v", err)
- return
+ // validate handshake info
+ if hs.Version != baseProtocolVersion {
+ return newPeerError(errP2PVersionMismatch, "required version %d, received %d\n",
+ baseProtocolVersion, hs.Version)
}
- loop = func(msg chan<- Msg, err chan<- error, next <-chan bool) {
- // this is the readloop :)
+ if hs.NodeID == *p.remoteID {
+ return newPeerError(errPubkeyForbidden, "node ID mismatch")
}
- return
+ // TODO: remove Caps with empty name
+ p.setHandshakeInfo(hs.Name, hs.Caps)
+ p.startSubprotocols(hs.Caps)
+ return nil
}
-func (p *Peer) startBaseProtocol() {
- p.runlock.Lock()
- defer p.runlock.Unlock()
- p.running[""] = p.startProto(0, Protocol{
- Length: baseProtocolLength,
- Run: runBaseProtocol,
- })
+func writeProtocolHandshake(w MsgWriter, name string, id discover.NodeID, ps []Protocol) error {
+ var caps []interface{}
+ for _, proto := range ps {
+ caps = append(caps, proto.cap())
+ }
+ return EncodeMsg(w, handshakeMsg, baseProtocolVersion, name, caps, 0, id)
}
// startProtocols starts matching named subprotocols.
func (p *Peer) startSubprotocols(caps []Cap) {
sort.Sort(capsByName(caps))
-
p.runlock.Lock()
defer p.runlock.Unlock()
offset := baseProtocolLength
@@ -412,20 +313,22 @@ outer:
}
func (p *Peer) startProto(offset uint64, impl Protocol) *proto {
+ p.DebugDetailf("Starting protocol %s/%d\n", impl.Name, impl.Version)
rw := &proto{
+ name: impl.Name,
in: make(chan Msg),
offset: offset,
maxcode: impl.Length,
- peer: p,
+ w: p.rw,
}
p.protoWG.Add(1)
go func() {
err := impl.Run(p, rw)
if err == nil {
- p.Infof("protocol %q returned", impl.Name)
+ p.DebugDetailf("Protocol %s/%d returned\n", impl.Name, impl.Version)
err = newPeerError(errMisc, "protocol returned")
} else {
- p.Errorf("protocol %q error: %v\n", impl.Name, err)
+ p.DebugDetailf("Protocol %s/%d error: %v\n", impl.Name, impl.Version, err)
}
select {
case p.protoErr <- err:
@@ -459,6 +362,7 @@ func (p *Peer) closeProtocols() {
}
// writeProtoMsg sends the given message on behalf of the given named protocol.
+// this exists because of Server.Broadcast.
func (p *Peer) writeProtoMsg(protoName string, msg Msg) error {
p.runlock.RLock()
proto, ok := p.running[protoName]
@@ -470,25 +374,14 @@ func (p *Peer) writeProtoMsg(protoName string, msg Msg) error {
return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName)
}
msg.Code += proto.offset
- return p.writeMsg(msg, msgWriteTimeout)
-}
-
-// writeMsg writes a message to the connection.
-func (p *Peer) writeMsg(msg Msg, timeout time.Duration) error {
- p.writeMu.Lock()
- defer p.writeMu.Unlock()
- p.conn.SetWriteDeadline(time.Now().Add(timeout))
- if err := writeMsg(p.bufconn, msg); err != nil {
- return newPeerError(errWrite, "%v", err)
- }
- return p.bufconn.Flush()
+ return p.rw.WriteMsg(msg)
}
type proto struct {
name string
in chan Msg
maxcode, offset uint64
- peer *Peer
+ w MsgWriter
}
func (rw *proto) WriteMsg(msg Msg) error {
@@ -496,11 +389,7 @@ func (rw *proto) WriteMsg(msg Msg) error {
return newPeerError(errInvalidMsgCode, "not handled")
}
msg.Code += rw.offset
- return rw.peer.writeMsg(msg, msgWriteTimeout)
-}
-
-func (rw *proto) EncodeMsg(code uint64, data ...interface{}) error {
- return rw.WriteMsg(NewMsg(code, data...))
+ return rw.w.WriteMsg(msg)
}
func (rw *proto) ReadMsg() (Msg, error) {
@@ -511,26 +400,3 @@ func (rw *proto) ReadMsg() (Msg, error) {
msg.Code -= rw.offset
return msg, nil
}
-
-// eofSignal wraps a reader with eof signaling. the eof channel is
-// closed when the wrapped reader returns an error or when count bytes
-// have been read.
-//
-type eofSignal struct {
- wrapped io.Reader
- count int64
- eof chan<- struct{}
-}
-
-// note: when using eofSignal to detect whether a message payload
-// has been read, Read might not be called for zero sized messages.
-
-func (r *eofSignal) Read(buf []byte) (int, error) {
- n, err := r.wrapped.Read(buf)
- r.count -= int64(n)
- if (err != nil || r.count <= 0) && r.eof != nil {
- r.eof <- struct{}{} // tell Peer that msg has been consumed
- r.eof = nil
- }
- return n, err
-}