aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/client.go
diff options
context:
space:
mode:
authorBas van Kervel <basvankervel@gmail.com>2017-04-06 14:56:41 +0800
committerBas van Kervel <basvankervel@gmail.com>2017-04-25 17:13:22 +0800
commit37e3f561f15cbedf10c01847e58a079f9b86bf6f (patch)
treed7cdc3e8a5b74261a3359f6029e927cca0fc738b /rpc/client.go
parentba3bcd16a6d99bc0e58516556df8e96b730c2d60 (diff)
downloaddexon-37e3f561f15cbedf10c01847e58a079f9b86bf6f.tar.gz
dexon-37e3f561f15cbedf10c01847e58a079f9b86bf6f.tar.zst
dexon-37e3f561f15cbedf10c01847e58a079f9b86bf6f.zip
rpc: support subscriptions under custom namespaces
Diffstat (limited to 'rpc/client.go')
-rw-r--r--rpc/client.go35
1 files changed, 19 insertions, 16 deletions
diff --git a/rpc/client.go b/rpc/client.go
index 2c35ba54a..591986987 100644
--- a/rpc/client.go
+++ b/rpc/client.go
@@ -27,6 +27,7 @@ import (
"net/url"
"reflect"
"strconv"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -373,14 +374,14 @@ func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...
return nil, ErrNotificationsUnsupported
}
- msg, err := c.newMessage(subscribeMethod, args...)
+ msg, err := c.newMessage("eth"+subscribeMethodSuffix, args...)
if err != nil {
return nil, err
}
op := &requestOp{
ids: []json.RawMessage{msg.ID},
resp: make(chan *jsonrpcMessage),
- sub: newClientSubscription(c, chanVal),
+ sub: newClientSubscription(c, "eth", chanVal),
}
// Send the subscription request.
@@ -575,7 +576,7 @@ func (c *Client) closeRequestOps(err error) {
}
func (c *Client) handleNotification(msg *jsonrpcMessage) {
- if msg.Method != notificationMethod {
+ if !strings.HasSuffix(msg.Method, notificationMethodSuffix) {
log.Debug(fmt.Sprint("dropping non-subscription message: ", msg))
return
}
@@ -653,11 +654,12 @@ func (c *Client) read(conn net.Conn) error {
// A ClientSubscription represents a subscription established through EthSubscribe.
type ClientSubscription struct {
- client *Client
- etype reflect.Type
- channel reflect.Value
- subid string
- in chan json.RawMessage
+ client *Client
+ etype reflect.Type
+ channel reflect.Value
+ namespace string
+ subid string
+ in chan json.RawMessage
quitOnce sync.Once // ensures quit is closed once
quit chan struct{} // quit is closed when the subscription exits
@@ -665,14 +667,15 @@ type ClientSubscription struct {
err chan error
}
-func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription {
+func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
sub := &ClientSubscription{
- client: c,
- etype: channel.Type().Elem(),
- channel: channel,
- quit: make(chan struct{}),
- err: make(chan error, 1),
- in: make(chan json.RawMessage),
+ client: c,
+ namespace: namespace,
+ etype: channel.Type().Elem(),
+ channel: channel,
+ quit: make(chan struct{}),
+ err: make(chan error, 1),
+ in: make(chan json.RawMessage),
}
return sub
}
@@ -774,5 +777,5 @@ func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, e
func (sub *ClientSubscription) requestUnsubscribe() error {
var result interface{}
- return sub.client.Call(&result, unsubscribeMethod, sub.subid)
+ return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
}