diff options
author | Bas van Kervel <bas@ethdev.com> | 2016-07-27 23:47:46 +0800 |
---|---|---|
committer | Bas van Kervel <basvankervel@gmail.com> | 2016-08-17 18:59:58 +0800 |
commit | 47ff8130124b479f1f051312eed50c33f0a38e6f (patch) | |
tree | cb29e4550f63f3a763dd04b267261e354e56d7eb /rpc/server.go | |
parent | 3b39d4d1c15df2697284c3d7a61564f98ab45c70 (diff) | |
download | dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.gz dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.zst dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.zip |
rpc: refactor subscriptions and filters
Diffstat (limited to 'rpc/server.go')
-rw-r--r-- | rpc/server.go | 12 |
1 files changed, 6 insertions, 6 deletions
diff --git a/rpc/server.go b/rpc/server.go index 040805a5c..996c63700 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -166,7 +166,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO // to send notification to clients. It is thight to the codec/connection. If the // connection is closed the notifier will stop and cancels all active subscriptions. if options&OptionSubscriptions == OptionSubscriptions { - ctx = context.WithValue(ctx, notifierKey{}, newBufferedNotifier(codec, notificationBufferSize)) + ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec)) } s.codecsMu.Lock() if atomic.LoadInt32(&s.run) != 1 { // server stopped @@ -247,7 +247,7 @@ func (s *Server) Stop() { } // createSubscription will call the subscription callback and returns the subscription id or error. -func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) { +func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) { // subscription have as first argument the context following optional arguments args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)} args = append(args, req.args...) @@ -257,7 +257,7 @@ func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *ser return "", reply[1].Interface().(error) } - return reply[0].Interface().(Subscription).ID(), nil + return reply[0].Interface().(*Subscription).ID, nil } // handle executes a request and returns the response from the callback. @@ -273,8 +273,8 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil } - subid := req.args[0].String() - if err := notifier.Unsubscribe(subid); err != nil { + subid := ID(req.args[0].String()) + if err := notifier.unsubscribe(subid); err != nil { return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil } @@ -292,7 +292,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque // active the subscription after the sub id was successfully sent to the client activateSub := func() { notifier, _ := NotifierFromContext(ctx) - notifier.(*bufferedNotifier).activate(subid) + notifier.activate(subid) } return codec.CreateResponse(req.id, subid), activateSub |