aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/server.go
diff options
context:
space:
mode:
authorBas van Kervel <bas@ethdev.com>2016-07-27 23:47:46 +0800
committerBas van Kervel <basvankervel@gmail.com>2016-08-17 18:59:58 +0800
commit47ff8130124b479f1f051312eed50c33f0a38e6f (patch)
treecb29e4550f63f3a763dd04b267261e354e56d7eb /rpc/server.go
parent3b39d4d1c15df2697284c3d7a61564f98ab45c70 (diff)
downloaddexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.gz
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.tar.zst
dexon-47ff8130124b479f1f051312eed50c33f0a38e6f.zip
rpc: refactor subscriptions and filters
Diffstat (limited to 'rpc/server.go')
-rw-r--r--rpc/server.go12
1 files changed, 6 insertions, 6 deletions
diff --git a/rpc/server.go b/rpc/server.go
index 040805a5c..996c63700 100644
--- a/rpc/server.go
+++ b/rpc/server.go
@@ -166,7 +166,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
// to send notification to clients. It is thight to the codec/connection. If the
// connection is closed the notifier will stop and cancels all active subscriptions.
if options&OptionSubscriptions == OptionSubscriptions {
- ctx = context.WithValue(ctx, notifierKey{}, newBufferedNotifier(codec, notificationBufferSize))
+ ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec))
}
s.codecsMu.Lock()
if atomic.LoadInt32(&s.run) != 1 { // server stopped
@@ -247,7 +247,7 @@ func (s *Server) Stop() {
}
// createSubscription will call the subscription callback and returns the subscription id or error.
-func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) {
+func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) {
// subscription have as first argument the context following optional arguments
args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)}
args = append(args, req.args...)
@@ -257,7 +257,7 @@ func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *ser
return "", reply[1].Interface().(error)
}
- return reply[0].Interface().(Subscription).ID(), nil
+ return reply[0].Interface().(*Subscription).ID, nil
}
// handle executes a request and returns the response from the callback.
@@ -273,8 +273,8 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
}
- subid := req.args[0].String()
- if err := notifier.Unsubscribe(subid); err != nil {
+ subid := ID(req.args[0].String())
+ if err := notifier.unsubscribe(subid); err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
}
@@ -292,7 +292,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
// active the subscription after the sub id was successfully sent to the client
activateSub := func() {
notifier, _ := NotifierFromContext(ctx)
- notifier.(*bufferedNotifier).activate(subid)
+ notifier.activate(subid)
}
return codec.CreateResponse(req.id, subid), activateSub