diff options
author | Lewis Marshall <lewis@lmars.net> | 2017-09-25 16:08:07 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-09-25 16:08:07 +0800 |
commit | 9feec51e2dd754819e5c730ac5985d28d57adb48 (patch) | |
tree | 32b07b659cf7d0b4c1a7da67b5c49daf7a10a9d3 /p2p | |
parent | 673007d7aed1d2678ea3277eceb7b55dc29cf092 (diff) | |
download | go-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.gz go-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.zst go-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.zip |
p2p: add network simulation framework (#14982)
This commit introduces a network simulation framework which
can be used to run simulated networks of devp2p nodes. The
intention is to use this for testing protocols, performing
benchmarks and visualising emergent network behaviour.
Diffstat (limited to 'p2p')
-rw-r--r-- | p2p/dial.go | 23 | ||||
-rw-r--r-- | p2p/dial_test.go | 2 | ||||
-rw-r--r-- | p2p/discover/node.go | 40 | ||||
-rw-r--r-- | p2p/discover/node_test.go | 30 | ||||
-rw-r--r-- | p2p/message.go | 66 | ||||
-rw-r--r-- | p2p/peer.go | 42 | ||||
-rw-r--r-- | p2p/server.go | 56 | ||||
-rw-r--r-- | p2p/server_test.go | 2 | ||||
-rw-r--r-- | p2p/simulations/README.md | 181 | ||||
-rw-r--r-- | p2p/simulations/adapters/docker.go | 182 | ||||
-rw-r--r-- | p2p/simulations/adapters/exec.go | 504 | ||||
-rw-r--r-- | p2p/simulations/adapters/inproc.go | 314 | ||||
-rw-r--r-- | p2p/simulations/adapters/types.go | 215 | ||||
-rw-r--r-- | p2p/simulations/events.go | 108 | ||||
-rw-r--r-- | p2p/simulations/examples/README.md | 39 | ||||
-rw-r--r-- | p2p/simulations/examples/ping-pong.go | 184 | ||||
-rwxr-xr-x | p2p/simulations/examples/ping-pong.sh | 40 | ||||
-rw-r--r-- | p2p/simulations/http.go | 680 | ||||
-rw-r--r-- | p2p/simulations/http_test.go | 823 | ||||
-rw-r--r-- | p2p/simulations/network.go | 680 | ||||
-rw-r--r-- | p2p/simulations/network_test.go | 159 | ||||
-rw-r--r-- | p2p/simulations/simulation.go | 157 |
22 files changed, 4513 insertions, 14 deletions
diff --git a/p2p/dial.go b/p2p/dial.go index b77971396..2d9e3a0ed 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -47,6 +47,24 @@ const ( maxResolveDelay = time.Hour ) +// NodeDialer is used to connect to nodes in the network, typically by using +// an underlying net.Dialer but also using net.Pipe in tests +type NodeDialer interface { + Dial(*discover.Node) (net.Conn, error) +} + +// TCPDialer implements the NodeDialer interface by using a net.Dialer to +// create TCP connections to nodes in the network +type TCPDialer struct { + *net.Dialer +} + +// Dial creates a TCP connection to the node +func (t TCPDialer) Dial(dest *discover.Node) (net.Conn, error) { + addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)} + return t.Dialer.Dial("tcp", addr.String()) +} + // dialstate schedules dials and discovery lookups. // it get's a chance to compute new tasks on every iteration // of the main loop in Server.run. @@ -318,14 +336,13 @@ func (t *dialTask) resolve(srv *Server) bool { // dial performs the actual connection attempt. func (t *dialTask) dial(srv *Server, dest *discover.Node) bool { - addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)} - fd, err := srv.Dialer.Dial("tcp", addr.String()) + fd, err := srv.Dialer.Dial(dest) if err != nil { log.Trace("Dial error", "task", t, "err", err) return false } mfd := newMeteredConn(fd, false) - srv.setupConn(mfd, t.flags, dest) + srv.SetupConn(mfd, t.flags, dest) return true } diff --git a/p2p/dial_test.go b/p2p/dial_test.go index 08e863bae..ad18ef9ab 100644 --- a/p2p/dial_test.go +++ b/p2p/dial_test.go @@ -597,7 +597,7 @@ func TestDialResolve(t *testing.T) { } // Now run the task, it should resolve the ID once. - config := Config{Dialer: &net.Dialer{Deadline: time.Now().Add(-5 * time.Minute)}} + config := Config{Dialer: TCPDialer{&net.Dialer{Deadline: time.Now().Add(-5 * time.Minute)}}} srv := &Server{ntab: table, Config: config} tasks[0].Do(srv) if !reflect.DeepEqual(table.resolveCalls, []discover.NodeID{dest.ID}) { diff --git a/p2p/discover/node.go b/p2p/discover/node.go index d9cbd9448..fc928a91a 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -225,6 +225,11 @@ func (n *Node) UnmarshalText(text []byte) error { // The node identifier is a marshaled elliptic curve public key. type NodeID [NodeIDBits / 8]byte +// Bytes returns a byte slice representation of the NodeID +func (n NodeID) Bytes() []byte { + return n[:] +} + // NodeID prints as a long hexadecimal number. func (n NodeID) String() string { return fmt.Sprintf("%x", n[:]) @@ -240,6 +245,41 @@ func (n NodeID) TerminalString() string { return hex.EncodeToString(n[:8]) } +// MarshalText implements the encoding.TextMarshaler interface. +func (n NodeID) MarshalText() ([]byte, error) { + return []byte(hex.EncodeToString(n[:])), nil +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +func (n *NodeID) UnmarshalText(text []byte) error { + id, err := HexID(string(text)) + if err != nil { + return err + } + *n = id + return nil +} + +// BytesID converts a byte slice to a NodeID +func BytesID(b []byte) (NodeID, error) { + var id NodeID + if len(b) != len(id) { + return id, fmt.Errorf("wrong length, want %d bytes", len(id)) + } + copy(id[:], b) + return id, nil +} + +// MustBytesID converts a byte slice to a NodeID. +// It panics if the byte slice is not a valid NodeID. +func MustBytesID(b []byte) NodeID { + id, err := BytesID(b) + if err != nil { + panic(err) + } + return id +} + // HexID converts a hex string to a NodeID. // The string may be prefixed with 0x. func HexID(in string) (NodeID, error) { diff --git a/p2p/discover/node_test.go b/p2p/discover/node_test.go index 3d1662d0b..ed8db4dc6 100644 --- a/p2p/discover/node_test.go +++ b/p2p/discover/node_test.go @@ -17,6 +17,7 @@ package discover import ( + "bytes" "fmt" "math/big" "math/rand" @@ -192,6 +193,35 @@ func TestHexID(t *testing.T) { } } +func TestNodeID_textEncoding(t *testing.T) { + ref := NodeID{ + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x10, + 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x20, + 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x30, + 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x40, + 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, 0x50, + 0x51, 0x52, 0x53, 0x54, 0x55, 0x56, 0x57, 0x58, 0x59, 0x60, + 0x61, 0x62, 0x63, 0x64, + } + hex := "01020304050607080910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364" + + text, err := ref.MarshalText() + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(text, []byte(hex)) { + t.Fatalf("text encoding did not match\nexpected: %s\ngot: %s", hex, text) + } + + id := new(NodeID) + if err := id.UnmarshalText(text); err != nil { + t.Fatal(err) + } + if *id != ref { + t.Fatalf("text decoding did not match\nexpected: %s\ngot: %s", ref, id) + } +} + func TestNodeID_recover(t *testing.T) { prv := newkey() hash := make([]byte, 32) diff --git a/p2p/message.go b/p2p/message.go index 1292d2121..5690494bf 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -27,6 +27,8 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/rlp" ) @@ -271,3 +273,67 @@ func ExpectMsg(r MsgReader, code uint64, content interface{}) error { } return nil } + +// msgEventer wraps a MsgReadWriter and sends events whenever a message is sent +// or received +type msgEventer struct { + MsgReadWriter + + feed *event.Feed + peerID discover.NodeID + Protocol string +} + +// newMsgEventer returns a msgEventer which sends message events to the given +// feed +func newMsgEventer(rw MsgReadWriter, feed *event.Feed, peerID discover.NodeID, proto string) *msgEventer { + return &msgEventer{ + MsgReadWriter: rw, + feed: feed, + peerID: peerID, + Protocol: proto, + } +} + +// ReadMsg reads a message from the underlying MsgReadWriter and emits a +// "message received" event +func (self *msgEventer) ReadMsg() (Msg, error) { + msg, err := self.MsgReadWriter.ReadMsg() + if err != nil { + return msg, err + } + self.feed.Send(&PeerEvent{ + Type: PeerEventTypeMsgRecv, + Peer: self.peerID, + Protocol: self.Protocol, + MsgCode: &msg.Code, + MsgSize: &msg.Size, + }) + return msg, nil +} + +// WriteMsg writes a message to the underlying MsgReadWriter and emits a +// "message sent" event +func (self *msgEventer) WriteMsg(msg Msg) error { + err := self.MsgReadWriter.WriteMsg(msg) + if err != nil { + return err + } + self.feed.Send(&PeerEvent{ + Type: PeerEventTypeMsgSend, + Peer: self.peerID, + Protocol: self.Protocol, + MsgCode: &msg.Code, + MsgSize: &msg.Size, + }) + return nil +} + +// Close closes the underlying MsgReadWriter if it implements the io.Closer +// interface +func (self *msgEventer) Close() error { + if v, ok := self.MsgReadWriter.(io.Closer); ok { + return v.Close() + } + return nil +} diff --git a/p2p/peer.go b/p2p/peer.go index fb4b39e95..ebf7490c6 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -25,6 +25,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/rlp" @@ -60,6 +61,38 @@ type protoHandshake struct { Rest []rlp.RawValue `rlp:"tail"` } +// PeerEventType is the type of peer events emitted by a p2p.Server +type PeerEventType string + +const ( + // PeerEventTypeAdd is the type of event emitted when a peer is added + // to a p2p.Server + PeerEventTypeAdd PeerEventType = "add" + + // PeerEventTypeDrop is the type of event emitted when a peer is + // dropped from a p2p.Server + PeerEventTypeDrop PeerEventType = "drop" + + // PeerEventTypeMsgSend is the type of event emitted when a + // message is successfully sent to a peer + PeerEventTypeMsgSend PeerEventType = "msgsend" + + // PeerEventTypeMsgRecv is the type of event emitted when a + // message is received from a peer + PeerEventTypeMsgRecv PeerEventType = "msgrecv" +) + +// PeerEvent is an event emitted when peers are either added or dropped from +// a p2p.Server or when a message is sent or received on a peer connection +type PeerEvent struct { + Type PeerEventType `json:"type"` + Peer discover.NodeID `json:"peer"` + Error string `json:"error,omitempty"` + Protocol string `json:"protocol,omitempty"` + MsgCode *uint64 `json:"msg_code,omitempty"` + MsgSize *uint32 `json:"msg_size,omitempty"` +} + // Peer represents a connected remote node. type Peer struct { rw *conn @@ -71,6 +104,9 @@ type Peer struct { protoErr chan error closed chan struct{} disc chan DiscReason + + // events receives message send / receive events if set + events *event.Feed } // NewPeer returns a peer for testing purposes. @@ -297,9 +333,13 @@ func (p *Peer) startProtocols(writeStart <-chan struct{}, writeErr chan<- error) proto.closed = p.closed proto.wstart = writeStart proto.werr = writeErr + var rw MsgReadWriter = proto + if p.events != nil { + rw = newMsgEventer(rw, p.events, p.ID(), proto.Name) + } p.log.Trace(fmt.Sprintf("Starting protocol %s/%d", proto.Name, proto.Version)) go func() { - err := proto.Run(p, proto) + err := proto.Run(p, rw) if err == nil { p.log.Trace(fmt.Sprintf("Protocol %s/%d returned", proto.Name, proto.Version)) err = errProtocolReturned diff --git a/p2p/server.go b/p2p/server.go index d7909d53a..d1d578401 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" @@ -130,10 +131,14 @@ type Config struct { // If Dialer is set to a non-nil value, the given Dialer // is used to dial outbound peer connections. - Dialer *net.Dialer `toml:"-"` + Dialer NodeDialer `toml:"-"` // If NoDial is true, the server will not dial any peers. NoDial bool `toml:",omitempty"` + + // If EnableMsgEvents is set then the server will emit PeerEvents + // whenever a message is sent to or received from a peer + EnableMsgEvents bool } // Server manages all peer connections. @@ -166,6 +171,7 @@ type Server struct { addpeer chan *conn delpeer chan peerDrop loopWG sync.WaitGroup // loop, listenLoop + peerFeed event.Feed } type peerOpFunc func(map[discover.NodeID]*Peer) @@ -191,7 +197,7 @@ type conn struct { fd net.Conn transport flags connFlag - cont chan error // The run loop uses cont to signal errors to setupConn. + cont chan error // The run loop uses cont to signal errors to SetupConn. id discover.NodeID // valid after the encryption handshake caps []Cap // valid after the protocol handshake name string // valid after the protocol handshake @@ -291,6 +297,11 @@ func (srv *Server) RemovePeer(node *discover.Node) { } } +// SubscribePeers subscribes the given channel to peer events +func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription { + return srv.peerFeed.Subscribe(ch) +} + // Self returns the local node's endpoint information. func (srv *Server) Self() *discover.Node { srv.lock.Lock() @@ -358,7 +369,7 @@ func (srv *Server) Start() (err error) { srv.newTransport = newRLPX } if srv.Dialer == nil { - srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout} + srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}} } srv.quit = make(chan struct{}) srv.addpeer = make(chan *conn) @@ -536,7 +547,11 @@ running: c.flags |= trustedConn } // TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them. - c.cont <- srv.encHandshakeChecks(peers, c) + select { + case c.cont <- srv.encHandshakeChecks(peers, c): + case <-srv.quit: + break running + } case c := <-srv.addpeer: // At this point the connection is past the protocol handshake. // Its capabilities are known and the remote identity is verified. @@ -544,6 +559,11 @@ running: if err == nil { // The handshakes are done and it passed all checks. p := newPeer(c, srv.Protocols) + // If message events are enabled, pass the peerFeed + // to the peer + if srv.EnableMsgEvents { + p.events = &srv.peerFeed + } name := truncateName(c.name) log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1) peers[c.id] = p @@ -552,7 +572,11 @@ running: // The dialer logic relies on the assumption that // dial tasks complete after the peer has been added or // discarded. Unblock the task last. - c.cont <- err + select { + case c.cont <- err: + case <-srv.quit: + break running + } case pd := <-srv.delpeer: // A peer disconnected. d := common.PrettyDuration(mclock.Now() - pd.created) @@ -665,16 +689,16 @@ func (srv *Server) listenLoop() { // Spawn the handler. It will give the slot back when the connection // has been established. go func() { - srv.setupConn(fd, inboundConn, nil) + srv.SetupConn(fd, inboundConn, nil) slots <- struct{}{} }() } } -// setupConn runs the handshakes and attempts to add the connection +// SetupConn runs the handshakes and attempts to add the connection // as a peer. It returns when the connection has been added as a peer // or the handshakes have failed. -func (srv *Server) setupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) { +func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) { // Prevent leftover pending conns from entering the handshake. srv.lock.Lock() running := srv.running @@ -755,7 +779,23 @@ func (srv *Server) runPeer(p *Peer) { if srv.newPeerHook != nil { srv.newPeerHook(p) } + + // broadcast peer add + srv.peerFeed.Send(&PeerEvent{ + Type: PeerEventTypeAdd, + Peer: p.ID(), + }) + + // run the protocol remoteRequested, err := p.run() + + // broadcast peer drop + srv.peerFeed.Send(&PeerEvent{ + Type: PeerEventTypeDrop, + Peer: p.ID(), + Error: err.Error(), + }) + // Note: run waits for existing peers to be sent on srv.delpeer // before returning, so this send should not select on srv.quit. srv.delpeer <- peerDrop{p, err, remoteRequested} diff --git a/p2p/server_test.go b/p2p/server_test.go index 971faf002..11dd83e5d 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -435,7 +435,7 @@ func TestServerSetupConn(t *testing.T) { } } p1, _ := net.Pipe() - srv.setupConn(p1, test.flags, test.dialDest) + srv.SetupConn(p1, test.flags, test.dialDest) if !reflect.DeepEqual(test.tt.closeErr, test.wantCloseErr) { t.Errorf("test %d: close error mismatch: got %q, want %q", i, test.tt.closeErr, test.wantCloseErr) } diff --git a/p2p/simulations/README.md b/p2p/simulations/README.md new file mode 100644 index 000000000..d1f8649ea --- /dev/null +++ b/p2p/simulations/README.md @@ -0,0 +1,181 @@ +# devp2p Simulations + +The `p2p/simulations` package implements a simulation framework which supports +creating a collection of devp2p nodes, connecting them together to form a +simulation network, performing simulation actions in that network and then +extracting useful information. + +## Nodes + +Each node in a simulation network runs multiple services by wrapping a collection +of objects which implement the `node.Service` interface meaning they: + +* can be started and stopped +* run p2p protocols +* expose RPC APIs + +This means that any object which implements the `node.Service` interface can be +used to run a node in the simulation. + +## Services + +Before running a simulation, a set of service initializers must be registered +which can then be used to run nodes in the network. + +A service initializer is a function with the following signature: + +```go +func(ctx *adapters.ServiceContext) (node.Service, error) +``` + +These initializers should be registered by calling the `adapters.RegisterServices` +function in an `init()` hook: + +```go +func init() { + adapters.RegisterServices(adapters.Services{ + "service1": initService1, + "service2": initService2, + }) +} +``` + +## Node Adapters + +The simulation framework includes multiple "node adapters" which are +responsible for creating an environment in which a node runs. + +### SimAdapter + +The `SimAdapter` runs nodes in-memory, connecting them using an in-memory, +synchronous `net.Pipe` and connecting to their RPC server using an in-memory +`rpc.Client`. + +### ExecAdapter + +The `ExecAdapter` runs nodes as child processes of the running simulation. + +It does this by executing the binary which is running the simulation but +setting `argv[0]` (i.e. the program name) to `p2p-node` which is then +detected by an init hook in the child process which runs the `node.Service` +using the devp2p node stack rather than executing `main()`. + +The nodes listen for devp2p connections and WebSocket RPC clients on random +localhost ports. + +### DockerAdapter + +The `DockerAdapter` is similar to the `ExecAdapter` but executes `docker run` +to run the node in a Docker container using a Docker image containing the +simulation binary at `/bin/p2p-node`. + +The Docker image is built using `docker build` when the adapter is initialised, +meaning no prior setup is necessary other than having a working Docker client. + +Each node listens on the external IP of the container and the default p2p and +RPC ports (`30303` and `8546` respectively). + +## Network + +A simulation network is created with an ID and default service (which is used +if a node is created without an explicit service), exposes methods for +creating, starting, stopping, connecting and disconnecting nodes, and emits +events when certain actions occur. + +### Events + +A simulation network emits the following events: + +* node event - when nodes are created / started / stopped +* connection event - when nodes are connected / disconnected +* message event - when a protocol message is sent between two nodes + +The events have a "control" flag which when set indicates that the event is the +outcome of a controlled simulation action (e.g. creating a node or explicitly +connecting two nodes together). + +This is in contrast to a non-control event, otherwise called a "live" event, +which is the outcome of something happening in the network as a result of a +control event (e.g. a node actually started up or a connection was actually +established between two nodes). + +Live events are detected by the simulation network by subscribing to node peer +events via RPC when the nodes start up. + +## Testing Framework + +The `Simulation` type can be used in tests to perform actions in a simulation +network and then wait for expectations to be met. + +With a running simulation network, the `Simulation.Run` method can be called +with a `Step` which has the following fields: + +* `Action` - a function which performs some action in the network + +* `Expect` - an expectation function which returns whether or not a + given node meets the expectation + +* `Trigger` - a channel which receives node IDs which then trigger a check + of the expectation function to be performed against that node + +As a concrete example, consider a simulated network of Ethereum nodes. An +`Action` could be the sending of a transaction, `Expect` it being included in +a block, and `Trigger` a check for every block that is mined. + +On return, the `Simulation.Run` method returns a `StepResult` which can be used +to determine if all nodes met the expectation, how long it took them to meet +the expectation and what network events were emitted during the step run. + +## HTTP API + +The simulation framework includes a HTTP API which can be used to control the +simulation. + +The API is initialised with a particular node adapter and has the following +endpoints: + +``` +GET / Get network information +POST /start Start all nodes in the network +POST /stop Stop all nodes in the network +GET /events Stream network events +GET /snapshot Take a network snapshot +POST /snapshot Load a network snapshot +POST /nodes Create a node +GET /nodes Get all nodes in the network +GET /nodes/:nodeid Get node information +POST /nodes/:nodeid/start Start a node +POST /nodes/:nodeid/stop Stop a node +POST /nodes/:nodeid/conn/:peerid Connect two nodes +DELETE /nodes/:nodeid/conn/:peerid Disconnect two nodes +GET /nodes/:nodeid/rpc Make RPC requests to a node via WebSocket +``` + +For convenience, `nodeid` in the URL can be the name of a node rather than its +ID. + +## Command line client + +`p2psim` is a command line client for the HTTP API, located in +`cmd/p2psim`. + +It provides the following commands: + +``` +p2psim show +p2psim events [--current] [--filter=FILTER] +p2psim snapshot +p2psim load +p2psim node create [--name=NAME] [--services=SERVICES] [--key=KEY] +p2psim node list +p2psim node show <node> +p2psim node start <node> +p2psim node stop <node> +p2psim node connect <node> <peer> +p2psim node disconnect <node> <peer> +p2psim node rpc <node> <method> [<args>] [--subscribe] +``` + +## Example + +See [p2p/simulations/examples/README.md](examples/README.md). diff --git a/p2p/simulations/adapters/docker.go b/p2p/simulations/adapters/docker.go new file mode 100644 index 000000000..022314b3d --- /dev/null +++ b/p2p/simulations/adapters/docker.go @@ -0,0 +1,182 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package adapters + +import ( + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +// DockerAdapter is a NodeAdapter which runs simulation nodes inside Docker +// containers. +// +// A Docker image is built which contains the current binary at /bin/p2p-node +// which when executed runs the underlying service (see the description +// of the execP2PNode function for more details) +type DockerAdapter struct { + ExecAdapter +} + +// NewDockerAdapter builds the p2p-node Docker image containing the current +// binary and returns a DockerAdapter +func NewDockerAdapter() (*DockerAdapter, error) { + // Since Docker containers run on Linux and this adapter runs the + // current binary in the container, it must be compiled for Linux. + // + // It is reasonable to require this because the caller can just + // compile the current binary in a Docker container. + if runtime.GOOS != "linux" { + return nil, errors.New("DockerAdapter can only be used on Linux as it uses the current binary (which must be a Linux binary)") + } + + if err := buildDockerImage(); err != nil { + return nil, err + } + + return &DockerAdapter{ + ExecAdapter{ + nodes: make(map[discover.NodeID]*ExecNode), + }, + }, nil +} + +// Name returns the name of the adapter for logging purposes +func (d *DockerAdapter) Name() string { + return "docker-adapter" +} + +// NewNode returns a new DockerNode using the given config +func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) { + if len(config.Services) == 0 { + return nil, errors.New("node must have at least one service") + } + for _, service := range config.Services { + if _, exists := serviceFuncs[service]; !exists { + return nil, fmt.Errorf("unknown node service %q", service) + } + } + + // generate the config + conf := &execNodeConfig{ + Stack: node.DefaultConfig, + Node: config, + } + conf.Stack.DataDir = "/data" + conf.Stack.WSHost = "0.0.0.0" + conf.Stack.WSOrigins = []string{"*"} + conf.Stack.WSExposeAll = true + conf.Stack.P2P.EnableMsgEvents = false + conf.Stack.P2P.NoDiscovery = true + conf.Stack.P2P.NAT = nil + conf.Stack.NoUSB = true + + node := &DockerNode{ + ExecNode: ExecNode{ + ID: config.ID, + Config: conf, + adapter: &d.ExecAdapter, + }, + } + node.newCmd = node.dockerCommand + d.ExecAdapter.nodes[node.ID] = &node.ExecNode + return node, nil +} + +// DockerNode wraps an ExecNode but exec's the current binary in a docker +// container rather than locally +type DockerNode struct { + ExecNode +} + +// dockerCommand returns a command which exec's the binary in a Docker +// container. +// +// It uses a shell so that we can pass the _P2P_NODE_CONFIG environment +// variable to the container using the --env flag. +func (n *DockerNode) dockerCommand() *exec.Cmd { + return exec.Command( + "sh", "-c", + fmt.Sprintf( + `exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`, + dockerImage, strings.Join(n.Config.Node.Services, ","), n.ID.String(), + ), + ) +} + +// dockerImage is the name of the Docker image which gets built to run the +// simulation node +const dockerImage = "p2p-node" + +// buildDockerImage builds the Docker image which is used to run the simulation +// node in a Docker container. +// +// It adds the current binary as "p2p-node" so that it runs execP2PNode +// when executed. +func buildDockerImage() error { + // create a directory to use as the build context + dir, err := ioutil.TempDir("", "p2p-docker") + if err != nil { + return err + } + defer os.RemoveAll(dir) + + // copy the current binary into the build context + bin, err := os.Open(reexec.Self()) + if err != nil { + return err + } + defer bin.Close() + dst, err := os.OpenFile(filepath.Join(dir, "self.bin"), os.O_WRONLY|os.O_CREATE, 0755) + if err != nil { + return err + } + defer dst.Close() + if _, err := io.Copy(dst, bin); err != nil { + return err + } + + // create the Dockerfile + dockerfile := []byte(` +FROM ubuntu:16.04 +RUN mkdir /data +ADD self.bin /bin/p2p-node + `) + if err := ioutil.WriteFile(filepath.Join(dir, "Dockerfile"), dockerfile, 0644); err != nil { + return err + } + + // run 'docker build' + cmd := exec.Command("docker", "build", "-t", dockerImage, dir) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("error building docker image: %s", err) + } + + return nil +} diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go new file mode 100644 index 000000000..bdb92cc1d --- /dev/null +++ b/p2p/simulations/adapters/exec.go @@ -0,0 +1,504 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package adapters + +import ( + "bufio" + "context" + "crypto/ecdsa" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "os/exec" + "os/signal" + "path/filepath" + "regexp" + "strings" + "sync" + "syscall" + "time" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/net/websocket" +) + +// ExecAdapter is a NodeAdapter which runs simulation nodes by executing the +// current binary as a child process. +// +// An init hook is used so that the child process executes the node services +// (rather than whataver the main() function would normally do), see the +// execP2PNode function for more information. +type ExecAdapter struct { + // BaseDir is the directory under which the data directories for each + // simulation node are created. + BaseDir string + + nodes map[discover.NodeID]*ExecNode +} + +// NewExecAdapter returns an ExecAdapter which stores node data in +// subdirectories of the given base directory +func NewExecAdapter(baseDir string) *ExecAdapter { + return &ExecAdapter{ + BaseDir: baseDir, + nodes: make(map[discover.NodeID]*ExecNode), + } +} + +// Name returns the name of the adapter for logging purposes +func (e *ExecAdapter) Name() string { + return "exec-adapter" +} + +// NewNode returns a new ExecNode using the given config +func (e *ExecAdapter) NewNode(config *NodeConfig) (Node, error) { + if len(config.Services) == 0 { + return nil, errors.New("node must have at least one service") + } + for _, service := range config.Services { + if _, exists := serviceFuncs[service]; !exists { + return nil, fmt.Errorf("unknown node service %q", service) + } + } + + // create the node directory using the first 12 characters of the ID + // as Unix socket paths cannot be longer than 256 characters + dir := filepath.Join(e.BaseDir, config.ID.String()[:12]) + if err := os.Mkdir(dir, 0755); err != nil { + return nil, fmt.Errorf("error creating node directory: %s", err) + } + + // generate the config + conf := &execNodeConfig{ + Stack: node.DefaultConfig, + Node: config, + } + conf.Stack.DataDir = filepath.Join(dir, "data") + conf.Stack.WSHost = "127.0.0.1" + conf.Stack.WSPort = 0 + conf.Stack.WSOrigins = []string{"*"} + conf.Stack.WSExposeAll = true + conf.Stack.P2P.EnableMsgEvents = false + conf.Stack.P2P.NoDiscovery = true + conf.Stack.P2P.NAT = nil + conf.Stack.NoUSB = true + + // listen on a random localhost port (we'll get the actual port after + // starting the node through the RPC admin.nodeInfo method) + conf.Stack.P2P.ListenAddr = "127.0.0.1:0" + + node := &ExecNode{ + ID: config.ID, + Dir: dir, + Config: conf, + adapter: e, + } + node.newCmd = node.execCommand + e.nodes[node.ID] = node + return node, nil +} + +// ExecNode starts a simulation node by exec'ing the current binary and +// running the configured services +type ExecNode struct { + ID discover.NodeID + Dir string + Config *execNodeConfig + Cmd *exec.Cmd + Info *p2p.NodeInfo + + adapter *ExecAdapter + client *rpc.Client + wsAddr string + newCmd func() *exec.Cmd + key *ecdsa.PrivateKey +} + +// Addr returns the node's enode URL +func (n *ExecNode) Addr() []byte { + if n.Info == nil { + return nil + } + return []byte(n.Info.Enode) +} + +// Client returns an rpc.Client which can be used to communicate with the +// underlying services (it is set once the node has started) +func (n *ExecNode) Client() (*rpc.Client, error) { + return n.client, nil +} + +// wsAddrPattern is a regex used to read the WebSocket address from the node's +// log +var wsAddrPattern = regexp.MustCompile(`ws://[\d.:]+`) + +// Start exec's the node passing the ID and service as command line arguments +// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment +// variable +func (n *ExecNode) Start(snapshots map[string][]byte) (err error) { + if n.Cmd != nil { + return errors.New("already started") + } + defer func() { + if err != nil { + log.Error("node failed to start", "err", err) + n.Stop() + } + }() + + // encode a copy of the config containing the snapshot + confCopy := *n.Config + confCopy.Snapshots = snapshots + confCopy.PeerAddrs = make(map[string]string) + for id, node := range n.adapter.nodes { + confCopy.PeerAddrs[id.String()] = node.wsAddr + } + confData, err := json.Marshal(confCopy) + if err != nil { + return fmt.Errorf("error generating node config: %s", err) + } + + // use a pipe for stderr so we can both copy the node's stderr to + // os.Stderr and read the WebSocket address from the logs + stderrR, stderrW := io.Pipe() + stderr := io.MultiWriter(os.Stderr, stderrW) + + // start the node + cmd := n.newCmd() + cmd.Stdout = os.Stdout + cmd.Stderr = stderr + cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", confData)) + if err := cmd.Start(); err != nil { + return fmt.Errorf("error starting node: %s", err) + } + n.Cmd = cmd + + // read the WebSocket address from the stderr logs + var wsAddr string + wsAddrC := make(chan string) + go func() { + s := bufio.NewScanner(stderrR) + for s.Scan() { + if strings.Contains(s.Text(), "WebSocket endpoint opened:") { + wsAddrC <- wsAddrPattern.FindString(s.Text()) + } + } + }() + select { + case wsAddr = <-wsAddrC: + if wsAddr == "" { + return errors.New("failed to read WebSocket address from stderr") + } + case <-time.After(10 * time.Second): + return errors.New("timed out waiting for WebSocket address on stderr") + } + + // create the RPC client and load the node info + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + client, err := rpc.DialWebsocket(ctx, wsAddr, "") + if err != nil { + return fmt.Errorf("error dialing rpc websocket: %s", err) + } + var info p2p.NodeInfo + if err := client.CallContext(ctx, &info, "admin_nodeInfo"); err != nil { + return fmt.Errorf("error getting node info: %s", err) + } + n.client = client + n.wsAddr = wsAddr + n.Info = &info + + return nil +} + +// execCommand returns a command which runs the node locally by exec'ing +// the current binary but setting argv[0] to "p2p-node" so that the child +// runs execP2PNode +func (n *ExecNode) execCommand() *exec.Cmd { + return &exec.Cmd{ + Path: reexec.Self(), + Args: []string{"p2p-node", strings.Join(n.Config.Node.Services, ","), n.ID.String()}, + } +} + +// Stop stops the node by first sending SIGTERM and then SIGKILL if the node +// doesn't stop within 5s +func (n *ExecNode) Stop() error { + if n.Cmd == nil { + return nil + } + defer func() { + n.Cmd = nil + }() + + if n.client != nil { + n.client.Close() + n.client = nil + n.wsAddr = "" + n.Info = nil + } + + if err := n.Cmd.Process.Signal(syscall.SIGTERM); err != nil { + return n.Cmd.Process.Kill() + } + waitErr := make(chan error) + go func() { + waitErr <- n.Cmd.Wait() + }() + select { + case err := <-waitErr: + return err + case <-time.After(5 * time.Second): + return n.Cmd.Process.Kill() + } +} + +// NodeInfo returns information about the node +func (n *ExecNode) NodeInfo() *p2p.NodeInfo { + info := &p2p.NodeInfo{ + ID: n.ID.String(), + } + if n.client != nil { + n.client.Call(&info, "admin_nodeInfo") + } + return info +} + +// ServeRPC serves RPC requests over the given connection by dialling the +// node's WebSocket address and joining the two connections +func (n *ExecNode) ServeRPC(clientConn net.Conn) error { + conn, err := websocket.Dial(n.wsAddr, "", "http://localhost") + if err != nil { + return err + } + var wg sync.WaitGroup + wg.Add(2) + join := func(src, dst net.Conn) { + defer wg.Done() + io.Copy(dst, src) + // close the write end of the destination connection + if cw, ok := dst.(interface { + CloseWrite() error + }); ok { + cw.CloseWrite() + } else { + dst.Close() + } + } + go join(conn, clientConn) + go join(clientConn, conn) + wg.Wait() + return nil +} + +// Snapshots creates snapshots of the services by calling the +// simulation_snapshot RPC method +func (n *ExecNode) Snapshots() (map[string][]byte, error) { + if n.client == nil { + return nil, errors.New("RPC not started") + } + var snapshots map[string][]byte + return snapshots, n.client.Call(&snapshots, "simulation_snapshot") +} + +func init() { + // register a reexec function to start a devp2p node when the current + // binary is executed as "p2p-node" + reexec.Register("p2p-node", execP2PNode) +} + +// execNodeConfig is used to serialize the node configuration so it can be +// passed to the child process as a JSON encoded environment variable +type execNodeConfig struct { + Stack node.Config `json:"stack"` + Node *NodeConfig `json:"node"` + Snapshots map[string][]byte `json:"snapshots,omitempty"` + PeerAddrs map[string]string `json:"peer_addrs,omitempty"` +} + +// execP2PNode starts a devp2p node when the current binary is executed with +// argv[0] being "p2p-node", reading the service / ID from argv[1] / argv[2] +// and the node config from the _P2P_NODE_CONFIG environment variable +func execP2PNode() { + glogger := log.NewGlogHandler(log.StreamHandler(os.Stderr, log.LogfmtFormat())) + glogger.Verbosity(log.LvlInfo) + log.Root().SetHandler(glogger) + + // read the services from argv + serviceNames := strings.Split(os.Args[1], ",") + + // decode the config + confEnv := os.Getenv("_P2P_NODE_CONFIG") + if confEnv == "" { + log.Crit("missing _P2P_NODE_CONFIG") + } + var conf execNodeConfig + if err := json.Unmarshal([]byte(confEnv), &conf); err != nil { + log.Crit("error decoding _P2P_NODE_CONFIG", "err", err) + } + conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey + + // use explicit IP address in ListenAddr so that Enode URL is usable + externalIP := func() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + log.Crit("error getting IP address", "err", err) + } + for _, addr := range addrs { + if ip, ok := addr.(*net.IPNet); ok && !ip.IP.IsLoopback() { + return ip.IP.String() + } + } + log.Crit("unable to determine explicit IP address") + return "" + } + if strings.HasPrefix(conf.Stack.P2P.ListenAddr, ":") { + conf.Stack.P2P.ListenAddr = externalIP() + conf.Stack.P2P.ListenAddr + } + if conf.Stack.WSHost == "0.0.0.0" { + conf.Stack.WSHost = externalIP() + } + + // initialize the devp2p stack + stack, err := node.New(&conf.Stack) + if err != nil { + log.Crit("error creating node stack", "err", err) + } + + // register the services, collecting them into a map so we can wrap + // them in a snapshot service + services := make(map[string]node.Service, len(serviceNames)) + for _, name := range serviceNames { + serviceFunc, exists := serviceFuncs[name] + if !exists { + log.Crit("unknown node service", "name", name) + } + constructor := func(nodeCtx *node.ServiceContext) (node.Service, error) { + ctx := &ServiceContext{ + RPCDialer: &wsRPCDialer{addrs: conf.PeerAddrs}, + NodeContext: nodeCtx, + Config: conf.Node, + } + if conf.Snapshots != nil { + ctx.Snapshot = conf.Snapshots[name] + } + service, err := serviceFunc(ctx) + if err != nil { + return nil, err + } + services[name] = service + return service, nil + } + if err := stack.Register(constructor); err != nil { + log.Crit("error starting service", "name", name, "err", err) + } + } + + // register the snapshot service + if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { + return &snapshotService{services}, nil + }); err != nil { + log.Crit("error starting snapshot service", "err", err) + } + + // start the stack + if err := stack.Start(); err != nil { + log.Crit("error stating node stack", "err", err) + } + + // stop the stack if we get a SIGTERM signal + go func() { + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGTERM) + defer signal.Stop(sigc) + <-sigc + log.Info("Received SIGTERM, shutting down...") + stack.Stop() + }() + + // wait for the stack to exit + stack.Wait() +} + +// snapshotService is a node.Service which wraps a list of services and +// exposes an API to generate a snapshot of those services +type snapshotService struct { + services map[string]node.Service +} + +func (s *snapshotService) APIs() []rpc.API { + return []rpc.API{{ + Namespace: "simulation", + Version: "1.0", + Service: SnapshotAPI{s.services}, + }} +} + +func (s *snapshotService) Protocols() []p2p.Protocol { + return nil +} + +func (s *snapshotService) Start(*p2p.Server) error { + return nil +} + +func (s *snapshotService) Stop() error { + return nil +} + +// SnapshotAPI provides an RPC method to create snapshots of services +type SnapshotAPI struct { + services map[string]node.Service +} + +func (api SnapshotAPI) Snapshot() (map[string][]byte, error) { + snapshots := make(map[string][]byte) + for name, service := range api.services { + if s, ok := service.(interface { + Snapshot() ([]byte, error) + }); ok { + snap, err := s.Snapshot() + if err != nil { + return nil, err + } + snapshots[name] = snap + } + } + return snapshots, nil +} + +type wsRPCDialer struct { + addrs map[string]string +} + +// DialRPC implements the RPCDialer interface by creating a WebSocket RPC +// client of the given node +func (w *wsRPCDialer) DialRPC(id discover.NodeID) (*rpc.Client, error) { + addr, ok := w.addrs[id.String()] + if !ok { + return nil, fmt.Errorf("unknown node: %s", id) + } + return rpc.DialWebsocket(context.Background(), addr, "http://localhost") +} diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go new file mode 100644 index 000000000..c97188def --- /dev/null +++ b/p2p/simulations/adapters/inproc.go @@ -0,0 +1,314 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package adapters + +import ( + "errors" + "fmt" + "math" + "net" + "sync" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rpc" +) + +// SimAdapter is a NodeAdapter which creates in-memory simulation nodes and +// connects them using in-memory net.Pipe connections +type SimAdapter struct { + mtx sync.RWMutex + nodes map[discover.NodeID]*SimNode + services map[string]ServiceFunc +} + +// NewSimAdapter creates a SimAdapter which is capable of running in-memory +// simulation nodes running any of the given services (the services to run on a +// particular node are passed to the NewNode function in the NodeConfig) +func NewSimAdapter(services map[string]ServiceFunc) *SimAdapter { + return &SimAdapter{ + nodes: make(map[discover.NodeID]*SimNode), + services: services, + } +} + +// Name returns the name of the adapter for logging purposes +func (s *SimAdapter) Name() string { + return "sim-adapter" +} + +// NewNode returns a new SimNode using the given config +func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + // check a node with the ID doesn't already exist + id := config.ID + if _, exists := s.nodes[id]; exists { + return nil, fmt.Errorf("node already exists: %s", id) + } + + // check the services are valid + if len(config.Services) == 0 { + return nil, errors.New("node must have at least one service") + } + for _, service := range config.Services { + if _, exists := s.services[service]; !exists { + return nil, fmt.Errorf("unknown node service %q", service) + } + } + + n, err := node.New(&node.Config{ + P2P: p2p.Config{ + PrivateKey: config.PrivateKey, + MaxPeers: math.MaxInt32, + NoDiscovery: true, + Dialer: s, + EnableMsgEvents: true, + }, + NoUSB: true, + }) + if err != nil { + return nil, err + } + + simNode := &SimNode{ + ID: id, + config: config, + node: n, + adapter: s, + running: make(map[string]node.Service), + } + s.nodes[id] = simNode + return simNode, nil +} + +// Dial implements the p2p.NodeDialer interface by connecting to the node using +// an in-memory net.Pipe connection +func (s *SimAdapter) Dial(dest *discover.Node) (conn net.Conn, err error) { + node, ok := s.GetNode(dest.ID) + if !ok { + return nil, fmt.Errorf("unknown node: %s", dest.ID) + } + srv := node.Server() + if srv == nil { + return nil, fmt.Errorf("node not running: %s", dest.ID) + } + pipe1, pipe2 := net.Pipe() + go srv.SetupConn(pipe1, 0, nil) + return pipe2, nil +} + +// DialRPC implements the RPCDialer interface by creating an in-memory RPC +// client of the given node +func (s *SimAdapter) DialRPC(id discover.NodeID) (*rpc.Client, error) { + node, ok := s.GetNode(id) + if !ok { + return nil, fmt.Errorf("unknown node: %s", id) + } + handler, err := node.node.RPCHandler() + if err != nil { + return nil, err + } + return rpc.DialInProc(handler), nil +} + +// GetNode returns the node with the given ID if it exists +func (s *SimAdapter) GetNode(id discover.NodeID) (*SimNode, bool) { + s.mtx.RLock() + defer s.mtx.RUnlock() + node, ok := s.nodes[id] + return node, ok +} + +// SimNode is an in-memory simulation node which connects to other nodes using +// an in-memory net.Pipe connection (see SimAdapter.Dial), running devp2p +// protocols directly over that pipe +type SimNode struct { + lock sync.RWMutex + ID discover.NodeID + config *NodeConfig + adapter *SimAdapter + node *node.Node + running map[string]node.Service + client *rpc.Client + registerOnce sync.Once +} + +// Addr returns the node's discovery address +func (self *SimNode) Addr() []byte { + return []byte(self.Node().String()) +} + +// Node returns a discover.Node representing the SimNode +func (self *SimNode) Node() *discover.Node { + return discover.NewNode(self.ID, net.IP{127, 0, 0, 1}, 30303, 30303) +} + +// Client returns an rpc.Client which can be used to communicate with the +// underlying services (it is set once the node has started) +func (self *SimNode) Client() (*rpc.Client, error) { + self.lock.RLock() + defer self.lock.RUnlock() + if self.client == nil { + return nil, errors.New("node not started") + } + return self.client, nil +} + +// ServeRPC serves RPC requests over the given connection by creating an +// in-memory client to the node's RPC server +func (self *SimNode) ServeRPC(conn net.Conn) error { + handler, err := self.node.RPCHandler() + if err != nil { + return err + } + handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions) + return nil +} + +// Snapshots creates snapshots of the services by calling the +// simulation_snapshot RPC method +func (self *SimNode) Snapshots() (map[string][]byte, error) { + self.lock.RLock() + services := make(map[string]node.Service, len(self.running)) + for name, service := range self.running { + services[name] = service + } + self.lock.RUnlock() + if len(services) == 0 { + return nil, errors.New("no running services") + } + snapshots := make(map[string][]byte) + for name, service := range services { + if s, ok := service.(interface { + Snapshot() ([]byte, error) + }); ok { + snap, err := s.Snapshot() + if err != nil { + return nil, err + } + snapshots[name] = snap + } + } + return snapshots, nil +} + +// Start registers the services and starts the underlying devp2p node +func (self *SimNode) Start(snapshots map[string][]byte) error { + newService := func(name string) func(ctx *node.ServiceContext) (node.Service, error) { + return func(nodeCtx *node.ServiceContext) (node.Service, error) { + ctx := &ServiceContext{ + RPCDialer: self.adapter, + NodeContext: nodeCtx, + Config: self.config, + } + if snapshots != nil { + ctx.Snapshot = snapshots[name] + } + serviceFunc := self.adapter.services[name] + service, err := serviceFunc(ctx) + if err != nil { + return nil, err + } + self.running[name] = service + return service, nil + } + } + + // ensure we only register the services once in the case of the node + // being stopped and then started again + var regErr error + self.registerOnce.Do(func() { + for _, name := range self.config.Services { + if err := self.node.Register(newService(name)); err != nil { + regErr = err + return + } + } + }) + if regErr != nil { + return regErr + } + + if err := self.node.Start(); err != nil { + return err + } + + // create an in-process RPC client + handler, err := self.node.RPCHandler() + if err != nil { + return err + } + + self.lock.Lock() + self.client = rpc.DialInProc(handler) + self.lock.Unlock() + + return nil +} + +// Stop closes the RPC client and stops the underlying devp2p node +func (self *SimNode) Stop() error { + self.lock.Lock() + if self.client != nil { + self.client.Close() + self.client = nil + } + self.lock.Unlock() + return self.node.Stop() +} + +// Services returns a copy of the underlying services +func (self *SimNode) Services() []node.Service { + self.lock.RLock() + defer self.lock.RUnlock() + services := make([]node.Service, 0, len(self.running)) + for _, service := range self.running { + services = append(services, service) + } + return services +} + +// Server returns the underlying p2p.Server +func (self *SimNode) Server() *p2p.Server { + return self.node.Server() +} + +// SubscribeEvents subscribes the given channel to peer events from the +// underlying p2p.Server +func (self *SimNode) SubscribeEvents(ch chan *p2p.PeerEvent) event.Subscription { + srv := self.Server() + if srv == nil { + panic("node not running") + } + return srv.SubscribeEvents(ch) +} + +// NodeInfo returns information about the node +func (self *SimNode) NodeInfo() *p2p.NodeInfo { + server := self.Server() + if server == nil { + return &p2p.NodeInfo{ + ID: self.ID.String(), + Enode: self.Node().String(), + } + } + return server.NodeInfo() +} diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go new file mode 100644 index 000000000..ed6cfc504 --- /dev/null +++ b/p2p/simulations/adapters/types.go @@ -0,0 +1,215 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package adapters + +import ( + "crypto/ecdsa" + "encoding/hex" + "encoding/json" + "fmt" + "net" + "os" + + "github.com/docker/docker/pkg/reexec" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/rpc" +) + +// Node represents a node in a simulation network which is created by a +// NodeAdapter, for example: +// +// * SimNode - An in-memory node +// * ExecNode - A child process node +// * DockerNode - A Docker container node +// +type Node interface { + // Addr returns the node's address (e.g. an Enode URL) + Addr() []byte + + // Client returns the RPC client which is created once the node is + // up and running + Client() (*rpc.Client, error) + + // ServeRPC serves RPC requests over the given connection + ServeRPC(net.Conn) error + + // Start starts the node with the given snapshots + Start(snapshots map[string][]byte) error + + // Stop stops the node + Stop() error + + // NodeInfo returns information about the node + NodeInfo() *p2p.NodeInfo + + // Snapshots creates snapshots of the running services + Snapshots() (map[string][]byte, error) +} + +// NodeAdapter is used to create Nodes in a simulation network +type NodeAdapter interface { + // Name returns the name of the adapter for logging purposes + Name() string + + // NewNode creates a new node with the given configuration + NewNode(config *NodeConfig) (Node, error) +} + +// NodeConfig is the configuration used to start a node in a simulation +// network +type NodeConfig struct { + // ID is the node's ID which is used to identify the node in the + // simulation network + ID discover.NodeID + + // PrivateKey is the node's private key which is used by the devp2p + // stack to encrypt communications + PrivateKey *ecdsa.PrivateKey + + // Name is a human friendly name for the node like "node01" + Name string + + // Services are the names of the services which should be run when + // starting the node (for SimNodes it should be the names of services + // contained in SimAdapter.services, for other nodes it should be + // services registered by calling the RegisterService function) + Services []string +} + +// nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding +// all fields as strings +type nodeConfigJSON struct { + ID string `json:"id"` + PrivateKey string `json:"private_key"` + Name string `json:"name"` + Services []string `json:"services"` +} + +// MarshalJSON implements the json.Marshaler interface by encoding the config +// fields as strings +func (n *NodeConfig) MarshalJSON() ([]byte, error) { + confJSON := nodeConfigJSON{ + ID: n.ID.String(), + Name: n.Name, + Services: n.Services, + } + if n.PrivateKey != nil { + confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)) + } + return json.Marshal(confJSON) +} + +// UnmarshalJSON implements the json.Unmarshaler interface by decoding the json +// string values into the config fields +func (n *NodeConfig) UnmarshalJSON(data []byte) error { + var confJSON nodeConfigJSON + if err := json.Unmarshal(data, &confJSON); err != nil { + return err + } + + if confJSON.ID != "" { + nodeID, err := discover.HexID(confJSON.ID) + if err != nil { + return err + } + n.ID = nodeID + } + + if confJSON.PrivateKey != "" { + key, err := hex.DecodeString(confJSON.PrivateKey) + if err != nil { + return err + } + privKey, err := crypto.ToECDSA(key) + if err != nil { + return err + } + n.PrivateKey = privKey + } + + n.Name = confJSON.Name + n.Services = confJSON.Services + + return nil +} + +// RandomNodeConfig returns node configuration with a randomly generated ID and +// PrivateKey +func RandomNodeConfig() *NodeConfig { + key, err := crypto.GenerateKey() + if err != nil { + panic("unable to generate key") + } + var id discover.NodeID + pubkey := crypto.FromECDSAPub(&key.PublicKey) + copy(id[:], pubkey[1:]) + return &NodeConfig{ + ID: id, + PrivateKey: key, + } +} + +// ServiceContext is a collection of options and methods which can be utilised +// when starting services +type ServiceContext struct { + RPCDialer + + NodeContext *node.ServiceContext + Config *NodeConfig + Snapshot []byte +} + +// RPCDialer is used when initialising services which need to connect to +// other nodes in the network (for example a simulated Swarm node which needs +// to connect to a Geth node to resolve ENS names) +type RPCDialer interface { + DialRPC(id discover.NodeID) (*rpc.Client, error) +} + +// Services is a collection of services which can be run in a simulation +type Services map[string]ServiceFunc + +// ServiceFunc returns a node.Service which can be used to boot a devp2p node +type ServiceFunc func(ctx *ServiceContext) (node.Service, error) + +// serviceFuncs is a map of registered services which are used to boot devp2p +// nodes +var serviceFuncs = make(Services) + +// RegisterServices registers the given Services which can then be used to +// start devp2p nodes using either the Exec or Docker adapters. +// +// It should be called in an init function so that it has the opportunity to +// execute the services before main() is called. +func RegisterServices(services Services) { + for name, f := range services { + if _, exists := serviceFuncs[name]; exists { + panic(fmt.Sprintf("node service already exists: %q", name)) + } + serviceFuncs[name] = f + } + + // now we have registered the services, run reexec.Init() which will + // potentially start one of the services if the current binary has + // been exec'd with argv[0] set to "p2p-node" + if reexec.Init() { + os.Exit(0) + } +} diff --git a/p2p/simulations/events.go b/p2p/simulations/events.go new file mode 100644 index 000000000..f17958c68 --- /dev/null +++ b/p2p/simulations/events.go @@ -0,0 +1,108 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "fmt" + "time" +) + +// EventType is the type of event emitted by a simulation network +type EventType string + +const ( + // EventTypeNode is the type of event emitted when a node is either + // created, started or stopped + EventTypeNode EventType = "node" + + // EventTypeConn is the type of event emitted when a connection is + // is either established or dropped between two nodes + EventTypeConn EventType = "conn" + + // EventTypeMsg is the type of event emitted when a p2p message it + // sent between two nodes + EventTypeMsg EventType = "msg" +) + +// Event is an event emitted by a simulation network +type Event struct { + // Type is the type of the event + Type EventType `json:"type"` + + // Time is the time the event happened + Time time.Time `json:"time"` + + // Control indicates whether the event is the result of a controlled + // action in the network + Control bool `json:"control"` + + // Node is set if the type is EventTypeNode + Node *Node `json:"node,omitempty"` + + // Conn is set if the type is EventTypeConn + Conn *Conn `json:"conn,omitempty"` + + // Msg is set if the type is EventTypeMsg + Msg *Msg `json:"msg,omitempty"` +} + +// NewEvent creates a new event for the given object which should be either a +// Node, Conn or Msg. +// +// The object is copied so that the event represents the state of the object +// when NewEvent is called. +func NewEvent(v interface{}) *Event { + event := &Event{Time: time.Now()} + switch v := v.(type) { + case *Node: + event.Type = EventTypeNode + node := *v + event.Node = &node + case *Conn: + event.Type = EventTypeConn + conn := *v + event.Conn = &conn + case *Msg: + event.Type = EventTypeMsg + msg := *v + event.Msg = &msg + default: + panic(fmt.Sprintf("invalid event type: %T", v)) + } + return event +} + +// ControlEvent creates a new control event +func ControlEvent(v interface{}) *Event { + event := NewEvent(v) + event.Control = true + return event +} + +// String returns the string representation of the event +func (e *Event) String() string { + switch e.Type { + case EventTypeNode: + return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up) + case EventTypeConn: + return fmt.Sprintf("<conn-event> nodes: %s->%s up: %t", e.Conn.One.TerminalString(), e.Conn.Other.TerminalString(), e.Conn.Up) + case EventTypeMsg: + return fmt.Sprintf("<msg-event> nodes: %s->%s proto: %s, code: %d, received: %t", e.Msg.One.TerminalString(), e.Msg.Other.TerminalString(), e.Msg.Protocol, e.Msg.Code, e.Msg.Received) + default: + return "" + } +} diff --git a/p2p/simulations/examples/README.md b/p2p/simulations/examples/README.md new file mode 100644 index 000000000..822a48dcb --- /dev/null +++ b/p2p/simulations/examples/README.md @@ -0,0 +1,39 @@ +# devp2p simulation examples + +## ping-pong + +`ping-pong.go` implements a simulation network which contains nodes running a +simple "ping-pong" protocol where nodes send a ping message to all their +connected peers every 10s and receive pong messages in return. + +To run the simulation, run `go run ping-pong.go` in one terminal to start the +simulation API and `./ping-pong.sh` in another to start and connect the nodes: + +``` +$ go run ping-pong.go +INFO [08-15|13:53:49] using sim adapter +INFO [08-15|13:53:49] starting simulation server on 0.0.0.0:8888... +``` + +``` +$ ./ping-pong.sh +---> 13:58:12 creating 10 nodes +Created node01 +Started node01 +... +Created node10 +Started node10 +---> 13:58:13 connecting node01 to all other nodes +Connected node01 to node02 +... +Connected node01 to node10 +---> 13:58:14 done +``` + +Use the `--adapter` flag to choose the adapter type: + +``` +$ go run ping-pong.go --adapter exec +INFO [08-15|14:01:14] using exec adapter tmpdir=/var/folders/k6/wpsgfg4n23ddbc6f5cnw5qg00000gn/T/p2p-example992833779 +INFO [08-15|14:01:14] starting simulation server on 0.0.0.0:8888... +``` diff --git a/p2p/simulations/examples/ping-pong.go b/p2p/simulations/examples/ping-pong.go new file mode 100644 index 000000000..6a0ead53a --- /dev/null +++ b/p2p/simulations/examples/ping-pong.go @@ -0,0 +1,184 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "net/http" + "os" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" +) + +var adapterType = flag.String("adapter", "sim", `node adapter to use (one of "sim", "exec" or "docker")`) + +// main() starts a simulation network which contains nodes running a simple +// ping-pong protocol +func main() { + flag.Parse() + + // set the log level to Trace + log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(false)))) + + // register a single ping-pong service + services := map[string]adapters.ServiceFunc{ + "ping-pong": func(ctx *adapters.ServiceContext) (node.Service, error) { + return newPingPongService(ctx.Config.ID), nil + }, + } + adapters.RegisterServices(services) + + // create the NodeAdapter + var adapter adapters.NodeAdapter + + switch *adapterType { + + case "sim": + log.Info("using sim adapter") + adapter = adapters.NewSimAdapter(services) + + case "exec": + tmpdir, err := ioutil.TempDir("", "p2p-example") + if err != nil { + log.Crit("error creating temp dir", "err", err) + } + defer os.RemoveAll(tmpdir) + log.Info("using exec adapter", "tmpdir", tmpdir) + adapter = adapters.NewExecAdapter(tmpdir) + + case "docker": + log.Info("using docker adapter") + var err error + adapter, err = adapters.NewDockerAdapter() + if err != nil { + log.Crit("error creating docker adapter", "err", err) + } + + default: + log.Crit(fmt.Sprintf("unknown node adapter %q", *adapterType)) + } + + // start the HTTP API + log.Info("starting simulation server on 0.0.0.0:8888...") + network := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ + DefaultService: "ping-pong", + }) + if err := http.ListenAndServe(":8888", simulations.NewServer(network)); err != nil { + log.Crit("error starting simulation server", "err", err) + } +} + +// pingPongService runs a ping-pong protocol between nodes where each node +// sends a ping to all its connected peers every 10s and receives a pong in +// return +type pingPongService struct { + id discover.NodeID + log log.Logger + received int64 +} + +func newPingPongService(id discover.NodeID) *pingPongService { + return &pingPongService{ + id: id, + log: log.New("node.id", id), + } +} + +func (p *pingPongService) Protocols() []p2p.Protocol { + return []p2p.Protocol{{ + Name: "ping-pong", + Version: 1, + Length: 2, + Run: p.Run, + NodeInfo: p.Info, + }} +} + +func (p *pingPongService) APIs() []rpc.API { + return nil +} + +func (p *pingPongService) Start(server *p2p.Server) error { + p.log.Info("ping-pong service starting") + return nil +} + +func (p *pingPongService) Stop() error { + p.log.Info("ping-pong service stopping") + return nil +} + +func (p *pingPongService) Info() interface{} { + return struct { + Received int64 `json:"received"` + }{ + atomic.LoadInt64(&p.received), + } +} + +const ( + pingMsgCode = iota + pongMsgCode +) + +// Run implements the ping-pong protocol which sends ping messages to the peer +// at 10s intervals, and responds to pings with pong messages. +func (p *pingPongService) Run(peer *p2p.Peer, rw p2p.MsgReadWriter) error { + log := p.log.New("peer.id", peer.ID()) + + errC := make(chan error) + go func() { + for range time.Tick(10 * time.Second) { + log.Info("sending ping") + if err := p2p.Send(rw, pingMsgCode, "PING"); err != nil { + errC <- err + return + } + } + }() + go func() { + for { + msg, err := rw.ReadMsg() + if err != nil { + errC <- err + return + } + payload, err := ioutil.ReadAll(msg.Payload) + if err != nil { + errC <- err + return + } + log.Info("received message", "msg.code", msg.Code, "msg.payload", string(payload)) + atomic.AddInt64(&p.received, 1) + if msg.Code == pingMsgCode { + log.Info("sending pong") + go p2p.Send(rw, pongMsgCode, "PONG") + } + } + }() + return <-errC +} diff --git a/p2p/simulations/examples/ping-pong.sh b/p2p/simulations/examples/ping-pong.sh new file mode 100755 index 000000000..47936bd9a --- /dev/null +++ b/p2p/simulations/examples/ping-pong.sh @@ -0,0 +1,40 @@ +#!/bin/bash +# +# Boot a ping-pong network simulation using the HTTP API started by ping-pong.go + +set -e + +main() { + if ! which p2psim &>/dev/null; then + fail "missing p2psim binary (you need to build cmd/p2psim and put it in \$PATH)" + fi + + info "creating 10 nodes" + for i in $(seq 1 10); do + p2psim node create --name "$(node_name $i)" + p2psim node start "$(node_name $i)" + done + + info "connecting node01 to all other nodes" + for i in $(seq 2 10); do + p2psim node connect "node01" "$(node_name $i)" + done + + info "done" +} + +node_name() { + local num=$1 + echo "node$(printf '%02d' $num)" +} + +info() { + echo -e "\033[1;32m---> $(date +%H:%M:%S) ${@}\033[0m" +} + +fail() { + echo -e "\033[1;31mERROR: ${@}\033[0m" >&2 + exit 1 +} + +main "$@" diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go new file mode 100644 index 000000000..3fa8b9292 --- /dev/null +++ b/p2p/simulations/http.go @@ -0,0 +1,680 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "strconv" + "strings" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" + "github.com/julienschmidt/httprouter" + "golang.org/x/net/websocket" +) + +// DefaultClient is the default simulation API client which expects the API +// to be running at http://localhost:8888 +var DefaultClient = NewClient("http://localhost:8888") + +// Client is a client for the simulation HTTP API which supports creating +// and managing simulation networks +type Client struct { + URL string + + client *http.Client +} + +// NewClient returns a new simulation API client +func NewClient(url string) *Client { + return &Client{ + URL: url, + client: http.DefaultClient, + } +} + +// GetNetwork returns details of the network +func (c *Client) GetNetwork() (*Network, error) { + network := &Network{} + return network, c.Get("/", network) +} + +// StartNetwork starts all existing nodes in the simulation network +func (c *Client) StartNetwork() error { + return c.Post("/start", nil, nil) +} + +// StopNetwork stops all existing nodes in a simulation network +func (c *Client) StopNetwork() error { + return c.Post("/stop", nil, nil) +} + +// CreateSnapshot creates a network snapshot +func (c *Client) CreateSnapshot() (*Snapshot, error) { + snap := &Snapshot{} + return snap, c.Get("/snapshot", snap) +} + +// LoadSnapshot loads a snapshot into the network +func (c *Client) LoadSnapshot(snap *Snapshot) error { + return c.Post("/snapshot", snap, nil) +} + +// SubscribeOpts is a collection of options to use when subscribing to network +// events +type SubscribeOpts struct { + // Current instructs the server to send events for existing nodes and + // connections first + Current bool + + // Filter instructs the server to only send a subset of message events + Filter string +} + +// SubscribeNetwork subscribes to network events which are sent from the server +// as a server-sent-events stream, optionally receiving events for existing +// nodes and connections and filtering message events +func (c *Client) SubscribeNetwork(events chan *Event, opts SubscribeOpts) (event.Subscription, error) { + url := fmt.Sprintf("%s/events?current=%t&filter=%s", c.URL, opts.Current, opts.Filter) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "text/event-stream") + res, err := c.client.Do(req) + if err != nil { + return nil, err + } + if res.StatusCode != http.StatusOK { + response, _ := ioutil.ReadAll(res.Body) + res.Body.Close() + return nil, fmt.Errorf("unexpected HTTP status: %s: %s", res.Status, response) + } + + // define a producer function to pass to event.Subscription + // which reads server-sent events from res.Body and sends + // them to the events channel + producer := func(stop <-chan struct{}) error { + defer res.Body.Close() + + // read lines from res.Body in a goroutine so that we are + // always reading from the stop channel + lines := make(chan string) + errC := make(chan error, 1) + go func() { + s := bufio.NewScanner(res.Body) + for s.Scan() { + select { + case lines <- s.Text(): + case <-stop: + return + } + } + errC <- s.Err() + }() + + // detect any lines which start with "data:", decode the data + // into an event and send it to the events channel + for { + select { + case line := <-lines: + if !strings.HasPrefix(line, "data:") { + continue + } + data := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + event := &Event{} + if err := json.Unmarshal([]byte(data), event); err != nil { + return fmt.Errorf("error decoding SSE event: %s", err) + } + select { + case events <- event: + case <-stop: + return nil + } + case err := <-errC: + return err + case <-stop: + return nil + } + } + } + + return event.NewSubscription(producer), nil +} + +// GetNodes returns all nodes which exist in the network +func (c *Client) GetNodes() ([]*p2p.NodeInfo, error) { + var nodes []*p2p.NodeInfo + return nodes, c.Get("/nodes", &nodes) +} + +// CreateNode creates a node in the network using the given configuration +func (c *Client) CreateNode(config *adapters.NodeConfig) (*p2p.NodeInfo, error) { + node := &p2p.NodeInfo{} + return node, c.Post("/nodes", config, node) +} + +// GetNode returns details of a node +func (c *Client) GetNode(nodeID string) (*p2p.NodeInfo, error) { + node := &p2p.NodeInfo{} + return node, c.Get(fmt.Sprintf("/nodes/%s", nodeID), node) +} + +// StartNode starts a node +func (c *Client) StartNode(nodeID string) error { + return c.Post(fmt.Sprintf("/nodes/%s/start", nodeID), nil, nil) +} + +// StopNode stops a node +func (c *Client) StopNode(nodeID string) error { + return c.Post(fmt.Sprintf("/nodes/%s/stop", nodeID), nil, nil) +} + +// ConnectNode connects a node to a peer node +func (c *Client) ConnectNode(nodeID, peerID string) error { + return c.Post(fmt.Sprintf("/nodes/%s/conn/%s", nodeID, peerID), nil, nil) +} + +// DisconnectNode disconnects a node from a peer node +func (c *Client) DisconnectNode(nodeID, peerID string) error { + return c.Delete(fmt.Sprintf("/nodes/%s/conn/%s", nodeID, peerID)) +} + +// RPCClient returns an RPC client connected to a node +func (c *Client) RPCClient(ctx context.Context, nodeID string) (*rpc.Client, error) { + baseURL := strings.Replace(c.URL, "http", "ws", 1) + return rpc.DialWebsocket(ctx, fmt.Sprintf("%s/nodes/%s/rpc", baseURL, nodeID), "") +} + +// Get performs a HTTP GET request decoding the resulting JSON response +// into "out" +func (c *Client) Get(path string, out interface{}) error { + return c.Send("GET", path, nil, out) +} + +// Post performs a HTTP POST request sending "in" as the JSON body and +// decoding the resulting JSON response into "out" +func (c *Client) Post(path string, in, out interface{}) error { + return c.Send("POST", path, in, out) +} + +// Delete performs a HTTP DELETE request +func (c *Client) Delete(path string) error { + return c.Send("DELETE", path, nil, nil) +} + +// Send performs a HTTP request, sending "in" as the JSON request body and +// decoding the JSON response into "out" +func (c *Client) Send(method, path string, in, out interface{}) error { + var body []byte + if in != nil { + var err error + body, err = json.Marshal(in) + if err != nil { + return err + } + } + req, err := http.NewRequest(method, c.URL+path, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + res, err := c.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusCreated { + response, _ := ioutil.ReadAll(res.Body) + return fmt.Errorf("unexpected HTTP status: %s: %s", res.Status, response) + } + if out != nil { + if err := json.NewDecoder(res.Body).Decode(out); err != nil { + return err + } + } + return nil +} + +// Server is an HTTP server providing an API to manage a simulation network +type Server struct { + router *httprouter.Router + network *Network +} + +// NewServer returns a new simulation API server +func NewServer(network *Network) *Server { + s := &Server{ + router: httprouter.New(), + network: network, + } + + s.OPTIONS("/", s.Options) + s.GET("/", s.GetNetwork) + s.POST("/start", s.StartNetwork) + s.POST("/stop", s.StopNetwork) + s.GET("/events", s.StreamNetworkEvents) + s.GET("/snapshot", s.CreateSnapshot) + s.POST("/snapshot", s.LoadSnapshot) + s.POST("/nodes", s.CreateNode) + s.GET("/nodes", s.GetNodes) + s.GET("/nodes/:nodeid", s.GetNode) + s.POST("/nodes/:nodeid/start", s.StartNode) + s.POST("/nodes/:nodeid/stop", s.StopNode) + s.POST("/nodes/:nodeid/conn/:peerid", s.ConnectNode) + s.DELETE("/nodes/:nodeid/conn/:peerid", s.DisconnectNode) + s.GET("/nodes/:nodeid/rpc", s.NodeRPC) + + return s +} + +// GetNetwork returns details of the network +func (s *Server) GetNetwork(w http.ResponseWriter, req *http.Request) { + s.JSON(w, http.StatusOK, s.network) +} + +// StartNetwork starts all nodes in the network +func (s *Server) StartNetwork(w http.ResponseWriter, req *http.Request) { + if err := s.network.StartAll(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +// StopNetwork stops all nodes in the network +func (s *Server) StopNetwork(w http.ResponseWriter, req *http.Request) { + if err := s.network.StopAll(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +// StreamNetworkEvents streams network events as a server-sent-events stream +func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) { + events := make(chan *Event) + sub := s.network.events.Subscribe(events) + defer sub.Unsubscribe() + + // stop the stream if the client goes away + var clientGone <-chan bool + if cn, ok := w.(http.CloseNotifier); ok { + clientGone = cn.CloseNotify() + } + + // write writes the given event and data to the stream like: + // + // event: <event> + // data: <data> + // + write := func(event, data string) { + fmt.Fprintf(w, "event: %s\n", event) + fmt.Fprintf(w, "data: %s\n\n", data) + if fw, ok := w.(http.Flusher); ok { + fw.Flush() + } + } + writeEvent := func(event *Event) error { + data, err := json.Marshal(event) + if err != nil { + return err + } + write("network", string(data)) + return nil + } + writeErr := func(err error) { + write("error", err.Error()) + } + + // check if filtering has been requested + var filters MsgFilters + if filterParam := req.URL.Query().Get("filter"); filterParam != "" { + var err error + filters, err = NewMsgFilters(filterParam) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } + + w.Header().Set("Content-Type", "text/event-stream; charset=utf-8") + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "\n\n") + if fw, ok := w.(http.Flusher); ok { + fw.Flush() + } + + // optionally send the existing nodes and connections + if req.URL.Query().Get("current") == "true" { + snap, err := s.network.Snapshot() + if err != nil { + writeErr(err) + return + } + for _, node := range snap.Nodes { + event := NewEvent(&node.Node) + if err := writeEvent(event); err != nil { + writeErr(err) + return + } + } + for _, conn := range snap.Conns { + event := NewEvent(&conn) + if err := writeEvent(event); err != nil { + writeErr(err) + return + } + } + } + + for { + select { + case event := <-events: + // only send message events which match the filters + if event.Msg != nil && !filters.Match(event.Msg) { + continue + } + if err := writeEvent(event); err != nil { + writeErr(err) + return + } + case <-clientGone: + return + } + } +} + +// NewMsgFilters constructs a collection of message filters from a URL query +// parameter. +// +// The parameter is expected to be a dash-separated list of individual filters, +// each having the format '<proto>:<codes>', where <proto> is the name of a +// protocol and <codes> is a comma-separated list of message codes. +// +// A message code of '*' or '-1' is considered a wildcard and matches any code. +func NewMsgFilters(filterParam string) (MsgFilters, error) { + filters := make(MsgFilters) + for _, filter := range strings.Split(filterParam, "-") { + protoCodes := strings.SplitN(filter, ":", 2) + if len(protoCodes) != 2 || protoCodes[0] == "" || protoCodes[1] == "" { + return nil, fmt.Errorf("invalid message filter: %s", filter) + } + proto := protoCodes[0] + for _, code := range strings.Split(protoCodes[1], ",") { + if code == "*" || code == "-1" { + filters[MsgFilter{Proto: proto, Code: -1}] = struct{}{} + continue + } + n, err := strconv.ParseUint(code, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid message code: %s", code) + } + filters[MsgFilter{Proto: proto, Code: int64(n)}] = struct{}{} + } + } + return filters, nil +} + +// MsgFilters is a collection of filters which are used to filter message +// events +type MsgFilters map[MsgFilter]struct{} + +// Match checks if the given message matches any of the filters +func (m MsgFilters) Match(msg *Msg) bool { + // check if there is a wildcard filter for the message's protocol + if _, ok := m[MsgFilter{Proto: msg.Protocol, Code: -1}]; ok { + return true + } + + // check if there is a filter for the message's protocol and code + if _, ok := m[MsgFilter{Proto: msg.Protocol, Code: int64(msg.Code)}]; ok { + return true + } + + return false +} + +// MsgFilter is used to filter message events based on protocol and message +// code +type MsgFilter struct { + // Proto is matched against a message's protocol + Proto string + + // Code is matched against a message's code, with -1 matching all codes + Code int64 +} + +// CreateSnapshot creates a network snapshot +func (s *Server) CreateSnapshot(w http.ResponseWriter, req *http.Request) { + snap, err := s.network.Snapshot() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, snap) +} + +// LoadSnapshot loads a snapshot into the network +func (s *Server) LoadSnapshot(w http.ResponseWriter, req *http.Request) { + snap := &Snapshot{} + if err := json.NewDecoder(req.Body).Decode(snap); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if err := s.network.Load(snap); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, s.network) +} + +// CreateNode creates a node in the network using the given configuration +func (s *Server) CreateNode(w http.ResponseWriter, req *http.Request) { + config := adapters.RandomNodeConfig() + err := json.NewDecoder(req.Body).Decode(config) + if err != nil && err != io.EOF { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + node, err := s.network.NewNodeWithConfig(config) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusCreated, node.NodeInfo()) +} + +// GetNodes returns all nodes which exist in the network +func (s *Server) GetNodes(w http.ResponseWriter, req *http.Request) { + nodes := s.network.GetNodes() + + infos := make([]*p2p.NodeInfo, len(nodes)) + for i, node := range nodes { + infos[i] = node.NodeInfo() + } + + s.JSON(w, http.StatusOK, infos) +} + +// GetNode returns details of a node +func (s *Server) GetNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// StartNode starts a node +func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + if err := s.network.Start(node.ID()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// StopNode stops a node +func (s *Server) StopNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + if err := s.network.Stop(node.ID()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// ConnectNode connects a node to a peer node +func (s *Server) ConnectNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + peer := req.Context().Value("peer").(*Node) + + if err := s.network.Connect(node.ID(), peer.ID()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// DisconnectNode disconnects a node from a peer node +func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + peer := req.Context().Value("peer").(*Node) + + if err := s.network.Disconnect(node.ID(), peer.ID()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + s.JSON(w, http.StatusOK, node.NodeInfo()) +} + +// Options responds to the OPTIONS HTTP method by returning a 200 OK response +// with the "Access-Control-Allow-Headers" header set to "Content-Type" +func (s *Server) Options(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + w.WriteHeader(http.StatusOK) +} + +// NodeRPC forwards RPC requests to a node in the network via a WebSocket +// connection +func (s *Server) NodeRPC(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + handler := func(conn *websocket.Conn) { + node.ServeRPC(conn) + } + + websocket.Server{Handler: handler}.ServeHTTP(w, req) +} + +// ServeHTTP implements the http.Handler interface by delegating to the +// underlying httprouter.Router +func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { + s.router.ServeHTTP(w, req) +} + +// GET registers a handler for GET requests to a particular path +func (s *Server) GET(path string, handle http.HandlerFunc) { + s.router.GET(path, s.wrapHandler(handle)) +} + +// POST registers a handler for POST requests to a particular path +func (s *Server) POST(path string, handle http.HandlerFunc) { + s.router.POST(path, s.wrapHandler(handle)) +} + +// DELETE registers a handler for DELETE requests to a particular path +func (s *Server) DELETE(path string, handle http.HandlerFunc) { + s.router.DELETE(path, s.wrapHandler(handle)) +} + +// OPTIONS registers a handler for OPTIONS requests to a particular path +func (s *Server) OPTIONS(path string, handle http.HandlerFunc) { + s.router.OPTIONS("/*path", s.wrapHandler(handle)) +} + +// JSON sends "data" as a JSON HTTP response +func (s *Server) JSON(w http.ResponseWriter, status int, data interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + json.NewEncoder(w).Encode(data) +} + +// wrapHandler returns a httprouter.Handle which wraps a http.HandlerFunc by +// populating request.Context with any objects from the URL params +func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { + return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + + ctx := context.Background() + + if id := params.ByName("nodeid"); id != "" { + var node *Node + if nodeID, err := discover.HexID(id); err == nil { + node = s.network.GetNode(nodeID) + } else { + node = s.network.GetNodeByName(id) + } + if node == nil { + http.NotFound(w, req) + return + } + ctx = context.WithValue(ctx, "node", node) + } + + if id := params.ByName("peerid"); id != "" { + var peer *Node + if peerID, err := discover.HexID(id); err == nil { + peer = s.network.GetNode(peerID) + } else { + peer = s.network.GetNodeByName(id) + } + if peer == nil { + http.NotFound(w, req) + return + } + ctx = context.WithValue(ctx, "peer", peer) + } + + handler(w, req.WithContext(ctx)) + } +} diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go new file mode 100644 index 000000000..677a8fb14 --- /dev/null +++ b/p2p/simulations/http_test.go @@ -0,0 +1,823 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "context" + "fmt" + "math/rand" + "net/http/httptest" + "reflect" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" +) + +// testService implements the node.Service interface and provides protocols +// and APIs which are useful for testing nodes in a simulation network +type testService struct { + id discover.NodeID + + // peerCount is incremented once a peer handshake has been performed + peerCount int64 + + peers map[discover.NodeID]*testPeer + peersMtx sync.Mutex + + // state stores []byte which is used to test creating and loading + // snapshots + state atomic.Value +} + +func newTestService(ctx *adapters.ServiceContext) (node.Service, error) { + svc := &testService{ + id: ctx.Config.ID, + peers: make(map[discover.NodeID]*testPeer), + } + svc.state.Store(ctx.Snapshot) + return svc, nil +} + +type testPeer struct { + testReady chan struct{} + dumReady chan struct{} +} + +func (t *testService) peer(id discover.NodeID) *testPeer { + t.peersMtx.Lock() + defer t.peersMtx.Unlock() + if peer, ok := t.peers[id]; ok { + return peer + } + peer := &testPeer{ + testReady: make(chan struct{}), + dumReady: make(chan struct{}), + } + t.peers[id] = peer + return peer +} + +func (t *testService) Protocols() []p2p.Protocol { + return []p2p.Protocol{ + { + Name: "test", + Version: 1, + Length: 3, + Run: t.RunTest, + }, + { + Name: "dum", + Version: 1, + Length: 1, + Run: t.RunDum, + }, + { + Name: "prb", + Version: 1, + Length: 1, + Run: t.RunPrb, + }, + } +} + +func (t *testService) APIs() []rpc.API { + return []rpc.API{{ + Namespace: "test", + Version: "1.0", + Service: &TestAPI{ + state: &t.state, + peerCount: &t.peerCount, + }, + }} +} + +func (t *testService) Start(server *p2p.Server) error { + return nil +} + +func (t *testService) Stop() error { + return nil +} + +// handshake performs a peer handshake by sending and expecting an empty +// message with the given code +func (t *testService) handshake(rw p2p.MsgReadWriter, code uint64) error { + errc := make(chan error, 2) + go func() { errc <- p2p.Send(rw, code, struct{}{}) }() + go func() { errc <- p2p.ExpectMsg(rw, code, struct{}{}) }() + for i := 0; i < 2; i++ { + if err := <-errc; err != nil { + return err + } + } + return nil +} + +func (t *testService) RunTest(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := t.peer(p.ID()) + + // perform three handshakes with three different message codes, + // used to test message sending and filtering + if err := t.handshake(rw, 2); err != nil { + return err + } + if err := t.handshake(rw, 1); err != nil { + return err + } + if err := t.handshake(rw, 0); err != nil { + return err + } + + // close the testReady channel so that other protocols can run + close(peer.testReady) + + // track the peer + atomic.AddInt64(&t.peerCount, 1) + defer atomic.AddInt64(&t.peerCount, -1) + + // block until the peer is dropped + for { + _, err := rw.ReadMsg() + if err != nil { + return err + } + } +} + +func (t *testService) RunDum(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := t.peer(p.ID()) + + // wait for the test protocol to perform its handshake + <-peer.testReady + + // perform a handshake + if err := t.handshake(rw, 0); err != nil { + return err + } + + // close the dumReady channel so that other protocols can run + close(peer.dumReady) + + // block until the peer is dropped + for { + _, err := rw.ReadMsg() + if err != nil { + return err + } + } +} +func (t *testService) RunPrb(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := t.peer(p.ID()) + + // wait for the dum protocol to perform its handshake + <-peer.dumReady + + // perform a handshake + if err := t.handshake(rw, 0); err != nil { + return err + } + + // block until the peer is dropped + for { + _, err := rw.ReadMsg() + if err != nil { + return err + } + } +} + +func (t *testService) Snapshot() ([]byte, error) { + return t.state.Load().([]byte), nil +} + +// TestAPI provides a test API to: +// * get the peer count +// * get and set an arbitrary state byte slice +// * get and increment a counter +// * subscribe to counter increment events +type TestAPI struct { + state *atomic.Value + peerCount *int64 + counter int64 + feed event.Feed +} + +func (t *TestAPI) PeerCount() int64 { + return atomic.LoadInt64(t.peerCount) +} + +func (t *TestAPI) Get() int64 { + return atomic.LoadInt64(&t.counter) +} + +func (t *TestAPI) Add(delta int64) { + atomic.AddInt64(&t.counter, delta) + t.feed.Send(delta) +} + +func (t *TestAPI) GetState() []byte { + return t.state.Load().([]byte) +} + +func (t *TestAPI) SetState(state []byte) { + t.state.Store(state) +} + +func (t *TestAPI) Events(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + events := make(chan int64) + sub := t.feed.Subscribe(events) + defer sub.Unsubscribe() + + for { + select { + case event := <-events: + notifier.Notify(rpcSub.ID, event) + case <-sub.Err(): + return + case <-rpcSub.Err(): + return + case <-notifier.Closed(): + return + } + } + }() + + return rpcSub, nil +} + +var testServices = adapters.Services{ + "test": newTestService, +} + +func testHTTPServer(t *testing.T) (*Network, *httptest.Server) { + adapter := adapters.NewSimAdapter(testServices) + network := NewNetwork(adapter, &NetworkConfig{ + DefaultService: "test", + }) + return network, httptest.NewServer(NewServer(network)) +} + +// TestHTTPNetwork tests interacting with a simulation network using the HTTP +// API +func TestHTTPNetwork(t *testing.T) { + // start the server + network, s := testHTTPServer(t) + defer s.Close() + + // subscribe to events so we can check them later + client := NewClient(s.URL) + events := make(chan *Event, 100) + var opts SubscribeOpts + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // check we can retrieve details about the network + gotNetwork, err := client.GetNetwork() + if err != nil { + t.Fatalf("error getting network: %s", err) + } + if gotNetwork.ID != network.ID { + t.Fatalf("expected network to have ID %q, got %q", network.ID, gotNetwork.ID) + } + + // start a simulation network + nodeIDs := startTestNetwork(t, client) + + // check we got all the events + x := &expectEvents{t, events, sub} + x.expect( + x.nodeEvent(nodeIDs[0], false), + x.nodeEvent(nodeIDs[1], false), + x.nodeEvent(nodeIDs[0], true), + x.nodeEvent(nodeIDs[1], true), + x.connEvent(nodeIDs[0], nodeIDs[1], false), + x.connEvent(nodeIDs[0], nodeIDs[1], true), + ) + + // reconnect the stream and check we get the current nodes and conns + events = make(chan *Event, 100) + opts.Current = true + sub, err = client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + x = &expectEvents{t, events, sub} + x.expect( + x.nodeEvent(nodeIDs[0], true), + x.nodeEvent(nodeIDs[1], true), + x.connEvent(nodeIDs[0], nodeIDs[1], true), + ) +} + +func startTestNetwork(t *testing.T, client *Client) []string { + // create two nodes + nodeCount := 2 + nodeIDs := make([]string, nodeCount) + for i := 0; i < nodeCount; i++ { + node, err := client.CreateNode(nil) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + nodeIDs[i] = node.ID + } + + // check both nodes exist + nodes, err := client.GetNodes() + if err != nil { + t.Fatalf("error getting nodes: %s", err) + } + if len(nodes) != nodeCount { + t.Fatalf("expected %d nodes, got %d", nodeCount, len(nodes)) + } + for i, nodeID := range nodeIDs { + if nodes[i].ID != nodeID { + t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, nodes[i].ID) + } + node, err := client.GetNode(nodeID) + if err != nil { + t.Fatalf("error getting node %d: %s", i, err) + } + if node.ID != nodeID { + t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, node.ID) + } + } + + // start both nodes + for _, nodeID := range nodeIDs { + if err := client.StartNode(nodeID); err != nil { + t.Fatalf("error starting node %q: %s", nodeID, err) + } + } + + // connect the nodes + for i := 0; i < nodeCount-1; i++ { + peerId := i + 1 + if i == nodeCount-1 { + peerId = 0 + } + if err := client.ConnectNode(nodeIDs[i], nodeIDs[peerId]); err != nil { + t.Fatalf("error connecting nodes: %s", err) + } + } + + return nodeIDs +} + +type expectEvents struct { + *testing.T + + events chan *Event + sub event.Subscription +} + +func (t *expectEvents) nodeEvent(id string, up bool) *Event { + return &Event{ + Type: EventTypeNode, + Node: &Node{ + Config: &adapters.NodeConfig{ + ID: discover.MustHexID(id), + }, + Up: up, + }, + } +} + +func (t *expectEvents) connEvent(one, other string, up bool) *Event { + return &Event{ + Type: EventTypeConn, + Conn: &Conn{ + One: discover.MustHexID(one), + Other: discover.MustHexID(other), + Up: up, + }, + } +} + +func (t *expectEvents) expectMsgs(expected map[MsgFilter]int) { + actual := make(map[MsgFilter]int) + timeout := time.After(10 * time.Second) +loop: + for { + select { + case event := <-t.events: + t.Logf("received %s event: %s", event.Type, event) + + if event.Type != EventTypeMsg || event.Msg.Received { + continue loop + } + if event.Msg == nil { + t.Fatal("expected event.Msg to be set") + } + filter := MsgFilter{ + Proto: event.Msg.Protocol, + Code: int64(event.Msg.Code), + } + actual[filter]++ + if actual[filter] > expected[filter] { + t.Fatalf("received too many msgs for filter: %v", filter) + } + if reflect.DeepEqual(actual, expected) { + return + } + + case err := <-t.sub.Err(): + t.Fatalf("network stream closed unexpectedly: %s", err) + + case <-timeout: + t.Fatal("timed out waiting for expected events") + } + } +} + +func (t *expectEvents) expect(events ...*Event) { + timeout := time.After(10 * time.Second) + i := 0 + for { + select { + case event := <-t.events: + t.Logf("received %s event: %s", event.Type, event) + + expected := events[i] + if event.Type != expected.Type { + t.Fatalf("expected event %d to have type %q, got %q", i, expected.Type, event.Type) + } + + switch expected.Type { + + case EventTypeNode: + if event.Node == nil { + t.Fatal("expected event.Node to be set") + } + if event.Node.ID() != expected.Node.ID() { + t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString()) + } + if event.Node.Up != expected.Node.Up { + t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up) + } + + case EventTypeConn: + if event.Conn == nil { + t.Fatal("expected event.Conn to be set") + } + if event.Conn.One != expected.Conn.One { + t.Fatalf("expected conn event %d to have one=%q, got one=%q", i, expected.Conn.One.TerminalString(), event.Conn.One.TerminalString()) + } + if event.Conn.Other != expected.Conn.Other { + t.Fatalf("expected conn event %d to have other=%q, got other=%q", i, expected.Conn.Other.TerminalString(), event.Conn.Other.TerminalString()) + } + if event.Conn.Up != expected.Conn.Up { + t.Fatalf("expected conn event %d to have up=%t, got up=%t", i, expected.Conn.Up, event.Conn.Up) + } + + } + + i++ + if i == len(events) { + return + } + + case err := <-t.sub.Err(): + t.Fatalf("network stream closed unexpectedly: %s", err) + + case <-timeout: + t.Fatal("timed out waiting for expected events") + } + } +} + +// TestHTTPNodeRPC tests calling RPC methods on nodes via the HTTP API +func TestHTTPNodeRPC(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // start a node in the network + client := NewClient(s.URL) + node, err := client.CreateNode(nil) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + if err := client.StartNode(node.ID); err != nil { + t.Fatalf("error starting node: %s", err) + } + + // create two RPC clients + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + rpcClient1, err := client.RPCClient(ctx, node.ID) + if err != nil { + t.Fatalf("error getting node RPC client: %s", err) + } + rpcClient2, err := client.RPCClient(ctx, node.ID) + if err != nil { + t.Fatalf("error getting node RPC client: %s", err) + } + + // subscribe to events using client 1 + events := make(chan int64, 1) + sub, err := rpcClient1.Subscribe(ctx, "test", events, "events") + if err != nil { + t.Fatalf("error subscribing to events: %s", err) + } + defer sub.Unsubscribe() + + // call some RPC methods using client 2 + if err := rpcClient2.CallContext(ctx, nil, "test_add", 10); err != nil { + t.Fatalf("error calling RPC method: %s", err) + } + var result int64 + if err := rpcClient2.CallContext(ctx, &result, "test_get"); err != nil { + t.Fatalf("error calling RPC method: %s", err) + } + if result != 10 { + t.Fatalf("expected result to be 10, got %d", result) + } + + // check we got an event from client 1 + select { + case event := <-events: + if event != 10 { + t.Fatalf("expected event to be 10, got %d", event) + } + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } +} + +// TestHTTPSnapshot tests creating and loading network snapshots +func TestHTTPSnapshot(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // create a two-node network + client := NewClient(s.URL) + nodeCount := 2 + nodes := make([]*p2p.NodeInfo, nodeCount) + for i := 0; i < nodeCount; i++ { + node, err := client.CreateNode(nil) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + if err := client.StartNode(node.ID); err != nil { + t.Fatalf("error starting node: %s", err) + } + nodes[i] = node + } + if err := client.ConnectNode(nodes[0].ID, nodes[1].ID); err != nil { + t.Fatalf("error connecting nodes: %s", err) + } + + // store some state in the test services + states := make([]string, nodeCount) + for i, node := range nodes { + rpc, err := client.RPCClient(context.Background(), node.ID) + if err != nil { + t.Fatalf("error getting RPC client: %s", err) + } + defer rpc.Close() + state := fmt.Sprintf("%x", rand.Int()) + if err := rpc.Call(nil, "test_setState", []byte(state)); err != nil { + t.Fatalf("error setting service state: %s", err) + } + states[i] = state + } + + // create a snapshot + snap, err := client.CreateSnapshot() + if err != nil { + t.Fatalf("error creating snapshot: %s", err) + } + for i, state := range states { + gotState := snap.Nodes[i].Snapshots["test"] + if string(gotState) != state { + t.Fatalf("expected snapshot state %q, got %q", state, gotState) + } + } + + // create another network + _, s = testHTTPServer(t) + defer s.Close() + client = NewClient(s.URL) + + // subscribe to events so we can check them later + events := make(chan *Event, 100) + var opts SubscribeOpts + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // load the snapshot + if err := client.LoadSnapshot(snap); err != nil { + t.Fatalf("error loading snapshot: %s", err) + } + + // check the nodes and connection exists + net, err := client.GetNetwork() + if err != nil { + t.Fatalf("error getting network: %s", err) + } + if len(net.Nodes) != nodeCount { + t.Fatalf("expected network to have %d nodes, got %d", nodeCount, len(net.Nodes)) + } + for i, node := range nodes { + id := net.Nodes[i].ID().String() + if id != node.ID { + t.Fatalf("expected node %d to have ID %s, got %s", i, node.ID, id) + } + } + if len(net.Conns) != 1 { + t.Fatalf("expected network to have 1 connection, got %d", len(net.Conns)) + } + conn := net.Conns[0] + if conn.One.String() != nodes[0].ID { + t.Fatalf("expected connection to have one=%q, got one=%q", nodes[0].ID, conn.One) + } + if conn.Other.String() != nodes[1].ID { + t.Fatalf("expected connection to have other=%q, got other=%q", nodes[1].ID, conn.Other) + } + + // check the node states were restored + for i, node := range nodes { + rpc, err := client.RPCClient(context.Background(), node.ID) + if err != nil { + t.Fatalf("error getting RPC client: %s", err) + } + defer rpc.Close() + var state []byte + if err := rpc.Call(&state, "test_getState"); err != nil { + t.Fatalf("error getting service state: %s", err) + } + if string(state) != states[i] { + t.Fatalf("expected snapshot state %q, got %q", states[i], state) + } + } + + // check we got all the events + x := &expectEvents{t, events, sub} + x.expect( + x.nodeEvent(nodes[0].ID, false), + x.nodeEvent(nodes[0].ID, true), + x.nodeEvent(nodes[1].ID, false), + x.nodeEvent(nodes[1].ID, true), + x.connEvent(nodes[0].ID, nodes[1].ID, false), + x.connEvent(nodes[0].ID, nodes[1].ID, true), + ) +} + +// TestMsgFilterPassMultiple tests streaming message events using a filter +// with multiple protocols +func TestMsgFilterPassMultiple(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // subscribe to events with a message filter + client := NewClient(s.URL) + events := make(chan *Event, 10) + opts := SubscribeOpts{ + Filter: "prb:0-test:0", + } + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // start a simulation network + startTestNetwork(t, client) + + // check we got the expected events + x := &expectEvents{t, events, sub} + x.expectMsgs(map[MsgFilter]int{ + {"test", 0}: 2, + {"prb", 0}: 2, + }) +} + +// TestMsgFilterPassWildcard tests streaming message events using a filter +// with a code wildcard +func TestMsgFilterPassWildcard(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // subscribe to events with a message filter + client := NewClient(s.URL) + events := make(chan *Event, 10) + opts := SubscribeOpts{ + Filter: "prb:0,2-test:*", + } + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // start a simulation network + startTestNetwork(t, client) + + // check we got the expected events + x := &expectEvents{t, events, sub} + x.expectMsgs(map[MsgFilter]int{ + {"test", 2}: 2, + {"test", 1}: 2, + {"test", 0}: 2, + {"prb", 0}: 2, + }) +} + +// TestMsgFilterPassSingle tests streaming message events using a filter +// with a single protocol and code +func TestMsgFilterPassSingle(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // subscribe to events with a message filter + client := NewClient(s.URL) + events := make(chan *Event, 10) + opts := SubscribeOpts{ + Filter: "dum:0", + } + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // start a simulation network + startTestNetwork(t, client) + + // check we got the expected events + x := &expectEvents{t, events, sub} + x.expectMsgs(map[MsgFilter]int{ + {"dum", 0}: 2, + }) +} + +// TestMsgFilterPassSingle tests streaming message events using an invalid +// filter +func TestMsgFilterFailBadParams(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + client := NewClient(s.URL) + events := make(chan *Event, 10) + opts := SubscribeOpts{ + Filter: "foo:", + } + _, err := client.SubscribeNetwork(events, opts) + if err == nil { + t.Fatalf("expected event subscription to fail but succeeded!") + } + + opts.Filter = "bzz:aa" + _, err = client.SubscribeNetwork(events, opts) + if err == nil { + t.Fatalf("expected event subscription to fail but succeeded!") + } + + opts.Filter = "invalid" + _, err = client.SubscribeNetwork(events, opts) + if err == nil { + t.Fatalf("expected event subscription to fail but succeeded!") + } +} diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go new file mode 100644 index 000000000..06890ffcf --- /dev/null +++ b/p2p/simulations/network.go @@ -0,0 +1,680 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" +) + +// NetworkConfig defines configuration options for starting a Network +type NetworkConfig struct { + ID string `json:"id"` + DefaultService string `json:"default_service,omitempty"` +} + +// Network models a p2p simulation network which consists of a collection of +// simulated nodes and the connections which exist between them. +// +// The Network has a single NodeAdapter which is responsible for actually +// starting nodes and connecting them together. +// +// The Network emits events when nodes are started and stopped, when they are +// connected and disconnected, and also when messages are sent between nodes. +type Network struct { + NetworkConfig + + Nodes []*Node `json:"nodes"` + nodeMap map[discover.NodeID]int + + Conns []*Conn `json:"conns"` + connMap map[string]int + + nodeAdapter adapters.NodeAdapter + events event.Feed + lock sync.RWMutex + quitc chan struct{} +} + +// NewNetwork returns a Network which uses the given NodeAdapter and NetworkConfig +func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network { + return &Network{ + NetworkConfig: *conf, + nodeAdapter: nodeAdapter, + nodeMap: make(map[discover.NodeID]int), + connMap: make(map[string]int), + quitc: make(chan struct{}), + } +} + +// Events returns the output event feed of the Network. +func (self *Network) Events() *event.Feed { + return &self.events +} + +// NewNode adds a new node to the network with a random ID +func (self *Network) NewNode() (*Node, error) { + conf := adapters.RandomNodeConfig() + conf.Services = []string{self.DefaultService} + return self.NewNodeWithConfig(conf) +} + +// NewNodeWithConfig adds a new node to the network with the given config, +// returning an error if a node with the same ID or name already exists +func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) { + self.lock.Lock() + defer self.lock.Unlock() + + // create a random ID and PrivateKey if not set + if conf.ID == (discover.NodeID{}) { + c := adapters.RandomNodeConfig() + conf.ID = c.ID + conf.PrivateKey = c.PrivateKey + } + id := conf.ID + + // assign a name to the node if not set + if conf.Name == "" { + conf.Name = fmt.Sprintf("node%02d", len(self.Nodes)+1) + } + + // check the node doesn't already exist + if node := self.getNode(id); node != nil { + return nil, fmt.Errorf("node with ID %q already exists", id) + } + if node := self.getNodeByName(conf.Name); node != nil { + return nil, fmt.Errorf("node with name %q already exists", conf.Name) + } + + // if no services are configured, use the default service + if len(conf.Services) == 0 { + conf.Services = []string{self.DefaultService} + } + + // use the NodeAdapter to create the node + adapterNode, err := self.nodeAdapter.NewNode(conf) + if err != nil { + return nil, err + } + node := &Node{ + Node: adapterNode, + Config: conf, + } + log.Trace(fmt.Sprintf("node %v created", id)) + self.nodeMap[id] = len(self.Nodes) + self.Nodes = append(self.Nodes, node) + + // emit a "control" event + self.events.Send(ControlEvent(node)) + + return node, nil +} + +// Config returns the network configuration +func (self *Network) Config() *NetworkConfig { + return &self.NetworkConfig +} + +// StartAll starts all nodes in the network +func (self *Network) StartAll() error { + for _, node := range self.Nodes { + if node.Up { + continue + } + if err := self.Start(node.ID()); err != nil { + return err + } + } + return nil +} + +// StopAll stops all nodes in the network +func (self *Network) StopAll() error { + for _, node := range self.Nodes { + if !node.Up { + continue + } + if err := self.Stop(node.ID()); err != nil { + return err + } + } + return nil +} + +// Start starts the node with the given ID +func (self *Network) Start(id discover.NodeID) error { + return self.startWithSnapshots(id, nil) +} + +// startWithSnapshots starts the node with the given ID using the give +// snapshots +func (self *Network) startWithSnapshots(id discover.NodeID, snapshots map[string][]byte) error { + node := self.GetNode(id) + if node == nil { + return fmt.Errorf("node %v does not exist", id) + } + if node.Up { + return fmt.Errorf("node %v already up", id) + } + log.Trace(fmt.Sprintf("starting node %v: %v using %v", id, node.Up, self.nodeAdapter.Name())) + if err := node.Start(snapshots); err != nil { + log.Warn(fmt.Sprintf("start up failed: %v", err)) + return err + } + node.Up = true + log.Info(fmt.Sprintf("started node %v: %v", id, node.Up)) + + self.events.Send(NewEvent(node)) + + // subscribe to peer events + client, err := node.Client() + if err != nil { + return fmt.Errorf("error getting rpc client for node %v: %s", id, err) + } + events := make(chan *p2p.PeerEvent) + sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents") + if err != nil { + return fmt.Errorf("error getting peer events for node %v: %s", id, err) + } + go self.watchPeerEvents(id, events, sub) + return nil +} + +// watchPeerEvents reads peer events from the given channel and emits +// corresponding network events +func (self *Network) watchPeerEvents(id discover.NodeID, events chan *p2p.PeerEvent, sub event.Subscription) { + defer func() { + sub.Unsubscribe() + + // assume the node is now down + self.lock.Lock() + node := self.getNode(id) + node.Up = false + self.lock.Unlock() + self.events.Send(NewEvent(node)) + }() + for { + select { + case event, ok := <-events: + if !ok { + return + } + peer := event.Peer + switch event.Type { + + case p2p.PeerEventTypeAdd: + self.DidConnect(id, peer) + + case p2p.PeerEventTypeDrop: + self.DidDisconnect(id, peer) + + case p2p.PeerEventTypeMsgSend: + self.DidSend(id, peer, event.Protocol, *event.MsgCode) + + case p2p.PeerEventTypeMsgRecv: + self.DidReceive(peer, id, event.Protocol, *event.MsgCode) + + } + + case err := <-sub.Err(): + if err != nil { + log.Error(fmt.Sprintf("error getting peer events for node %v", id), "err", err) + } + return + } + } +} + +// Stop stops the node with the given ID +func (self *Network) Stop(id discover.NodeID) error { + node := self.GetNode(id) + if node == nil { + return fmt.Errorf("node %v does not exist", id) + } + if !node.Up { + return fmt.Errorf("node %v already down", id) + } + if err := node.Stop(); err != nil { + return err + } + node.Up = false + log.Info(fmt.Sprintf("stop node %v: %v", id, node.Up)) + + self.events.Send(ControlEvent(node)) + return nil +} + +// Connect connects two nodes together by calling the "admin_addPeer" RPC +// method on the "one" node so that it connects to the "other" node +func (self *Network) Connect(oneID, otherID discover.NodeID) error { + log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID)) + conn, err := self.GetOrCreateConn(oneID, otherID) + if err != nil { + return err + } + if conn.Up { + return fmt.Errorf("%v and %v already connected", oneID, otherID) + } + if err := conn.nodesUp(); err != nil { + return err + } + client, err := conn.one.Client() + if err != nil { + return err + } + self.events.Send(ControlEvent(conn)) + return client.Call(nil, "admin_addPeer", string(conn.other.Addr())) +} + +// Disconnect disconnects two nodes by calling the "admin_removePeer" RPC +// method on the "one" node so that it disconnects from the "other" node +func (self *Network) Disconnect(oneID, otherID discover.NodeID) error { + conn := self.GetConn(oneID, otherID) + if conn == nil { + return fmt.Errorf("connection between %v and %v does not exist", oneID, otherID) + } + if !conn.Up { + return fmt.Errorf("%v and %v already disconnected", oneID, otherID) + } + client, err := conn.one.Client() + if err != nil { + return err + } + self.events.Send(ControlEvent(conn)) + return client.Call(nil, "admin_removePeer", string(conn.other.Addr())) +} + +// DidConnect tracks the fact that the "one" node connected to the "other" node +func (self *Network) DidConnect(one, other discover.NodeID) error { + conn, err := self.GetOrCreateConn(one, other) + if err != nil { + return fmt.Errorf("connection between %v and %v does not exist", one, other) + } + if conn.Up { + return fmt.Errorf("%v and %v already connected", one, other) + } + conn.Up = true + self.events.Send(NewEvent(conn)) + return nil +} + +// DidDisconnect tracks the fact that the "one" node disconnected from the +// "other" node +func (self *Network) DidDisconnect(one, other discover.NodeID) error { + conn, err := self.GetOrCreateConn(one, other) + if err != nil { + return fmt.Errorf("connection between %v and %v does not exist", one, other) + } + if !conn.Up { + return fmt.Errorf("%v and %v already disconnected", one, other) + } + conn.Up = false + self.events.Send(NewEvent(conn)) + return nil +} + +// DidSend tracks the fact that "sender" sent a message to "receiver" +func (self *Network) DidSend(sender, receiver discover.NodeID, proto string, code uint64) error { + msg := &Msg{ + One: sender, + Other: receiver, + Protocol: proto, + Code: code, + Received: false, + } + self.events.Send(NewEvent(msg)) + return nil +} + +// DidReceive tracks the fact that "receiver" received a message from "sender" +func (self *Network) DidReceive(sender, receiver discover.NodeID, proto string, code uint64) error { + msg := &Msg{ + One: sender, + Other: receiver, + Protocol: proto, + Code: code, + Received: true, + } + self.events.Send(NewEvent(msg)) + return nil +} + +// GetNode gets the node with the given ID, returning nil if the node does not +// exist +func (self *Network) GetNode(id discover.NodeID) *Node { + self.lock.Lock() + defer self.lock.Unlock() + return self.getNode(id) +} + +// GetNode gets the node with the given name, returning nil if the node does +// not exist +func (self *Network) GetNodeByName(name string) *Node { + self.lock.Lock() + defer self.lock.Unlock() + return self.getNodeByName(name) +} + +func (self *Network) getNode(id discover.NodeID) *Node { + i, found := self.nodeMap[id] + if !found { + return nil + } + return self.Nodes[i] +} + +func (self *Network) getNodeByName(name string) *Node { + for _, node := range self.Nodes { + if node.Config.Name == name { + return node + } + } + return nil +} + +// GetNodes returns the existing nodes +func (self *Network) GetNodes() []*Node { + self.lock.Lock() + defer self.lock.Unlock() + return self.Nodes +} + +// GetConn returns the connection which exists between "one" and "other" +// regardless of which node initiated the connection +func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn { + self.lock.Lock() + defer self.lock.Unlock() + return self.getConn(oneID, otherID) +} + +// GetOrCreateConn is like GetConn but creates the connection if it doesn't +// already exist +func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) { + self.lock.Lock() + defer self.lock.Unlock() + if conn := self.getConn(oneID, otherID); conn != nil { + return conn, nil + } + + one := self.getNode(oneID) + if one == nil { + return nil, fmt.Errorf("node %v does not exist", oneID) + } + other := self.getNode(otherID) + if other == nil { + return nil, fmt.Errorf("node %v does not exist", otherID) + } + conn := &Conn{ + One: oneID, + Other: otherID, + one: one, + other: other, + } + label := ConnLabel(oneID, otherID) + self.connMap[label] = len(self.Conns) + self.Conns = append(self.Conns, conn) + return conn, nil +} + +func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn { + label := ConnLabel(oneID, otherID) + i, found := self.connMap[label] + if !found { + return nil + } + return self.Conns[i] +} + +// Shutdown stops all nodes in the network and closes the quit channel +func (self *Network) Shutdown() { + for _, node := range self.Nodes { + log.Debug(fmt.Sprintf("stopping node %s", node.ID().TerminalString())) + if err := node.Stop(); err != nil { + log.Warn(fmt.Sprintf("error stopping node %s", node.ID().TerminalString()), "err", err) + } + } + close(self.quitc) +} + +// Node is a wrapper around adapters.Node which is used to track the status +// of a node in the network +type Node struct { + adapters.Node `json:"-"` + + // Config if the config used to created the node + Config *adapters.NodeConfig `json:"config"` + + // Up tracks whether or not the node is running + Up bool `json:"up"` +} + +// ID returns the ID of the node +func (self *Node) ID() discover.NodeID { + return self.Config.ID +} + +// String returns a log-friendly string +func (self *Node) String() string { + return fmt.Sprintf("Node %v", self.ID().TerminalString()) +} + +// NodeInfo returns information about the node +func (self *Node) NodeInfo() *p2p.NodeInfo { + // avoid a panic if the node is not started yet + if self.Node == nil { + return nil + } + info := self.Node.NodeInfo() + info.Name = self.Config.Name + return info +} + +// MarshalJSON implements the json.Marshaler interface so that the encoded +// JSON includes the NodeInfo +func (self *Node) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Info *p2p.NodeInfo `json:"info,omitempty"` + Config *adapters.NodeConfig `json:"config,omitempty"` + Up bool `json:"up"` + }{ + Info: self.NodeInfo(), + Config: self.Config, + Up: self.Up, + }) +} + +// Conn represents a connection between two nodes in the network +type Conn struct { + // One is the node which initiated the connection + One discover.NodeID `json:"one"` + + // Other is the node which the connection was made to + Other discover.NodeID `json:"other"` + + // Up tracks whether or not the connection is active + Up bool `json:"up"` + + one *Node + other *Node +} + +// nodesUp returns whether both nodes are currently up +func (self *Conn) nodesUp() error { + if !self.one.Up { + return fmt.Errorf("one %v is not up", self.One) + } + if !self.other.Up { + return fmt.Errorf("other %v is not up", self.Other) + } + return nil +} + +// String returns a log-friendly string +func (self *Conn) String() string { + return fmt.Sprintf("Conn %v->%v", self.One.TerminalString(), self.Other.TerminalString()) +} + +// Msg represents a p2p message sent between two nodes in the network +type Msg struct { + One discover.NodeID `json:"one"` + Other discover.NodeID `json:"other"` + Protocol string `json:"protocol"` + Code uint64 `json:"code"` + Received bool `json:"received"` +} + +// String returns a log-friendly string +func (self *Msg) String() string { + return fmt.Sprintf("Msg(%d) %v->%v", self.Code, self.One.TerminalString(), self.Other.TerminalString()) +} + +// ConnLabel generates a deterministic string which represents a connection +// between two nodes, used to compare if two connections are between the same +// nodes +func ConnLabel(source, target discover.NodeID) string { + var first, second discover.NodeID + if bytes.Compare(source.Bytes(), target.Bytes()) > 0 { + first = target + second = source + } else { + first = source + second = target + } + return fmt.Sprintf("%v-%v", first, second) +} + +// Snapshot represents the state of a network at a single point in time and can +// be used to restore the state of a network +type Snapshot struct { + Nodes []NodeSnapshot `json:"nodes,omitempty"` + Conns []Conn `json:"conns,omitempty"` +} + +// NodeSnapshot represents the state of a node in the network +type NodeSnapshot struct { + Node Node `json:"node,omitempty"` + + // Snapshots is arbitrary data gathered from calling node.Snapshots() + Snapshots map[string][]byte `json:"snapshots,omitempty"` +} + +// Snapshot creates a network snapshot +func (self *Network) Snapshot() (*Snapshot, error) { + self.lock.Lock() + defer self.lock.Unlock() + snap := &Snapshot{ + Nodes: make([]NodeSnapshot, len(self.Nodes)), + Conns: make([]Conn, len(self.Conns)), + } + for i, node := range self.Nodes { + snap.Nodes[i] = NodeSnapshot{Node: *node} + if !node.Up { + continue + } + snapshots, err := node.Snapshots() + if err != nil { + return nil, err + } + snap.Nodes[i].Snapshots = snapshots + } + for i, conn := range self.Conns { + snap.Conns[i] = *conn + } + return snap, nil +} + +// Load loads a network snapshot +func (self *Network) Load(snap *Snapshot) error { + for _, n := range snap.Nodes { + if _, err := self.NewNodeWithConfig(n.Node.Config); err != nil { + return err + } + if !n.Node.Up { + continue + } + if err := self.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil { + return err + } + } + for _, conn := range snap.Conns { + if err := self.Connect(conn.One, conn.Other); err != nil { + return err + } + } + return nil +} + +// Subscribe reads control events from a channel and executes them +func (self *Network) Subscribe(events chan *Event) { + for { + select { + case event, ok := <-events: + if !ok { + return + } + if event.Control { + self.executeControlEvent(event) + } + case <-self.quitc: + return + } + } +} + +func (self *Network) executeControlEvent(event *Event) { + log.Trace("execute control event", "type", event.Type, "event", event) + switch event.Type { + case EventTypeNode: + if err := self.executeNodeEvent(event); err != nil { + log.Error("error executing node event", "event", event, "err", err) + } + case EventTypeConn: + if err := self.executeConnEvent(event); err != nil { + log.Error("error executing conn event", "event", event, "err", err) + } + case EventTypeMsg: + log.Warn("ignoring control msg event") + } +} + +func (self *Network) executeNodeEvent(e *Event) error { + if !e.Node.Up { + return self.Stop(e.Node.ID()) + } + + if _, err := self.NewNodeWithConfig(e.Node.Config); err != nil { + return err + } + return self.Start(e.Node.ID()) +} + +func (self *Network) executeConnEvent(e *Event) error { + if e.Conn.Up { + return self.Connect(e.Conn.One, e.Conn.Other) + } else { + return self.Disconnect(e.Conn.One, e.Conn.Other) + } +} diff --git a/p2p/simulations/network_test.go b/p2p/simulations/network_test.go new file mode 100644 index 000000000..2a062121b --- /dev/null +++ b/p2p/simulations/network_test.go @@ -0,0 +1,159 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" +) + +// TestNetworkSimulation creates a multi-node simulation network with each node +// connected in a ring topology, checks that all nodes successfully handshake +// with each other and that a snapshot fully represents the desired topology +func TestNetworkSimulation(t *testing.T) { + // create simulation network with 20 testService nodes + adapter := adapters.NewSimAdapter(adapters.Services{ + "test": newTestService, + }) + network := NewNetwork(adapter, &NetworkConfig{ + DefaultService: "test", + }) + defer network.Shutdown() + nodeCount := 20 + ids := make([]discover.NodeID, nodeCount) + for i := 0; i < nodeCount; i++ { + node, err := network.NewNode() + if err != nil { + t.Fatalf("error creating node: %s", err) + } + if err := network.Start(node.ID()); err != nil { + t.Fatalf("error starting node: %s", err) + } + ids[i] = node.ID() + } + + // perform a check which connects the nodes in a ring (so each node is + // connected to exactly two peers) and then checks that all nodes + // performed two handshakes by checking their peerCount + action := func(_ context.Context) error { + for i, id := range ids { + peerID := ids[(i+1)%len(ids)] + if err := network.Connect(id, peerID); err != nil { + return err + } + } + return nil + } + check := func(ctx context.Context, id discover.NodeID) (bool, error) { + // check we haven't run out of time + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + + // get the node + node := network.GetNode(id) + if node == nil { + return false, fmt.Errorf("unknown node: %s", id) + } + + // check it has exactly two peers + client, err := node.Client() + if err != nil { + return false, err + } + var peerCount int64 + if err := client.CallContext(ctx, &peerCount, "test_peerCount"); err != nil { + return false, err + } + switch { + case peerCount < 2: + return false, nil + case peerCount == 2: + return true, nil + default: + return false, fmt.Errorf("unexpected peerCount: %d", peerCount) + } + } + + timeout := 30 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + // trigger a check every 100ms + trigger := make(chan discover.NodeID) + go triggerChecks(ctx, ids, trigger, 100*time.Millisecond) + + result := NewSimulation(network).Run(ctx, &Step{ + Action: action, + Trigger: trigger, + Expect: &Expectation{ + Nodes: ids, + Check: check, + }, + }) + if result.Error != nil { + t.Fatalf("simulation failed: %s", result.Error) + } + + // take a network snapshot and check it contains the correct topology + snap, err := network.Snapshot() + if err != nil { + t.Fatal(err) + } + if len(snap.Nodes) != nodeCount { + t.Fatalf("expected snapshot to contain %d nodes, got %d", nodeCount, len(snap.Nodes)) + } + if len(snap.Conns) != nodeCount { + t.Fatalf("expected snapshot to contain %d connections, got %d", nodeCount, len(snap.Conns)) + } + for i, id := range ids { + conn := snap.Conns[i] + if conn.One != id { + t.Fatalf("expected conn[%d].One to be %s, got %s", i, id, conn.One) + } + peerID := ids[(i+1)%len(ids)] + if conn.Other != peerID { + t.Fatalf("expected conn[%d].Other to be %s, got %s", i, peerID, conn.Other) + } + } +} + +func triggerChecks(ctx context.Context, ids []discover.NodeID, trigger chan discover.NodeID, interval time.Duration) { + tick := time.NewTicker(interval) + defer tick.Stop() + for { + select { + case <-tick.C: + for _, id := range ids { + select { + case trigger <- id: + case <-ctx.Done(): + return + } + } + case <-ctx.Done(): + return + } + } +} diff --git a/p2p/simulations/simulation.go b/p2p/simulations/simulation.go new file mode 100644 index 000000000..28886e924 --- /dev/null +++ b/p2p/simulations/simulation.go @@ -0,0 +1,157 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/p2p/discover" +) + +// Simulation provides a framework for running actions in a simulated network +// and then waiting for expectations to be met +type Simulation struct { + network *Network +} + +// NewSimulation returns a new simulation which runs in the given network +func NewSimulation(network *Network) *Simulation { + return &Simulation{ + network: network, + } +} + +// Run performs a step of the simulation by performing the step's action and +// then waiting for the step's expectation to be met +func (s *Simulation) Run(ctx context.Context, step *Step) (result *StepResult) { + result = newStepResult() + + result.StartedAt = time.Now() + defer func() { result.FinishedAt = time.Now() }() + + // watch network events for the duration of the step + stop := s.watchNetwork(result) + defer stop() + + // perform the action + if err := step.Action(ctx); err != nil { + result.Error = err + return + } + + // wait for all node expectations to either pass, error or timeout + nodes := make(map[discover.NodeID]struct{}, len(step.Expect.Nodes)) + for _, id := range step.Expect.Nodes { + nodes[id] = struct{}{} + } + for len(result.Passes) < len(nodes) { + select { + case id := <-step.Trigger: + // skip if we aren't checking the node + if _, ok := nodes[id]; !ok { + continue + } + + // skip if the node has already passed + if _, ok := result.Passes[id]; ok { + continue + } + + // run the node expectation check + pass, err := step.Expect.Check(ctx, id) + if err != nil { + result.Error = err + return + } + if pass { + result.Passes[id] = time.Now() + } + case <-ctx.Done(): + result.Error = ctx.Err() + return + } + } + + return +} + +func (s *Simulation) watchNetwork(result *StepResult) func() { + stop := make(chan struct{}) + done := make(chan struct{}) + events := make(chan *Event) + sub := s.network.Events().Subscribe(events) + go func() { + defer close(done) + defer sub.Unsubscribe() + for { + select { + case event := <-events: + result.NetworkEvents = append(result.NetworkEvents, event) + case <-stop: + return + } + } + }() + return func() { + close(stop) + <-done + } +} + +type Step struct { + // Action is the action to perform for this step + Action func(context.Context) error + + // Trigger is a channel which receives node ids and triggers an + // expectation check for that node + Trigger chan discover.NodeID + + // Expect is the expectation to wait for when performing this step + Expect *Expectation +} + +type Expectation struct { + // Nodes is a list of nodes to check + Nodes []discover.NodeID + + // Check checks whether a given node meets the expectation + Check func(context.Context, discover.NodeID) (bool, error) +} + +func newStepResult() *StepResult { + return &StepResult{ + Passes: make(map[discover.NodeID]time.Time), + } +} + +type StepResult struct { + // Error is the error encountered whilst running the step + Error error + + // StartedAt is the time the step started + StartedAt time.Time + + // FinishedAt is the time the step finished + FinishedAt time.Time + + // Passes are the timestamps of the successful node expectations + Passes map[discover.NodeID]time.Time + + // NetworkEvents are the network events which occurred during the step + NetworkEvents []*Event +} |