aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/subscription.go
diff options
context:
space:
mode:
authorBas van Kervel <basvankervel@gmail.com>2017-04-06 14:56:41 +0800
committerBas van Kervel <basvankervel@gmail.com>2017-04-25 17:13:22 +0800
commit37e3f561f15cbedf10c01847e58a079f9b86bf6f (patch)
treed7cdc3e8a5b74261a3359f6029e927cca0fc738b /rpc/subscription.go
parentba3bcd16a6d99bc0e58516556df8e96b730c2d60 (diff)
downloaddexon-37e3f561f15cbedf10c01847e58a079f9b86bf6f.tar.gz
dexon-37e3f561f15cbedf10c01847e58a079f9b86bf6f.tar.zst
dexon-37e3f561f15cbedf10c01847e58a079f9b86bf6f.zip
rpc: support subscriptions under custom namespaces
Diffstat (limited to 'rpc/subscription.go')
-rw-r--r--rpc/subscription.go14
1 files changed, 8 insertions, 6 deletions
diff --git a/rpc/subscription.go b/rpc/subscription.go
index 9ab6af9e1..720e4dd06 100644
--- a/rpc/subscription.go
+++ b/rpc/subscription.go
@@ -35,8 +35,9 @@ type ID string
// a Subscription is created by a notifier and tight to that notifier. The client can use
// this subscription to wait for an unsubscribe request for the client, see Err().
type Subscription struct {
- ID ID
- err chan error // closed on unsubscribe
+ ID ID
+ namespace string
+ err chan error // closed on unsubscribe
}
// Err returns a channel that is closed when the client send an unsubscribe request.
@@ -78,7 +79,7 @@ func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
// are dropped until the subscription is marked as active. This is done
// by the RPC server after the subscription ID is send to the client.
func (n *Notifier) CreateSubscription() *Subscription {
- s := &Subscription{NewID(), make(chan error)}
+ s := &Subscription{ID: NewID(), err: make(chan error)}
n.subMu.Lock()
n.inactive[s.ID] = s
n.subMu.Unlock()
@@ -91,9 +92,9 @@ func (n *Notifier) Notify(id ID, data interface{}) error {
n.subMu.RLock()
defer n.subMu.RUnlock()
- _, active := n.active[id]
+ sub, active := n.active[id]
if active {
- notification := n.codec.CreateNotification(string(id), data)
+ notification := n.codec.CreateNotification(string(id), sub.namespace, data)
if err := n.codec.Write(notification); err != nil {
n.codec.Close()
return err
@@ -124,10 +125,11 @@ func (n *Notifier) unsubscribe(id ID) error {
// notifications are dropped. This method is called by the RPC server after
// the subscription ID was sent to client. This prevents notifications being
// send to the client before the subscription ID is send to the client.
-func (n *Notifier) activate(id ID) {
+func (n *Notifier) activate(id ID, namespace string) {
n.subMu.Lock()
defer n.subMu.Unlock()
if sub, found := n.inactive[id]; found {
+ sub.namespace = namespace
n.active[id] = sub
delete(n.inactive, id)
}