diff options
author | Bas van Kervel <basvankervel@gmail.com> | 2017-04-06 14:56:41 +0800 |
---|---|---|
committer | Bas van Kervel <basvankervel@gmail.com> | 2017-04-25 17:13:22 +0800 |
commit | 37e3f561f15cbedf10c01847e58a079f9b86bf6f (patch) | |
tree | d7cdc3e8a5b74261a3359f6029e927cca0fc738b /rpc/server.go | |
parent | ba3bcd16a6d99bc0e58516556df8e96b730c2d60 (diff) | |
download | dexon-37e3f561f15cbedf10c01847e58a079f9b86bf6f.tar.gz dexon-37e3f561f15cbedf10c01847e58a079f9b86bf6f.tar.zst dexon-37e3f561f15cbedf10c01847e58a079f9b86bf6f.zip |
rpc: support subscriptions under custom namespaces
Diffstat (limited to 'rpc/server.go')
-rw-r--r-- | rpc/server.go | 17 |
1 files changed, 8 insertions, 9 deletions
diff --git a/rpc/server.go b/rpc/server.go index 78df37e52..62b84af34 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -21,6 +21,7 @@ import ( "fmt" "reflect" "runtime" + "strings" "sync" "sync/atomic" @@ -96,32 +97,30 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name()) } + methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) + // already a previous service register under given sname, merge methods/subscriptions if regsvc, present := s.services[name]; present { - methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) if len(methods) == 0 && len(subscriptions) == 0 { return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) } - for _, m := range methods { regsvc.callbacks[formatName(m.method.Name)] = m } for _, s := range subscriptions { regsvc.subscriptions[formatName(s.method.Name)] = s } - return nil } svc.name = name - svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ) + svc.callbacks, svc.subscriptions = methods, subscriptions if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 { return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) } s.services[svc.name] = svc - return nil } @@ -303,7 +302,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque // active the subscription after the sub id was successfully sent to the client activateSub := func() { notifier, _ := NotifierFromContext(ctx) - notifier.activate(subid) + notifier.activate(subid, req.svcname) } return codec.CreateResponse(req.id, subid), activateSub @@ -383,7 +382,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s codec.Close() } - // when request holds one of more subscribe requests this allows these subscriptions to be actived + // when request holds one of more subscribe requests this allows these subscriptions to be activated for _, c := range callbacks { c() } @@ -410,7 +409,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) continue } - if r.isPubSub && r.method == unsubscribeMethod { + if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) { requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { @@ -439,7 +438,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) } } } else { - requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{subscribeMethod, r.method}} + requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.method, r.method}} } continue } |