diff options
Diffstat (limited to 'p2p/simulations/http_test.go')
-rw-r--r-- | p2p/simulations/http_test.go | 823 |
1 files changed, 823 insertions, 0 deletions
diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go new file mode 100644 index 000000000..677a8fb14 --- /dev/null +++ b/p2p/simulations/http_test.go @@ -0,0 +1,823 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package simulations + +import ( + "context" + "fmt" + "math/rand" + "net/http/httptest" + "reflect" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" +) + +// testService implements the node.Service interface and provides protocols +// and APIs which are useful for testing nodes in a simulation network +type testService struct { + id discover.NodeID + + // peerCount is incremented once a peer handshake has been performed + peerCount int64 + + peers map[discover.NodeID]*testPeer + peersMtx sync.Mutex + + // state stores []byte which is used to test creating and loading + // snapshots + state atomic.Value +} + +func newTestService(ctx *adapters.ServiceContext) (node.Service, error) { + svc := &testService{ + id: ctx.Config.ID, + peers: make(map[discover.NodeID]*testPeer), + } + svc.state.Store(ctx.Snapshot) + return svc, nil +} + +type testPeer struct { + testReady chan struct{} + dumReady chan struct{} +} + +func (t *testService) peer(id discover.NodeID) *testPeer { + t.peersMtx.Lock() + defer t.peersMtx.Unlock() + if peer, ok := t.peers[id]; ok { + return peer + } + peer := &testPeer{ + testReady: make(chan struct{}), + dumReady: make(chan struct{}), + } + t.peers[id] = peer + return peer +} + +func (t *testService) Protocols() []p2p.Protocol { + return []p2p.Protocol{ + { + Name: "test", + Version: 1, + Length: 3, + Run: t.RunTest, + }, + { + Name: "dum", + Version: 1, + Length: 1, + Run: t.RunDum, + }, + { + Name: "prb", + Version: 1, + Length: 1, + Run: t.RunPrb, + }, + } +} + +func (t *testService) APIs() []rpc.API { + return []rpc.API{{ + Namespace: "test", + Version: "1.0", + Service: &TestAPI{ + state: &t.state, + peerCount: &t.peerCount, + }, + }} +} + +func (t *testService) Start(server *p2p.Server) error { + return nil +} + +func (t *testService) Stop() error { + return nil +} + +// handshake performs a peer handshake by sending and expecting an empty +// message with the given code +func (t *testService) handshake(rw p2p.MsgReadWriter, code uint64) error { + errc := make(chan error, 2) + go func() { errc <- p2p.Send(rw, code, struct{}{}) }() + go func() { errc <- p2p.ExpectMsg(rw, code, struct{}{}) }() + for i := 0; i < 2; i++ { + if err := <-errc; err != nil { + return err + } + } + return nil +} + +func (t *testService) RunTest(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := t.peer(p.ID()) + + // perform three handshakes with three different message codes, + // used to test message sending and filtering + if err := t.handshake(rw, 2); err != nil { + return err + } + if err := t.handshake(rw, 1); err != nil { + return err + } + if err := t.handshake(rw, 0); err != nil { + return err + } + + // close the testReady channel so that other protocols can run + close(peer.testReady) + + // track the peer + atomic.AddInt64(&t.peerCount, 1) + defer atomic.AddInt64(&t.peerCount, -1) + + // block until the peer is dropped + for { + _, err := rw.ReadMsg() + if err != nil { + return err + } + } +} + +func (t *testService) RunDum(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := t.peer(p.ID()) + + // wait for the test protocol to perform its handshake + <-peer.testReady + + // perform a handshake + if err := t.handshake(rw, 0); err != nil { + return err + } + + // close the dumReady channel so that other protocols can run + close(peer.dumReady) + + // block until the peer is dropped + for { + _, err := rw.ReadMsg() + if err != nil { + return err + } + } +} +func (t *testService) RunPrb(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := t.peer(p.ID()) + + // wait for the dum protocol to perform its handshake + <-peer.dumReady + + // perform a handshake + if err := t.handshake(rw, 0); err != nil { + return err + } + + // block until the peer is dropped + for { + _, err := rw.ReadMsg() + if err != nil { + return err + } + } +} + +func (t *testService) Snapshot() ([]byte, error) { + return t.state.Load().([]byte), nil +} + +// TestAPI provides a test API to: +// * get the peer count +// * get and set an arbitrary state byte slice +// * get and increment a counter +// * subscribe to counter increment events +type TestAPI struct { + state *atomic.Value + peerCount *int64 + counter int64 + feed event.Feed +} + +func (t *TestAPI) PeerCount() int64 { + return atomic.LoadInt64(t.peerCount) +} + +func (t *TestAPI) Get() int64 { + return atomic.LoadInt64(&t.counter) +} + +func (t *TestAPI) Add(delta int64) { + atomic.AddInt64(&t.counter, delta) + t.feed.Send(delta) +} + +func (t *TestAPI) GetState() []byte { + return t.state.Load().([]byte) +} + +func (t *TestAPI) SetState(state []byte) { + t.state.Store(state) +} + +func (t *TestAPI) Events(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + events := make(chan int64) + sub := t.feed.Subscribe(events) + defer sub.Unsubscribe() + + for { + select { + case event := <-events: + notifier.Notify(rpcSub.ID, event) + case <-sub.Err(): + return + case <-rpcSub.Err(): + return + case <-notifier.Closed(): + return + } + } + }() + + return rpcSub, nil +} + +var testServices = adapters.Services{ + "test": newTestService, +} + +func testHTTPServer(t *testing.T) (*Network, *httptest.Server) { + adapter := adapters.NewSimAdapter(testServices) + network := NewNetwork(adapter, &NetworkConfig{ + DefaultService: "test", + }) + return network, httptest.NewServer(NewServer(network)) +} + +// TestHTTPNetwork tests interacting with a simulation network using the HTTP +// API +func TestHTTPNetwork(t *testing.T) { + // start the server + network, s := testHTTPServer(t) + defer s.Close() + + // subscribe to events so we can check them later + client := NewClient(s.URL) + events := make(chan *Event, 100) + var opts SubscribeOpts + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // check we can retrieve details about the network + gotNetwork, err := client.GetNetwork() + if err != nil { + t.Fatalf("error getting network: %s", err) + } + if gotNetwork.ID != network.ID { + t.Fatalf("expected network to have ID %q, got %q", network.ID, gotNetwork.ID) + } + + // start a simulation network + nodeIDs := startTestNetwork(t, client) + + // check we got all the events + x := &expectEvents{t, events, sub} + x.expect( + x.nodeEvent(nodeIDs[0], false), + x.nodeEvent(nodeIDs[1], false), + x.nodeEvent(nodeIDs[0], true), + x.nodeEvent(nodeIDs[1], true), + x.connEvent(nodeIDs[0], nodeIDs[1], false), + x.connEvent(nodeIDs[0], nodeIDs[1], true), + ) + + // reconnect the stream and check we get the current nodes and conns + events = make(chan *Event, 100) + opts.Current = true + sub, err = client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + x = &expectEvents{t, events, sub} + x.expect( + x.nodeEvent(nodeIDs[0], true), + x.nodeEvent(nodeIDs[1], true), + x.connEvent(nodeIDs[0], nodeIDs[1], true), + ) +} + +func startTestNetwork(t *testing.T, client *Client) []string { + // create two nodes + nodeCount := 2 + nodeIDs := make([]string, nodeCount) + for i := 0; i < nodeCount; i++ { + node, err := client.CreateNode(nil) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + nodeIDs[i] = node.ID + } + + // check both nodes exist + nodes, err := client.GetNodes() + if err != nil { + t.Fatalf("error getting nodes: %s", err) + } + if len(nodes) != nodeCount { + t.Fatalf("expected %d nodes, got %d", nodeCount, len(nodes)) + } + for i, nodeID := range nodeIDs { + if nodes[i].ID != nodeID { + t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, nodes[i].ID) + } + node, err := client.GetNode(nodeID) + if err != nil { + t.Fatalf("error getting node %d: %s", i, err) + } + if node.ID != nodeID { + t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, node.ID) + } + } + + // start both nodes + for _, nodeID := range nodeIDs { + if err := client.StartNode(nodeID); err != nil { + t.Fatalf("error starting node %q: %s", nodeID, err) + } + } + + // connect the nodes + for i := 0; i < nodeCount-1; i++ { + peerId := i + 1 + if i == nodeCount-1 { + peerId = 0 + } + if err := client.ConnectNode(nodeIDs[i], nodeIDs[peerId]); err != nil { + t.Fatalf("error connecting nodes: %s", err) + } + } + + return nodeIDs +} + +type expectEvents struct { + *testing.T + + events chan *Event + sub event.Subscription +} + +func (t *expectEvents) nodeEvent(id string, up bool) *Event { + return &Event{ + Type: EventTypeNode, + Node: &Node{ + Config: &adapters.NodeConfig{ + ID: discover.MustHexID(id), + }, + Up: up, + }, + } +} + +func (t *expectEvents) connEvent(one, other string, up bool) *Event { + return &Event{ + Type: EventTypeConn, + Conn: &Conn{ + One: discover.MustHexID(one), + Other: discover.MustHexID(other), + Up: up, + }, + } +} + +func (t *expectEvents) expectMsgs(expected map[MsgFilter]int) { + actual := make(map[MsgFilter]int) + timeout := time.After(10 * time.Second) +loop: + for { + select { + case event := <-t.events: + t.Logf("received %s event: %s", event.Type, event) + + if event.Type != EventTypeMsg || event.Msg.Received { + continue loop + } + if event.Msg == nil { + t.Fatal("expected event.Msg to be set") + } + filter := MsgFilter{ + Proto: event.Msg.Protocol, + Code: int64(event.Msg.Code), + } + actual[filter]++ + if actual[filter] > expected[filter] { + t.Fatalf("received too many msgs for filter: %v", filter) + } + if reflect.DeepEqual(actual, expected) { + return + } + + case err := <-t.sub.Err(): + t.Fatalf("network stream closed unexpectedly: %s", err) + + case <-timeout: + t.Fatal("timed out waiting for expected events") + } + } +} + +func (t *expectEvents) expect(events ...*Event) { + timeout := time.After(10 * time.Second) + i := 0 + for { + select { + case event := <-t.events: + t.Logf("received %s event: %s", event.Type, event) + + expected := events[i] + if event.Type != expected.Type { + t.Fatalf("expected event %d to have type %q, got %q", i, expected.Type, event.Type) + } + + switch expected.Type { + + case EventTypeNode: + if event.Node == nil { + t.Fatal("expected event.Node to be set") + } + if event.Node.ID() != expected.Node.ID() { + t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString()) + } + if event.Node.Up != expected.Node.Up { + t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up) + } + + case EventTypeConn: + if event.Conn == nil { + t.Fatal("expected event.Conn to be set") + } + if event.Conn.One != expected.Conn.One { + t.Fatalf("expected conn event %d to have one=%q, got one=%q", i, expected.Conn.One.TerminalString(), event.Conn.One.TerminalString()) + } + if event.Conn.Other != expected.Conn.Other { + t.Fatalf("expected conn event %d to have other=%q, got other=%q", i, expected.Conn.Other.TerminalString(), event.Conn.Other.TerminalString()) + } + if event.Conn.Up != expected.Conn.Up { + t.Fatalf("expected conn event %d to have up=%t, got up=%t", i, expected.Conn.Up, event.Conn.Up) + } + + } + + i++ + if i == len(events) { + return + } + + case err := <-t.sub.Err(): + t.Fatalf("network stream closed unexpectedly: %s", err) + + case <-timeout: + t.Fatal("timed out waiting for expected events") + } + } +} + +// TestHTTPNodeRPC tests calling RPC methods on nodes via the HTTP API +func TestHTTPNodeRPC(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // start a node in the network + client := NewClient(s.URL) + node, err := client.CreateNode(nil) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + if err := client.StartNode(node.ID); err != nil { + t.Fatalf("error starting node: %s", err) + } + + // create two RPC clients + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + rpcClient1, err := client.RPCClient(ctx, node.ID) + if err != nil { + t.Fatalf("error getting node RPC client: %s", err) + } + rpcClient2, err := client.RPCClient(ctx, node.ID) + if err != nil { + t.Fatalf("error getting node RPC client: %s", err) + } + + // subscribe to events using client 1 + events := make(chan int64, 1) + sub, err := rpcClient1.Subscribe(ctx, "test", events, "events") + if err != nil { + t.Fatalf("error subscribing to events: %s", err) + } + defer sub.Unsubscribe() + + // call some RPC methods using client 2 + if err := rpcClient2.CallContext(ctx, nil, "test_add", 10); err != nil { + t.Fatalf("error calling RPC method: %s", err) + } + var result int64 + if err := rpcClient2.CallContext(ctx, &result, "test_get"); err != nil { + t.Fatalf("error calling RPC method: %s", err) + } + if result != 10 { + t.Fatalf("expected result to be 10, got %d", result) + } + + // check we got an event from client 1 + select { + case event := <-events: + if event != 10 { + t.Fatalf("expected event to be 10, got %d", event) + } + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } +} + +// TestHTTPSnapshot tests creating and loading network snapshots +func TestHTTPSnapshot(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // create a two-node network + client := NewClient(s.URL) + nodeCount := 2 + nodes := make([]*p2p.NodeInfo, nodeCount) + for i := 0; i < nodeCount; i++ { + node, err := client.CreateNode(nil) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + if err := client.StartNode(node.ID); err != nil { + t.Fatalf("error starting node: %s", err) + } + nodes[i] = node + } + if err := client.ConnectNode(nodes[0].ID, nodes[1].ID); err != nil { + t.Fatalf("error connecting nodes: %s", err) + } + + // store some state in the test services + states := make([]string, nodeCount) + for i, node := range nodes { + rpc, err := client.RPCClient(context.Background(), node.ID) + if err != nil { + t.Fatalf("error getting RPC client: %s", err) + } + defer rpc.Close() + state := fmt.Sprintf("%x", rand.Int()) + if err := rpc.Call(nil, "test_setState", []byte(state)); err != nil { + t.Fatalf("error setting service state: %s", err) + } + states[i] = state + } + + // create a snapshot + snap, err := client.CreateSnapshot() + if err != nil { + t.Fatalf("error creating snapshot: %s", err) + } + for i, state := range states { + gotState := snap.Nodes[i].Snapshots["test"] + if string(gotState) != state { + t.Fatalf("expected snapshot state %q, got %q", state, gotState) + } + } + + // create another network + _, s = testHTTPServer(t) + defer s.Close() + client = NewClient(s.URL) + + // subscribe to events so we can check them later + events := make(chan *Event, 100) + var opts SubscribeOpts + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // load the snapshot + if err := client.LoadSnapshot(snap); err != nil { + t.Fatalf("error loading snapshot: %s", err) + } + + // check the nodes and connection exists + net, err := client.GetNetwork() + if err != nil { + t.Fatalf("error getting network: %s", err) + } + if len(net.Nodes) != nodeCount { + t.Fatalf("expected network to have %d nodes, got %d", nodeCount, len(net.Nodes)) + } + for i, node := range nodes { + id := net.Nodes[i].ID().String() + if id != node.ID { + t.Fatalf("expected node %d to have ID %s, got %s", i, node.ID, id) + } + } + if len(net.Conns) != 1 { + t.Fatalf("expected network to have 1 connection, got %d", len(net.Conns)) + } + conn := net.Conns[0] + if conn.One.String() != nodes[0].ID { + t.Fatalf("expected connection to have one=%q, got one=%q", nodes[0].ID, conn.One) + } + if conn.Other.String() != nodes[1].ID { + t.Fatalf("expected connection to have other=%q, got other=%q", nodes[1].ID, conn.Other) + } + + // check the node states were restored + for i, node := range nodes { + rpc, err := client.RPCClient(context.Background(), node.ID) + if err != nil { + t.Fatalf("error getting RPC client: %s", err) + } + defer rpc.Close() + var state []byte + if err := rpc.Call(&state, "test_getState"); err != nil { + t.Fatalf("error getting service state: %s", err) + } + if string(state) != states[i] { + t.Fatalf("expected snapshot state %q, got %q", states[i], state) + } + } + + // check we got all the events + x := &expectEvents{t, events, sub} + x.expect( + x.nodeEvent(nodes[0].ID, false), + x.nodeEvent(nodes[0].ID, true), + x.nodeEvent(nodes[1].ID, false), + x.nodeEvent(nodes[1].ID, true), + x.connEvent(nodes[0].ID, nodes[1].ID, false), + x.connEvent(nodes[0].ID, nodes[1].ID, true), + ) +} + +// TestMsgFilterPassMultiple tests streaming message events using a filter +// with multiple protocols +func TestMsgFilterPassMultiple(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // subscribe to events with a message filter + client := NewClient(s.URL) + events := make(chan *Event, 10) + opts := SubscribeOpts{ + Filter: "prb:0-test:0", + } + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // start a simulation network + startTestNetwork(t, client) + + // check we got the expected events + x := &expectEvents{t, events, sub} + x.expectMsgs(map[MsgFilter]int{ + {"test", 0}: 2, + {"prb", 0}: 2, + }) +} + +// TestMsgFilterPassWildcard tests streaming message events using a filter +// with a code wildcard +func TestMsgFilterPassWildcard(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // subscribe to events with a message filter + client := NewClient(s.URL) + events := make(chan *Event, 10) + opts := SubscribeOpts{ + Filter: "prb:0,2-test:*", + } + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // start a simulation network + startTestNetwork(t, client) + + // check we got the expected events + x := &expectEvents{t, events, sub} + x.expectMsgs(map[MsgFilter]int{ + {"test", 2}: 2, + {"test", 1}: 2, + {"test", 0}: 2, + {"prb", 0}: 2, + }) +} + +// TestMsgFilterPassSingle tests streaming message events using a filter +// with a single protocol and code +func TestMsgFilterPassSingle(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + // subscribe to events with a message filter + client := NewClient(s.URL) + events := make(chan *Event, 10) + opts := SubscribeOpts{ + Filter: "dum:0", + } + sub, err := client.SubscribeNetwork(events, opts) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // start a simulation network + startTestNetwork(t, client) + + // check we got the expected events + x := &expectEvents{t, events, sub} + x.expectMsgs(map[MsgFilter]int{ + {"dum", 0}: 2, + }) +} + +// TestMsgFilterPassSingle tests streaming message events using an invalid +// filter +func TestMsgFilterFailBadParams(t *testing.T) { + // start the server + _, s := testHTTPServer(t) + defer s.Close() + + client := NewClient(s.URL) + events := make(chan *Event, 10) + opts := SubscribeOpts{ + Filter: "foo:", + } + _, err := client.SubscribeNetwork(events, opts) + if err == nil { + t.Fatalf("expected event subscription to fail but succeeded!") + } + + opts.Filter = "bzz:aa" + _, err = client.SubscribeNetwork(events, opts) + if err == nil { + t.Fatalf("expected event subscription to fail but succeeded!") + } + + opts.Filter = "invalid" + _, err = client.SubscribeNetwork(events, opts) + if err == nil { + t.Fatalf("expected event subscription to fail but succeeded!") + } +} |