diff options
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/metrics.go | 10 | ||||
-rw-r--r-- | p2p/peer.go | 7 | ||||
-rw-r--r-- | p2p/protocols/protocol.go | 7 | ||||
-rw-r--r-- | p2p/simulations/adapters/state.go | 36 | ||||
-rw-r--r-- | p2p/simulations/network.go | 67 | ||||
-rw-r--r-- | p2p/testing/protocolsession.go | 2 |
6 files changed, 53 insertions, 76 deletions
diff --git a/p2p/metrics.go b/p2p/metrics.go index 4cbff90ac..2d52fd1fd 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -31,10 +31,10 @@ var ( egressTrafficMeter = metrics.NewRegisteredMeter("p2p/OutboundTraffic", nil) ) -// meteredConn is a wrapper around a network TCP connection that meters both the +// meteredConn is a wrapper around a net.Conn that meters both the // inbound and outbound network traffic. type meteredConn struct { - *net.TCPConn // Network connection to wrap with metering + net.Conn // Network connection to wrap with metering } // newMeteredConn creates a new metered connection, also bumping the ingress or @@ -51,13 +51,13 @@ func newMeteredConn(conn net.Conn, ingress bool) net.Conn { } else { egressConnectMeter.Mark(1) } - return &meteredConn{conn.(*net.TCPConn)} + return &meteredConn{Conn: conn} } // Read delegates a network read to the underlying connection, bumping the ingress // traffic meter along the way. func (c *meteredConn) Read(b []byte) (n int, err error) { - n, err = c.TCPConn.Read(b) + n, err = c.Conn.Read(b) ingressTrafficMeter.Mark(int64(n)) return } @@ -65,7 +65,7 @@ func (c *meteredConn) Read(b []byte) (n int, err error) { // Write delegates a network write to the underlying connection, bumping the // egress traffic meter along the way. func (c *meteredConn) Write(b []byte) (n int, err error) { - n, err = c.TCPConn.Write(b) + n, err = c.Conn.Write(b) egressTrafficMeter.Mark(int64(n)) return } diff --git a/p2p/peer.go b/p2p/peer.go index c3907349f..eb2d34441 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -17,6 +17,7 @@ package p2p import ( + "errors" "fmt" "io" "net" @@ -31,6 +32,10 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) +var ( + ErrShuttingDown = errors.New("shutting down") +) + const ( baseProtocolVersion = 5 baseProtocolLength = uint64(16) @@ -393,7 +398,7 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) { // as well but we don't want to rely on that. rw.werr <- err case <-rw.closed: - err = fmt.Errorf("shutting down") + err = ErrShuttingDown } return err } diff --git a/p2p/protocols/protocol.go b/p2p/protocols/protocol.go index 849a7ef39..d5c0375ac 100644 --- a/p2p/protocols/protocol.go +++ b/p2p/protocols/protocol.go @@ -31,10 +31,12 @@ package protocols import ( "context" "fmt" + "io" "reflect" "sync" "time" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" ) @@ -202,6 +204,11 @@ func NewPeer(p *p2p.Peer, rw p2p.MsgReadWriter, spec *Spec) *Peer { func (p *Peer) Run(handler func(msg interface{}) error) error { for { if err := p.handleIncoming(handler); err != nil { + if err != io.EOF { + metrics.GetOrRegisterCounter("peer.handleincoming.error", nil).Inc(1) + log.Error("peer.handleIncoming", "err", err) + } + return err } } diff --git a/p2p/simulations/adapters/state.go b/p2p/simulations/adapters/state.go deleted file mode 100644 index 78dfb11f9..000000000 --- a/p2p/simulations/adapters/state.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package adapters - -type SimStateStore struct { - m map[string][]byte -} - -func (st *SimStateStore) Load(s string) ([]byte, error) { - return st.m[s], nil -} - -func (st *SimStateStore) Save(s string, data []byte) error { - st.m[s] = data - return nil -} - -func NewSimStateStore() *SimStateStore { - return &SimStateStore{ - make(map[string][]byte), - } -} diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index a8a46cd87..0fb7485ad 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -31,7 +31,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/simulations/adapters" ) -var dialBanTimeout = 200 * time.Millisecond +var DialBanTimeout = 200 * time.Millisecond // NetworkConfig defines configuration options for starting a Network type NetworkConfig struct { @@ -78,41 +78,25 @@ func (net *Network) Events() *event.Feed { return &net.events } -// NewNode adds a new node to the network with a random ID -func (net *Network) NewNode() (*Node, error) { - conf := adapters.RandomNodeConfig() - conf.Services = []string{net.DefaultService} - return net.NewNodeWithConfig(conf) -} - // NewNodeWithConfig adds a new node to the network with the given config, // returning an error if a node with the same ID or name already exists func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) { net.lock.Lock() defer net.lock.Unlock() - // create a random ID and PrivateKey if not set - if conf.ID == (discover.NodeID{}) { - c := adapters.RandomNodeConfig() - conf.ID = c.ID - conf.PrivateKey = c.PrivateKey - } - id := conf.ID if conf.Reachable == nil { conf.Reachable = func(otherID discover.NodeID) bool { _, err := net.InitConn(conf.ID, otherID) - return err == nil + if err != nil && bytes.Compare(conf.ID.Bytes(), otherID.Bytes()) < 0 { + return false + } + return true } } - // assign a name to the node if not set - if conf.Name == "" { - conf.Name = fmt.Sprintf("node%02d", len(net.Nodes)+1) - } - // check the node doesn't already exist - if node := net.getNode(id); node != nil { - return nil, fmt.Errorf("node with ID %q already exists", id) + if node := net.getNode(conf.ID); node != nil { + return nil, fmt.Errorf("node with ID %q already exists", conf.ID) } if node := net.getNodeByName(conf.Name); node != nil { return nil, fmt.Errorf("node with name %q already exists", conf.Name) @@ -132,8 +116,8 @@ func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) Node: adapterNode, Config: conf, } - log.Trace(fmt.Sprintf("node %v created", id)) - net.nodeMap[id] = len(net.Nodes) + log.Trace(fmt.Sprintf("node %v created", conf.ID)) + net.nodeMap[conf.ID] = len(net.Nodes) net.Nodes = append(net.Nodes, node) // emit a "control" event @@ -181,7 +165,9 @@ func (net *Network) Start(id discover.NodeID) error { // startWithSnapshots starts the node with the given ID using the give // snapshots func (net *Network) startWithSnapshots(id discover.NodeID, snapshots map[string][]byte) error { - node := net.GetNode(id) + net.lock.Lock() + defer net.lock.Unlock() + node := net.getNode(id) if node == nil { return fmt.Errorf("node %v does not exist", id) } @@ -220,9 +206,13 @@ func (net *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEve // assume the node is now down net.lock.Lock() + defer net.lock.Unlock() node := net.getNode(id) + if node == nil { + log.Error("Can not find node for id", "id", id) + return + } node.Up = false - net.lock.Unlock() net.events.Send(NewEvent(node)) }() for { @@ -259,7 +249,9 @@ func (net *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEve // Stop stops the node with the given ID func (net *Network) Stop(id discover.NodeID) error { - node := net.GetNode(id) + net.lock.Lock() + defer net.lock.Unlock() + node := net.getNode(id) if node == nil { return fmt.Errorf("node %v does not exist", id) } @@ -312,7 +304,9 @@ func (net *Network) Disconnect(oneID, otherID discover.NodeID) error { // DidConnect tracks the fact that the "one" node connected to the "other" node func (net *Network) DidConnect(one, other discover.NodeID) error { - conn, err := net.GetOrCreateConn(one, other) + net.lock.Lock() + defer net.lock.Unlock() + conn, err := net.getOrCreateConn(one, other) if err != nil { return fmt.Errorf("connection between %v and %v does not exist", one, other) } @@ -327,7 +321,9 @@ func (net *Network) DidConnect(one, other discover.NodeID) error { // DidDisconnect tracks the fact that the "one" node disconnected from the // "other" node func (net *Network) DidDisconnect(one, other discover.NodeID) error { - conn := net.GetConn(one, other) + net.lock.Lock() + defer net.lock.Unlock() + conn := net.getConn(one, other) if conn == nil { return fmt.Errorf("connection between %v and %v does not exist", one, other) } @@ -335,7 +331,7 @@ func (net *Network) DidDisconnect(one, other discover.NodeID) error { return fmt.Errorf("%v and %v already disconnected", one, other) } conn.Up = false - conn.initiated = time.Now().Add(-dialBanTimeout) + conn.initiated = time.Now().Add(-DialBanTimeout) net.events.Send(NewEvent(conn)) return nil } @@ -476,16 +472,19 @@ func (net *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) { if err != nil { return nil, err } - if time.Since(conn.initiated) < dialBanTimeout { - return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID) - } if conn.Up { return nil, fmt.Errorf("%v and %v already connected", oneID, otherID) } + if time.Since(conn.initiated) < DialBanTimeout { + return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID) + } + err = conn.nodesUp() if err != nil { + log.Trace(fmt.Sprintf("nodes not up: %v", err)) return nil, fmt.Errorf("nodes not up: %v", err) } + log.Debug("InitConn - connection initiated") conn.initiated = time.Now() return conn, nil } diff --git a/p2p/testing/protocolsession.go b/p2p/testing/protocolsession.go index 8f73bfa03..e3ec41ad6 100644 --- a/p2p/testing/protocolsession.go +++ b/p2p/testing/protocolsession.go @@ -91,7 +91,9 @@ func (s *ProtocolSession) trigger(trig Trigger) error { errc := make(chan error) go func() { + log.Trace(fmt.Sprintf("trigger %v (%v)....", trig.Msg, trig.Code)) errc <- mockNode.Trigger(&trig) + log.Trace(fmt.Sprintf("triggered %v (%v)", trig.Msg, trig.Code)) }() t := trig.Timeout |