aboutsummaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-01-29 23:52:00 +0800
committerobscuren <geffobscura@gmail.com>2015-01-29 23:52:00 +0800
commit6488a392a347d0d47212fdc78386e3e0e5841d7d (patch)
treeed95cdf67028b2b60ddc7850728abe9a391796bf /rpc
parentddf17d93acf92ef18b0134f19f22220362e06bad (diff)
downloaddexon-6488a392a347d0d47212fdc78386e3e0e5841d7d.tar.gz
dexon-6488a392a347d0d47212fdc78386e3e0e5841d7d.tar.zst
dexon-6488a392a347d0d47212fdc78386e3e0e5841d7d.zip
Reimplemented message filters for rpc calls
Diffstat (limited to 'rpc')
-rw-r--r--rpc/args.go31
-rw-r--r--rpc/message.go55
-rw-r--r--rpc/packages.go91
3 files changed, 169 insertions, 8 deletions
diff --git a/rpc/args.go b/rpc/args.go
index ff4974792..79519e7d2 100644
--- a/rpc/args.go
+++ b/rpc/args.go
@@ -1,6 +1,7 @@
package rpc
import "encoding/json"
+import "github.com/ethereum/go-ethereum/core"
type GetBlockArgs struct {
BlockNumber int32
@@ -36,10 +37,6 @@ type NewTxArgs struct {
Data string `json:"data"`
}
-// type TxResponse struct {
-// Hash string
-// }
-
func (a *NewTxArgs) requirements() error {
if a.Gas == "" {
return NewErrorResponse("Transact requires a 'gas' value as argument")
@@ -195,3 +192,29 @@ func (obj *Sha3Args) UnmarshalJSON(b []byte) (err error) {
}
return
}
+
+type FilterOptions struct {
+ Earliest int64
+ Latest int64
+ Address string
+ Topics []string
+ Skip int
+ Max int
+}
+
+func toFilterOptions(options *FilterOptions) core.FilterOptions {
+ var opts core.FilterOptions
+ opts.Earliest = options.Earliest
+ opts.Latest = options.Latest
+ opts.Address = fromHex(options.Address)
+ opts.Topics = make([][]byte, len(options.Topics))
+ for i, topic := range options.Topics {
+ opts.Topics[i] = fromHex(topic)
+ }
+
+ return opts
+}
+
+type FilterChangedArgs struct {
+ n int
+}
diff --git a/rpc/message.go b/rpc/message.go
index e9f47634f..05f66ee95 100644
--- a/rpc/message.go
+++ b/rpc/message.go
@@ -21,6 +21,8 @@ import (
"encoding/json"
"errors"
"fmt"
+
+ "github.com/ethereum/go-ethereum/state"
)
const (
@@ -184,3 +186,56 @@ func (req *RpcRequest) ToGetCodeAtArgs() (*GetCodeAtArgs, error) {
rpclogger.DebugDetailf("%T %v", args, args)
return args, nil
}
+
+func (req *RpcRequest) ToFilterArgs() (*FilterOptions, error) {
+ if len(req.Params) < 1 {
+ return nil, NewErrorResponse(ErrorArguments)
+ }
+
+ args := new(FilterOptions)
+ r := bytes.NewReader(req.Params[0])
+ err := json.NewDecoder(r).Decode(args)
+ if err != nil {
+ return nil, NewErrorResponse(ErrorDecodeArgs)
+ }
+ rpclogger.DebugDetailf("%T %v", args, args)
+ return args, nil
+}
+
+func (req *RpcRequest) ToFilterChangedArgs() (int, error) {
+ if len(req.Params) < 1 {
+ return 0, NewErrorResponse(ErrorArguments)
+ }
+
+ var id int
+ r := bytes.NewReader(req.Params[0])
+ err := json.NewDecoder(r).Decode(&id)
+ if err != nil {
+ return 0, NewErrorResponse(ErrorDecodeArgs)
+ }
+ rpclogger.DebugDetailf("%T %v", id, id)
+ return id, nil
+}
+
+type Log struct {
+ Address string `json:"address"`
+ Topics []string `json:"topics"`
+ Data string `json:"data"`
+}
+
+func toLogs(logs state.Logs) (ls []Log) {
+ ls = make([]Log, len(logs))
+
+ for i, log := range logs {
+ var l Log
+ l.Topics = make([]string, len(log.Topics()))
+ l.Address = toHex(log.Address())
+ l.Data = toHex(log.Data())
+ for j, topic := range log.Topics() {
+ l.Topics[j] = toHex(topic)
+ }
+ ls[i] = l
+ }
+
+ return
+}
diff --git a/rpc/packages.go b/rpc/packages.go
index b25660b25..e8dc570fd 100644
--- a/rpc/packages.go
+++ b/rpc/packages.go
@@ -29,9 +29,13 @@ import (
"fmt"
"math/big"
"strings"
+ "sync"
+ "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethutil"
+ "github.com/ethereum/go-ethereum/event/filter"
+ "github.com/ethereum/go-ethereum/state"
"github.com/ethereum/go-ethereum/xeth"
)
@@ -53,12 +57,79 @@ type RpcServer interface {
Stop()
}
+type EthereumApi struct {
+ xeth *xeth.XEth
+ filterManager *filter.FilterManager
+
+ mut sync.RWMutex
+ logs map[int]state.Logs
+}
+
func NewEthereumApi(xeth *xeth.XEth) *EthereumApi {
- return &EthereumApi{xeth: xeth}
+ api := &EthereumApi{
+ xeth: xeth,
+ filterManager: filter.NewFilterManager(xeth.Backend().EventMux()),
+ logs: make(map[int]state.Logs),
+ }
+ go api.filterManager.Start()
+
+ return api
}
-type EthereumApi struct {
- xeth *xeth.XEth
+func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) error {
+ var id int
+ filter := core.NewFilter(self.xeth.Backend())
+ filter.LogsCallback = func(logs state.Logs) {
+ self.mut.Lock()
+ defer self.mut.Unlock()
+
+ self.logs[id] = append(self.logs[id], logs...)
+ }
+ id = self.filterManager.InstallFilter(filter)
+ *reply = id
+
+ return nil
+}
+
+type Log struct {
+ Address string `json:"address"`
+ Topics []string `json:"topics"`
+ Data string `json:"data"`
+}
+
+func toLogs(logs state.Logs) (ls []Log) {
+ ls = make([]Log, len(logs))
+
+ for i, log := range logs {
+ var l Log
+ l.Topics = make([]string, len(log.Topics()))
+ l.Address = toHex(log.Address())
+ l.Data = toHex(log.Data())
+ for j, topic := range log.Topics() {
+ l.Topics[j] = toHex(topic)
+ }
+ ls[i] = l
+ }
+
+ return
+}
+
+func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error {
+ self.mut.RLock()
+ defer self.mut.RUnlock()
+
+ *reply = toLogs(self.logs[id])
+
+ self.logs[id] = nil // empty the logs
+
+ return nil
+}
+
+func (self *EthereumApi) Logs(id int, reply *interface{}) error {
+ filter := self.filterManager.GetFilter(id)
+ *reply = toLogs(filter.Find())
+
+ return nil
}
func (p *EthereumApi) GetBlock(args *GetBlockArgs, reply *interface{}) error {
@@ -162,7 +233,7 @@ func (p *EthereumApi) GetBalanceAt(args *GetBalanceArgs, reply *interface{}) err
return err
}
state := p.xeth.State().SafeGet(args.Address)
- *reply = BalanceRes{Balance: state.Balance().String(), Address: args.Address}
+ *reply = toHex(state.Balance().Bytes())
return nil
}
@@ -234,6 +305,18 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error
return err
}
return p.Call(args, reply)
+ case "eth_newFilter":
+ args, err := req.ToFilterArgs()
+ if err != nil {
+ return err
+ }
+ return p.NewFilter(args, reply)
+ case "eth_changed":
+ args, err := req.ToFilterChangedArgs()
+ if err != nil {
+ return err
+ }
+ return p.FilterChanged(args, reply)
case "web3_sha3":
args, err := req.ToSha3Args()
if err != nil {