diff options
Diffstat (limited to 'xeth')
-rw-r--r-- | xeth/xeth.go | 108 |
1 files changed, 65 insertions, 43 deletions
diff --git a/xeth/xeth.go b/xeth/xeth.go index 1be42734d..701932f97 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -33,9 +33,10 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth" - "github.com/ethereum/go-ethereum/event/filter" + "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/miner" @@ -75,7 +76,7 @@ type XEth struct { whisper *Whisper quit chan struct{} - filterManager *filter.FilterManager + filterManager *filters.FilterSystem logMu sync.RWMutex logQueue map[int]*logQueue @@ -111,7 +112,7 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth { backend: ethereum, frontend: frontend, quit: make(chan struct{}), - filterManager: filter.NewFilterManager(ethereum.EventMux()), + filterManager: filters.NewFilterSystem(ethereum.EventMux()), logQueue: make(map[int]*logQueue), blockQueue: make(map[int]*hashQueue), transactionQueue: make(map[int]*hashQueue), @@ -125,10 +126,13 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth { if frontend == nil { xeth.frontend = dummyFrontend{} } - xeth.state = NewState(xeth, xeth.backend.ChainManager().State()) + state, err := xeth.backend.BlockChain().State() + if err != nil { + return nil + } + xeth.state = NewState(xeth, state) go xeth.start() - go xeth.filterManager.Start() return xeth } @@ -142,7 +146,7 @@ done: self.logMu.Lock() for id, filter := range self.logQueue { if time.Since(filter.timeout) > filterTickerTime { - self.filterManager.UninstallFilter(id) + self.filterManager.Remove(id) delete(self.logQueue, id) } } @@ -151,7 +155,7 @@ done: self.blockMu.Lock() for id, filter := range self.blockQueue { if time.Since(filter.timeout) > filterTickerTime { - self.filterManager.UninstallFilter(id) + self.filterManager.Remove(id) delete(self.blockQueue, id) } } @@ -160,7 +164,7 @@ done: self.transactionMu.Lock() for id, filter := range self.transactionQueue { if time.Since(filter.timeout) > filterTickerTime { - self.filterManager.UninstallFilter(id) + self.filterManager.Remove(id) delete(self.transactionQueue, id) } } @@ -207,14 +211,21 @@ func (self *XEth) RemoteMining() *miner.RemoteAgent { return self.agent } func (self *XEth) AtStateNum(num int64) *XEth { var st *state.StateDB + var err error switch num { case -2: st = self.backend.Miner().PendingState().Copy() default: if block := self.getBlockByHeight(num); block != nil { - st = state.New(block.Root(), self.backend.ChainDb()) + st, err = state.New(block.Root(), self.backend.ChainDb()) + if err != nil { + return nil + } } else { - st = state.New(self.backend.ChainManager().GetBlockByNumber(0).Root(), self.backend.ChainDb()) + st, err = state.New(self.backend.BlockChain().GetBlockByNumber(0).Root(), self.backend.ChainDb()) + if err != nil { + return nil + } } } @@ -244,30 +255,41 @@ func (self *XEth) State() *State { return self.state } func (self *XEth) UpdateState() (wait chan *big.Int) { wait = make(chan *big.Int) go func() { - sub := self.backend.EventMux().Subscribe(core.ChainHeadEvent{}) + eventSub := self.backend.EventMux().Subscribe(core.ChainHeadEvent{}) + defer eventSub.Unsubscribe() + var m, n *big.Int var ok bool - out: + + eventCh := eventSub.Chan() for { select { - case event := <-sub.Chan(): - ev, ok := event.(core.ChainHeadEvent) - if ok { - m = ev.Block.Number() + case event, ok := <-eventCh: + if !ok { + // Event subscription closed, set the channel to nil to stop spinning + eventCh = nil + continue + } + // A real event arrived, process if new head block assignment + if event, ok := event.Data.(core.ChainHeadEvent); ok { + m = event.Block.Number() if n != nil && n.Cmp(m) < 0 { wait <- n n = nil } - statedb := state.New(ev.Block.Root(), self.backend.ChainDb()) + statedb, err := state.New(event.Block.Root(), self.backend.ChainDb()) + if err != nil { + glog.V(logger.Error).Infoln("Could not create new state: %v", err) + return + } self.state = NewState(self, statedb) } case n, ok = <-wait: if !ok { - break out + return } } } - sub.Unsubscribe() }() return } @@ -290,19 +312,19 @@ func (self *XEth) getBlockByHeight(height int64) *types.Block { num = uint64(height) } - return self.backend.ChainManager().GetBlockByNumber(num) + return self.backend.BlockChain().GetBlockByNumber(num) } func (self *XEth) BlockByHash(strHash string) *Block { hash := common.HexToHash(strHash) - block := self.backend.ChainManager().GetBlock(hash) + block := self.backend.BlockChain().GetBlock(hash) return NewBlock(block) } func (self *XEth) EthBlockByHash(strHash string) *types.Block { hash := common.HexToHash(strHash) - block := self.backend.ChainManager().GetBlock(hash) + block := self.backend.BlockChain().GetBlock(hash) return block } @@ -356,11 +378,11 @@ func (self *XEth) EthBlockByNumber(num int64) *types.Block { } func (self *XEth) Td(hash common.Hash) *big.Int { - return self.backend.ChainManager().GetTd(hash) + return self.backend.BlockChain().GetTd(hash) } func (self *XEth) CurrentBlock() *types.Block { - return self.backend.ChainManager().CurrentBlock() + return self.backend.BlockChain().CurrentBlock() } func (self *XEth) GetBlockReceipts(bhash common.Hash) types.Receipts { @@ -372,7 +394,7 @@ func (self *XEth) GetTxReceipt(txhash common.Hash) *types.Receipt { } func (self *XEth) GasLimit() *big.Int { - return self.backend.ChainManager().GasLimit() + return self.backend.BlockChain().GasLimit() } func (self *XEth) Block(v interface{}) *Block { @@ -504,7 +526,7 @@ func (self *XEth) IsContract(address string) bool { } func (self *XEth) UninstallFilter(id int) bool { - defer self.filterManager.UninstallFilter(id) + defer self.filterManager.Remove(id) if _, ok := self.logQueue[id]; ok { self.logMu.Lock() @@ -532,8 +554,8 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address [] self.logMu.Lock() defer self.logMu.Unlock() - filter := core.NewFilter(self.backend) - id := self.filterManager.InstallFilter(filter) + filter := filters.New(self.backend.ChainDb()) + id := self.filterManager.Add(filter) self.logQueue[id] = &logQueue{timeout: time.Now()} filter.SetEarliestBlock(earliest) @@ -542,7 +564,7 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address [] filter.SetMax(max) filter.SetAddress(cAddress(address)) filter.SetTopics(cTopics(topics)) - filter.LogsCallback = func(logs state.Logs) { + filter.LogsCallback = func(logs vm.Logs) { self.logMu.Lock() defer self.logMu.Unlock() @@ -558,8 +580,8 @@ func (self *XEth) NewTransactionFilter() int { self.transactionMu.Lock() defer self.transactionMu.Unlock() - filter := core.NewFilter(self.backend) - id := self.filterManager.InstallFilter(filter) + filter := filters.New(self.backend.ChainDb()) + id := self.filterManager.Add(filter) self.transactionQueue[id] = &hashQueue{timeout: time.Now()} filter.TransactionCallback = func(tx *types.Transaction) { @@ -577,11 +599,11 @@ func (self *XEth) NewBlockFilter() int { self.blockMu.Lock() defer self.blockMu.Unlock() - filter := core.NewFilter(self.backend) - id := self.filterManager.InstallFilter(filter) + filter := filters.New(self.backend.ChainDb()) + id := self.filterManager.Add(filter) self.blockQueue[id] = &hashQueue{timeout: time.Now()} - filter.BlockCallback = func(block *types.Block, logs state.Logs) { + filter.BlockCallback = func(block *types.Block, logs vm.Logs) { self.blockMu.Lock() defer self.blockMu.Unlock() @@ -604,7 +626,7 @@ func (self *XEth) GetFilterType(id int) byte { return UnknownFilterTy } -func (self *XEth) LogFilterChanged(id int) state.Logs { +func (self *XEth) LogFilterChanged(id int) vm.Logs { self.logMu.Lock() defer self.logMu.Unlock() @@ -634,8 +656,8 @@ func (self *XEth) TransactionFilterChanged(id int) []common.Hash { return nil } -func (self *XEth) Logs(id int) state.Logs { - filter := self.filterManager.GetFilter(id) +func (self *XEth) Logs(id int) vm.Logs { + filter := self.filterManager.Get(id) if filter != nil { return filter.Find() } @@ -643,8 +665,8 @@ func (self *XEth) Logs(id int) state.Logs { return nil } -func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []string, topics [][]string) state.Logs { - filter := core.NewFilter(self.backend) +func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []string, topics [][]string) vm.Logs { + filter := filters.New(self.backend.ChainDb()) filter.SetEarliestBlock(earliest) filter.SetLatestBlock(latest) filter.SetSkip(skip) @@ -855,7 +877,7 @@ func (self *XEth) Call(fromStr, toStr, valueStr, gasStr, gasPriceStr, dataStr st } header := self.CurrentBlock().Header() - vmenv := core.NewEnv(statedb, self.backend.ChainManager(), msg, header) + vmenv := core.NewEnv(statedb, self.backend.BlockChain(), msg, header) res, gas, err := core.ApplyMessage(vmenv, msg, from) return common.ToHex(res), gas.String(), err @@ -1030,19 +1052,19 @@ func (m callmsg) Data() []byte { return m.data } type logQueue struct { mu sync.Mutex - logs state.Logs + logs vm.Logs timeout time.Time id int } -func (l *logQueue) add(logs ...*state.Log) { +func (l *logQueue) add(logs ...*vm.Log) { l.mu.Lock() defer l.mu.Unlock() l.logs = append(l.logs, logs...) } -func (l *logQueue) get() state.Logs { +func (l *logQueue) get() vm.Logs { l.mu.Lock() defer l.mu.Unlock() |