diff options
Diffstat (limited to 'rpc')
-rw-r--r-- | rpc/args.go | 9 | ||||
-rw-r--r-- | rpc/message.go | 44 | ||||
-rw-r--r-- | rpc/packages.go | 80 |
3 files changed, 124 insertions, 9 deletions
diff --git a/rpc/args.go b/rpc/args.go index aaa017c4e..75eef873d 100644 --- a/rpc/args.go +++ b/rpc/args.go @@ -251,3 +251,12 @@ func (a *DbArgs) requirements() error { } return nil } + +type WhisperMessageArgs struct { + Payload string + To string + From string + Topics []string + Priority uint32 + Ttl uint32 +} diff --git a/rpc/message.go b/rpc/message.go index 5045adb8f..919302921 100644 --- a/rpc/message.go +++ b/rpc/message.go @@ -21,6 +21,8 @@ import ( "encoding/json" "errors" "fmt" + + "github.com/ethereum/go-ethereum/xeth" ) const ( @@ -270,3 +272,45 @@ func (req *RpcRequest) ToDbGetArgs() (*DbArgs, error) { rpclogger.DebugDetailf("%T %v", args, args) return &args, nil } + +func (req *RpcRequest) ToWhisperFilterArgs() (*xeth.Options, error) { + if len(req.Params) < 1 { + return nil, NewErrorResponse(ErrorArguments) + } + + var args xeth.Options + err := json.Unmarshal(req.Params[0], &args) + if err != nil { + return nil, NewErrorResponseWithError(ErrorDecodeArgs, err) + } + rpclogger.DebugDetailf("%T %v", args, args) + return &args, nil +} + +func (req *RpcRequest) ToWhisperChangedArgs() (int, error) { + if len(req.Params) < 1 { + return 0, NewErrorResponse(ErrorArguments) + } + + var id int + err := json.Unmarshal(req.Params[0], &id) + if err != nil { + return 0, NewErrorResponse(ErrorDecodeArgs) + } + rpclogger.DebugDetailf("%T %v", id, id) + return id, nil +} + +func (req *RpcRequest) ToWhisperPostArgs() (*WhisperMessageArgs, error) { + if len(req.Params) < 1 { + return nil, NewErrorResponse(ErrorArguments) + } + + var args WhisperMessageArgs + err := json.Unmarshal(req.Params[0], &args) + if err != nil { + return nil, err + } + rpclogger.DebugDetailf("%T %v", args, args) + return &args, nil +} diff --git a/rpc/packages.go b/rpc/packages.go index aa51aad42..8344d6a46 100644 --- a/rpc/packages.go +++ b/rpc/packages.go @@ -44,18 +44,22 @@ type EthereumApi struct { xeth *xeth.XEth filterManager *filter.FilterManager - mut sync.RWMutex - logs map[int]state.Logs + logMut sync.RWMutex + logs map[int]state.Logs + + messagesMut sync.RWMutex + messages map[int][]xeth.WhisperMessage db ethutil.Database } -func NewEthereumApi(xeth *xeth.XEth) *EthereumApi { +func NewEthereumApi(eth *xeth.XEth) *EthereumApi { db, _ := ethdb.NewLDBDatabase("dapps") api := &EthereumApi{ - xeth: xeth, - filterManager: filter.NewFilterManager(xeth.Backend().EventMux()), + xeth: eth, + filterManager: filter.NewFilterManager(eth.Backend().EventMux()), logs: make(map[int]state.Logs), + messages: make(map[int][]xeth.WhisperMessage), db: db, } go api.filterManager.Start() @@ -67,8 +71,8 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro var id int filter := core.NewFilter(self.xeth.Backend()) filter.LogsCallback = func(logs state.Logs) { - self.mut.Lock() - defer self.mut.Unlock() + self.logMut.Lock() + defer self.logMut.Unlock() self.logs[id] = append(self.logs[id], logs...) } @@ -79,8 +83,8 @@ func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) erro } func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error { - self.mut.RLock() - defer self.mut.RUnlock() + self.logMut.RLock() + defer self.logMut.RUnlock() *reply = toLogs(self.logs[id]) @@ -257,6 +261,44 @@ func (p *EthereumApi) DbGet(args *DbArgs, reply *interface{}) error { return nil } +func (p *EthereumApi) NewWhisperIdentity(reply *interface{}) error { + *reply = p.xeth.Whisper().NewIdentity() + return nil +} + +func (p *EthereumApi) NewWhisperFilter(args *xeth.Options, reply *interface{}) error { + var id int + args.Fn = func(msg xeth.WhisperMessage) { + p.messagesMut.Lock() + defer p.messagesMut.Unlock() + p.messages[id] = append(p.messages[id], msg) + } + id = p.xeth.Whisper().Watch(args) + *reply = id + return nil +} + +func (self *EthereumApi) MessagesChanged(id int, reply *interface{}) error { + self.messagesMut.RLock() + defer self.messagesMut.RUnlock() + + *reply = self.messages[id] + + self.messages[id] = nil // empty the messages + + return nil +} + +func (p *EthereumApi) WhisperPost(args *WhisperMessageArgs, reply *interface{}) error { + err := p.xeth.Whisper().Post(args.Payload, args.To, args.From, args.Topics, args.Priority, args.Ttl) + if err != nil { + return err + } + + *reply = true + return nil +} + func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error { // Spec at https://github.com/ethereum/wiki/wiki/Generic-ON-RPC rpclogger.DebugDetailf("%T %s", req.Params, req.Params) @@ -354,6 +396,26 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error return err } return p.DbGet(args, reply) + case "shh_newIdentity": + return p.NewWhisperIdentity(reply) + case "shh_newFilter": + args, err := req.ToWhisperFilterArgs() + if err != nil { + return err + } + return p.NewWhisperFilter(args, reply) + case "shh_changed": + args, err := req.ToWhisperChangedArgs() + if err != nil { + return err + } + return p.MessagesChanged(args, reply) + case "shh_post": + args, err := req.ToWhisperPostArgs() + if err != nil { + return nil + } + return p.WhisperPost(args, reply) default: return NewErrorResponse(fmt.Sprintf("%v %s", ErrorNotImplemented, req.Method)) } |