aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/client_test.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2016-08-05 03:18:13 +0800
committerFelix Lange <fjl@twurst.com>2016-08-06 02:48:33 +0800
commitf5f042ffdc9fed3094b86f3dbbc85bb63a4f9537 (patch)
tree2c7fc00059c94a9178b6e75e29f31a3cacc2b68b /rpc/client_test.go
parent464660651ddf7e8938a0fbb03f140502180a8062 (diff)
downloaddexon-f5f042ffdc9fed3094b86f3dbbc85bb63a4f9537.tar.gz
dexon-f5f042ffdc9fed3094b86f3dbbc85bb63a4f9537.tar.zst
dexon-f5f042ffdc9fed3094b86f3dbbc85bb63a4f9537.zip
rpc: ensure client doesn't block for slow subscribers
I initially made the client block if the 100-element buffer was exceeded. It turns out that this is inconvenient for simple uses of the client which subscribe and perform calls on the same goroutine, e.g. client, _ := rpc.Dial(...) ch := make(chan int) // note: no buffer sub, _ := client.EthSubscribe(ch, "something") for event := range ch { client.Call(...) } This innocent looking code will lock up if the server suddenly decides to send 2000 notifications. In this case, the client's main loop won't accept the call because it is trying to deliver a notification to ch. The issue is kind of hard to explain in the docs and few people will actually read them. Buffering is the simple option and works with close to no overhead for subscribers that always listen.
Diffstat (limited to 'rpc/client_test.go')
-rw-r--r--rpc/client_test.go51
1 files changed, 51 insertions, 0 deletions
diff --git a/rpc/client_test.go b/rpc/client_test.go
index 58dceada0..424d7f5bc 100644
--- a/rpc/client_test.go
+++ b/rpc/client_test.go
@@ -296,6 +296,57 @@ func TestClientSubscribeClose(t *testing.T) {
}
}
+// This test checks that Client doesn't lock up when a single subscriber
+// doesn't read subscription events.
+func TestClientNotificationStorm(t *testing.T) {
+ server := newTestServer("eth", new(NotificationTestService))
+ defer server.Stop()
+
+ doTest := func(count int, wantError bool) {
+ client := DialInProc(server)
+ defer client.Close()
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ // Subscribe on the server. It will start sending many notifications
+ // very quickly.
+ nc := make(chan int)
+ sub, err := client.EthSubscribe(nc, "someSubscription", count, 0)
+ if err != nil {
+ t.Fatal("can't subscribe:", err)
+ }
+ defer sub.Unsubscribe()
+
+ // Process each notification, try to run a call in between each of them.
+ for i := 0; i < count; i++ {
+ select {
+ case val := <-nc:
+ if val != i {
+ t.Fatalf("(%d/%d) unexpected value %d", i, count, val)
+ }
+ case err := <-sub.Err():
+ if wantError && err != ErrSubscriptionQueueOverflow {
+ t.Fatalf("(%d/%d) got error %q, want %q", i, count, err, ErrSubscriptionQueueOverflow)
+ } else if !wantError {
+ t.Fatalf("(%d/%d) got unexpected error %q", i, count, err)
+ }
+ return
+ }
+ var r int
+ err := client.CallContext(ctx, &r, "eth_echo", i)
+ if err != nil {
+ if !wantError {
+ t.Fatalf("(%d/%d) call error: %v", i, count, err)
+ }
+ return
+ }
+ }
+ }
+
+ doTest(8000, false)
+ doTest(10000, true)
+}
+
func TestClientHTTP(t *testing.T) {
server := newTestServer("service", new(Service))
defer server.Stop()