diff options
author | Felix Lange <fjl@twurst.com> | 2015-02-05 10:07:58 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2015-02-06 07:00:36 +0800 |
commit | 5bdc1159433138d92ed6fefb253e3c6ed3a43995 (patch) | |
tree | 3b2441a86bed5b1aabac67d61111147cffc074ed /p2p/peer.go | |
parent | 739066ec56393e63b93531787746fb8ba5f1df15 (diff) | |
download | go-tangerine-5bdc1159433138d92ed6fefb253e3c6ed3a43995.tar.gz go-tangerine-5bdc1159433138d92ed6fefb253e3c6ed3a43995.tar.zst go-tangerine-5bdc1159433138d92ed6fefb253e3c6ed3a43995.zip |
p2p: integrate p2p/discover
Overview of changes:
- ClientIdentity has been removed, use discover.NodeID
- Server now requires a private key to be set (instead of public key)
- Server performs the encryption handshake before launching Peer
- Dial logic takes peers from discover table
- Encryption handshake code has been cleaned up a bit
- baseProtocol is gone because we don't exchange peers anymore
- Some parts of baseProtocol have moved into Peer instead
Diffstat (limited to 'p2p/peer.go')
-rw-r--r-- | p2p/peer.go | 518 |
1 files changed, 192 insertions, 326 deletions
diff --git a/p2p/peer.go b/p2p/peer.go index e82bca222..1fa8264a3 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -1,10 +1,6 @@ package p2p import ( - "bufio" - "bytes" - "crypto/ecdsa" - "crypto/rand" "fmt" "io" "io/ioutil" @@ -13,179 +9,118 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/crypto" - - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rlp" ) -// peerAddr is the structure of a peer list element. -// It is also a valid net.Addr. -type peerAddr struct { - IP net.IP - Port uint64 - Pubkey []byte // optional -} +const ( + // maximum amount of time allowed for reading a message + msgReadTimeout = 5 * time.Second + // maximum amount of time allowed for writing a message + msgWriteTimeout = 5 * time.Second + // messages smaller than this many bytes will be read at + // once before passing them to a protocol. + wholePayloadSize = 64 * 1024 -func newPeerAddr(addr net.Addr, pubkey []byte) *peerAddr { - n := addr.Network() - if n != "tcp" && n != "tcp4" && n != "tcp6" { - // for testing with non-TCP - return &peerAddr{net.ParseIP("127.0.0.1"), 30303, pubkey} - } - ta := addr.(*net.TCPAddr) - return &peerAddr{ta.IP, uint64(ta.Port), pubkey} -} + disconnectGracePeriod = 2 * time.Second +) -func (d peerAddr) Network() string { - if d.IP.To4() != nil { - return "tcp4" - } else { - return "tcp6" - } -} +const ( + baseProtocolVersion = 2 + baseProtocolLength = uint64(16) + baseProtocolMaxMsgSize = 10 * 1024 * 1024 +) -func (d peerAddr) String() string { - return fmt.Sprintf("%v:%d", d.IP, d.Port) -} +const ( + // devp2p message codes + handshakeMsg = 0x00 + discMsg = 0x01 + pingMsg = 0x02 + pongMsg = 0x03 + getPeersMsg = 0x04 + peersMsg = 0x05 +) -func (d *peerAddr) RlpData() interface{} { - return []interface{}{string(d.IP), d.Port, d.Pubkey} +// handshake is the RLP structure of the protocol handshake. +type handshake struct { + Version uint64 + Name string + Caps []Cap + ListenPort uint64 + NodeID discover.NodeID } -// Peer represents a remote peer. +// Peer represents a connected remote node. type Peer struct { // Peers have all the log methods. // Use them to display messages related to the peer. *logger.Logger - infolock sync.Mutex - identity ClientIdentity - caps []Cap - listenAddr *peerAddr // what remote peer is listening on - dialAddr *peerAddr // non-nil if dialing + infoMu sync.Mutex + name string + caps []Cap - // The mutex protects the connection - // so only one protocol can write at a time. - writeMu sync.Mutex - conn net.Conn - bufconn *bufio.ReadWriter + ourID, remoteID *discover.NodeID + ourName string + + rw *frameRW // These fields maintain the running protocols. - protocols []Protocol - runBaseProtocol bool // for testing - cryptoHandshake bool // for testing - cryptoReady chan struct{} - privateKey []byte + protocols []Protocol + runlock sync.RWMutex // protects running + running map[string]*proto - runlock sync.RWMutex // protects running - running map[string]*proto + protocolHandshakeEnabled bool protoWG sync.WaitGroup protoErr chan error closed chan struct{} disc chan DiscReason - - activity event.TypeMux // for activity events - - slot int // index into Server peer list - - // These fields are kept so base protocol can access them. - // TODO: this should be one or more interfaces - ourID ClientIdentity // client id of the Server - ourListenAddr *peerAddr // listen addr of Server, nil if not listening - newPeerAddr chan<- *peerAddr // tell server about received peers - otherPeers func() []*Peer // should return the list of all peers - pubkeyHook func(*peerAddr) error // called at end of handshake to validate pubkey } // NewPeer returns a peer for testing purposes. -func NewPeer(id ClientIdentity, caps []Cap) *Peer { +func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer { conn, _ := net.Pipe() - peer := newPeer(conn, nil, nil) - peer.setHandshakeInfo(id, nil, caps) - close(peer.closed) + peer := newPeer(conn, nil, "", nil, &id) + peer.setHandshakeInfo(name, caps) + close(peer.closed) // ensures Disconnect doesn't block return peer } -func newServerPeer(server *Server, conn net.Conn, dialAddr *peerAddr) *Peer { - p := newPeer(conn, server.Protocols, dialAddr) - p.ourID = server.Identity - p.newPeerAddr = server.peerConnect - p.otherPeers = server.Peers - p.pubkeyHook = server.verifyPeer - p.runBaseProtocol = true - - // laddr can be updated concurrently by NAT traversal. - // newServerPeer must be called with the server lock held. - if server.laddr != nil { - p.ourListenAddr = newPeerAddr(server.laddr, server.Identity.Pubkey()) - } - return p -} - -func newPeer(conn net.Conn, protocols []Protocol, dialAddr *peerAddr) *Peer { - p := &Peer{ - Logger: logger.NewLogger("P2P " + conn.RemoteAddr().String()), - conn: conn, - dialAddr: dialAddr, - bufconn: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), - protocols: protocols, - running: make(map[string]*proto), - disc: make(chan DiscReason), - protoErr: make(chan error), - closed: make(chan struct{}), - cryptoReady: make(chan struct{}), - } - return p +// ID returns the node's public key. +func (p *Peer) ID() discover.NodeID { + return *p.remoteID } -// Identity returns the client identity of the remote peer. The -// identity can be nil if the peer has not yet completed the -// handshake. -func (p *Peer) Identity() ClientIdentity { - p.infolock.Lock() - defer p.infolock.Unlock() - return p.identity -} - -func (self *Peer) Pubkey() (pubkey []byte) { - self.infolock.Lock() - defer self.infolock.Unlock() - switch { - case self.identity != nil: - pubkey = self.identity.Pubkey()[1:] - case self.dialAddr != nil: - pubkey = self.dialAddr.Pubkey - case self.listenAddr != nil: - pubkey = self.listenAddr.Pubkey - } - return +// Name returns the node name that the remote node advertised. +func (p *Peer) Name() string { + // this needs a lock because the information is part of the + // protocol handshake. + p.infoMu.Lock() + name := p.name + p.infoMu.Unlock() + return name } // Caps returns the capabilities (supported subprotocols) of the remote peer. func (p *Peer) Caps() []Cap { - p.infolock.Lock() - defer p.infolock.Unlock() - return p.caps -} - -func (p *Peer) setHandshakeInfo(id ClientIdentity, laddr *peerAddr, caps []Cap) { - p.infolock.Lock() - p.identity = id - p.listenAddr = laddr - p.caps = caps - p.infolock.Unlock() + // this needs a lock because the information is part of the + // protocol handshake. + p.infoMu.Lock() + caps := p.caps + p.infoMu.Unlock() + return caps } // RemoteAddr returns the remote address of the network connection. func (p *Peer) RemoteAddr() net.Addr { - return p.conn.RemoteAddr() + return p.rw.RemoteAddr() } // LocalAddr returns the local address of the network connection. func (p *Peer) LocalAddr() net.Addr { - return p.conn.LocalAddr() + return p.rw.LocalAddr() } // Disconnect terminates the peer connection with the given reason. @@ -199,201 +134,167 @@ func (p *Peer) Disconnect(reason DiscReason) { // String implements fmt.Stringer. func (p *Peer) String() string { - kind := "inbound" - p.infolock.Lock() - if p.dialAddr != nil { - kind = "outbound" + return fmt.Sprintf("Peer %.8x %v", p.remoteID, p.RemoteAddr()) +} + +func newPeer(conn net.Conn, protocols []Protocol, ourName string, ourID, remoteID *discover.NodeID) *Peer { + logtag := fmt.Sprintf("Peer %.8x %v", remoteID, conn.RemoteAddr()) + return &Peer{ + Logger: logger.NewLogger(logtag), + rw: newFrameRW(conn, msgWriteTimeout), + ourID: ourID, + ourName: ourName, + remoteID: remoteID, + protocols: protocols, + running: make(map[string]*proto), + disc: make(chan DiscReason), + protoErr: make(chan error), + closed: make(chan struct{}), } - p.infolock.Unlock() - return fmt.Sprintf("Peer(%p %v %s)", p, p.conn.RemoteAddr(), kind) } -const ( - // maximum amount of time allowed for reading a message - msgReadTimeout = 5 * time.Second - // maximum amount of time allowed for writing a message - msgWriteTimeout = 5 * time.Second - // messages smaller than this many bytes will be read at - // once before passing them to a protocol. - wholePayloadSize = 64 * 1024 -) - -var ( - inactivityTimeout = 2 * time.Second - disconnectGracePeriod = 2 * time.Second -) +func (p *Peer) setHandshakeInfo(name string, caps []Cap) { + p.infoMu.Lock() + p.name = name + p.caps = caps + p.infoMu.Unlock() +} -func (p *Peer) loop() (reason DiscReason, err error) { - defer p.activity.Stop() +func (p *Peer) run() DiscReason { + var readErr = make(chan error, 1) defer p.closeProtocols() defer close(p.closed) - defer p.conn.Close() + defer p.rw.Close() + + // start the read loop + go func() { readErr <- p.readLoop() }() - var readLoop func(chan<- Msg, chan<- error, <-chan bool) - if p.cryptoHandshake { - if readLoop, err = p.handleCryptoHandshake(); err != nil { - // from here on everything can be encrypted, authenticated - return DiscProtocolError, err // no graceful disconnect + if p.protocolHandshakeEnabled { + if err := writeProtocolHandshake(p.rw, p.ourName, *p.ourID, p.protocols); err != nil { + p.DebugDetailf("Protocol handshake error: %v\n", err) + return DiscProtocolError } - } else { - readLoop = p.readLoop } - // read loop - readMsg := make(chan Msg) - readErr := make(chan error) - readNext := make(chan bool, 1) - protoDone := make(chan struct{}, 1) - go readLoop(readMsg, readErr, readNext) - readNext <- true - - close(p.cryptoReady) - if p.runBaseProtocol { - p.startBaseProtocol() + // wait for an error or disconnect + var reason DiscReason + select { + case err := <-readErr: + // We rely on protocols to abort if there is a write error. It + // might be more robust to handle them here as well. + p.DebugDetailf("Read error: %v\n", err) + reason = DiscNetworkError + case err := <-p.protoErr: + reason = discReasonForError(err) + case reason = <-p.disc: } - -loop: - for { - select { - case msg := <-readMsg: - // a new message has arrived. - var wait bool - if wait, err = p.dispatch(msg, protoDone); err != nil { - p.Errorf("msg dispatch error: %v\n", err) - reason = discReasonForError(err) - break loop - } - if !wait { - // Msg has already been read completely, continue with next message. - readNext <- true - } - p.activity.Post(time.Now()) - case <-protoDone: - // protocol has consumed the message payload, - // we can continue reading from the socket. - readNext <- true - - case err := <-readErr: - // read failed. there is no need to run the - // polite disconnect sequence because the connection - // is probably dead anyway. - // TODO: handle write errors as well - return DiscNetworkError, err - case err = <-p.protoErr: - reason = discReasonForError(err) - break loop - case reason = <-p.disc: - break loop - } + if reason != DiscNetworkError { + p.politeDisconnect(reason) } + p.Debugf("Disconnected: %v\n", reason) + return reason +} - // wait for read loop to return. - close(readNext) - <-readErr - // tell the remote end to disconnect +func (p *Peer) politeDisconnect(reason DiscReason) { done := make(chan struct{}) go func() { - p.conn.SetDeadline(time.Now().Add(disconnectGracePeriod)) - p.writeMsg(NewMsg(discMsg, reason), disconnectGracePeriod) - io.Copy(ioutil.Discard, p.conn) + // send reason + EncodeMsg(p.rw, discMsg, uint(reason)) + // discard any data that might arrive + io.Copy(ioutil.Discard, p.rw) close(done) }() select { case <-done: case <-time.After(disconnectGracePeriod): } - return reason, err } -func (p *Peer) readLoop(msgc chan<- Msg, errc chan<- error, unblock <-chan bool) { - for _ = range unblock { - p.conn.SetReadDeadline(time.Now().Add(msgReadTimeout)) - if msg, err := readMsg(p.bufconn); err != nil { - errc <- err - } else { - msgc <- msg +func (p *Peer) readLoop() error { + if p.protocolHandshakeEnabled { + if err := readProtocolHandshake(p, p.rw); err != nil { + return err } } - close(errc) + for { + msg, err := p.rw.ReadMsg() + if err != nil { + return err + } + if err = p.handle(msg); err != nil { + return err + } + } + return nil } -func (p *Peer) dispatch(msg Msg, protoDone chan struct{}) (wait bool, err error) { - proto, err := p.getProto(msg.Code) - if err != nil { - return false, err - } - if msg.Size <= wholePayloadSize { - // optimization: msg is small enough, read all - // of it and move on to the next message - buf, err := ioutil.ReadAll(msg.Payload) +func (p *Peer) handle(msg Msg) error { + switch { + case msg.Code == pingMsg: + msg.Discard() + go EncodeMsg(p.rw, pongMsg) + case msg.Code == discMsg: + var reason DiscReason + // no need to discard or for error checking, we'll close the + // connection after this. + rlp.Decode(msg.Payload, &reason) + p.Disconnect(DiscRequested) + return discRequestedError(reason) + case msg.Code < baseProtocolLength: + // ignore other base protocol messages + return msg.Discard() + default: + // it's a subprotocol message + proto, err := p.getProto(msg.Code) if err != nil { - return false, err + return fmt.Errorf("msg code out of range: %v", msg.Code) } - msg.Payload = bytes.NewReader(buf) - proto.in <- msg - } else { - wait = true - pr := &eofSignal{msg.Payload, int64(msg.Size), protoDone} - msg.Payload = pr proto.in <- msg } - return wait, nil + return nil } -type readLoop func(chan<- Msg, chan<- error, <-chan bool) - -func (p *Peer) PrivateKey() (prv *ecdsa.PrivateKey, err error) { - if prv = crypto.ToECDSA(p.privateKey); prv == nil { - err = fmt.Errorf("invalid private key") +func readProtocolHandshake(p *Peer, rw MsgReadWriter) error { + // read and handle remote handshake + msg, err := rw.ReadMsg() + if err != nil { + return err } - return -} - -func (p *Peer) handleCryptoHandshake() (loop readLoop, err error) { - // cryptoId is just created for the lifecycle of the handshake - // it is survived by an encrypted readwriter - var initiator bool - var sessionToken []byte - sessionToken = make([]byte, shaLen) - if _, err = rand.Read(sessionToken); err != nil { - return + if msg.Code != handshakeMsg { + return newPeerError(errProtocolBreach, "expected handshake, got %x", msg.Code) } - if p.dialAddr != nil { // this should have its own method Outgoing() bool - initiator = true + if msg.Size > baseProtocolMaxMsgSize { + return newPeerError(errMisc, "message too big") } - - // run on peer - // this bit handles the handshake and creates a secure communications channel with - // var rw *secretRW - var prvKey *ecdsa.PrivateKey - if prvKey, err = p.PrivateKey(); err != nil { - err = fmt.Errorf("unable to access private key for client: %v", err) - return + var hs handshake + if err := msg.Decode(&hs); err != nil { + return err } - // initialise a new secure session - if sessionToken, _, err = NewSecureSession(p.conn, prvKey, p.Pubkey(), sessionToken, initiator); err != nil { - p.Debugf("unable to setup secure session: %v", err) - return + // validate handshake info + if hs.Version != baseProtocolVersion { + return newPeerError(errP2PVersionMismatch, "required version %d, received %d\n", + baseProtocolVersion, hs.Version) } - loop = func(msg chan<- Msg, err chan<- error, next <-chan bool) { - // this is the readloop :) + if hs.NodeID == *p.remoteID { + return newPeerError(errPubkeyForbidden, "node ID mismatch") } - return + // TODO: remove Caps with empty name + p.setHandshakeInfo(hs.Name, hs.Caps) + p.startSubprotocols(hs.Caps) + return nil } -func (p *Peer) startBaseProtocol() { - p.runlock.Lock() - defer p.runlock.Unlock() - p.running[""] = p.startProto(0, Protocol{ - Length: baseProtocolLength, - Run: runBaseProtocol, - }) +func writeProtocolHandshake(w MsgWriter, name string, id discover.NodeID, ps []Protocol) error { + var caps []interface{} + for _, proto := range ps { + caps = append(caps, proto.cap()) + } + return EncodeMsg(w, handshakeMsg, baseProtocolVersion, name, caps, 0, id) } // startProtocols starts matching named subprotocols. func (p *Peer) startSubprotocols(caps []Cap) { sort.Sort(capsByName(caps)) - p.runlock.Lock() defer p.runlock.Unlock() offset := baseProtocolLength @@ -412,20 +313,22 @@ outer: } func (p *Peer) startProto(offset uint64, impl Protocol) *proto { + p.DebugDetailf("Starting protocol %s/%d\n", impl.Name, impl.Version) rw := &proto{ + name: impl.Name, in: make(chan Msg), offset: offset, maxcode: impl.Length, - peer: p, + w: p.rw, } p.protoWG.Add(1) go func() { err := impl.Run(p, rw) if err == nil { - p.Infof("protocol %q returned", impl.Name) + p.DebugDetailf("Protocol %s/%d returned\n", impl.Name, impl.Version) err = newPeerError(errMisc, "protocol returned") } else { - p.Errorf("protocol %q error: %v\n", impl.Name, err) + p.DebugDetailf("Protocol %s/%d error: %v\n", impl.Name, impl.Version, err) } select { case p.protoErr <- err: @@ -459,6 +362,7 @@ func (p *Peer) closeProtocols() { } // writeProtoMsg sends the given message on behalf of the given named protocol. +// this exists because of Server.Broadcast. func (p *Peer) writeProtoMsg(protoName string, msg Msg) error { p.runlock.RLock() proto, ok := p.running[protoName] @@ -470,25 +374,14 @@ func (p *Peer) writeProtoMsg(protoName string, msg Msg) error { return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName) } msg.Code += proto.offset - return p.writeMsg(msg, msgWriteTimeout) -} - -// writeMsg writes a message to the connection. -func (p *Peer) writeMsg(msg Msg, timeout time.Duration) error { - p.writeMu.Lock() - defer p.writeMu.Unlock() - p.conn.SetWriteDeadline(time.Now().Add(timeout)) - if err := writeMsg(p.bufconn, msg); err != nil { - return newPeerError(errWrite, "%v", err) - } - return p.bufconn.Flush() + return p.rw.WriteMsg(msg) } type proto struct { name string in chan Msg maxcode, offset uint64 - peer *Peer + w MsgWriter } func (rw *proto) WriteMsg(msg Msg) error { @@ -496,11 +389,7 @@ func (rw *proto) WriteMsg(msg Msg) error { return newPeerError(errInvalidMsgCode, "not handled") } msg.Code += rw.offset - return rw.peer.writeMsg(msg, msgWriteTimeout) -} - -func (rw *proto) EncodeMsg(code uint64, data ...interface{}) error { - return rw.WriteMsg(NewMsg(code, data...)) + return rw.w.WriteMsg(msg) } func (rw *proto) ReadMsg() (Msg, error) { @@ -511,26 +400,3 @@ func (rw *proto) ReadMsg() (Msg, error) { msg.Code -= rw.offset return msg, nil } - -// eofSignal wraps a reader with eof signaling. the eof channel is -// closed when the wrapped reader returns an error or when count bytes -// have been read. -// -type eofSignal struct { - wrapped io.Reader - count int64 - eof chan<- struct{} -} - -// note: when using eofSignal to detect whether a message payload -// has been read, Read might not be called for zero sized messages. - -func (r *eofSignal) Read(buf []byte) (int, error) { - n, err := r.wrapped.Read(buf) - r.count -= int64(n) - if (err != nil || r.count <= 0) && r.eof != nil { - r.eof <- struct{}{} // tell Peer that msg has been consumed - r.eof = nil - } - return n, err -} |