aboutsummaryrefslogtreecommitdiffstats
path: root/p2p/peer.go
diff options
context:
space:
mode:
Diffstat (limited to 'p2p/peer.go')
-rw-r--r--p2p/peer.go71
1 files changed, 31 insertions, 40 deletions
diff --git a/p2p/peer.go b/p2p/peer.go
index c7ec08887..cbe5ccc84 100644
--- a/p2p/peer.go
+++ b/p2p/peer.go
@@ -18,7 +18,7 @@ import (
const (
baseProtocolVersion = 4
baseProtocolLength = uint64(16)
- baseProtocolMaxMsgSize = 10 * 1024 * 1024
+ baseProtocolMaxMsgSize = 2 * 1024
pingInterval = 15 * time.Second
)
@@ -33,9 +33,17 @@ const (
peersMsg = 0x05
)
+// protoHandshake is the RLP structure of the protocol handshake.
+type protoHandshake struct {
+ Version uint64
+ Name string
+ Caps []Cap
+ ListenPort uint64
+ ID discover.NodeID
+}
+
// Peer represents a connected remote node.
type Peer struct {
- conn net.Conn
rw *conn
running map[string]*protoRW
@@ -48,37 +56,36 @@ type Peer struct {
// NewPeer returns a peer for testing purposes.
func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer {
pipe, _ := net.Pipe()
- msgpipe, _ := MsgPipe()
- conn := &conn{msgpipe, &protoHandshake{ID: id, Name: name, Caps: caps}}
- peer := newPeer(pipe, conn, nil)
+ conn := &conn{fd: pipe, transport: nil, id: id, caps: caps, name: name}
+ peer := newPeer(conn, nil)
close(peer.closed) // ensures Disconnect doesn't block
return peer
}
// ID returns the node's public key.
func (p *Peer) ID() discover.NodeID {
- return p.rw.ID
+ return p.rw.id
}
// Name returns the node name that the remote node advertised.
func (p *Peer) Name() string {
- return p.rw.Name
+ return p.rw.name
}
// Caps returns the capabilities (supported subprotocols) of the remote peer.
func (p *Peer) Caps() []Cap {
// TODO: maybe return copy
- return p.rw.Caps
+ return p.rw.caps
}
// RemoteAddr returns the remote address of the network connection.
func (p *Peer) RemoteAddr() net.Addr {
- return p.conn.RemoteAddr()
+ return p.rw.fd.RemoteAddr()
}
// LocalAddr returns the local address of the network connection.
func (p *Peer) LocalAddr() net.Addr {
- return p.conn.LocalAddr()
+ return p.rw.fd.LocalAddr()
}
// Disconnect terminates the peer connection with the given reason.
@@ -92,13 +99,12 @@ func (p *Peer) Disconnect(reason DiscReason) {
// String implements fmt.Stringer.
func (p *Peer) String() string {
- return fmt.Sprintf("Peer %.8x %v", p.rw.ID[:], p.RemoteAddr())
+ return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr())
}
-func newPeer(fd net.Conn, conn *conn, protocols []Protocol) *Peer {
- protomap := matchProtocols(protocols, conn.Caps, conn)
+func newPeer(conn *conn, protocols []Protocol) *Peer {
+ protomap := matchProtocols(protocols, conn.caps, conn)
p := &Peer{
- conn: fd,
rw: conn,
running: protomap,
disc: make(chan DiscReason),
@@ -117,7 +123,10 @@ func (p *Peer) run() DiscReason {
p.startProtocols()
// Wait for an error or disconnect.
- var reason DiscReason
+ var (
+ reason DiscReason
+ requested bool
+ )
select {
case err := <-readErr:
if r, ok := err.(DiscReason); ok {
@@ -131,21 +140,17 @@ func (p *Peer) run() DiscReason {
case err := <-p.protoErr:
reason = discReasonForError(err)
case reason = <-p.disc:
- p.politeDisconnect(reason)
- reason = DiscRequested
+ requested = true
}
-
close(p.closed)
+ p.rw.close(reason)
p.wg.Wait()
- glog.V(logger.Debug).Infof("%v: Disconnected: %v\n", p, reason)
- return reason
-}
-func (p *Peer) politeDisconnect(reason DiscReason) {
- if reason != DiscNetworkError {
- SendItems(p.rw, discMsg, uint(reason))
+ if requested {
+ reason = DiscRequested
}
- p.conn.Close()
+ glog.V(logger.Debug).Infof("%v: Disconnected: %v\n", p, reason)
+ return reason
}
func (p *Peer) pingLoop() {
@@ -254,7 +259,7 @@ func (p *Peer) startProtocols() {
glog.V(logger.Detail).Infof("%v: Protocol %s/%d returned\n", p, proto.Name, proto.Version)
err = errors.New("protocol returned")
} else if err != io.EOF {
- glog.V(logger.Detail).Infof("%v: Protocol %s/%d error: \n", p, proto.Name, proto.Version, err)
+ glog.V(logger.Detail).Infof("%v: Protocol %s/%d error: %v\n", p, proto.Name, proto.Version, err)
}
p.protoErr <- err
p.wg.Done()
@@ -273,20 +278,6 @@ func (p *Peer) getProto(code uint64) (*protoRW, error) {
return nil, newPeerError(errInvalidMsgCode, "%d", code)
}
-// writeProtoMsg sends the given message on behalf of the given named protocol.
-// this exists because of Server.Broadcast.
-func (p *Peer) writeProtoMsg(protoName string, msg Msg) error {
- proto, ok := p.running[protoName]
- if !ok {
- return fmt.Errorf("protocol %s not handled by peer", protoName)
- }
- if msg.Code >= proto.Length {
- return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName)
- }
- msg.Code += proto.offset
- return p.rw.WriteMsg(msg)
-}
-
type protoRW struct {
Protocol
in chan Msg