diff options
Diffstat (limited to 'p2p/peer.go')
-rw-r--r-- | p2p/peer.go | 71 |
1 files changed, 31 insertions, 40 deletions
diff --git a/p2p/peer.go b/p2p/peer.go index c7ec08887..cbe5ccc84 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -18,7 +18,7 @@ import ( const ( baseProtocolVersion = 4 baseProtocolLength = uint64(16) - baseProtocolMaxMsgSize = 10 * 1024 * 1024 + baseProtocolMaxMsgSize = 2 * 1024 pingInterval = 15 * time.Second ) @@ -33,9 +33,17 @@ const ( peersMsg = 0x05 ) +// protoHandshake is the RLP structure of the protocol handshake. +type protoHandshake struct { + Version uint64 + Name string + Caps []Cap + ListenPort uint64 + ID discover.NodeID +} + // Peer represents a connected remote node. type Peer struct { - conn net.Conn rw *conn running map[string]*protoRW @@ -48,37 +56,36 @@ type Peer struct { // NewPeer returns a peer for testing purposes. func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer { pipe, _ := net.Pipe() - msgpipe, _ := MsgPipe() - conn := &conn{msgpipe, &protoHandshake{ID: id, Name: name, Caps: caps}} - peer := newPeer(pipe, conn, nil) + conn := &conn{fd: pipe, transport: nil, id: id, caps: caps, name: name} + peer := newPeer(conn, nil) close(peer.closed) // ensures Disconnect doesn't block return peer } // ID returns the node's public key. func (p *Peer) ID() discover.NodeID { - return p.rw.ID + return p.rw.id } // Name returns the node name that the remote node advertised. func (p *Peer) Name() string { - return p.rw.Name + return p.rw.name } // Caps returns the capabilities (supported subprotocols) of the remote peer. func (p *Peer) Caps() []Cap { // TODO: maybe return copy - return p.rw.Caps + return p.rw.caps } // RemoteAddr returns the remote address of the network connection. func (p *Peer) RemoteAddr() net.Addr { - return p.conn.RemoteAddr() + return p.rw.fd.RemoteAddr() } // LocalAddr returns the local address of the network connection. func (p *Peer) LocalAddr() net.Addr { - return p.conn.LocalAddr() + return p.rw.fd.LocalAddr() } // Disconnect terminates the peer connection with the given reason. @@ -92,13 +99,12 @@ func (p *Peer) Disconnect(reason DiscReason) { // String implements fmt.Stringer. func (p *Peer) String() string { - return fmt.Sprintf("Peer %.8x %v", p.rw.ID[:], p.RemoteAddr()) + return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr()) } -func newPeer(fd net.Conn, conn *conn, protocols []Protocol) *Peer { - protomap := matchProtocols(protocols, conn.Caps, conn) +func newPeer(conn *conn, protocols []Protocol) *Peer { + protomap := matchProtocols(protocols, conn.caps, conn) p := &Peer{ - conn: fd, rw: conn, running: protomap, disc: make(chan DiscReason), @@ -117,7 +123,10 @@ func (p *Peer) run() DiscReason { p.startProtocols() // Wait for an error or disconnect. - var reason DiscReason + var ( + reason DiscReason + requested bool + ) select { case err := <-readErr: if r, ok := err.(DiscReason); ok { @@ -131,21 +140,17 @@ func (p *Peer) run() DiscReason { case err := <-p.protoErr: reason = discReasonForError(err) case reason = <-p.disc: - p.politeDisconnect(reason) - reason = DiscRequested + requested = true } - close(p.closed) + p.rw.close(reason) p.wg.Wait() - glog.V(logger.Debug).Infof("%v: Disconnected: %v\n", p, reason) - return reason -} -func (p *Peer) politeDisconnect(reason DiscReason) { - if reason != DiscNetworkError { - SendItems(p.rw, discMsg, uint(reason)) + if requested { + reason = DiscRequested } - p.conn.Close() + glog.V(logger.Debug).Infof("%v: Disconnected: %v\n", p, reason) + return reason } func (p *Peer) pingLoop() { @@ -254,7 +259,7 @@ func (p *Peer) startProtocols() { glog.V(logger.Detail).Infof("%v: Protocol %s/%d returned\n", p, proto.Name, proto.Version) err = errors.New("protocol returned") } else if err != io.EOF { - glog.V(logger.Detail).Infof("%v: Protocol %s/%d error: \n", p, proto.Name, proto.Version, err) + glog.V(logger.Detail).Infof("%v: Protocol %s/%d error: %v\n", p, proto.Name, proto.Version, err) } p.protoErr <- err p.wg.Done() @@ -273,20 +278,6 @@ func (p *Peer) getProto(code uint64) (*protoRW, error) { return nil, newPeerError(errInvalidMsgCode, "%d", code) } -// writeProtoMsg sends the given message on behalf of the given named protocol. -// this exists because of Server.Broadcast. -func (p *Peer) writeProtoMsg(protoName string, msg Msg) error { - proto, ok := p.running[protoName] - if !ok { - return fmt.Errorf("protocol %s not handled by peer", protoName) - } - if msg.Code >= proto.Length { - return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName) - } - msg.Code += proto.offset - return p.rw.WriteMsg(msg) -} - type protoRW struct { Protocol in chan Msg |