diff options
author | obscuren <geffobscura@gmail.com> | 2015-01-29 23:52:00 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-01-29 23:52:00 +0800 |
commit | 6488a392a347d0d47212fdc78386e3e0e5841d7d (patch) | |
tree | ed95cdf67028b2b60ddc7850728abe9a391796bf /rpc/packages.go | |
parent | ddf17d93acf92ef18b0134f19f22220362e06bad (diff) | |
download | dexon-6488a392a347d0d47212fdc78386e3e0e5841d7d.tar.gz dexon-6488a392a347d0d47212fdc78386e3e0e5841d7d.tar.zst dexon-6488a392a347d0d47212fdc78386e3e0e5841d7d.zip |
Reimplemented message filters for rpc calls
Diffstat (limited to 'rpc/packages.go')
-rw-r--r-- | rpc/packages.go | 91 |
1 files changed, 87 insertions, 4 deletions
diff --git a/rpc/packages.go b/rpc/packages.go index b25660b25..e8dc570fd 100644 --- a/rpc/packages.go +++ b/rpc/packages.go @@ -29,9 +29,13 @@ import ( "fmt" "math/big" "strings" + "sync" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/event/filter" + "github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/xeth" ) @@ -53,12 +57,79 @@ type RpcServer interface { Stop() } +type EthereumApi struct { + xeth *xeth.XEth + filterManager *filter.FilterManager + + mut sync.RWMutex + logs map[int]state.Logs +} + func NewEthereumApi(xeth *xeth.XEth) *EthereumApi { - return &EthereumApi{xeth: xeth} + api := &EthereumApi{ + xeth: xeth, + filterManager: filter.NewFilterManager(xeth.Backend().EventMux()), + logs: make(map[int]state.Logs), + } + go api.filterManager.Start() + + return api } -type EthereumApi struct { - xeth *xeth.XEth +func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) error { + var id int + filter := core.NewFilter(self.xeth.Backend()) + filter.LogsCallback = func(logs state.Logs) { + self.mut.Lock() + defer self.mut.Unlock() + + self.logs[id] = append(self.logs[id], logs...) + } + id = self.filterManager.InstallFilter(filter) + *reply = id + + return nil +} + +type Log struct { + Address string `json:"address"` + Topics []string `json:"topics"` + Data string `json:"data"` +} + +func toLogs(logs state.Logs) (ls []Log) { + ls = make([]Log, len(logs)) + + for i, log := range logs { + var l Log + l.Topics = make([]string, len(log.Topics())) + l.Address = toHex(log.Address()) + l.Data = toHex(log.Data()) + for j, topic := range log.Topics() { + l.Topics[j] = toHex(topic) + } + ls[i] = l + } + + return +} + +func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error { + self.mut.RLock() + defer self.mut.RUnlock() + + *reply = toLogs(self.logs[id]) + + self.logs[id] = nil // empty the logs + + return nil +} + +func (self *EthereumApi) Logs(id int, reply *interface{}) error { + filter := self.filterManager.GetFilter(id) + *reply = toLogs(filter.Find()) + + return nil } func (p *EthereumApi) GetBlock(args *GetBlockArgs, reply *interface{}) error { @@ -162,7 +233,7 @@ func (p *EthereumApi) GetBalanceAt(args *GetBalanceArgs, reply *interface{}) err return err } state := p.xeth.State().SafeGet(args.Address) - *reply = BalanceRes{Balance: state.Balance().String(), Address: args.Address} + *reply = toHex(state.Balance().Bytes()) return nil } @@ -234,6 +305,18 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error return err } return p.Call(args, reply) + case "eth_newFilter": + args, err := req.ToFilterArgs() + if err != nil { + return err + } + return p.NewFilter(args, reply) + case "eth_changed": + args, err := req.ToFilterChangedArgs() + if err != nil { + return err + } + return p.FilterChanged(args, reply) case "web3_sha3": args, err := req.ToSha3Args() if err != nil { |