aboutsummaryrefslogtreecommitdiffstats
path: root/rpc/json.go
diff options
context:
space:
mode:
authorBas van Kervel <bas@ethdev.com>2016-03-29 21:07:40 +0800
committerBas van Kervel <bas@ethdev.com>2016-04-02 00:26:35 +0800
commitf7328c5ecbd1076582a71ef7bf436485f3868b1f (patch)
treea32f466f00306cb131bee254cbe14a4dcaa68973 /rpc/json.go
parentfb578f4550a08617485d9146876489d1f3bb1b52 (diff)
downloaddexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.gz
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.zst
dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.zip
rpc: add pub/sub support
Diffstat (limited to 'rpc/json.go')
-rw-r--r--rpc/json.go24
1 files changed, 14 insertions, 10 deletions
diff --git a/rpc/json.go b/rpc/json.go
index 1ed943c00..a0bfcac04 100644
--- a/rpc/json.go
+++ b/rpc/json.go
@@ -22,7 +22,7 @@ import (
"io"
"reflect"
"strings"
- "sync/atomic"
+ "sync"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
@@ -81,19 +81,20 @@ type jsonNotification struct {
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has support for parsing arguments
// and serializing (result) objects.
type jsonCodec struct {
- closed chan interface{}
- isClosed int32
- d *json.Decoder
- e *json.Encoder
- req JSONRequest
- rw io.ReadWriteCloser
+ closed chan interface{}
+ closer sync.Once
+ d *json.Decoder
+ muEncoder sync.Mutex
+ e *json.Encoder
+ req JSONRequest
+ rw io.ReadWriteCloser
}
// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0
func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec {
d := json.NewDecoder(rwc)
d.UseNumber()
- return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc, isClosed: 0}
+ return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc}
}
// isBatch returns true when the first non-whitespace characters is '['
@@ -326,15 +327,18 @@ func (c *jsonCodec) CreateNotification(subid string, event interface{}) interfac
// Write message to client
func (c *jsonCodec) Write(res interface{}) error {
+ c.muEncoder.Lock()
+ defer c.muEncoder.Unlock()
+
return c.e.Encode(res)
}
// Close the underlying connection
func (c *jsonCodec) Close() {
- if atomic.CompareAndSwapInt32(&c.isClosed, 0, 1) {
+ c.closer.Do(func() {
close(c.closed)
c.rw.Close()
- }
+ })
}
// Closed returns a channel which will be closed when Close is called