diff options
author | Lewis Marshall <lewis@lmars.net> | 2017-09-25 16:08:07 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-09-25 16:08:07 +0800 |
commit | 9feec51e2dd754819e5c730ac5985d28d57adb48 (patch) | |
tree | 32b07b659cf7d0b4c1a7da67b5c49daf7a10a9d3 /rpc | |
parent | 673007d7aed1d2678ea3277eceb7b55dc29cf092 (diff) | |
download | go-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.gz go-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.tar.zst go-tangerine-9feec51e2dd754819e5c730ac5985d28d57adb48.zip |
p2p: add network simulation framework (#14982)
This commit introduces a network simulation framework which
can be used to run simulated networks of devp2p nodes. The
intention is to use this for testing protocols, performing
benchmarks and visualising emergent network behaviour.
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/client.go | 64 | ||||
-rw-r--r-- | rpc/client_test.go | 32 |
2 files changed, 46 insertions, 50 deletions
diff --git a/rpc/client.go b/rpc/client.go index f02366a39..8aa84ec98 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -349,85 +349,49 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { return err } -// ShhSubscribe calls the "shh_subscribe" method with the given arguments, -// registering a subscription. Server notifications for the subscription are -// sent to the given channel. The element type of the channel must match the -// expected type of content returned by the subscription. -// -// The context argument cancels the RPC request that sets up the subscription but has no -// effect on the subscription after ShhSubscribe has returned. -// -// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications -// before considering the subscriber dead. The subscription Err channel will receive -// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure -// that the channel usually has at least one reader to prevent this issue. -func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { - // Check type of channel first. - chanVal := reflect.ValueOf(channel) - if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { - panic("first argument to ShhSubscribe must be a writable channel") - } - if chanVal.IsNil() { - panic("channel given to ShhSubscribe must not be nil") - } - if c.isHTTP { - return nil, ErrNotificationsUnsupported - } - - msg, err := c.newMessage("shh"+subscribeMethodSuffix, args...) - if err != nil { - return nil, err - } - op := &requestOp{ - ids: []json.RawMessage{msg.ID}, - resp: make(chan *jsonrpcMessage), - sub: newClientSubscription(c, "shh", chanVal), - } +// EthSubscribe registers a subscripion under the "eth" namespace. +func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { + return c.Subscribe(ctx, "eth", channel, args...) +} - // Send the subscription request. - // The arrival and validity of the response is signaled on sub.quit. - if err := c.send(ctx, op, msg); err != nil { - return nil, err - } - if _, err := op.wait(ctx); err != nil { - return nil, err - } - return op.sub, nil +// ShhSubscribe registers a subscripion under the "shh" namespace. +func (c *Client) ShhSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { + return c.Subscribe(ctx, "shh", channel, args...) } -// EthSubscribe calls the "eth_subscribe" method with the given arguments, +// Subscribe calls the "<namespace>_subscribe" method with the given arguments, // registering a subscription. Server notifications for the subscription are // sent to the given channel. The element type of the channel must match the // expected type of content returned by the subscription. // // The context argument cancels the RPC request that sets up the subscription but has no -// effect on the subscription after EthSubscribe has returned. +// effect on the subscription after Subscribe has returned. // // Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications // before considering the subscriber dead. The subscription Err channel will receive // ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure // that the channel usually has at least one reader to prevent this issue. -func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...interface{}) (*ClientSubscription, error) { +func (c *Client) Subscribe(ctx context.Context, namespace string, channel interface{}, args ...interface{}) (*ClientSubscription, error) { // Check type of channel first. chanVal := reflect.ValueOf(channel) if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { - panic("first argument to EthSubscribe must be a writable channel") + panic("first argument to Subscribe must be a writable channel") } if chanVal.IsNil() { - panic("channel given to EthSubscribe must not be nil") + panic("channel given to Subscribe must not be nil") } if c.isHTTP { return nil, ErrNotificationsUnsupported } - msg, err := c.newMessage("eth"+subscribeMethodSuffix, args...) + msg, err := c.newMessage(namespace+subscribeMethodSuffix, args...) if err != nil { return nil, err } op := &requestOp{ ids: []json.RawMessage{msg.ID}, resp: make(chan *jsonrpcMessage), - sub: newClientSubscription(c, "eth", chanVal), + sub: newClientSubscription(c, namespace, chanVal), } // Send the subscription request. diff --git a/rpc/client_test.go b/rpc/client_test.go index 10d74670b..4f354d389 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -251,6 +251,38 @@ func TestClientSubscribe(t *testing.T) { } } +func TestClientSubscribeCustomNamespace(t *testing.T) { + namespace := "custom" + server := newTestServer(namespace, new(NotificationTestService)) + defer server.Stop() + client := DialInProc(server) + defer client.Close() + + nc := make(chan int) + count := 10 + sub, err := client.Subscribe(context.Background(), namespace, nc, "someSubscription", count, 0) + if err != nil { + t.Fatal("can't subscribe:", err) + } + for i := 0; i < count; i++ { + if val := <-nc; val != i { + t.Fatalf("value mismatch: got %d, want %d", val, i) + } + } + + sub.Unsubscribe() + select { + case v := <-nc: + t.Fatal("received value after unsubscribe:", v) + case err := <-sub.Err(): + if err != nil { + t.Fatalf("Err returned a non-nil error after explicit unsubscribe: %q", err) + } + case <-time.After(1 * time.Second): + t.Fatalf("subscription not closed within 1s after unsubscribe") + } +} + // In this test, the connection drops while EthSubscribe is // waiting for a response. func TestClientSubscribeClose(t *testing.T) { |