From 6488a392a347d0d47212fdc78386e3e0e5841d7d Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 29 Jan 2015 16:52:00 +0100 Subject: Reimplemented message filters for rpc calls --- rpc/packages.go | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 87 insertions(+), 4 deletions(-) (limited to 'rpc/packages.go') diff --git a/rpc/packages.go b/rpc/packages.go index b25660b25..e8dc570fd 100644 --- a/rpc/packages.go +++ b/rpc/packages.go @@ -29,9 +29,13 @@ import ( "fmt" "math/big" "strings" + "sync" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethutil" + "github.com/ethereum/go-ethereum/event/filter" + "github.com/ethereum/go-ethereum/state" "github.com/ethereum/go-ethereum/xeth" ) @@ -53,12 +57,79 @@ type RpcServer interface { Stop() } +type EthereumApi struct { + xeth *xeth.XEth + filterManager *filter.FilterManager + + mut sync.RWMutex + logs map[int]state.Logs +} + func NewEthereumApi(xeth *xeth.XEth) *EthereumApi { - return &EthereumApi{xeth: xeth} + api := &EthereumApi{ + xeth: xeth, + filterManager: filter.NewFilterManager(xeth.Backend().EventMux()), + logs: make(map[int]state.Logs), + } + go api.filterManager.Start() + + return api } -type EthereumApi struct { - xeth *xeth.XEth +func (self *EthereumApi) NewFilter(args *FilterOptions, reply *interface{}) error { + var id int + filter := core.NewFilter(self.xeth.Backend()) + filter.LogsCallback = func(logs state.Logs) { + self.mut.Lock() + defer self.mut.Unlock() + + self.logs[id] = append(self.logs[id], logs...) + } + id = self.filterManager.InstallFilter(filter) + *reply = id + + return nil +} + +type Log struct { + Address string `json:"address"` + Topics []string `json:"topics"` + Data string `json:"data"` +} + +func toLogs(logs state.Logs) (ls []Log) { + ls = make([]Log, len(logs)) + + for i, log := range logs { + var l Log + l.Topics = make([]string, len(log.Topics())) + l.Address = toHex(log.Address()) + l.Data = toHex(log.Data()) + for j, topic := range log.Topics() { + l.Topics[j] = toHex(topic) + } + ls[i] = l + } + + return +} + +func (self *EthereumApi) FilterChanged(id int, reply *interface{}) error { + self.mut.RLock() + defer self.mut.RUnlock() + + *reply = toLogs(self.logs[id]) + + self.logs[id] = nil // empty the logs + + return nil +} + +func (self *EthereumApi) Logs(id int, reply *interface{}) error { + filter := self.filterManager.GetFilter(id) + *reply = toLogs(filter.Find()) + + return nil } func (p *EthereumApi) GetBlock(args *GetBlockArgs, reply *interface{}) error { @@ -162,7 +233,7 @@ func (p *EthereumApi) GetBalanceAt(args *GetBalanceArgs, reply *interface{}) err return err } state := p.xeth.State().SafeGet(args.Address) - *reply = BalanceRes{Balance: state.Balance().String(), Address: args.Address} + *reply = toHex(state.Balance().Bytes()) return nil } @@ -234,6 +305,18 @@ func (p *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) error return err } return p.Call(args, reply) + case "eth_newFilter": + args, err := req.ToFilterArgs() + if err != nil { + return err + } + return p.NewFilter(args, reply) + case "eth_changed": + args, err := req.ToFilterChangedArgs() + if err != nil { + return err + } + return p.FilterChanged(args, reply) case "web3_sha3": args, err := req.ToSha3Args() if err != nil { -- cgit