diff options
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/dial.go | 23 | ||||
-rw-r--r-- | p2p/dial_test.go | 2 | ||||
-rw-r--r-- | p2p/discover/node.go | 40 | ||||
-rw-r--r-- | p2p/discover/node_test.go | 30 | ||||
-rw-r--r-- | p2p/message.go | 66 | ||||
-rw-r--r-- | p2p/peer.go | 42 | ||||
-rw-r--r-- | p2p/server.go | 56 | ||||
-rw-r--r-- | p2p/server_test.go | 2 | ||||
-rw-r--r-- | p2p/simulations/README.md | 181 | ||||
-rw-r--r-- | p2p/simulations/adapters/docker.go | 182 | ||||
-rw-r--r-- | p2p/simulations/adapters/exec.go | 504 | ||||
-rw-r--r-- | p2p/simulations/adapters/inproc.go | 314 | ||||
-rw-r--r-- | p2p/simulations/adapters/types.go | 215 | ||||
-rw-r--r-- | p2p/simulations/events.go | 108 | ||||
-rw-r--r-- | p2p/simulations/examples/README.md | 39 | ||||
-rw-r--r-- | p2p/simulations/examples/ping-pong.go | 184 | ||||
-rwxr-xr-x | p2p/simulations/examples/ping-pong.sh | 40 | ||||
-rw-r--r-- | p2p/simulations/http.go | 680 | ||||
-rw-r--r-- | p2p/simulations/http_test.go | 823 | ||||
-rw-r--r-- | p2p/simulations/network.go | 680 | ||||
-rw-r--r-- | p2p/simulations/network_test.go | 159 | ||||
-rw-r--r-- | p2p/simulations/simulation.go | 157 |
22 files changed, 4513 insertions, 14 deletions
diff --git a/p2p/dial.go b/p2p/dial.go index b77971396..2d9e3a0ed 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -47,6 +47,24 @@ const ( maxResolveDelay = time.Hour ) +// NodeDialer is used to connect to nodes in the network, typically by using +// an underlying net.Dialer but also using net.Pipe in tests +type NodeDialer interface { + Dial(*discover.Node) (net.Conn, error) +} + +// TCPDialer implements the NodeDialer interface by using a net.Dialer to +// create TCP connections to nodes in the network +type TCPDialer struct { + *net.Dialer +} + +// Dial creates a TCP connection to the node +func (t TCPDialer) Dial(dest *discover.Node) (net.Conn, error) { + addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)} + return t.Dialer.Dial("tcp", addr.String()) +} + // dialstate schedules dials and discovery lookups. // it get's a chance to compute new tasks on every iteration // of the main loop in Server.run. @@ -318,14 +336,13 @@ func (t *dialTask) resolve(srv *Server) bool { // dial performs the actual connection attempt. func (t *dialTask) dial(srv *Server, dest *discover.Node) bool { - addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)} - fd, err := srv.Dialer.Dial("tcp", addr.String()) + fd, err := srv.Dialer.Dial(dest) if err != nil { log.Trace("Dial error", "task", t, "err", err) return false } mfd := newMeteredConn(fd, false) - srv.setupConn(mfd, t.flags, dest) + srv.SetupConn(mfd, t.flags, dest) return true } diff --git a/p2p/dial_test.go b/p2p/dial_test.go index 08e863bae..ad18ef9ab 100644 --- a/p2p/dial_test.go +++ b/p2p/dial_test.go @@ -597,7 +597,7 @@ func TestDialResolve(t *testing.T) { } // Now run the task, it should resolve the ID once. - config := Config{Dialer: &net.Dialer{Deadline: time.Now().Add(-5 * time.Minute)}} + config := Config{Dialer: TCPDialer{&net.Dialer{Deadline: time.Now().Add(-5 * time.Minute)}}} srv := &Server{ntab: table, Config: config} tasks[0].Do(srv) if !reflect.DeepEqual(table.resolveCalls, []discover.NodeID{dest.ID}) { diff --git a/p2p/discover/node.go b/p2p/discover/node.go index d9cbd9448..fc928a91a 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -225,6 +225,11 @@ func (n *Node) UnmarshalText(text []byte) error { // The node identifier is a marshaled elliptic curve public key. type NodeID [NodeIDBits / 8]byte +// Bytes returns a byte slice representation of the NodeID +func (n NodeID) Bytes() []byte { + return n[:] +} + // NodeID prints as a long hexadecimal number. func (n NodeID) String() string { return fmt.Sprintf("%x", n[:]) @@ -240,6 +245,41 @@ func (n NodeID) TerminalString() string { return hex.EncodeToString(n[:8]) } +// MarshalText implements the encoding.TextMarshaler interface. +func (n NodeID) MarshalText() ([]byte, error) { + return []byte(hex.EncodeToString(n[:])), nil +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (n *NodeID) UnmarshalText(text []byte) error { + id, err := HexID(string(text)) + if err != nil { + return err + } + *n = id + return nil +} + +// BytesID converts a byte slice to a NodeID +func BytesID(b []byte) (NodeID, error) { + var id NodeID + if len(b) != len(id) { + return id, fmt.Errorf("wrong length, want %d bytes", len(id)) + } + copy(id[:], b) + return id, nil +} + +// MustBytesID converts a byte slice to a NodeID. +// It panics if the byte slice is not a valid NodeID. +func MustBytesID(b []byte) NodeID { + id, err := BytesID(b) + if err != nil { + panic(err) + } + return id +} + // HexID converts a hex string to a NodeID. // The string may be prefixed with 0x. func HexID(in string) (NodeID, error) { diff --git a/p2p/discover/node_test.go b/p2p/discover/node_test.go index 3d1662d0b..ed8db4dc6 100644 --- a/p2p/discover/node_test.go +++ b/p2p/discover/node_test.go @@ -17,6 +17,7 @@ package discover import ( + "bytes" "fmt" "math/big" "math/rand" @@ -192,6 +193,35 @@ func TestHexID(t *testing.T) { } } +func TestNodeID_textEncoding(t *testing.T) { + ref := NodeID{ + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x10, + 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x20, + 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x30, + 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x40, + 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, 0x50, + 0x51, 0x52, 0x53, 0x54, 0x55, 0x56, 0x57, 0x58, 0x59, 0x60, + 0x61, 0x62, 0x63, 0x64, + } + hex := "01020304050607080910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364" + + text, err := ref.MarshalText() + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(text, []byte(hex)) { + t.Fatalf("text encoding did not match\nexpected: %s\ngot: %s", hex, text) + } + + id := new(NodeID) + if err := id.UnmarshalText(text); err != nil { + t.Fatal(err) + } + if *id != ref { + t.Fatalf("text decoding did not match\nexpected: %s\ngot: %s", ref, id) + } +} + func TestNodeID_recover(t *testing.T) { prv := newkey() hash := make([]byte, 32) diff --git a/p2p/message.go b/p2p/message.go index 1292d2121..5690494bf 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -27,6 +27,8 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/rlp" ) @@ -271,3 +273,67 @@ func ExpectMsg(r MsgReader, code uint64, content interface{}) error { } return nil } + +// msgEventer wraps a MsgReadWriter and sends events whenever a message is sent +// or received +type msgEventer struct { + MsgReadWriter + + feed *event.Feed + peerID discover.NodeID + Protocol string +} + +// newMsgEventer returns a msgEventer which sends message events to the given +// feed +func newMsgEventer(rw MsgReadWriter, feed *event.Feed, peerID discover.NodeID, proto string) *msgEventer { + return &msgEventer{ + MsgReadWriter: rw, + feed: feed, + peerID: peerID, + Protocol: proto, + } +} + +// ReadMsg reads a message from the underlying MsgReadWriter and emits a +// "message received" event +func (self *msgEventer) ReadMsg() (Msg, error) { + msg, err := self.MsgReadWriter.ReadMsg() + if err != nil { + return msg, err + } + self.feed.Send(&PeerEvent{ + Type: PeerEventTypeMsgRecv, + Peer: self.peerID, + Protocol: self.Protocol, + MsgCode: &msg.Code, + MsgSize: &msg.Size, + }) + return msg, nil +} + +// WriteMsg writes a message to the underlying MsgReadWriter and emits a +// "message sent" event +func (self *msgEventer) WriteMsg(msg Msg) error { + err := self.MsgReadWriter.WriteMsg(msg) + if err != nil { + return err + } + self.feed.Send(&PeerEvent{ + Type: PeerEventTypeMsgSend, + Peer: self.peerID, + Protocol: self.Protocol, + MsgCode: &msg.Code, + MsgSize: &msg.Size, + }) + return nil +} + +// Close closes the underlying MsgReadWriter if it implements the io.Closer +// interface +func (self *msgEventer) Close() error { + if v, ok := self.MsgReadWriter.(io.Closer); ok { + return v.Close() + } + return nil +} diff --git a/p2p/peer.go b/p2p/peer.go index fb4b39e95..ebf7490c6 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/rlp" @@ -60,6 +61,38 @@ type protoHandshake struct { Rest []rlp.RawValue `rlp:"tail"` } +// PeerEventType is the type of peer events emitted by a p2p.Server +type PeerEventType string + +const ( + // PeerEventTypeAdd is the type of event emitted when a peer is added + // to a p2p.Server + PeerEventTypeAdd PeerEventType = "add" + + // PeerEventTypeDrop is the type of event emitted when a peer is + // dropped from a p2p.Server + PeerEventTypeDrop PeerEventType = "drop" + + // PeerEventTypeMsgSend is the type of event emitted when a + // message is successfully sent to a peer + PeerEventTypeMsgSend PeerEventType = "msgsend" + + // PeerEventTypeMsgRecv is the type of event emitted when a + // message is received from a peer + PeerEventTypeMsgRecv PeerEventType = "msgrecv" +) + +// PeerEvent is an event emitted when peers are either added or dropped from +// a p2p.Server or when a message is sent or received on a peer connection +type PeerEvent struct { + Type PeerEventType `json:"type"` + Peer discover.NodeID `json:"peer"` + Error string `json:"error,omitempty"` + Protocol string `json:"protocol,omitempty"` + MsgCode *uint64 `json:"msg_code,omitempty"` + MsgSize *uint32 `json:"msg_size,omitempty"` +} + // Peer represents a connected remote node. type Peer struct { rw *conn @@ -71,6 +104,9 @@ type Peer struct { protoErr chan error closed chan struct{} disc chan DiscReason + + // events receives message send / receive events if set + events *event.Feed } // NewPeer returns a peer for testing purposes. @@ -297,9 +333,13 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) proto.closed = p.closed proto.wstart = writeStart proto.werr = writeErr + var rw MsgReadWriter = proto + if p.events != nil { + rw = newMsgEventer(rw, p.events, p.ID(), proto.Name) + } p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version)) go func() { - err := proto.Run(p, proto) + err := proto.Run(p, rw) if err == nil { p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version)) err = errProtocolReturned diff --git a/p2p/server.go b/p2p/server.go index d7909d53a..d1d578401 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" @@ -130,10 +131,14 @@ type Config struct { // If Dialer is set to a non-nil value, the given Dialer // is used to dial outbound peer connections. - Dialer *net.Dialer `toml:"-"` + Dialer NodeDialer `toml:"-"` // If NoDial is true, the server will not dial any peers. NoDial bool `toml:",omitempty"` + + // If EnableMsgEvents is set then the server will emit PeerEvents + // whenever a message is sent to or received from a peer + EnableMsgEvents bool } // Server manages all peer connections. @@ -166,6 +171,7 @@ type Server struct { addpeer chan *conn delpeer chan peerDrop loopWG sync.WaitGroup // loop, listenLoop + peerFeed event.Feed } type peerOpFunc func(map[discover.NodeID]*Peer) @@ -191,7 +197,7 @@ type conn struct { fd net.Conn transport flags connFlag - cont chan error // The run loop uses cont to signal errors to setupConn. + cont chan error // The run loop uses cont to signal errors to SetupConn. id discover.NodeID // valid after the encryption handshake caps []Cap // valid after the protocol handshake name string // valid after the protocol handshake @@ -291,6 +297,11 @@ func (srv *Server) RemovePeer(node *discover.Node) { } } +// SubscribePeers subscribes the given channel to peer events +func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription { + return srv.peerFeed.Subscribe(ch) +} + // Self returns the local node's endpoint information. func (srv *Server) Self() *discover.Node { srv.lock.Lock() @@ -358,7 +369,7 @@ func (srv *Server) Start() (err error) { srv.newTransport = newRLPX } if srv.Dialer == nil { - srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout} + srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}} } srv.quit = make(chan struct{}) srv.addpeer = make(chan *conn) @@ -536,7 +547,11 @@ running: c.flags |= trustedConn } // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. - c.cont <- srv.encHandshakeChecks(peers, c) + select { + case c.cont <- srv.encHandshakeChecks(peers, c): + case <-srv.quit: + break running + } case c := <-srv.addpeer: // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. @@ -544,6 +559,11 @@ running: if err == nil { // The handshakes are done and it passed all checks. p := newPeer(c, srv.Protocols) + // If message events are enabled, pass the peerFeed + // to the peer + if srv.EnableMsgEvents { + p.events = &srv.peerFeed + } name := truncateName(c.name) log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) peers[c.id] = p @@ -552,7 +572,11 @@ running: // The dialer logic relies on the assumption that // dial tasks complete after the peer has been added or // discarded. Unblock the task last. - c.cont <- err + select { + case c.cont <- err: + case <-srv.quit: + break running + } case pd := <-srv.delpeer: // A peer disconnected. d := common.PrettyDuration(mclock.Now() - pd.created) @@ -665,16 +689,16 @@ func (srv *Server) listenLoop() { // Spawn the handler. It will give the slot back when the connection // has been established. go func() { - srv.setupConn(fd, inboundConn, nil) + srv.SetupConn(fd, inboundConn, nil) slots <- struct{}{} }() } } -// setupConn runs the handshakes and attempts to add the connection +// SetupConn runs the handshakes and attempts to add the connection // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. -func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) { +func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) { // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() running := srv.running @@ -755,7 +779,23 @@ func (srv *Server) runPeer(p *Peer) { if srv.newPeerHook != nil { srv.newPeerHook(p) } + + // broadcast peer add + srv.peerFeed.Send(&PeerEvent{ + Type: PeerEventTypeAdd, + Peer: p.ID(), + }) + + // run the protocol remoteRequested, err := p.run() + + // broadcast peer drop + srv.peerFeed.Send(&PeerEvent{ + Type: PeerEventTypeDrop, + Peer: p.ID(), + Error: err.Error(), + }) + // Note: run waits for existing peers to be sent on srv.delpeer // before returning, so this send should not select on srv.quit. srv.delpeer <- peerDrop{p, err, remoteRequested} diff --git a/p2p/server_test.go b/p2p/server_test.go index 971faf002..11dd83e5d 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -435,7 +435,7 @@ func TestServerSetupConn(t *testing.T) { } } p1, _ := net.Pipe() - srv.setupConn(p1, test.flags, test.dialDest) + srv.SetupConn(p1, test.flags, test.dialDest) if !reflect.DeepEqual(test.tt.closeErr, test.wantCloseErr) { t.Errorf("test %d: close error mismatch: got %q, want %q", i, test.tt.closeErr, test.wantCloseErr) } diff --git a/p2p/simulations/README.md b/p2p/simulations/README.md new file mode 100644 index 000000000..d1f8649ea --- /dev/null +++ b/p2p/simulations/README.md @@ -0,0 +1,181 @@ +# devp2p Simulations + +The `p2p/simulations` package implements a simulation framework which supports +creating a collection of devp2p nodes, connecting them together to form a +simulation network, performing simulation actions in that network and then +extracting useful information. + +## Nodes + +Each node in a simulation network runs multiple services by wrapping a collection +of objects which implement the `node.Service` interface meaning they: + +* can be started and stopped +* run p2p protocols +* expose RPC APIs + +This means that any object which implements the `node.Service` interface can be +used to run a node in the simulation. + +## Services + +Before running a simulation, a set of service initializers must be registered +which can then be used to run nodes in the network. + +A service initializer is a function with the following signature: + +```go +func(ctx *adapters.ServiceContext) (node.Service, error) +``` + +These initializers should be registered by calling the `adapters.RegisterServices` +function in an `init()` hook: + +```go +func init() { + adapters.RegisterServices(adapters.Services{ + "service1": initService1, + "service2": initService2, + }) +} +``` + +## Node Adapters + +The simulation framework includes multiple "node adapters" which are +responsible for creating an environment in which a node runs. + +### SimAdapter + +The `SimAdapter` runs nodes in-memory, connecting them using an in-memory, +synchronous `net.Pipe` and connecting to their RPC server using an in-memory +`rpc.Client`. + +### ExecAdapter + +The `ExecAdapter` runs nodes as child processes of the running simulation. + +It does this by executing the binary which is running the simulation but +setting `argv[0]` (i.e. the program name) to `p2p-node` which is then +detected by an init hook in the child process which runs the `node.Service` +using the devp2p node stack rather than executing `main()`. + +The nodes listen for devp2p connections and WebSocket RPC clients on random +localhost ports. + +### DockerAdapter + +The `DockerAdapter` is similar to the `ExecAdapter` but executes `docker run` +to run the node in a Docker container using a Docker image containing the +simulation binary at `/bin/p2p-node`. + +The Docker image is built using `docker build` when the adapter is initialised, +meaning no prior setup is necessary other than having a working Docker client. + +Each node listens on the external IP of the container and the default p2p and +RPC ports (`30303` and `8546` respectively). + +## Network + +A simulation network is created with an ID and default service (which is used +if a node is created without an explicit service), exposes methods for +creating, starting, stopping, connecting and disconnecting nodes, and emits +events when certain actions occur. + +### Events + +A simulation network emits the following events: + +* node event - when nodes are created / started / stopped +* connection event - when nodes are connected / disconnected +* message event - when a protocol message is sent between two nodes + +The events have a "control" flag which when set indicates that the event is the +outcome of a controlled simulation action (e.g. creating a node or explicitly +connecting two nodes together). + +This is in contrast to a non-control event, otherwise called a "live" event, +which is the outcome of something happening in the network as a result of a +control event (e.g. a node actually started up or a connection was actually +established between two nodes). + +Live events are detected by the simulation network by subscribing to node peer +events via RPC when the nodes start up. + +## Testing Framework + +The `Simulation` type can be used in tests to perform actions in a simulation +network and then wait for expectations to be met. + +With a running simulation network, the `Simulation.Run` method can be called +with a `Step` which has the following fields: + +* `Action` - a function which performs some action in the network + +* `Expect` - an expectation function which returns whether or not a + given node meets the expectation + +* `Trigger` - a channel which receives node IDs which then trigger a check + of the expectation function to be performed against that node + +As a concrete example, consider a simulated network of Ethereum nodes. An +`Action` could be the sending of a transaction, `Expect` it being included in +a block, and `Trigger` a check for every block that is mined. + +On return, the `Simulation.Run` method returns a `StepResult` which can be used +to determine if all nodes met the expectation, how long it took them to meet +the expectation and what network events were emitted during the step run. + +## HTTP API + +The simulation framework includes a HTTP API which can be used to control the +simulation. + +The API is initialised with a particular node adapter and has the following +endpoints: + +``` +GET / Get network information +POST /start Start all nodes in the network +POST /stop Stop all nodes in the network +GET /events Stream network events +GET /snapshot Take a network snapshot +POST /snapshot Load a network snapshot +POST /nodes Create a node +GET /nodes Get all nodes in the network +GET /nodes/:nodeid Get node information +POST /nodes/:nodeid/start Start a node +POST /nodes/:nodeid/stop Stop a node +POST /nodes/:nodeid/conn/:peerid Connect two nodes +DELETE /nodes/:nodeid/conn/:peerid Disconnect two nodes +GET /nodes/:nodeid/rpc Make RPC requests to a node via WebSocket +``` + +For convenience, `nodeid` in the URL can be the name of a node rather than its +ID. + +## Command line client + +`p2psim` is a command line client for the HTTP API, located in +`cmd/p2psim`. + +It provides the following commands: + +``` +p2psim show +p2psim events [--current] [--filter=FILTER] +p2psim snapshot +p2psim load +p2psim node create [--name=NAME] [--services=SERVICES] [--key=KEY] +p2psim node list +p2psim node show <node> +p2psim node start <node> +p2psim node stop <node> +p2psim node connect <node> <peer> +p2psim node disconnect <node> <peer> +p2psim node rpc <node> <method> [<args>] [--subscribe] +``` + +## Example + +See [p2p/simulations/examples/README.md](examples/README.md). diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go new file mode 100644 index 000000000..022314b3d --- /dev/null +++ b/p2p/simulations/adapters/docker.go @@ -0,0 +1,182 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package adapters + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +// DockerAdapter is a NodeAdapter which runs simulation nodes inside Docker +// containers. +// +// A Docker image is built which contains the current binary at /bin/p2p-node +// which when executed runs the underlying service (see the description +// of the execP2PNode function for more details) +type DockerAdapter struct { + ExecAdapter +} + +// NewDockerAdapter builds the p2p-node Docker image containing the current +// binary and returns a DockerAdapter +func NewDockerAdapter() (*DockerAdapter, error) { + // Since Docker containers run on Linux and this adapter runs the + // current binary in the container, it must be compiled for Linux. + // + // It is reasonable to require this because the caller can just + // compile the current binary in a Docker container. + if runtime.GOOS != "linux" { + return nil, errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)") + } + + if err := buildDockerImage(); err != nil { + return nil, err + } + + return &DockerAdapter{ + ExecAdapter{ + nodes: make(map[discover.NodeID]*ExecNode), + }, + }, nil +} + +// Name returns the name of the adapter for logging purposes +func (d *DockerAdapter) Name() string { + return "docker-adapter" +} + +// NewNode returns a new DockerNode using the given config +func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) { + if len(config.Services) == 0 { + return nil, errors.New("node must have at least one service") + } + for _, service := range config.Services { + if _, exists := serviceFuncs[service]; !exists { + return nil, fmt.Errorf("unknown node service %q", service) + } + } + + // generate the config + conf := &execNodeConfig{ + Stack: node.DefaultConfig, + Node: config, + } + conf.Stack.DataDir = "/data" + conf.Stack.WSHost = "0.0.0.0" + conf.Stack.WSOrigins = []string{"*"} + conf.Stack.WSExposeAll = true + conf.Stack.P2P.EnableMsgEvents = false + conf.Stack.P2P.NoDiscovery = true + conf.Stack.P2P.NAT = nil + conf.Stack.NoUSB = true + + node := &DockerNode{ + ExecNode: ExecNode{ + ID: config.ID, + Config: conf, + adapter: &d.ExecAdapter, + }, + } + node.newCmd = node.dockerCommand + d.ExecAdapter.nodes[node.ID] = &node.ExecNode + return node, nil +} + +// DockerNode wraps an ExecNode but exec's the current binary in a docker +// container rather than locally +type DockerNode struct { + ExecNode +} + +// dockerCommand returns a command which exec's the binary in a Docker +// container. +// +// It uses a shell so that we can pass the _P2P_NODE_CONFIG environment +// variable to the container using the --env flag. +func (n *DockerNode) dockerCommand() *exec.Cmd { + return exec.Command( + "sh", "-c", + fmt.Sprintf( + `exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`, + dockerImage, strings.Join(n.Config.Node.Services, ","), n.ID.String(), + ), + ) +} + +// dockerImage is the name of the Docker image which gets built to run the +// simulation node +const dockerImage = "p2p-node" + +// buildDockerImage builds the Docker image which is used to run the simulation +// node in a Docker container. +// +// It adds the current binary as "p2p-node" so that it runs execP2PNode +// when executed. +func buildDockerImage() error { + // create a directory to use as the build context + dir, err := ioutil.TempDir("", "p2p-docker") + if err != nil { + return err + } + defer os.RemoveAll(dir) + + // copy the current binary into the build context + bin, err := os.Open(reexec.Self()) + if err != nil { + return err + } + defer bin.Close() + dst, err := os.OpenFile(filepath.Join(dir, "self.bin"), os.O_WRONLY|os.O_CREATE, 0755) + if err != nil { + return err + } + defer dst.Close() + if _, err := io.Copy(dst, bin); err != nil { + return err + } + + // create the Dockerfile + dockerfile := []byte(` +FROM ubuntu:16.04 +RUN mkdir /data +ADD self.bin /bin/p2p-node + `) + if err := ioutil.WriteFile(filepath.Join(dir, "Dockerfile"), dockerfile, 0644); err != nil { + return err + } + + // run 'docker build' + cmd := exec.Command("docker", "build", "-t", dockerImage, dir) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("error building docker image: %s", err) + } + + return nil +} diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go new file mode 100644 index 000000000..bdb92cc1d --- /dev/null +++ b/p2p/simulations/adapters/exec.go @@ -0,0 +1,504 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package adapters + +import ( + "bufio" + "context" + "crypto/ecdsa" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "os/exec" + "os/signal" + "path/filepath" + "regexp" + "strings" + "sync" + "syscall" + "time" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/net/websocket" +) + +// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the +// current binary as a child process. +// +// An init hook is used so that the child process executes the node services +// (rather than whataver the main() function would normally do), see the +// execP2PNode function for more information. +type ExecAdapter struct { + // BaseDir is the directory under which the data directories for each + // simulation node are created. + BaseDir string + + nodes map[discover.NodeID]*ExecNode +} + +// NewExecAdapter returns an ExecAdapter which stores node data in +// subdirectories of the given base directory +func NewExecAdapter(baseDir string) *ExecAdapter { + return &ExecAdapter{ + BaseDir: baseDir, + nodes: make(map[discover.NodeID]*ExecNode), + } +} + +// Name returns the name of the adapter for logging purposes +func (e *ExecAdapter) Name() string { + return "exec-adapter" +} + +// NewNode returns a new ExecNode using the given config +func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) { + if len(config.Services) == 0 { + return nil, errors.New("node must have at least one service") + } + for _, service := range config.Services { + if _, exists := serviceFuncs[service]; !exists { + return nil, fmt.Errorf("unknown node service %q", service) + } + } + + // create the node directory using the first 12 characters of the ID + // as Unix socket paths cannot be longer than 256 characters + dir := filepath.Join(e.BaseDir, config.ID.String()[:12]) + if err := os.Mkdir(dir, 0755); err != nil { + return nil, fmt.Errorf("error creating node directory: %s", err) + } + + // generate the config + conf := &execNodeConfig{ + Stack: node.DefaultConfig, + Node: config, + } + conf.Stack.DataDir = filepath.Join(dir, "data") + conf.Stack.WSHost = "127.0.0.1" + conf.Stack.WSPort = 0 + conf.Stack.WSOrigins = []string{"*"} + conf.Stack.WSExposeAll = true + conf.Stack.P2P.EnableMsgEvents = false + conf.Stack.P2P.NoDiscovery = true + conf.Stack.P2P.NAT = nil + conf.Stack.NoUSB = true + + // listen on a random localhost port (we'll get the actual port after + // starting the node through the RPC admin.nodeInfo method) + conf.Stack.P2P.ListenAddr = "127.0.0.1:0" + + node := &ExecNode{ + ID: config.ID, + Dir: dir, + Config: conf, + adapter: e, + } + node.newCmd = node.execCommand + e.nodes[node.ID] = node + return node, nil +} + +// ExecNode starts a simulation node by exec'ing the current binary and +// running the configured services +type ExecNode struct { + ID discover.NodeID + Dir string + Config *execNodeConfig + Cmd *exec.Cmd + Info *p2p.NodeInfo + + adapter *ExecAdapter + client *rpc.Client + wsAddr string + newCmd func() *exec.Cmd + key *ecdsa.PrivateKey +} + +// Addr returns the node's enode URL +func (n *ExecNode) Addr() []byte { + if n.Info == nil { + return nil + } + return []byte(n.Info.Enode) +} + +// Client returns an rpc.Client which can be used to communicate with the +// underlying services (it is set once the node has started) +func (n *ExecNode) Client() (*rpc.Client, error) { + return n.client, nil +} + +// wsAddrPattern is a regex used to read the WebSocket address from the node's +// log +var wsAddrPattern = regexp.MustCompile(`ws://[\d.:]+`) + +// Start exec's the node passing the ID and service as command line arguments +// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment +// variable +func (n *ExecNode) Start(snapshots map[string][]byte) (err error) { + if n.Cmd != nil { + return errors.New("already started") + } + defer func() { + if err != nil { + log.Error("node failed to start", "err", err) + n.Stop() + } + }() + + // encode a copy of the config containing the snapshot + confCopy := *n.Config + confCopy.Snapshots = snapshots + confCopy.PeerAddrs = make(map[string]string) + for id, node := range n.adapter.nodes { + confCopy.PeerAddrs[id.String()] = node.wsAddr + } + confData, err := json.Marshal(confCopy) + if err != nil { + return fmt.Errorf("error generating node config: %s", err) + } + + // use a pipe for stderr so we can both copy the node's stderr to + // os.Stderr and read the WebSocket address from the logs + stderrR, stderrW := io.Pipe() + stderr := io.MultiWriter(os.Stderr, stderrW) + + // start the node + cmd := n.newCmd() + cmd.Stdout = os.Stdout + cmd.Stderr = stderr + cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData)) + if err := cmd.Start(); err != nil { + return fmt.Errorf("error starting node: %s", err) + } + n.Cmd = cmd + + // read the WebSocket address from the stderr logs + var wsAddr string + wsAddrC := make(chan string) + go func() { + s := bufio.NewScanner(stderrR) + for s.Scan() { + if strings.Contains(s.Text(), "WebSocket endpoint opened:") { + wsAddrC <- wsAddrPattern.FindString(s.Text()) + } + } + }() + select { + case wsAddr = <-wsAddrC: + if wsAddr == "" { + return errors.New("failed to read WebSocket address from stderr") + } + case <-time.After(10 * time.Second): + return errors.New("timed out waiting for WebSocket address on stderr") + } + + // create the RPC client and load the node info + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + client, err := rpc.DialWebsocket(ctx, wsAddr, "") + if err != nil { + return fmt.Errorf("error dialing rpc websocket: %s", err) + } + var info p2p.NodeInfo + if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil { + return fmt.Errorf("error getting node info: %s", err) + } + n.client = client + n.wsAddr = wsAddr + n.Info = &info + + return nil +} + +// execCommand returns a command which runs the node locally by exec'ing +// the current binary but setting argv[0] to "p2p-node" so that the child +// runs execP2PNode +func (n *ExecNode) execCommand() *exec.Cmd { + return &exec.Cmd{ + Path: reexec.Self(), + Args: []string{"p2p-node", strings.Join(n.Config.Node.Services, ","), n.ID.String()}, + } +} + +// Stop stops the node by first sending SIGTERM and then SIGKILL if the node +// doesn't stop within 5s +func (n *ExecNode) Stop() error { + if n.Cmd == nil { + return nil + } + defer func() { + n.Cmd = nil + }() + + if n.client != nil { + n.client.Close() + n.client = nil + n.wsAddr = "" + n.Info = nil + } + + if err := n.Cmd.Process.Signal(syscall.SIGTERM); err != nil { + return n.Cmd.Process.Kill() + } + waitErr := make(chan error) + go func() { + waitErr <- n.Cmd.Wait() + }() + select { + case err := <-waitErr: + return err + case <-time.After(5 * time.Second): + return n.Cmd.Process.Kill() + } +} + +// NodeInfo returns information about the node +func (n *ExecNode) NodeInfo() *p2p.NodeInfo { + info := &p2p.NodeInfo{ + ID: n.ID.String(), + } + if n.client != nil { + n.client.Call(&info, "admin_nodeInfo") + } + return info +} + +// ServeRPC serves RPC requests over the given connection by dialling the +// node's WebSocket address and joining the two connections +func (n *ExecNode) ServeRPC(clientConn net.Conn) error { + conn, err := websocket.Dial(n.wsAddr, "", "http://localhost") + if err != nil { + return err + } + var wg sync.WaitGroup + wg.Add(2) + join := func(src, dst net.Conn) { + defer wg.Done() + io.Copy(dst, src) + // close the write end of the destination connection + if cw, ok := dst.(interface { + CloseWrite() error + }); ok { + cw.CloseWrite() + } else { + dst.Close() + } + } + go join(conn, clientConn) + go join(clientConn, conn) + wg.Wait() + return nil +} + +// Snapshots creates snapshots of the services by calling the +// simulation_snapshot RPC method +func (n *ExecNode) Snapshots() (map[string][]byte, error) { + if n.client == nil { + return nil, errors.New("RPC not started") + } + var snapshots map[string][]byte + return snapshots, n.client.Call(&snapshots, "simulation_snapshot") +} + +func init() { + // register a reexec function to start a devp2p node when the current + // binary is executed as "p2p-node" + reexec.Register("p2p-node", execP2PNode) +} + +// execNodeConfig is used to serialize the node configuration so it can be +// passed to the child process as a JSON encoded environment variable +type execNodeConfig struct { + Stack node.Config `json:"stack"` + Node *NodeConfig `json:"node"` + Snapshots map[string][]byte `json:"snapshots,omitempty"` + PeerAddrs map[string]string `json:"peer_addrs,omitempty"` +} + +// execP2PNode starts a devp2p node when the current binary is executed with +// argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2] +// and the node config from the _P2P_NODE_CONFIG environment variable +func execP2PNode() { + glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat())) + glogger.Verbosity(log.LvlInfo) + log.Root().SetHandler(glogger) + + // read the services from argv + serviceNames := strings.Split(os.Args[1], ",") + + // decode the config + confEnv := os.Getenv("_P2P_NODE_CONFIG") + if confEnv == "" { + log.Crit("missing _P2P_NODE_CONFIG") + } + var conf execNodeConfig + if err := json.Unmarshal([]byte(confEnv), &conf); err != nil { + log.Crit("error decoding _P2P_NODE_CONFIG", "err", err) + } + conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey + + // use explicit IP address in ListenAddr so that Enode URL is usable + externalIP := func() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + log.Crit("error getting IP address", "err", err) + } + for _, addr := range addrs { + if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() { + return ip.IP.String() + } + } + log.Crit("unable to determine explicit IP address") + return "" + } + if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") { + conf.Stack.P2P.ListenAddr = externalIP() + conf.Stack.P2P.ListenAddr + } + if conf.Stack.WSHost == "0.0.0.0" { + conf.Stack.WSHost = externalIP() + } + + // initialize the devp2p stack + stack, err := node.New(&conf.Stack) + if err != nil { + log.Crit("error creating node stack", "err", err) + } + + // register the services, collecting them into a map so we can wrap + // them in a snapshot service + services := make(map[string]node.Service, len(serviceNames)) + for _, name := range serviceNames { + serviceFunc, exists := serviceFuncs[name] + if !exists { + log.Crit("unknown node service", "name", name) + } + constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) { + ctx := &ServiceContext{ + RPCDialer: &wsRPCDialer{addrs: conf.PeerAddrs}, + NodeContext: nodeCtx, + Config: conf.Node, + } + if conf.Snapshots != nil { + ctx.Snapshot = conf.Snapshots[name] + } + service, err := serviceFunc(ctx) + if err != nil { + return nil, err + } + services[name] = service + return service, nil + } + if err := stack.Register(constructor); err != nil { + log.Crit("error starting service", "name", name, "err", err) + } + } + + // register the snapshot service + if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { + return &snapshotService{services}, nil + }); err != nil { + log.Crit("error starting snapshot service", "err", err) + } + + // start the stack + if err := stack.Start(); err != nil { + log.Crit("error stating node stack", "err", err) + } + + // stop the stack if we get a SIGTERM signal + go func() { + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGTERM) + defer signal.Stop(sigc) + <-sigc + log.Info("Received SIGTERM, shutting down...") + stack.Stop() + }() + + // wait for the stack to exit + stack.Wait() +} + +// snapshotService is a node.Service which wraps a list of services and +// exposes an API to generate a snapshot of those services +type snapshotService struct { + services map[string]node.Service +} + +func (s *snapshotService) APIs() []rpc.API { + return []rpc.API{{ + Namespace: "simulation", + Version: "1.0", + Service: SnapshotAPI{s.services}, + }} +} + +func (s *snapshotService) Protocols() []p2p.Protocol { + return nil +} + +func (s *snapshotService) Start(*p2p.Server) error { + return nil +} + +func (s *snapshotService) Stop() error { + return nil +} + +// SnapshotAPI provides an RPC method to create snapshots of services +type SnapshotAPI struct { + services map[string]node.Service +} + +func (api SnapshotAPI) Snapshot() (map[string][]byte, error) { + snapshots := make(map[string][]byte) + for name, service := range api.services { + if s, ok := service.(interface { + Snapshot() ([]byte, error) + }); ok { + snap, err := s.Snapshot() + if err != nil { + return nil, err + } + snapshots[name] = snap + } + } + return snapshots, nil +} + +type wsRPCDialer struct { + addrs map[string]string +} + +// DialRPC implements the RPCDialer interface by creating a WebSocket RPC +// client of the given node +func (w *wsRPCDialer) DialRPC(id discover.NodeID) (*rpc.Client, error) { + addr, ok := w.addrs[id.String()] + if !ok { + return nil, fmt.Errorf("unknown node: %s", id) + } + return rpc.DialWebsocket(context.Background(), addr, "http://localhost") +} diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go new file mode 100644 index 000000000..c97188def --- /dev/null +++ b/p2p/simulations/adapters/inproc.go @@ -0,0 +1,314 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package adapters + +import ( + "errors" + "fmt" + "math" + "net" + "sync" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rpc" +) + +// SimAdapter is a NodeAdapter which creates in-memory simulation nodes and +// connects them using in-memory net.Pipe connections +type SimAdapter struct { + mtx sync.RWMutex + nodes map[discover.NodeID]*SimNode + services map[string]ServiceFunc +} + +// NewSimAdapter creates a SimAdapter which is capable of running in-memory +// simulation nodes running any of the given services (the services to run on a +// particular node are passed to the NewNode function in the NodeConfig) +func NewSimAdapter(services map[string]ServiceFunc) *SimAdapter { + return &SimAdapter{ + nodes: make(map[discover.NodeID]*SimNode), + services: services, + } +} + +// Name returns the name of the adapter for logging purposes +func (s *SimAdapter) Name() string { + return "sim-adapter" +} + +// NewNode returns a new SimNode using the given config +func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + // check a node with the ID doesn't already exist + id := config.ID + if _, exists := s.nodes[id]; exists { + return nil, fmt.Errorf("node already exists: %s", id) + } + + // check the services are valid + if len(config.Services) == 0 { + return nil, errors.New("node must have at least one service") + } + for _, service := range config.Services { + if _, exists := s.services[service]; !exists { + return nil, fmt.Errorf("unknown node service %q", service) + } + } + + n, err := node.New(&node.Config{ + P2P: p2p.Config{ + PrivateKey: config.PrivateKey, + MaxPeers: math.MaxInt32, + NoDiscovery: true, + Dialer: s, + EnableMsgEvents: true, + }, + NoUSB: true, + }) + if err != nil { + return nil, err + } + + simNode := &SimNode{ + ID: id, + config: config, + node: n, + adapter: s, + running: make(map[string]node.Service), + } + s.nodes[id] = simNode + return simNode, nil +} + +// Dial implements the p2p.NodeDialer interface by connecting to the node using +// an in-memory net.Pipe connection +func (s *SimAdapter) Dial(dest *discover.Node) (conn net.Conn, err error) { + node, ok := s.GetNode(dest.ID) + if !ok { + return nil, fmt.Errorf("unknown node: %s", dest.ID) + } + srv := node.Server() + if srv == nil { + return nil, fmt.Errorf("node not running: %s", dest.ID) + } + pipe1, pipe2 := net.Pipe() + go srv.SetupConn(pipe1, 0, nil) + return pipe2, nil +} + +// DialRPC implements the RPCDialer interface by creating an in-memory RPC +// client of the given node +func (s *SimAdapter) DialRPC(id discover.NodeID) (*rpc.Client, error) { + node, ok := s.GetNode(id) + if !ok { + return nil, fmt.Errorf("unknown node: %s", id) + } + handler, err := node.node.RPCHandler() + if err != nil { + return nil, err + } + return rpc.DialInProc(handler), nil +} + +// GetNode returns the node with the given ID if it exists +func (s *SimAdapter) GetNode(id discover.NodeID) (*SimNode, bool) { + s.mtx.RLock() + defer s.mtx.RUnlock() + node, ok := s.nodes[id] + return node, ok +} + +// SimNode is an in-memory simulation node which connects to other nodes using +// an in-memory net.Pipe connection (see SimAdapter.Dial), running devp2p +// protocols directly over that pipe +type SimNode struct { + lock sync.RWMutex + ID discover.NodeID + config *NodeConfig + adapter *SimAdapter + node *node.Node + running map[string]node.Service + client *rpc.Client + registerOnce sync.Once +} + +// Addr returns the node's discovery address +func (self *SimNode) Addr() []byte { + return []byte(self.Node().String()) +} + +// Node returns a discover.Node representing the SimNode +func (self *SimNode) Node() *discover.Node { + return discover.NewNode(self.ID, net.IP{127, 0, 0, 1}, 30303, 30303) +} + +// Client returns an rpc.Client which can be used to communicate with the +// underlying services (it is set once the node has started) +func (self *SimNode) Client() (*rpc.Client, error) { + self.lock.RLock() + defer self.lock.RUnlock() + if self.client == nil { + return nil, errors.New("node not started") + } + return self.client, nil +} + +// ServeRPC serves RPC requests over the given connection by creating an +// in-memory client to the node's RPC server +func (self *SimNode) ServeRPC(conn net.Conn) error { + handler, err := self.node.RPCHandler() + if err != nil { + return err + } + handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions) + return nil +} + +// Snapshots creates snapshots of the services by calling the +// simulation_snapshot RPC method +func (self *SimNode) Snapshots() (map[string][]byte, error) { + self.lock.RLock() + services := make(map[string]node.Service, len(self.running)) + for name, service := range self.running { + services[name] = service + } + self.lock.RUnlock() + if len(services) == 0 { + return nil, errors.New("no running services") + } + snapshots := make(map[string][]byte) + for name, service := range services { + if s, ok := service.(interface { + Snapshot() ([]byte, error) + }); ok { + snap, err := s.Snapshot() + if err != nil { + return nil, err + } + snapshots[name] = snap + } + } + return snapshots, nil +} + +// Start registers the services and starts the underlying devp2p node +func (self *SimNode) Start(snapshots map[string][]byte) error { + newService := func(name string) func(ctx *node.ServiceContext) (node.Service, error) { + return func(nodeCtx *node.ServiceContext) (node.Service, error) { + ctx := &ServiceContext{ + RPCDialer: self.adapter, + NodeContext: nodeCtx, + Config: self.config, + } + if snapshots != nil { + ctx.Snapshot = snapshots[name] + } + serviceFunc := self.adapter.services[name] + service, err := serviceFunc(ctx) + if err != nil { + return nil, err + } + self.running[name] = service + return service, nil + } + } + + // ensure we only register the services once in the case of the node + // being stopped and then started again + var regErr error + self.registerOnce.Do(func() { + for _, name := range self.config.Services { + if err := self.node.Register(newService(name)); err != nil { + regErr = err + return + } + } + }) + if regErr != nil { + return regErr + } + + if err := self.node.Start(); err != nil { + return err + } + + // create an in-process RPC client + handler, err := self.node.RPCHandler() + if err != nil { + return err + } + + self.lock.Lock() + self.client = rpc.DialInProc(handler) + self.lock.Unlock() + + return nil +} + +// Stop closes the RPC client and stops the underlying devp2p node +func (self *SimNode) Stop() error { + self.lock.Lock() + if self.client != nil { + self.client.Close() + self.client = nil + } + self.lock.Unlock() + return self.node.Stop() +} + +// Services returns a copy of the underlying services +func (self *SimNode) Services() []node.Service { + self.lock.RLock() + defer self.lock.RUnlock() + services := make([]node.Service, 0, len(self.running)) + for _, service := range self.running { + services = append(services, service) + } + return services +} + +// Server returns the underlying p2p.Server +func (self *SimNode) Server() *p2p.Server { + return self.node.Server() +} + +// SubscribeEvents subscribes the given channel to peer events from the +// underlying p2p.Server +func (self *SimNode) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription { + srv := self.Server() + if srv == nil { + panic("node not running") + } + return srv.SubscribeEvents(ch) +} + +// NodeInfo returns information about the node +func (self *SimNode) NodeInfo() *p2p.NodeInfo { + server := self.Server() + if server == nil { + return &p2p.NodeInfo{ + ID: self.ID.String(), + Enode: self.Node().String(), + } + } + return server.NodeInfo() +} diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go new file mode 100644 index 000000000..ed6cfc504 --- /dev/null +++ b/p2p/simulations/adapters/types.go @@ -0,0 +1,215 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package adapters + +import ( + "crypto/ecdsa" + "encoding/hex" + "encoding/json" + "fmt" + "net" + "os" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rpc" +) + +// Node represents a node in a simulation network which is created by a +// NodeAdapter, for example: +// +// * SimNode - An in-memory node +// * ExecNode - A child process node +// * DockerNode - A Docker container node +// +type Node interface { + // Addr returns the node's address (e.g. an Enode URL) + Addr() []byte + + // Client returns the RPC client which is created once the node is + // up and running + Client() (*rpc.Client, error) + + // ServeRPC serves RPC requests over the given connection + ServeRPC(net.Conn) error + + // Start starts the node with the given snapshots + Start(snapshots map[string][]byte) error + + // Stop stops the node + Stop() error + + // NodeInfo returns information about the node + NodeInfo() *p2p.NodeInfo + + // Snapshots creates snapshots of the running services + Snapshots() (map[string][]byte, error) +} + +// NodeAdapter is used to create Nodes in a simulation network +type NodeAdapter interface { + // Name returns the name of the adapter for logging purposes + Name() string + + // NewNode creates a new node with the given configuration + NewNode(config *NodeConfig) (Node, error) +} + +// NodeConfig is the configuration used to start a node in a simulation +// network +type NodeConfig struct { + // ID is the node's ID which is used to identify the node in the + // simulation network + ID discover.NodeID + + // PrivateKey is the node's private key which is used by the devp2p + // stack to encrypt communications + PrivateKey *ecdsa.PrivateKey + + // Name is a human friendly name for the node like "node01" + Name string + + // Services are the names of the services which should be run when + // starting the node (for SimNodes it should be the names of services + // contained in SimAdapter.services, for other nodes it should be + // services registered by calling the RegisterService function) + Services []string +} + +// nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding +// all fields as strings +type nodeConfigJSON struct { + ID string `json:"id"` + PrivateKey string `json:"private_key"` + Name string `json:"name"` + Services []string `json:"services"` +} + +// MarshalJSON implements the json.Marshaler interface by encoding the config +// fields as strings +func (n *NodeConfig) MarshalJSON() ([]byte, error) { + confJSON := nodeConfigJSON{ + ID: n.ID.String(), + Name: n.Name, + Services: n.Services, + } + if n.PrivateKey != nil { + confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)) + } + return json.Marshal(confJSON) +} + +// UnmarshalJSON implements the json.Unmarshaler interface by decoding the json +// string values into the config fields +func (n *NodeConfig) UnmarshalJSON(data []byte) error { + var confJSON nodeConfigJSON + if err := json.Unmarshal(data, &confJSON); err != nil { + return err + } + + if confJSON.ID != "" { + nodeID, err := discover.HexID(confJSON.ID) + if err != nil { + return err + } + n.ID = nodeID + } + + if confJSON.PrivateKey != "" { + key, err := hex.DecodeString(confJSON.PrivateKey) + if err != nil { + return err + } + privKey, err := crypto.ToECDSA(key) + if err != nil { + return err + } + n.PrivateKey = privKey + } + + n.Name = confJSON.Name + n.Services = confJSON.Services + + return nil +} + +// RandomNodeConfig returns node configuration with a randomly generated ID and +// PrivateKey +func RandomNodeConfig() *NodeConfig { + key, err := crypto.GenerateKey() + if err != nil { + panic("unable to generate key") + } + var id discover.NodeID + pubkey := crypto.FromECDSAPub(&key.PublicKey) + copy(id[:], pubkey[1:]) + return &NodeConfig{ + ID: id, + PrivateKey: key, + } +} + +// ServiceContext is a collection of options and methods which can be utilised +// when starting services +type ServiceContext struct { + RPCDialer + + NodeContext *node.ServiceContext + Config *NodeConfig + Snapshot []byte +} + +// RPCDialer is used when initialising services which need to connect to +// other nodes in the network (for example a simulated Swarm node which needs +// to connect to a Geth node to resolve ENS names) +type RPCDialer interface { + DialRPC(id discover.NodeID) (*rpc.Client, error) +} + +// Services is a collection of services which can be run in a simulation +type Services map[string]ServiceFunc + +// ServiceFunc returns a node.Service which can be used to boot a devp2p node +type ServiceFunc func(ctx *ServiceContext) (node.Service, error) + +// serviceFuncs is a map of registered services which are used to boot devp2p +// nodes +var serviceFuncs = make(Services) + +// RegisterServices registers the given Services which can then be used to +// start devp2p nodes using either the Exec or Docker adapters. +// +// It should be called in an init function so that it has the opportunity to +// execute the services before main() is called. +func RegisterServices(services Services) { + for name, f := range services { + if _, exists := serviceFuncs[name]; exists { + panic(fmt.Sprintf("node service already exists: %q", name)) + } + serviceFuncs[name] = f + } + + // now we have registered the services, run reexec.Init() which will + // potentially start one of the services if the current binary has + // been exec'd with argv[0] set to "p2p-node" + if reexec.Init() { + os.Exit(0) + } +} diff --git a/p2p/simulations/events.go b/p2p/simulations/events.go new file mode 100644 index 000000000..f17958c68 --- /dev/null +++ b/p2p/simulations/events.go @@ -0,0 +1,108 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "fmt" + "time" +) + +// EventType is the type of event emitted by a simulation network +type EventType string + +const ( + // EventTypeNode is the type of event emitted when a node is either + // created, started or stopped + EventTypeNode EventType = "node" + + // EventTypeConn is the type of event emitted when a connection is + // is either established or dropped between two nodes + EventTypeConn EventType = "conn" + + // EventTypeMsg is the type of event emitted when a p2p message it + // sent between two nodes + EventTypeMsg EventType = "msg" +) + +// Event is an event emitted by a simulation network +type Event struct { + // Type is the type of the event + Type EventType `json:"type"` + + // Time is the time the event happened + Time time.Time `json:"time"` + + // Control indicates whether the event is the result of a controlled + // action in the network + Control bool `json:"control"` + + // Node is set if the type is EventTypeNode + Node *Node `json:"node,omitempty"` + + // Conn is set if the type is EventTypeConn + Conn *Conn `json:"conn,omitempty"` + + // Msg is set if the type is EventTypeMsg + Msg *Msg `json:"msg,omitempty"` +} + +// NewEvent creates a new event for the given object which should be either a +// Node, Conn or Msg. +// +// The object is copied so that the event represents the state of the object +// when NewEvent is called. +func NewEvent(v interface{}) *Event { + event := &Event{Time: time.Now()} + switch v := v.(type) { + case *Node: + event.Type = EventTypeNode + node := *v + event.Node = &node + case *Conn: + event.Type = EventTypeConn + conn := *v + event.Conn = &conn + case *Msg: + event.Type = EventTypeMsg + msg := *v + event.Msg = &msg + default: + panic(fmt.Sprintf("invalid event type: %T", v)) + } + return event +} + +// ControlEvent creates a new control event +func ControlEvent(v interface{}) *Event { + event := NewEvent(v) + event.Control = true + return event +} + +// String returns the string representation of the event +func (e *Event) String() string { + switch e.Type { + case EventTypeNode: + return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up) + case EventTypeConn: + return fmt.Sprintf("<conn-event> nodes: %s->%s up: %t", e.Conn.One.TerminalString(), e.Conn.Other.TerminalString(), e.Conn.Up) + case EventTypeMsg: + return fmt.Sprintf("<msg-event> nodes: %s->%s proto: %s, code: %d, received: %t", e.Msg.One.TerminalString(), e.Msg.Other.TerminalString(), e.Msg.Protocol, e.Msg.Code, e.Msg.Received) + default: + return "" + } +} diff --git a/p2p/simulations/examples/README.md b/p2p/simulations/examples/README.md new file mode 100644 index 000000000..822a48dcb --- /dev/null +++ b/p2p/simulations/examples/README.md @@ -0,0 +1,39 @@ +# devp2p simulation examples + +## ping-pong + +`ping-pong.go` implements a simulation network which contains nodes running a +simple "ping-pong" protocol where nodes send a ping message to all their +connected peers every 10s and receive pong messages in return. + +To run the simulation, run `go run ping-pong.go` in one terminal to start the +simulation API and `./ping-pong.sh` in another to start and connect the nodes: + +``` +$ go run ping-pong.go +INFO [08-15|13:53:49] using sim adapter +INFO [08-15|13:53:49] starting simulation server on 0.0.0.0:8888... +``` + +``` +$ ./ping-pong.sh +---> 13:58:12 creating 10 nodes +Created node01 +Started node01 +... +Created node10 +Started node10 +---> 13:58:13 connecting node01 to all other nodes +Connected node01 to node02 +... +Connected node01 to node10 +---> 13:58:14 done +``` + +Use the `--adapter` flag to choose the adapter type: + +``` +$ go run ping-pong.go --adapter exec +INFO [08-15|14:01:14] using exec adapter tmpdir=/var/folders/k6/wpsgfg4n23ddbc6f5cnw5qg00000gn/T/p2p-example992833779 +INFO [08-15|14:01:14] starting simulation server on 0.0.0.0:8888... +``` diff --git a/p2p/simulations/examples/ping-pong.go b/p2p/simulations/examples/ping-pong.go new file mode 100644 index 000000000..6a0ead53a --- /dev/null +++ b/p2p/simulations/examples/ping-pong.go @@ -0,0 +1,184 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "net/http" + "os" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" +) + +var adapterType = flag.String("adapter", "sim", `node adapter to use (one of "sim", "exec" or "docker")`) + +// main() starts a simulation network which contains nodes running a simple +// ping-pong protocol +func main() { + flag.Parse() + + // set the log level to Trace + log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(false)))) + + // register a single ping-pong service + services := map[string]adapters.ServiceFunc{ + "ping-pong": func(ctx *adapters.ServiceContext) (node.Service, error) { + return newPingPongService(ctx.Config.ID), nil + }, + } + adapters.RegisterServices(services) + + // create the NodeAdapter + var adapter adapters.NodeAdapter + + switch *adapterType { + + case "sim": + log.Info("using sim adapter") + adapter = adapters.NewSimAdapter(services) + + case "exec": + tmpdir, err := ioutil.TempDir("", "p2p-example") + if err != nil { + log.Crit("error creating temp dir", "err", err) + } + defer os.RemoveAll(tmpdir) + log.Info("using exec adapter", "tmpdir", tmpdir) + adapter = adapters.NewExecAdapter(tmpdir) + + case "docker": + log.Info("using docker adapter") + var err error + adapter, err = adapters.NewDockerAdapter() + if err != nil { + log.Crit("error creating docker adapter", "err", err) + } + + default: + log.Crit(fmt.Sprintf("unknown node adapter %q", *adapterType)) + } + + // start the HTTP API + log.Info("starting simulation server on 0.0.0.0:8888...") + network := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + DefaultService: "ping-pong", + }) + if err := http.ListenAndServe(":8888", simulations.NewServer(network)); err != nil { + log.Crit("error starting simulation server", "err", err) + } +} + +// pingPongService runs a ping-pong protocol between nodes where each node +// sends a ping to all its connected peers every 10s and receives a pong in +// return +type pingPongService struct { + id discover.NodeID + log log.Logger + received int64 +} + +func newPingPongService(id discover.NodeID) *pingPongService { + return &pingPongService{ + id: id, + log: log.New("node.id", id), + } +} + +func (p *pingPongService) Protocols() []p2p.Protocol { + return []p2p.Protocol{{ + Name: "ping-pong", + Version: 1, + Length: 2, + Run: p.Run, + NodeInfo: p.Info, + }} +} + +func (p *pingPongService) APIs() []rpc.API { + return nil +} + +func (p *pingPongService) Start(server *p2p.Server) error { + p.log.Info("ping-pong service starting") + return nil +} + +func (p *pingPongService) Stop() error { + p.log.Info("ping-pong service stopping") + return nil +} + +func (p *pingPongService) Info() interface{} { + return struct { + Received int64 `json:"received"` + }{ + atomic.LoadInt64(&p.received), + } +} + +const ( + pingMsgCode = iota + pongMsgCode +) + +// Run implements the ping-pong protocol which sends ping messages to the peer +// at 10s intervals, and responds to pings with pong messages. +func (p *pingPongService) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + log := p.log.New("peer.id", peer.ID()) + + errC := make(chan error) + go func() { + for range time.Tick(10 * time.Second) { + log.Info("sending ping") + if err := p2p.Send(rw, pingMsgCode, "PING"); err != nil { + errC <- err + return + } + } + }() + go func() { + for { + msg, err := rw.ReadMsg() + if err != nil { + errC <- err + return + } + payload, err := ioutil.ReadAll(msg.Payload) + if err != nil { + errC <- err + return + } + log.Info("received message", "msg.code", msg.Code, "msg.payload", string(payload)) + atomic.AddInt64(&p.received, 1) + if msg.Code == pingMsgCode { + log.Info("sending pong") + go p2p.Send(rw, pongMsgCode, "PONG") + } + } + }() + return <-errC +} diff --git a/p2p/simulations/examples/ping-pong.sh b/p2p/simulations/examples/ping-pong.sh new file mode 100755 index 000000000..47936bd9a --- /dev/null +++ b/p2p/simulations/examples/ping-pong.sh @@ -0,0 +1,40 @@ +#!/bin/bash +# +# Boot a ping-pong network simulation using the HTTP API started by ping-pong.go + +set -e + +main() { + if ! which p2psim &>/dev/null; then + fail "missing p2psim binary (you need to build cmd/p2psim and put it in \$PATH)" + fi + + info "creating 10 nodes" + for i in $(seq 1 10); do + p2psim node create --name "$(node_name $i)" + p2psim node start "$(node_name $i)" + done + + info "connecting node01 to all other nodes" + for i in $(seq 2 10); do + p2psim node connect "node01" "$(node_name $i)" + done + + info "done" +} + +node_name() { + local num=$1 + echo "node$(printf '%02d' $num)" +} + +info() { + echo -e "\033[1;32m---> $(date +%H:%M:%S) ${@}\033[0m" +} + +fail() { + echo -e "\033[1;31mERROR: ${@}\033[0m" >&2 + exit 1 +} + +main "$@" diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go new file mode 100644 index 000000000..3fa8b9292 --- /dev/null +++ b/p2p/simulations/http.go @@ -0,0 +1,680 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "strconv" + "strings" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" + "github.com/julienschmidt/httprouter" + "golang.org/x/net/websocket" +) + +// DefaultClient is the default simulation API client which expects the API +// to be running at http://localhost:8888 +var DefaultClient = NewClient("http://localhost:8888") + +// Client is a client for the simulation HTTP API which supports creating +// and managing simulation networks +type Client struct { + URL string + + client *http.Client +} + +// NewClient returns a new simulation API client +func NewClient(url string) *Client { + return &Client{ + URL: url, + client: http.DefaultClient, + } +} + +// GetNetwork returns details of the network +func (c *Client) GetNetwork() (*Network, error) { + network := &Network{} + return network, c.Get("/", network) +} + +// StartNetwork starts all existing nodes in the simulation network +func (c *Client) StartNetwork() error { + return c.Post("/start", nil, nil) +} + +// StopNetwork stops all existing nodes in a simulation network +func (c *Client) StopNetwork() error { + return c.Post("/stop", nil, nil) +} + +// CreateSnapshot creates a network snapshot +func (c *Client) CreateSnapshot() (*Snapshot, error) { + snap := &Snapshot{} + return snap, c.Get("/snapshot", snap) +} + +// LoadSnapshot loads a snapshot into the network +func (c *Client) LoadSnapshot(snap *Snapshot) error { + return c.Post("/snapshot", snap, nil) +} + +// SubscribeOpts is a collection of options to use when subscribing to network +// events +type SubscribeOpts struct { + // Current instructs the server to send events for existing nodes and + // connections first + Current bool + + // Filter instructs the server to only send a subset of message events + Filter string +} + +// SubscribeNetwork subscribes to network events which are sent from the server +// as a server-sent-events stream, optionally receiving events for existing +// nodes and connections and filtering message events +func (c *Client) SubscribeNetwork(events chan *Event, opts SubscribeOpts) (event.Subscription, error) { + url := fmt.Sprintf("%s/events?current=%t&filter=%s", c.URL, opts.Current, opts.Filter) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "text/event-stream") + res, err := c.client.Do(req) + if err != nil { + return nil, err + } + if res.StatusCode != http.StatusOK { + response, _ := ioutil.ReadAll(res.Body) + res.Body.Close() + return nil, fmt.Errorf("unexpected HTTP status: %s: %s", res.Status, response) + } + + // define a producer function to pass to event.Subscription + // which reads server-sent events from res.Body and sends + // them to the events channel + producer := func(stop <-chan struct{}) error { + defer res.Body.Close() + + // read lines from res.Body in a goroutine so that we are + // always reading from the stop channel + lines := make(chan string) + errC := make(chan error, 1) + go func() { + s := bufio.NewScanner(res.Body) + for s.Scan() { + select { + case lines <- s.Text(): + case <-stop: + return + } + } + errC <- s.Err() + }() + + // detect any lines which start with "data:", decode the data + // into an event and send it to the events channel + for { + select { + case line := <-lines: + if !strings.HasPrefix(line, "data:") { + continue + } + data := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + event := &Event{} + if err := json.Unmarshal([]byte(data), event); err != nil { + return fmt.Errorf("error decoding SSE event: %s", err) + } + select { + case events <- event: + case <-stop: + return nil + } + case err := <-errC: + return err + case <-stop: + return nil + } + } + } + + return event.NewSubscription(producer), nil +} + +// GetNodes returns all nodes which exist in the network +func (c *Client) GetNodes() ([]*p2p.NodeInfo, error) { + var nodes []*p2p.NodeInfo + return nodes, c.Get("/nodes", &nodes) +} + +// CreateNode creates a node in the network using the given configuration +func (c *Client) CreateNode(config *adapters.NodeConfig) (*p2p.NodeInfo, error) { + node := &p2p.NodeInfo{} + return node, c.Post("/nodes", config, node) +} + +// GetNode returns details of a node +func (c *Client) GetNode(nodeID string) (*p2p.NodeInfo, error) { + node := &p2p.NodeInfo{} + return node, c.Get(fmt.Sprintf("/nodes/%s", nodeID), node) +} + +// StartNode starts a node +func (c *Client) StartNode(nodeID string) error { + return c.Post(fmt.Sprintf("/nodes/%s/start", nodeID), nil, nil) +} + +// StopNode stops a node +func (c *Client) StopNode(nodeID string) error { + return c.Post(fmt.Sprintf("/nodes/%s/stop", nodeID), nil, nil) +} + +// ConnectNode connects a node to a peer node +func (c *Client) ConnectNode(nodeID, peerID string) error { + return c.Post(fmt.Sprintf("/nodes/%s/conn/%s", nodeID, peerID), nil, nil) +} + +// DisconnectNode disconnects a node from a peer node +func (c *Client) DisconnectNode(nodeID, peerID string) error { + return c.Delete(fmt.Sprintf("/nodes/%s/conn/%s", nodeID, peerID)) +} + +// RPCClient returns an RPC client connected to a node +func (c *Client) RPCClient(ctx context.Context, nodeID string) (*rpc.Client, error) { + baseURL := strings.Replace(c.URL, "http", "ws", 1) + return rpc.DialWebsocket(ctx, fmt.Sprintf("%s/nodes/%s/rpc", baseURL, nodeID), "") +} + +// Get performs a HTTP GET request decoding the resulting JSON response +// into "out" +func (c *Client) Get(path string, out interface{}) error { + return c.Send("GET", path, nil, out) +} + +// Post performs a HTTP POST request sending "in" as the JSON body and +// decoding the resulting JSON response into "out" +func (c *Client) Post(path string, in, out interface{}) error { + return c.Send("POST", path, in, out) +} + +// Delete performs a HTTP DELETE request +func (c *Client) Delete(path string) error { + return c.Send("DELETE", path, nil, nil) +} + +// Send performs a HTTP request, sending "in" as the JSON request body and +// decoding the JSON response into "out" +func (c *Client) Send(method, path string, in, out interface{}) error { + var body []byte + if in != nil { + var err error + body, err = json.Marshal(in) + if err != nil { + return err + } + } + req, err := http.NewRequest(method, c.URL+path, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + res, err := c.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusCreated { + response, _ := ioutil.ReadAll(res.Body) + return fmt.Errorf("unexpected HTTP status: %s: %s", res.Status, response) + } + if out != nil { + if err := json.NewDecoder(res.Body).Decode(out); err != nil { + return err + } + } + return nil +} + +// Server is an HTTP server providing an API to manage a simulation network +type Server struct { + router *httprouter.Router + network *Network +} + +// NewServer returns a new simulation API server +func NewServer(network *Network) *Server { + s := &Server{ + router: httprouter.New(), + network: network, + } + + s.OPTIONS("/", s.Options) + s.GET("/", s.GetNetwork) + s.POST("/start", s.StartNetwork) + s.POST("/stop", s.StopNetwork) + s.GET("/events", s.StreamNetworkEvents) + s.GET("/snapshot", s.CreateSnapshot) + s.POST("/snapshot", s.LoadSnapshot) + s.POST("/nodes", s.CreateNode) + s.GET("/nodes", s.GetNodes) + s.GET("/nodes/:nodeid", s.GetNode) + s.POST("/nodes/:nodeid/start", s.StartNode) + s.POST("/nodes/:nodeid/stop", s.StopNode) + s.POST("/nodes/:nodeid/conn/:peerid", s.ConnectNode) + s.DELETE("/nodes/:nodeid/conn/:peerid", s.DisconnectNode) + s.GET("/nodes/:nodeid/rpc", s.NodeRPC) + + return s +} + +// GetNetwork returns details of the network +func (s *Server) GetNetwork(w http.ResponseWriter, req *http.Request) { + s.JSON(w, http.StatusOK, s.network) +} + +// StartNetwork starts all nodes in the network +func (s *Server) StartNetwork(w http.ResponseWriter, req *http.Request) { + if err := s.network.StartAll(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +// StopNetwork stops all nodes in the network +func (s *Server) StopNetwork(w http.ResponseWriter, req *http.Request) { + if err := s.network.StopAll(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +// StreamNetworkEvents streams network events as a server-sent-events stream +func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) { + events := make(chan *Event) + sub := s.network.events.Subscribe(events) + defer sub.Unsubscribe() + + // stop the stream if the client goes away + var clientGone <-chan bool + if cn, ok := w.(http.CloseNotifier); ok { + clientGone = cn.CloseNotify() + } + + // write writes the given event and data to the stream like: + // + // event: <event> + // data: <data> + // + write := func(event, data string) { + fmt.Fprintf(w, "event: %s\n", event) + fmt.Fprintf(w, "data: %s\n\n", data) + if fw, ok := w.(http.Flusher); ok { + fw.Flush() + } + } + writeEvent := func(event *Event) error { + data, err := json.Marshal(event) + if err != nil { + return err + } + write("network", string(data)) + return nil + } + writeErr := func(err error) { + write("error", err.Error()) + } + + // check if filtering has been requested + var filters MsgFilters + if filterParam := req.URL.Query().Get("filter"); filterParam != "" { + var err error + filters, err = NewMsgFilters(filterParam) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } + + w.Header().Set("Content-Type", "text/event-stream; charset=utf-8") + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "\n\n") + if fw, ok := w.(http.Flusher); ok { + fw.Flush() + } + + // optionally send the existing nodes and connections + if req.URL.Query().Get("current") == "true" { + snap, err := s.network.Snapshot() + if err != nil { + writeErr(err) + return + } + for _, node := range snap.Nodes { + event := NewEvent(&node.Node) + if err := writeEvent(event); err != nil { + writeErr(err) + return + } + } + for _, conn := range snap.Conns { + event := NewEvent(&conn) + if err := writeEvent(event); err != nil { + writeErr(err) + return + } + } + } + + for { + select { + case event := <-events: + // only send message events which match the filters + if event.Msg != nil && !filters.Match(event.Msg) { + continue + } + if err := writeEvent(event); err != nil { + writeErr(err) + return + } + case <-clientGone: + return + } + } +} + +// NewMsgFilters constructs a collection of message filters from a URL query +// parameter. +// +// The parameter is expected to be a dash-separated list of individual filters, +// each having the format '<proto>:<codes>', where <proto> is the name of a +// protocol and <codes> is a comma-separated list of message codes. +// +// A message code of '*' or '-1' is considered a wildcard and matches any code. +func NewMsgFilters(filterParam string) (MsgFilters, error) { + filters := make(MsgFilters) + for _, filter := range strings.Split(filterParam, "-") { + protoCodes := strings.SplitN(filter, ":", 2) + if len(protoCodes) != 2 || protoCodes[0] == "" || protoCodes[1] == "" { + return nil, fmt.Errorf("invalid message filter: %s", filter) + } + proto := protoCodes[0] + for _, code := range strings.Split(protoCodes[1], ",") { + if code == "*" || code == "-1" { + filters[MsgFilter{Proto: proto, Code: -1}] = struct{}{} + continue + } + n, err := strconv.ParseUint(code, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid message code: %s", code) + } + filters[MsgFilter{Proto: proto, Code: int64(n)}] = struct{}{} + } + } + return filters, nil +} + +// MsgFilters is a collection of filters which are used to filter message +// events +type MsgFilters map[MsgFilter]struct{} + +// Match checks if the given message matches any of the filters +func (m MsgFilters) Match(msg *Msg) bool { + // check if there is a wildcard filter for the message's protocol + if _, ok := m[MsgFilter{Proto: msg.Protocol, Code: -1}]; ok { + return true + } + + // check if there is a filter for the message's protocol and code + if _, ok := m[MsgFilter{Proto: msg.Protocol, Code: int64(msg.Code)}]; ok { + return true + } + + return false +} + +// MsgFilter is used to filter message events based on protocol and message +// code +type MsgFilter struct { + // Proto is matched against a message's protocol + Proto string + + // Code is matched against a message's code, with -1 matching all codes + Code int64 +} + +// CreateSnapshot creates a network snapshot +func (s *Server) CreateSnapshot(w http.ResponseWriter, req *http.Request) { + snap, err := s.network.Snapshot() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, snap) +} + +// LoadSnapshot loads a snapshot into the network +func (s *Server) LoadSnapshot(w http.ResponseWriter, req *http.Request) { + snap := &Snapshot{} + if err := json.NewDecoder(req.Body).Decode(snap); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := s.network.Load(snap); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, s.network) +} + +// CreateNode creates a node in the network using the given configuration +func (s *Server) CreateNode(w http.ResponseWriter, req *http.Request) { + config := adapters.RandomNodeConfig() + err := json.NewDecoder(req.Body).Decode(config) + if err != nil && err != io.EOF { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + node, err := s.network.NewNodeWithConfig(config) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusCreated, node.NodeInfo()) +} + +// GetNodes returns all nodes which exist in the network +func (s *Server) GetNodes(w http.ResponseWriter, req *http.Request) { + nodes := s.network.GetNodes() + + infos := make([]*p2p.NodeInfo, len(nodes)) + for i, node := range nodes { + infos[i] = node.NodeInfo() + } + + s.JSON(w, http.StatusOK, infos) +} + +// GetNode returns details of a node +func (s *Server) GetNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// StartNode starts a node +func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + if err := s.network.Start(node.ID()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// StopNode stops a node +func (s *Server) StopNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + if err := s.network.Stop(node.ID()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// ConnectNode connects a node to a peer node +func (s *Server) ConnectNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + peer := req.Context().Value("peer").(*Node) + + if err := s.network.Connect(node.ID(), peer.ID()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// DisconnectNode disconnects a node from a peer node +func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + peer := req.Context().Value("peer").(*Node) + + if err := s.network.Disconnect(node.ID(), peer.ID()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// Options responds to the OPTIONS HTTP method by returning a 200 OK response +// with the "Access-Control-Allow-Headers" header set to "Content-Type" +func (s *Server) Options(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + w.WriteHeader(http.StatusOK) +} + +// NodeRPC forwards RPC requests to a node in the network via a WebSocket +// connection +func (s *Server) NodeRPC(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + handler := func(conn *websocket.Conn) { + node.ServeRPC(conn) + } + + websocket.Server{Handler: handler}.ServeHTTP(w, req) +} + +// ServeHTTP implements the http.Handler interface by delegating to the +// underlying httprouter.Router +func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { + s.router.ServeHTTP(w, req) +} + +// GET registers a handler for GET requests to a particular path +func (s *Server) GET(path string, handle http.HandlerFunc) { + s.router.GET(path, s.wrapHandler(handle)) +} + +// POST registers a handler for POST requests to a particular path +func (s *Server) POST(path string, handle http.HandlerFunc) { + s.router.POST(path, s.wrapHandler(handle)) +} + +// DELETE registers a handler for DELETE requests to a particular path +func (s *Server) DELETE(path string, handle http.HandlerFunc) { + s.router.DELETE(path, s.wrapHandler(handle)) +} + +// OPTIONS registers a handler for OPTIONS requests to a particular path +func (s *Server) OPTIONS(path string, handle http.HandlerFunc) { + s.router.OPTIONS("/*path", s.wrapHandler(handle)) +} + +// JSON sends "data" as a JSON HTTP response +func (s *Server) JSON(w http.ResponseWriter, status int, data interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(data) +} + +// wrapHandler returns a httprouter.Handle which wraps a http.HandlerFunc by +// populating request.Context with any objects from the URL params +func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { + return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + + ctx := context.Background() + + if id := params.ByName("nodeid"); id != "" { + var node *Node + if nodeID, err := discover.HexID(id); err == nil { + node = s.network.GetNode(nodeID) + } else { + node = s.network.GetNodeByName(id) + } + if node == nil { + http.NotFound(w, req) + return + } + ctx = context.WithValue(ctx, "node", node) + } + + if id := params.ByName("peerid"); id != "" { + var peer *Node + if peerID, err := discover.HexID(id); err == nil { + peer = s.network.GetNode(peerID) + } else { + peer = s.network.GetNodeByName(id) + } + if peer == nil { + http.NotFound(w, req) + return + } + ctx = context.WithValue(ctx, "peer", peer) + } + + handler(w, req.WithContext(ctx)) + } +} diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go new file mode 100644 index 000000000..677a8fb14 --- /dev/null +++ b/p2p/simulations/http_test.go @@ -0,0 +1,823 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "context" + "fmt" + "math/rand" + "net/http/httptest" + "reflect" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" +) + +// testService implements the node.Service interface and provides protocols +// and APIs which are useful for testing nodes in a simulation network +type testService struct { + id discover.NodeID + + // peerCount is incremented once a peer handshake has been performed + peerCount int64 + + peers map[discover.NodeID]*testPeer + peersMtx sync.Mutex + + // state stores []byte which is used to test creating and loading + // snapshots + state atomic.Value +} + +func newTestService(ctx *adapters.ServiceContext) (node.Service, error) { + svc := &testService{ + id: ctx.Config.ID, + peers: make(map[discover.NodeID]*testPeer), + } + svc.state.Store(ctx.Snapshot) + return svc, nil +} + +type testPeer struct { + testReady chan struct{} + dumReady chan struct{} +} + +func (t *testService) peer(id discover.NodeID) *testPeer { + t.peersMtx.Lock() + defer t.peersMtx.Unlock() + if peer, ok := t.peers[id]; ok { + return peer + } + peer := &testPeer{ + testReady: make(chan struct{}), + dumReady: make(chan struct{}), + } + t.peers[id] = peer + return peer +} + +func (t *testService) Protocols() []p2p.Protocol { + return []p2p.Protocol{ + { + Name: "test", + Version: 1, + Length: 3, + Run: t.RunTest, + }, + { + Name: "dum", + Version: 1, + Length: 1, + Run: t.RunDum, + }, + { + Name: "prb", + Version: 1, + Length: 1, + Run: t.RunPrb, + }, + } +} + +func (t *testService) APIs() []rpc.API { + return []rpc.API{{ + Namespace: "test", + Version: "1.0", + Service: &TestAPI{ + state: &t.state, + peerCount: &t.peerCount, + }, + }} +} + +func (t *testService) Start(server *p2p.Server) error { + return nil +} + +func (t *testService) Stop() error { + return nil +} + +// handshake performs a peer handshake by sending and expecting an empty +// message with the given code +func (t *testService) handshake(rw p2p.MsgReadWriter, code uint64) error { + errc := make(chan error, 2) + go func() { errc <- p2p.Send(rw, code, struct{}{}) }() + go func() { errc <- p2p.ExpectMsg(rw, code, struct{}{}) }() + for i := 0; i < 2; i++ { + if err := <-errc; err != nil { + return err + } + } + return nil +} + +func (t *testService) RunTest(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := t.peer(p.ID()) + + // perform three handshakes with three different message codes, + // used to test message sending and filtering + if err := t.handshake(rw, 2); err != nil { + return err + } + if err := t.handshake(rw, 1); err != nil { + return err + } + if err := t.handshake(rw, 0); err != nil { + return err + } + + // close the testReady channel so that other protocols can run + close(peer.testReady) + + // track the peer + atomic.AddInt64(&t.peerCount, 1) + defer atomic.AddInt64(&t.peerCount, -1) + + // block until the peer is dropped + for { + _, err := rw.ReadMsg() + if err != nil { + return err + } + } +} + +func (t *testService) RunDum(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := t.peer(p.ID()) + + // wait for the test protocol to perform its handshake + <-peer.testReady + + // perform a handshake + if err := t.handshake(rw, 0); err != nil { + return err + } + + // close the dumReady channel so that other protocols can run + close(peer.dumReady) + + // block until the peer is dropped + for { + _, err := rw.ReadMsg() + if err != nil { + return err + } + } +} +func (t *testService) RunPrb(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := t.peer(p.ID()) + + // wait for the dum protocol to perform its handshake + <-peer.dumReady + + // perform a handshake + if err := t.handshake(rw, 0); err != nil { + return err + } + + // block until the peer is dropped + for { + _, err := rw.ReadMsg() + if err != nil { + return err + } + } +} + +func (t *testService) Snapshot() ([]byte, error) { + return t.state.Load().([]byte), nil +} + +// TestAPI provides a test API to: +// * get the peer count +// * get and set an arbitrary state byte slice +// * get and increment a counter +// * subscribe to counter increment events +type TestAPI struct { + state *atomic.Value + peerCount *int64 + counter int64 + feed event.Feed +} + +func (t *TestAPI) PeerCount() int64 { + return atomic.LoadInt64(t.peerCount) +} + +func (t *TestAPI) Get() int64 { + return atomic.LoadInt64(&t.counter) +} + +func (t *TestAPI) Add(delta int64) { + atomic.AddInt64(&t.counter, delta) + t.feed.Send(delta) +} + +func (t *TestAPI) GetState() []byte { + return t.state.Load().([]byte) +} + +func (t *TestAPI) SetState(state []byte) { + t.state.Store(state) +} + +func (t *TestAPI) Events(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + events := make(chan int64) + sub := t.feed.Subscribe(events) + defer sub.Unsubscribe() + + for { + select { + case event := <-events: + notifier.Notify(rpcSub.ID, event) + case <-sub.Err(): + return + case <-rpcSub.Err(): + return + case <-notifier.Closed(): + return + } + } + }() + + return rpcSub, nil +} + +var testServices = adapters.Services{ + "test": newTestService, +} + +func testHTTPServer(t *testing.T) (*Network, *httptest.Server) { + adapter := adapters.NewSimAdapter(testServices) + network := NewNetwork(adapter, &NetworkConfig{ + DefaultService: "test", + }) + return network, httptest.NewServer(NewServer(network)) +} + +// TestHTTPNetwork tests interacting with a simulation network using the HTTP +// API +func TestHTTPNetwork(t *testing.T) { + // start the server + network, s := testHTTPServer(t) + defer s.Close() + + // subscribe to events so we can check them later + client := NewClient(s.URL) + events := make(chan *Event, 100) + var opts SubscribeOpts + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // check we can retrieve details about the network + gotNetwork, err := client.GetNetwork() + if err != nil { + t.Fatalf("error getting network: %s", err) + } + if gotNetwork.ID != network.ID { + t.Fatalf("expected network to have ID %q, got %q", network.ID, gotNetwork.ID) + } + + // start a simulation network + nodeIDs := startTestNetwork(t, client) + + // check we got all the events + x := &expectEvents{t, events, sub} + x.expect( + x.nodeEvent(nodeIDs[0], false), + x.nodeEvent(nodeIDs[1], false), + x.nodeEvent(nodeIDs[0], true), + x.nodeEvent(nodeIDs[1], true), + x.connEvent(nodeIDs[0], nodeIDs[1], false), + x.connEvent(nodeIDs[0], nodeIDs[1], true), + ) + + // reconnect the stream and check we get the current nodes and conns + events = make(chan *Event, 100) + opts.Current = true + sub, err = client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + x = &expectEvents{t, events, sub} + x.expect( + x.nodeEvent(nodeIDs[0], true), + x.nodeEvent(nodeIDs[1], true), + x.connEvent(nodeIDs[0], nodeIDs[1], true), + ) +} + +func startTestNetwork(t *testing.T, client *Client) []string { + // create two nodes + nodeCount := 2 + nodeIDs := make([]string, nodeCount) + for i := 0; i < nodeCount; i++ { + node, err := client.CreateNode(nil) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + nodeIDs[i] = node.ID + } + + // check both nodes exist + nodes, err := client.GetNodes() + if err != nil { + t.Fatalf("error getting nodes: %s", err) + } + if len(nodes) != nodeCount { + t.Fatalf("expected %d nodes, got %d", nodeCount, len(nodes)) + } + for i, nodeID := range nodeIDs { + if nodes[i].ID != nodeID { + t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, nodes[i].ID) + } + node, err := client.GetNode(nodeID) + if err != nil { + t.Fatalf("error getting node %d: %s", i, err) + } + if node.ID != nodeID { + t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, node.ID) + } + } + + // start both nodes + for _, nodeID := range nodeIDs { + if err := client.StartNode(nodeID); err != nil { + t.Fatalf("error starting node %q: %s", nodeID, err) + } + } + + // connect the nodes + for i := 0; i < nodeCount-1; i++ { + peerId := i + 1 + if i == nodeCount-1 { + peerId = 0 + } + if err := client.ConnectNode(nodeIDs[i], nodeIDs[peerId]); err != nil { + t.Fatalf("error connecting nodes: %s", err) + } + } + + return nodeIDs +} + +type expectEvents struct { + *testing.T + + events chan *Event + sub event.Subscription +} + +func (t *expectEvents) nodeEvent(id string, up bool) *Event { + return &Event{ + Type: EventTypeNode, + Node: &Node{ + Config: &adapters.NodeConfig{ + ID: discover.MustHexID(id), + }, + Up: up, + }, + } +} + +func (t *expectEvents) connEvent(one, other string, up bool) *Event { + return &Event{ + Type: EventTypeConn, + Conn: &Conn{ + One: discover.MustHexID(one), + Other: discover.MustHexID(other), + Up: up, + }, + } +} + +func (t *expectEvents) expectMsgs(expected map[MsgFilter]int) { + actual := make(map[MsgFilter]int) + timeout := time.After(10 * time.Second) +loop: + for { + select { + case event := <-t.events: + t.Logf("received %s event: %s", event.Type, event) + + if event.Type != EventTypeMsg || event.Msg.Received { + continue loop + } + if event.Msg == nil { + t.Fatal("expected event.Msg to be set") + } + filter := MsgFilter{ + Proto: event.Msg.Protocol, + Code: int64(event.Msg.Code), + } + actual[filter]++ + if actual[filter] > expected[filter] { + t.Fatalf("received too many msgs for filter: %v", filter) + } + if reflect.DeepEqual(actual, expected) { + return + } + + case err := <-t.sub.Err(): + t.Fatalf("network stream closed unexpectedly: %s", err) + + case <-timeout: + t.Fatal("timed out waiting for expected events") + } + } +} + +func (t *expectEvents) expect(events ...*Event) { + timeout := time.After(10 * time.Second) + i := 0 + for { + select { + case event := <-t.events: + t.Logf("received %s event: %s", event.Type, event) + + expected := events[i] + if event.Type != expected.Type { + t.Fatalf("expected event %d to have type %q, got %q", i, expected.Type, event.Type) + } + + switch expected.Type { + + case EventTypeNode: + if event.Node == nil { + t.Fatal("expected event.Node to be set") + } + if event.Node.ID() != expected.Node.ID() { + t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString()) + } + if event.Node.Up != expected.Node.Up { + t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up) + } + + case EventTypeConn: + if event.Conn == nil { + t.Fatal("expected event.Conn to be set") + } + if event.Conn.One != expected.Conn.One { + t.Fatalf("expected conn event %d to have one=%q, got one=%q", i, expected.Conn.One.TerminalString(), event.Conn.One.TerminalString()) + } + if event.Conn.Other != expected.Conn.Other { + t.Fatalf("expected conn event %d to have other=%q, got other=%q", i, expected.Conn.Other.TerminalString(), event.Conn.Other.TerminalString()) + } + if event.Conn.Up != expected.Conn.Up { + t.Fatalf("expected conn event %d to have up=%t, got up=%t", i, expected.Conn.Up, event.Conn.Up) + } + + } + + i++ + if i == len(events) { + return + } + + case err := <-t.sub.Err(): + t.Fatalf("network stream closed unexpectedly: %s", err) + + case <-timeout: + t.Fatal("timed out waiting for expected events") + } + } +} + +// TestHTTPNodeRPC tests calling RPC methods on nodes via the HTTP API +func TestHTTPNodeRPC(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // start a node in the network + client := NewClient(s.URL) + node, err := client.CreateNode(nil) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + if err := client.StartNode(node.ID); err != nil { + t.Fatalf("error starting node: %s", err) + } + + // create two RPC clients + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + rpcClient1, err := client.RPCClient(ctx, node.ID) + if err != nil { + t.Fatalf("error getting node RPC client: %s", err) + } + rpcClient2, err := client.RPCClient(ctx, node.ID) + if err != nil { + t.Fatalf("error getting node RPC client: %s", err) + } + + // subscribe to events using client 1 + events := make(chan int64, 1) + sub, err := rpcClient1.Subscribe(ctx, "test", events, "events") + if err != nil { + t.Fatalf("error subscribing to events: %s", err) + } + defer sub.Unsubscribe() + + // call some RPC methods using client 2 + if err := rpcClient2.CallContext(ctx, nil, "test_add", 10); err != nil { + t.Fatalf("error calling RPC method: %s", err) + } + var result int64 + if err := rpcClient2.CallContext(ctx, &result, "test_get"); err != nil { + t.Fatalf("error calling RPC method: %s", err) + } + if result != 10 { + t.Fatalf("expected result to be 10, got %d", result) + } + + // check we got an event from client 1 + select { + case event := <-events: + if event != 10 { + t.Fatalf("expected event to be 10, got %d", event) + } + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } +} + +// TestHTTPSnapshot tests creating and loading network snapshots +func TestHTTPSnapshot(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // create a two-node network + client := NewClient(s.URL) + nodeCount := 2 + nodes := make([]*p2p.NodeInfo, nodeCount) + for i := 0; i < nodeCount; i++ { + node, err := client.CreateNode(nil) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + if err := client.StartNode(node.ID); err != nil { + t.Fatalf("error starting node: %s", err) + } + nodes[i] = node + } + if err := client.ConnectNode(nodes[0].ID, nodes[1].ID); err != nil { + t.Fatalf("error connecting nodes: %s", err) + } + + // store some state in the test services + states := make([]string, nodeCount) + for i, node := range nodes { + rpc, err := client.RPCClient(context.Background(), node.ID) + if err != nil { + t.Fatalf("error getting RPC client: %s", err) + } + defer rpc.Close() + state := fmt.Sprintf("%x", rand.Int()) + if err := rpc.Call(nil, "test_setState", []byte(state)); err != nil { + t.Fatalf("error setting service state: %s", err) + } + states[i] = state + } + + // create a snapshot + snap, err := client.CreateSnapshot() + if err != nil { + t.Fatalf("error creating snapshot: %s", err) + } + for i, state := range states { + gotState := snap.Nodes[i].Snapshots["test"] + if string(gotState) != state { + t.Fatalf("expected snapshot state %q, got %q", state, gotState) + } + } + + // create another network + _, s = testHTTPServer(t) + defer s.Close() + client = NewClient(s.URL) + + // subscribe to events so we can check them later + events := make(chan *Event, 100) + var opts SubscribeOpts + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // load the snapshot + if err := client.LoadSnapshot(snap); err != nil { + t.Fatalf("error loading snapshot: %s", err) + } + + // check the nodes and connection exists + net, err := client.GetNetwork() + if err != nil { + t.Fatalf("error getting network: %s", err) + } + if len(net.Nodes) != nodeCount { + t.Fatalf("expected network to have %d nodes, got %d", nodeCount, len(net.Nodes)) + } + for i, node := range nodes { + id := net.Nodes[i].ID().String() + if id != node.ID { + t.Fatalf("expected node %d to have ID %s, got %s", i, node.ID, id) + } + } + if len(net.Conns) != 1 { + t.Fatalf("expected network to have 1 connection, got %d", len(net.Conns)) + } + conn := net.Conns[0] + if conn.One.String() != nodes[0].ID { + t.Fatalf("expected connection to have one=%q, got one=%q", nodes[0].ID, conn.One) + } + if conn.Other.String() != nodes[1].ID { + t.Fatalf("expected connection to have other=%q, got other=%q", nodes[1].ID, conn.Other) + } + + // check the node states were restored + for i, node := range nodes { + rpc, err := client.RPCClient(context.Background(), node.ID) + if err != nil { + t.Fatalf("error getting RPC client: %s", err) + } + defer rpc.Close() + var state []byte + if err := rpc.Call(&state, "test_getState"); err != nil { + t.Fatalf("error getting service state: %s", err) + } + if string(state) != states[i] { + t.Fatalf("expected snapshot state %q, got %q", states[i], state) + } + } + + // check we got all the events + x := &expectEvents{t, events, sub} + x.expect( + x.nodeEvent(nodes[0].ID, false), + x.nodeEvent(nodes[0].ID, true), + x.nodeEvent(nodes[1].ID, false), + x.nodeEvent(nodes[1].ID, true), + x.connEvent(nodes[0].ID, nodes[1].ID, false), + x.connEvent(nodes[0].ID, nodes[1].ID, true), + ) +} + +// TestMsgFilterPassMultiple tests streaming message events using a filter +// with multiple protocols +func TestMsgFilterPassMultiple(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // subscribe to events with a message filter + client := NewClient(s.URL) + events := make(chan *Event, 10) + opts := SubscribeOpts{ + Filter: "prb:0-test:0", + } + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // start a simulation network + startTestNetwork(t, client) + + // check we got the expected events + x := &expectEvents{t, events, sub} + x.expectMsgs(map[MsgFilter]int{ + {"test", 0}: 2, + {"prb", 0}: 2, + }) +} + +// TestMsgFilterPassWildcard tests streaming message events using a filter +// with a code wildcard +func TestMsgFilterPassWildcard(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // subscribe to events with a message filter + client := NewClient(s.URL) + events := make(chan *Event, 10) + opts := SubscribeOpts{ + Filter: "prb:0,2-test:*", + } + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // start a simulation network + startTestNetwork(t, client) + + // check we got the expected events + x := &expectEvents{t, events, sub} + x.expectMsgs(map[MsgFilter]int{ + {"test", 2}: 2, + {"test", 1}: 2, + {"test", 0}: 2, + {"prb", 0}: 2, + }) +} + +// TestMsgFilterPassSingle tests streaming message events using a filter +// with a single protocol and code +func TestMsgFilterPassSingle(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // subscribe to events with a message filter + client := NewClient(s.URL) + events := make(chan *Event, 10) + opts := SubscribeOpts{ + Filter: "dum:0", + } + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // start a simulation network + startTestNetwork(t, client) + + // check we got the expected events + x := &expectEvents{t, events, sub} + x.expectMsgs(map[MsgFilter]int{ + {"dum", 0}: 2, + }) +} + +// TestMsgFilterPassSingle tests streaming message events using an invalid +// filter +func TestMsgFilterFailBadParams(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + client := NewClient(s.URL) + events := make(chan *Event, 10) + opts := SubscribeOpts{ + Filter: "foo:", + } + _, err := client.SubscribeNetwork(events, opts) + if err == nil { + t.Fatalf("expected event subscription to fail but succeeded!") + } + + opts.Filter = "bzz:aa" + _, err = client.SubscribeNetwork(events, opts) + if err == nil { + t.Fatalf("expected event subscription to fail but succeeded!") + } + + opts.Filter = "invalid" + _, err = client.SubscribeNetwork(events, opts) + if err == nil { + t.Fatalf("expected event subscription to fail but succeeded!") + } +} diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go new file mode 100644 index 000000000..06890ffcf --- /dev/null +++ b/p2p/simulations/network.go @@ -0,0 +1,680 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" +) + +// NetworkConfig defines configuration options for starting a Network +type NetworkConfig struct { + ID string `json:"id"` + DefaultService string `json:"default_service,omitempty"` +} + +// Network models a p2p simulation network which consists of a collection of +// simulated nodes and the connections which exist between them. +// +// The Network has a single NodeAdapter which is responsible for actually +// starting nodes and connecting them together. +// +// The Network emits events when nodes are started and stopped, when they are +// connected and disconnected, and also when messages are sent between nodes. +type Network struct { + NetworkConfig + + Nodes []*Node `json:"nodes"` + nodeMap map[discover.NodeID]int + + Conns []*Conn `json:"conns"` + connMap map[string]int + + nodeAdapter adapters.NodeAdapter + events event.Feed + lock sync.RWMutex + quitc chan struct{} +} + +// NewNetwork returns a Network which uses the given NodeAdapter and NetworkConfig +func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network { + return &Network{ + NetworkConfig: *conf, + nodeAdapter: nodeAdapter, + nodeMap: make(map[discover.NodeID]int), + connMap: make(map[string]int), + quitc: make(chan struct{}), + } +} + +// Events returns the output event feed of the Network. +func (self *Network) Events() *event.Feed { + return &self.events +} + +// NewNode adds a new node to the network with a random ID +func (self *Network) NewNode() (*Node, error) { + conf := adapters.RandomNodeConfig() + conf.Services = []string{self.DefaultService} + return self.NewNodeWithConfig(conf) +} + +// NewNodeWithConfig adds a new node to the network with the given config, +// returning an error if a node with the same ID or name already exists +func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) { + self.lock.Lock() + defer self.lock.Unlock() + + // create a random ID and PrivateKey if not set + if conf.ID == (discover.NodeID{}) { + c := adapters.RandomNodeConfig() + conf.ID = c.ID + conf.PrivateKey = c.PrivateKey + } + id := conf.ID + + // assign a name to the node if not set + if conf.Name == "" { + conf.Name = fmt.Sprintf("node%02d", len(self.Nodes)+1) + } + + // check the node doesn't already exist + if node := self.getNode(id); node != nil { + return nil, fmt.Errorf("node with ID %q already exists", id) + } + if node := self.getNodeByName(conf.Name); node != nil { + return nil, fmt.Errorf("node with name %q already exists", conf.Name) + } + + // if no services are configured, use the default service + if len(conf.Services) == 0 { + conf.Services = []string{self.DefaultService} + } + + // use the NodeAdapter to create the node + adapterNode, err := self.nodeAdapter.NewNode(conf) + if err != nil { + return nil, err + } + node := &Node{ + Node: adapterNode, + Config: conf, + } + log.Trace(fmt.Sprintf("node %v created", id)) + self.nodeMap[id] = len(self.Nodes) + self.Nodes = append(self.Nodes, node) + + // emit a "control" event + self.events.Send(ControlEvent(node)) + + return node, nil +} + +// Config returns the network configuration +func (self *Network) Config() *NetworkConfig { + return &self.NetworkConfig +} + +// StartAll starts all nodes in the network +func (self *Network) StartAll() error { + for _, node := range self.Nodes { + if node.Up { + continue + } + if err := self.Start(node.ID()); err != nil { + return err + } + } + return nil +} + +// StopAll stops all nodes in the network +func (self *Network) StopAll() error { + for _, node := range self.Nodes { + if !node.Up { + continue + } + if err := self.Stop(node.ID()); err != nil { + return err + } + } + return nil +} + +// Start starts the node with the given ID +func (self *Network) Start(id discover.NodeID) error { + return self.startWithSnapshots(id, nil) +} + +// startWithSnapshots starts the node with the given ID using the give +// snapshots +func (self *Network) startWithSnapshots(id discover.NodeID, snapshots map[string][]byte) error { + node := self.GetNode(id) + if node == nil { + return fmt.Errorf("node %v does not exist", id) + } + if node.Up { + return fmt.Errorf("node %v already up", id) + } + log.Trace(fmt.Sprintf("starting node %v: %v using %v", id, node.Up, self.nodeAdapter.Name())) + if err := node.Start(snapshots); err != nil { + log.Warn(fmt.Sprintf("start up failed: %v", err)) + return err + } + node.Up = true + log.Info(fmt.Sprintf("started node %v: %v", id, node.Up)) + + self.events.Send(NewEvent(node)) + + // subscribe to peer events + client, err := node.Client() + if err != nil { + return fmt.Errorf("error getting rpc client for node %v: %s", id, err) + } + events := make(chan *p2p.PeerEvent) + sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents") + if err != nil { + return fmt.Errorf("error getting peer events for node %v: %s", id, err) + } + go self.watchPeerEvents(id, events, sub) + return nil +} + +// watchPeerEvents reads peer events from the given channel and emits +// corresponding network events +func (self *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEvent, sub event.Subscription) { + defer func() { + sub.Unsubscribe() + + // assume the node is now down + self.lock.Lock() + node := self.getNode(id) + node.Up = false + self.lock.Unlock() + self.events.Send(NewEvent(node)) + }() + for { + select { + case event, ok := <-events: + if !ok { + return + } + peer := event.Peer + switch event.Type { + + case p2p.PeerEventTypeAdd: + self.DidConnect(id, peer) + + case p2p.PeerEventTypeDrop: + self.DidDisconnect(id, peer) + + case p2p.PeerEventTypeMsgSend: + self.DidSend(id, peer, event.Protocol, *event.MsgCode) + + case p2p.PeerEventTypeMsgRecv: + self.DidReceive(peer, id, event.Protocol, *event.MsgCode) + + } + + case err := <-sub.Err(): + if err != nil { + log.Error(fmt.Sprintf("error getting peer events for node %v", id), "err", err) + } + return + } + } +} + +// Stop stops the node with the given ID +func (self *Network) Stop(id discover.NodeID) error { + node := self.GetNode(id) + if node == nil { + return fmt.Errorf("node %v does not exist", id) + } + if !node.Up { + return fmt.Errorf("node %v already down", id) + } + if err := node.Stop(); err != nil { + return err + } + node.Up = false + log.Info(fmt.Sprintf("stop node %v: %v", id, node.Up)) + + self.events.Send(ControlEvent(node)) + return nil +} + +// Connect connects two nodes together by calling the "admin_addPeer" RPC +// method on the "one" node so that it connects to the "other" node +func (self *Network) Connect(oneID, otherID discover.NodeID) error { + log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID)) + conn, err := self.GetOrCreateConn(oneID, otherID) + if err != nil { + return err + } + if conn.Up { + return fmt.Errorf("%v and %v already connected", oneID, otherID) + } + if err := conn.nodesUp(); err != nil { + return err + } + client, err := conn.one.Client() + if err != nil { + return err + } + self.events.Send(ControlEvent(conn)) + return client.Call(nil, "admin_addPeer", string(conn.other.Addr())) +} + +// Disconnect disconnects two nodes by calling the "admin_removePeer" RPC +// method on the "one" node so that it disconnects from the "other" node +func (self *Network) Disconnect(oneID, otherID discover.NodeID) error { + conn := self.GetConn(oneID, otherID) + if conn == nil { + return fmt.Errorf("connection between %v and %v does not exist", oneID, otherID) + } + if !conn.Up { + return fmt.Errorf("%v and %v already disconnected", oneID, otherID) + } + client, err := conn.one.Client() + if err != nil { + return err + } + self.events.Send(ControlEvent(conn)) + return client.Call(nil, "admin_removePeer", string(conn.other.Addr())) +} + +// DidConnect tracks the fact that the "one" node connected to the "other" node +func (self *Network) DidConnect(one, other discover.NodeID) error { + conn, err := self.GetOrCreateConn(one, other) + if err != nil { + return fmt.Errorf("connection between %v and %v does not exist", one, other) + } + if conn.Up { + return fmt.Errorf("%v and %v already connected", one, other) + } + conn.Up = true + self.events.Send(NewEvent(conn)) + return nil +} + +// DidDisconnect tracks the fact that the "one" node disconnected from the +// "other" node +func (self *Network) DidDisconnect(one, other discover.NodeID) error { + conn, err := self.GetOrCreateConn(one, other) + if err != nil { + return fmt.Errorf("connection between %v and %v does not exist", one, other) + } + if !conn.Up { + return fmt.Errorf("%v and %v already disconnected", one, other) + } + conn.Up = false + self.events.Send(NewEvent(conn)) + return nil +} + +// DidSend tracks the fact that "sender" sent a message to "receiver" +func (self *Network) DidSend(sender, receiver discover.NodeID, proto string, code uint64) error { + msg := &Msg{ + One: sender, + Other: receiver, + Protocol: proto, + Code: code, + Received: false, + } + self.events.Send(NewEvent(msg)) + return nil +} + +// DidReceive tracks the fact that "receiver" received a message from "sender" +func (self *Network) DidReceive(sender, receiver discover.NodeID, proto string, code uint64) error { + msg := &Msg{ + One: sender, + Other: receiver, + Protocol: proto, + Code: code, + Received: true, + } + self.events.Send(NewEvent(msg)) + return nil +} + +// GetNode gets the node with the given ID, returning nil if the node does not +// exist +func (self *Network) GetNode(id discover.NodeID) *Node { + self.lock.Lock() + defer self.lock.Unlock() + return self.getNode(id) +} + +// GetNode gets the node with the given name, returning nil if the node does +// not exist +func (self *Network) GetNodeByName(name string) *Node { + self.lock.Lock() + defer self.lock.Unlock() + return self.getNodeByName(name) +} + +func (self *Network) getNode(id discover.NodeID) *Node { + i, found := self.nodeMap[id] + if !found { + return nil + } + return self.Nodes[i] +} + +func (self *Network) getNodeByName(name string) *Node { + for _, node := range self.Nodes { + if node.Config.Name == name { + return node + } + } + return nil +} + +// GetNodes returns the existing nodes +func (self *Network) GetNodes() []*Node { + self.lock.Lock() + defer self.lock.Unlock() + return self.Nodes +} + +// GetConn returns the connection which exists between "one" and "other" +// regardless of which node initiated the connection +func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn { + self.lock.Lock() + defer self.lock.Unlock() + return self.getConn(oneID, otherID) +} + +// GetOrCreateConn is like GetConn but creates the connection if it doesn't +// already exist +func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) { + self.lock.Lock() + defer self.lock.Unlock() + if conn := self.getConn(oneID, otherID); conn != nil { + return conn, nil + } + + one := self.getNode(oneID) + if one == nil { + return nil, fmt.Errorf("node %v does not exist", oneID) + } + other := self.getNode(otherID) + if other == nil { + return nil, fmt.Errorf("node %v does not exist", otherID) + } + conn := &Conn{ + One: oneID, + Other: otherID, + one: one, + other: other, + } + label := ConnLabel(oneID, otherID) + self.connMap[label] = len(self.Conns) + self.Conns = append(self.Conns, conn) + return conn, nil +} + +func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn { + label := ConnLabel(oneID, otherID) + i, found := self.connMap[label] + if !found { + return nil + } + return self.Conns[i] +} + +// Shutdown stops all nodes in the network and closes the quit channel +func (self *Network) Shutdown() { + for _, node := range self.Nodes { + log.Debug(fmt.Sprintf("stopping node %s", node.ID().TerminalString())) + if err := node.Stop(); err != nil { + log.Warn(fmt.Sprintf("error stopping node %s", node.ID().TerminalString()), "err", err) + } + } + close(self.quitc) +} + +// Node is a wrapper around adapters.Node which is used to track the status +// of a node in the network +type Node struct { + adapters.Node `json:"-"` + + // Config if the config used to created the node + Config *adapters.NodeConfig `json:"config"` + + // Up tracks whether or not the node is running + Up bool `json:"up"` +} + +// ID returns the ID of the node +func (self *Node) ID() discover.NodeID { + return self.Config.ID +} + +// String returns a log-friendly string +func (self *Node) String() string { + return fmt.Sprintf("Node %v", self.ID().TerminalString()) +} + +// NodeInfo returns information about the node +func (self *Node) NodeInfo() *p2p.NodeInfo { + // avoid a panic if the node is not started yet + if self.Node == nil { + return nil + } + info := self.Node.NodeInfo() + info.Name = self.Config.Name + return info +} + +// MarshalJSON implements the json.Marshaler interface so that the encoded +// JSON includes the NodeInfo +func (self *Node) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Info *p2p.NodeInfo `json:"info,omitempty"` + Config *adapters.NodeConfig `json:"config,omitempty"` + Up bool `json:"up"` + }{ + Info: self.NodeInfo(), + Config: self.Config, + Up: self.Up, + }) +} + +// Conn represents a connection between two nodes in the network +type Conn struct { + // One is the node which initiated the connection + One discover.NodeID `json:"one"` + + // Other is the node which the connection was made to + Other discover.NodeID `json:"other"` + + // Up tracks whether or not the connection is active + Up bool `json:"up"` + + one *Node + other *Node +} + +// nodesUp returns whether both nodes are currently up +func (self *Conn) nodesUp() error { + if !self.one.Up { + return fmt.Errorf("one %v is not up", self.One) + } + if !self.other.Up { + return fmt.Errorf("other %v is not up", self.Other) + } + return nil +} + +// String returns a log-friendly string +func (self *Conn) String() string { + return fmt.Sprintf("Conn %v->%v", self.One.TerminalString(), self.Other.TerminalString()) +} + +// Msg represents a p2p message sent between two nodes in the network +type Msg struct { + One discover.NodeID `json:"one"` + Other discover.NodeID `json:"other"` + Protocol string `json:"protocol"` + Code uint64 `json:"code"` + Received bool `json:"received"` +} + +// String returns a log-friendly string +func (self *Msg) String() string { + return fmt.Sprintf("Msg(%d) %v->%v", self.Code, self.One.TerminalString(), self.Other.TerminalString()) +} + +// ConnLabel generates a deterministic string which represents a connection +// between two nodes, used to compare if two connections are between the same +// nodes +func ConnLabel(source, target discover.NodeID) string { + var first, second discover.NodeID + if bytes.Compare(source.Bytes(), target.Bytes()) > 0 { + first = target + second = source + } else { + first = source + second = target + } + return fmt.Sprintf("%v-%v", first, second) +} + +// Snapshot represents the state of a network at a single point in time and can +// be used to restore the state of a network +type Snapshot struct { + Nodes []NodeSnapshot `json:"nodes,omitempty"` + Conns []Conn `json:"conns,omitempty"` +} + +// NodeSnapshot represents the state of a node in the network +type NodeSnapshot struct { + Node Node `json:"node,omitempty"` + + // Snapshots is arbitrary data gathered from calling node.Snapshots() + Snapshots map[string][]byte `json:"snapshots,omitempty"` +} + +// Snapshot creates a network snapshot +func (self *Network) Snapshot() (*Snapshot, error) { + self.lock.Lock() + defer self.lock.Unlock() + snap := &Snapshot{ + Nodes: make([]NodeSnapshot, len(self.Nodes)), + Conns: make([]Conn, len(self.Conns)), + } + for i, node := range self.Nodes { + snap.Nodes[i] = NodeSnapshot{Node: *node} + if !node.Up { + continue + } + snapshots, err := node.Snapshots() + if err != nil { + return nil, err + } + snap.Nodes[i].Snapshots = snapshots + } + for i, conn := range self.Conns { + snap.Conns[i] = *conn + } + return snap, nil +} + +// Load loads a network snapshot +func (self *Network) Load(snap *Snapshot) error { + for _, n := range snap.Nodes { + if _, err := self.NewNodeWithConfig(n.Node.Config); err != nil { + return err + } + if !n.Node.Up { + continue + } + if err := self.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil { + return err + } + } + for _, conn := range snap.Conns { + if err := self.Connect(conn.One, conn.Other); err != nil { + return err + } + } + return nil +} + +// Subscribe reads control events from a channel and executes them +func (self *Network) Subscribe(events chan *Event) { + for { + select { + case event, ok := <-events: + if !ok { + return + } + if event.Control { + self.executeControlEvent(event) + } + case <-self.quitc: + return + } + } +} + +func (self *Network) executeControlEvent(event *Event) { + log.Trace("execute control event", "type", event.Type, "event", event) + switch event.Type { + case EventTypeNode: + if err := self.executeNodeEvent(event); err != nil { + log.Error("error executing node event", "event", event, "err", err) + } + case EventTypeConn: + if err := self.executeConnEvent(event); err != nil { + log.Error("error executing conn event", "event", event, "err", err) + } + case EventTypeMsg: + log.Warn("ignoring control msg event") + } +} + +func (self *Network) executeNodeEvent(e *Event) error { + if !e.Node.Up { + return self.Stop(e.Node.ID()) + } + + if _, err := self.NewNodeWithConfig(e.Node.Config); err != nil { + return err + } + return self.Start(e.Node.ID()) +} + +func (self *Network) executeConnEvent(e *Event) error { + if e.Conn.Up { + return self.Connect(e.Conn.One, e.Conn.Other) + } else { + return self.Disconnect(e.Conn.One, e.Conn.Other) + } +} diff --git a/p2p/simulations/network_test.go b/p2p/simulations/network_test.go new file mode 100644 index 000000000..2a062121b --- /dev/null +++ b/p2p/simulations/network_test.go @@ -0,0 +1,159 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" +) + +// TestNetworkSimulation creates a multi-node simulation network with each node +// connected in a ring topology, checks that all nodes successfully handshake +// with each other and that a snapshot fully represents the desired topology +func TestNetworkSimulation(t *testing.T) { + // create simulation network with 20 testService nodes + adapter := adapters.NewSimAdapter(adapters.Services{ + "test": newTestService, + }) + network := NewNetwork(adapter, &NetworkConfig{ + DefaultService: "test", + }) + defer network.Shutdown() + nodeCount := 20 + ids := make([]discover.NodeID, nodeCount) + for i := 0; i < nodeCount; i++ { + node, err := network.NewNode() + if err != nil { + t.Fatalf("error creating node: %s", err) + } + if err := network.Start(node.ID()); err != nil { + t.Fatalf("error starting node: %s", err) + } + ids[i] = node.ID() + } + + // perform a check which connects the nodes in a ring (so each node is + // connected to exactly two peers) and then checks that all nodes + // performed two handshakes by checking their peerCount + action := func(_ context.Context) error { + for i, id := range ids { + peerID := ids[(i+1)%len(ids)] + if err := network.Connect(id, peerID); err != nil { + return err + } + } + return nil + } + check := func(ctx context.Context, id discover.NodeID) (bool, error) { + // check we haven't run out of time + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + + // get the node + node := network.GetNode(id) + if node == nil { + return false, fmt.Errorf("unknown node: %s", id) + } + + // check it has exactly two peers + client, err := node.Client() + if err != nil { + return false, err + } + var peerCount int64 + if err := client.CallContext(ctx, &peerCount, "test_peerCount"); err != nil { + return false, err + } + switch { + case peerCount < 2: + return false, nil + case peerCount == 2: + return true, nil + default: + return false, fmt.Errorf("unexpected peerCount: %d", peerCount) + } + } + + timeout := 30 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // trigger a check every 100ms + trigger := make(chan discover.NodeID) + go triggerChecks(ctx, ids, trigger, 100*time.Millisecond) + + result := NewSimulation(network).Run(ctx, &Step{ + Action: action, + Trigger: trigger, + Expect: &Expectation{ + Nodes: ids, + Check: check, + }, + }) + if result.Error != nil { + t.Fatalf("simulation failed: %s", result.Error) + } + + // take a network snapshot and check it contains the correct topology + snap, err := network.Snapshot() + if err != nil { + t.Fatal(err) + } + if len(snap.Nodes) != nodeCount { + t.Fatalf("expected snapshot to contain %d nodes, got %d", nodeCount, len(snap.Nodes)) + } + if len(snap.Conns) != nodeCount { + t.Fatalf("expected snapshot to contain %d connections, got %d", nodeCount, len(snap.Conns)) + } + for i, id := range ids { + conn := snap.Conns[i] + if conn.One != id { + t.Fatalf("expected conn[%d].One to be %s, got %s", i, id, conn.One) + } + peerID := ids[(i+1)%len(ids)] + if conn.Other != peerID { + t.Fatalf("expected conn[%d].Other to be %s, got %s", i, peerID, conn.Other) + } + } +} + +func triggerChecks(ctx context.Context, ids []discover.NodeID, trigger chan discover.NodeID, interval time.Duration) { + tick := time.NewTicker(interval) + defer tick.Stop() + for { + select { + case <-tick.C: + for _, id := range ids { + select { + case trigger <- id: + case <-ctx.Done(): + return + } + } + case <-ctx.Done(): + return + } + } +} diff --git a/p2p/simulations/simulation.go b/p2p/simulations/simulation.go new file mode 100644 index 000000000..28886e924 --- /dev/null +++ b/p2p/simulations/simulation.go @@ -0,0 +1,157 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/p2p/discover" +) + +// Simulation provides a framework for running actions in a simulated network +// and then waiting for expectations to be met +type Simulation struct { + network *Network +} + +// NewSimulation returns a new simulation which runs in the given network +func NewSimulation(network *Network) *Simulation { + return &Simulation{ + network: network, + } +} + +// Run performs a step of the simulation by performing the step's action and +// then waiting for the step's expectation to be met +func (s *Simulation) Run(ctx context.Context, step *Step) (result *StepResult) { + result = newStepResult() + + result.StartedAt = time.Now() + defer func() { result.FinishedAt = time.Now() }() + + // watch network events for the duration of the step + stop := s.watchNetwork(result) + defer stop() + + // perform the action + if err := step.Action(ctx); err != nil { + result.Error = err + return + } + + // wait for all node expectations to either pass, error or timeout + nodes := make(map[discover.NodeID]struct{}, len(step.Expect.Nodes)) + for _, id := range step.Expect.Nodes { + nodes[id] = struct{}{} + } + for len(result.Passes) < len(nodes) { + select { + case id := <-step.Trigger: + // skip if we aren't checking the node + if _, ok := nodes[id]; !ok { + continue + } + + // skip if the node has already passed + if _, ok := result.Passes[id]; ok { + continue + } + + // run the node expectation check + pass, err := step.Expect.Check(ctx, id) + if err != nil { + result.Error = err + return + } + if pass { + result.Passes[id] = time.Now() + } + case <-ctx.Done(): + result.Error = ctx.Err() + return + } + } + + return +} + +func (s *Simulation) watchNetwork(result *StepResult) func() { + stop := make(chan struct{}) + done := make(chan struct{}) + events := make(chan *Event) + sub := s.network.Events().Subscribe(events) + go func() { + defer close(done) + defer sub.Unsubscribe() + for { + select { + case event := <-events: + result.NetworkEvents = append(result.NetworkEvents, event) + case <-stop: + return + } + } + }() + return func() { + close(stop) + <-done + } +} + +type Step struct { + // Action is the action to perform for this step + Action func(context.Context) error + + // Trigger is a channel which receives node ids and triggers an + // expectation check for that node + Trigger chan discover.NodeID + + // Expect is the expectation to wait for when performing this step + Expect *Expectation +} + +type Expectation struct { + // Nodes is a list of nodes to check + Nodes []discover.NodeID + + // Check checks whether a given node meets the expectation + Check func(context.Context, discover.NodeID) (bool, error) +} + +func newStepResult() *StepResult { + return &StepResult{ + Passes: make(map[discover.NodeID]time.Time), + } +} + +type StepResult struct { + // Error is the error encountered whilst running the step + Error error + + // StartedAt is the time the step started + StartedAt time.Time + + // FinishedAt is the time the step finished + FinishedAt time.Time + + // Passes are the timestamps of the successful node expectations + Passes map[discover.NodeID]time.Time + + // NetworkEvents are the network events which occurred during the step + NetworkEvents []*Event +} |