From f5f042ffdc9fed3094b86f3dbbc85bb63a4f9537 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 4 Aug 2016 21:18:13 +0200 Subject: rpc: ensure client doesn't block for slow subscribers I initially made the client block if the 100-element buffer was exceeded. It turns out that this is inconvenient for simple uses of the client which subscribe and perform calls on the same goroutine, e.g. client, _ := rpc.Dial(...) ch := make(chan int) // note: no buffer sub, _ := client.EthSubscribe(ch, "something") for event := range ch { client.Call(...) } This innocent looking code will lock up if the server suddenly decides to send 2000 notifications. In this case, the client's main loop won't accept the call because it is trying to deliver a notification to ch. The issue is kind of hard to explain in the docs and few people will actually read them. Buffering is the simple option and works with close to no overhead for subscribers that always listen. --- rpc/client.go | 92 ++++++++++++++++++++++++++++++++---------------- rpc/client_test.go | 51 +++++++++++++++++++++++++++ rpc/notification_test.go | 4 +++ 3 files changed, 117 insertions(+), 30 deletions(-) (limited to 'rpc') diff --git a/rpc/client.go b/rpc/client.go index 0c52402ea..6846e1dda 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -18,6 +18,7 @@ package rpc import ( "bytes" + "container/list" "encoding/json" "errors" "fmt" @@ -35,16 +36,31 @@ import ( ) var ( - ErrClientQuit = errors.New("client is closed") - ErrNoResult = errors.New("no result in JSON-RPC response") + ErrClientQuit = errors.New("client is closed") + ErrNoResult = errors.New("no result in JSON-RPC response") + ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") ) const ( - clientSubscriptionBuffer = 100 // if exceeded, the client stops reading - tcpKeepAliveInterval = 30 * time.Second - defaultDialTimeout = 10 * time.Second // used when dialing if the context has no deadline - defaultWriteTimeout = 10 * time.Second // used for calls if the context has no deadline - subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls + // Timeouts + tcpKeepAliveInterval = 30 * time.Second + defaultDialTimeout = 10 * time.Second // used when dialing if the context has no deadline + defaultWriteTimeout = 10 * time.Second // used for calls if the context has no deadline + subscribeTimeout = 5 * time.Second // overall timeout eth_subscribe, rpc_modules calls +) + +const ( + // Subscriptions are removed when the subscriber cannot keep up. + // + // This can be worked around by supplying a channel with sufficiently sized buffer, + // but this can be inconvenient and hard to explain in the docs. Another issue with + // buffered channels is that the buffer is static even though it might not be needed + // most of the time. + // + // The approach taken here is to maintain a per-subscription linked list buffer + // shrinks on demand. If the buffer reaches the size below, the subscription is + // dropped. + maxClientSubscriptionBuffer = 8000 ) // BatchElem is an element in a batch request. @@ -276,9 +292,9 @@ func (c *Client) BatchCall(b []BatchElem) error { // to return a response for all of them. The wait duration is bounded by the // context's deadline. // -// In contrast to CallContext, BatchCallContext only returns I/O errors. Any -// error specific to a request is reported through the Error field of the -// corresponding BatchElem. +// In contrast to CallContext, BatchCallContext only returns errors that have occurred +// while sending the request. Any error specific to a request is reported through the +// Error field of the corresponding BatchElem. // // Note that batch calls may not be executed atomically on the server side. func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { @@ -338,11 +354,11 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { // sent to the given channel. The element type of the channel must match the // expected type of content returned by the subscription. // -// Callers should not use the same channel for multiple calls to EthSubscribe. -// The channel is closed when the notification is unsubscribed or an error -// occurs. The error can be retrieved via the Err method of the subscription. // -// Slow subscribers will block the clients ingress path eventually. +// Slow subscribers will be dropped eventually. Client buffers up to 8000 notifications +// before considering the subscriber dead. The subscription Err channel will receive +// ErrSubscriptionQueueOverflow. Use a sufficiently large buffer on the channel or ensure +// that the channel usually has at least one reader to prevent this issue. func (c *Client) EthSubscribe(channel interface{}, args ...interface{}) (*ClientSubscription, error) { // Check type of channel first. chanVal := reflect.ValueOf(channel) @@ -657,8 +673,7 @@ func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription channel: channel, quit: make(chan struct{}), err: make(chan error, 1), - // in is buffered so dispatch can continue even if the subscriber is slow. - in: make(chan json.RawMessage, clientSubscriptionBuffer), + in: make(chan json.RawMessage), } return sub } @@ -684,13 +699,16 @@ func (sub *ClientSubscription) Unsubscribe() { func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) { sub.quitOnce.Do(func() { + // The dispatch loop won't be able to execute the unsubscribe call + // if it is blocked on deliver. Close sub.quit first because it + // unblocks deliver. + close(sub.quit) if unsubscribeServer { sub.requestUnsubscribe() } if err != nil { sub.err <- err } - close(sub.quit) }) } @@ -710,32 +728,46 @@ func (sub *ClientSubscription) start() { func (sub *ClientSubscription) forward() (err error, unsubscribeServer bool) { cases := []reflect.SelectCase{ {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.quit)}, + {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.in)}, {Dir: reflect.SelectSend, Chan: sub.channel}, } + buffer := list.New() + defer buffer.Init() for { - select { - case result := <-sub.in: - val, err := sub.unmarshal(result) + var chosen int + var recv reflect.Value + if buffer.Len() == 0 { + // Idle, omit send case. + chosen, recv, _ = reflect.Select(cases[:2]) + } else { + // Non-empty buffer, send the first queued item. + cases[2].Send = reflect.ValueOf(buffer.Front().Value) + chosen, recv, _ = reflect.Select(cases) + } + + switch chosen { + case 0: // <-sub.quit + return nil, false + case 1: // <-sub.in + val, err := sub.unmarshal(recv.Interface().(json.RawMessage)) if err != nil { return err, true } - cases[1].Send = val - switch chosen, _, _ := reflect.Select(cases); chosen { - case 0: // <-sub.quit - return nil, false - case 1: // sub.channel<- - continue + if buffer.Len() == maxClientSubscriptionBuffer { + return ErrSubscriptionQueueOverflow, true } - case <-sub.quit: - return nil, false + buffer.PushBack(val) + case 2: // sub.channel<- + cases[2].Send = reflect.Value{} // Don't hold onto the value. + buffer.Remove(buffer.Front()) } } } -func (sub *ClientSubscription) unmarshal(result json.RawMessage) (reflect.Value, error) { +func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, error) { val := reflect.New(sub.etype) err := json.Unmarshal(result, val.Interface()) - return val.Elem(), err + return val.Elem().Interface(), err } func (sub *ClientSubscription) requestUnsubscribe() error { diff --git a/rpc/client_test.go b/rpc/client_test.go index 58dceada0..424d7f5bc 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -296,6 +296,57 @@ func TestClientSubscribeClose(t *testing.T) { } } +// This test checks that Client doesn't lock up when a single subscriber +// doesn't read subscription events. +func TestClientNotificationStorm(t *testing.T) { + server := newTestServer("eth", new(NotificationTestService)) + defer server.Stop() + + doTest := func(count int, wantError bool) { + client := DialInProc(server) + defer client.Close() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Subscribe on the server. It will start sending many notifications + // very quickly. + nc := make(chan int) + sub, err := client.EthSubscribe(nc, "someSubscription", count, 0) + if err != nil { + t.Fatal("can't subscribe:", err) + } + defer sub.Unsubscribe() + + // Process each notification, try to run a call in between each of them. + for i := 0; i < count; i++ { + select { + case val := <-nc: + if val != i { + t.Fatalf("(%d/%d) unexpected value %d", i, count, val) + } + case err := <-sub.Err(): + if wantError && err != ErrSubscriptionQueueOverflow { + t.Fatalf("(%d/%d) got error %q, want %q", i, count, err, ErrSubscriptionQueueOverflow) + } else if !wantError { + t.Fatalf("(%d/%d) got unexpected error %q", i, count, err) + } + return + } + var r int + err := client.CallContext(ctx, &r, "eth_echo", i) + if err != nil { + if !wantError { + t.Fatalf("(%d/%d) call error: %v", i, count, err) + } + return + } + } + } + + doTest(8000, false) + doTest(10000, true) +} + func TestClientHTTP(t *testing.T) { server := newTestServer("service", new(Service)) defer server.Stop() diff --git a/rpc/notification_test.go b/rpc/notification_test.go index 280503222..52352848c 100644 --- a/rpc/notification_test.go +++ b/rpc/notification_test.go @@ -34,6 +34,10 @@ type NotificationTestService struct { unblockHangSubscription chan struct{} } +func (s *NotificationTestService) Echo(i int) int { + return i +} + func (s *NotificationTestService) wasUnsubCallbackCalled() bool { s.mu.Lock() defer s.mu.Unlock() -- cgit