diff options
author | Bas van Kervel <bas@ethdev.com> | 2016-03-29 21:07:40 +0800 |
---|---|---|
committer | Bas van Kervel <bas@ethdev.com> | 2016-04-02 00:26:35 +0800 |
commit | f7328c5ecbd1076582a71ef7bf436485f3868b1f (patch) | |
tree | a32f466f00306cb131bee254cbe14a4dcaa68973 /rpc/json.go | |
parent | fb578f4550a08617485d9146876489d1f3bb1b52 (diff) | |
download | dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.gz dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.tar.zst dexon-f7328c5ecbd1076582a71ef7bf436485f3868b1f.zip |
rpc: add pub/sub support
Diffstat (limited to 'rpc/json.go')
-rw-r--r-- | rpc/json.go | 24 |
1 files changed, 14 insertions, 10 deletions
diff --git a/rpc/json.go b/rpc/json.go index 1ed943c00..a0bfcac04 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -22,7 +22,7 @@ import ( "io" "reflect" "strings" - "sync/atomic" + "sync" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -81,19 +81,20 @@ type jsonNotification struct { // jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has support for parsing arguments // and serializing (result) objects. type jsonCodec struct { - closed chan interface{} - isClosed int32 - d *json.Decoder - e *json.Encoder - req JSONRequest - rw io.ReadWriteCloser + closed chan interface{} + closer sync.Once + d *json.Decoder + muEncoder sync.Mutex + e *json.Encoder + req JSONRequest + rw io.ReadWriteCloser } // NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0 func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec { d := json.NewDecoder(rwc) d.UseNumber() - return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc, isClosed: 0} + return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc} } // isBatch returns true when the first non-whitespace characters is '[' @@ -326,15 +327,18 @@ func (c *jsonCodec) CreateNotification(subid string, event interface{}) interfac // Write message to client func (c *jsonCodec) Write(res interface{}) error { + c.muEncoder.Lock() + defer c.muEncoder.Unlock() + return c.e.Encode(res) } // Close the underlying connection func (c *jsonCodec) Close() { - if atomic.CompareAndSwapInt32(&c.isClosed, 0, 1) { + c.closer.Do(func() { close(c.closed) c.rw.Close() - } + }) } // Closed returns a channel which will be closed when Close is called |