From 3c6c89168049fbcffa0a02690c27c324f6f0264d Mon Sep 17 00:00:00 2001 From: obscuren Date: Mon, 11 May 2015 01:28:15 +0200 Subject: core: optimise pending transaction processing --- miner/worker.go | 186 ++++++++++++++++++++++++++++++-------------------------- 1 file changed, 101 insertions(+), 85 deletions(-) (limited to 'miner/worker.go') diff --git a/miner/worker.go b/miner/worker.go index 4ba566eec..e3dbae717 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -22,12 +22,18 @@ import ( var jsonlogger = logger.NewJsonLogger() type environment struct { - totalUsedGas *big.Int - state *state.StateDB - coinbase *state.StateObject - block *types.Block - family *set.Set - uncles *set.Set + totalUsedGas *big.Int + state *state.StateDB + coinbase *state.StateObject + block *types.Block + family *set.Set + uncles *set.Set + remove *set.Set + tcount int + ignoredTransactors *set.Set + lowGasTransactors *set.Set + ownedAccounts *set.Set + lowGasTxs types.Transactions } func env(block *types.Block, eth core.Backend) *environment { @@ -129,12 +135,13 @@ func (self *worker) start() { self.mu.Lock() defer self.mu.Unlock() + atomic.StoreInt32(&self.mining, 1) + // spin up agents for _, agent := range self.agents { agent.Start() } - atomic.StoreInt32(&self.mining, 1) } func (self *worker) stop() { @@ -175,8 +182,11 @@ out: self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() case core.TxPreEvent: + // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { - self.commitNewWork() + self.mu.Lock() + self.commitTransactions(types.Transactions{ev.Tx}) + self.mu.Unlock() } } case <-self.quit: @@ -242,13 +252,22 @@ func (self *worker) makeCurrent() { } block.Header().Extra = self.extra - self.current = env(block, self.eth) + current := env(block, self.eth) for _, ancestor := range self.chain.GetAncestors(block, 7) { - self.current.family.Add(ancestor.Hash()) + current.family.Add(ancestor.Hash()) } + accounts, _ := self.eth.AccountManager().Accounts() + // Keep track of transactions which return errors so they can be removed + current.remove = set.New() + current.tcount = 0 + current.ignoredTransactors = set.New() + current.lowGasTransactors = set.New() + current.ownedAccounts = accountAddressesSet(accounts) + + parent := self.chain.GetBlock(current.block.ParentHash()) + current.coinbase.SetGasPool(core.CalcGasLimit(parent)) - parent := self.chain.GetBlock(self.current.block.ParentHash()) - self.current.coinbase.SetGasPool(core.CalcGasLimit(parent)) + self.current = current } func (w *worker) setGasPrice(p *big.Int) { @@ -271,82 +290,14 @@ func (self *worker) commitNewWork() { defer self.currentMu.Unlock() self.makeCurrent() + current := self.current transactions := self.eth.TxPool().GetTransactions() sort.Sort(types.TxByNonce{transactions}) - accounts, _ := self.eth.AccountManager().Accounts() - // Keep track of transactions which return errors so they can be removed - var ( - remove = set.New() - tcount = 0 - ignoredTransactors = set.New() - lowGasTransactors = set.New() - ownedAccounts = accountAddressesSet(accounts) - lowGasTxs types.Transactions - ) - - for _, tx := range transactions { - // We can skip err. It has already been validated in the tx pool - from, _ := tx.From() - - // check if it falls within margin - if tx.GasPrice().Cmp(self.gasPrice) < 0 { - // ignore the transaction and transactor. We ignore the transactor - // because nonce will fail after ignoring this transaction so there's - // no point - lowGasTransactors.Add(from) - - glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(self.gasPrice), from[:4]) - } - - // Continue with the next transaction if the transaction sender is included in - // the low gas tx set. This will also remove the tx and all sequential transaction - // from this transactor - if lowGasTransactors.Has(from) { - // add tx to the low gas set. This will be removed at the end of the run - // owned accounts are ignored - if !ownedAccounts.Has(from) { - lowGasTxs = append(lowGasTxs, tx) - } - continue - } - - // Move on to the next transaction when the transactor is in ignored transactions set - // This may occur when a transaction hits the gas limit. When a gas limit is hit and - // the transaction is processed (that could potentially be included in the block) it - // will throw a nonce error because the previous transaction hasn't been processed. - // Therefor we need to ignore any transaction after the ignored one. - if ignoredTransactors.Has(from) { - continue - } - - self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0) - - err := self.commitTransaction(tx) - switch { - case core.IsNonceErr(err) || core.IsInvalidTxErr(err): - // Remove invalid transactions - from, _ := tx.From() - - self.chain.TxState().RemoveNonce(from, tx.Nonce()) - remove.Add(tx.Hash()) - - if glog.V(logger.Detail) { - glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) - } - case state.IsGasLimitErr(err): - from, _ := tx.From() - // ignore the transactor so no nonce errors will be thrown for this account - // next time the worker is run, they'll be picked up again. - ignoredTransactors.Add(from) - - glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) - default: - tcount++ - } - } - self.eth.TxPool().RemoveTransactions(lowGasTxs) + // commit transactions for this run + self.commitTransactions(transactions) + self.eth.TxPool().RemoveTransactions(current.lowGasTxs) var ( uncles []*types.Header @@ -372,7 +323,7 @@ func (self *worker) commitNewWork() { // We only care about logging if we're actually mining if atomic.LoadInt32(&self.mining) == 1 { - glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", self.current.block.Number(), tcount, len(uncles)) + glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", current.block.Number(), current.tcount, len(uncles)) } for _, hash := range badUncles { @@ -412,6 +363,71 @@ func (self *worker) commitUncle(uncle *types.Header) error { return nil } +func (self *worker) commitTransactions(transactions types.Transactions) { + current := self.current + + for _, tx := range transactions { + // We can skip err. It has already been validated in the tx pool + from, _ := tx.From() + + // check if it falls within margin + if tx.GasPrice().Cmp(self.gasPrice) < 0 { + // ignore the transaction and transactor. We ignore the transactor + // because nonce will fail after ignoring this transaction so there's + // no point + current.lowGasTransactors.Add(from) + + glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(self.gasPrice), from[:4]) + } + + // Continue with the next transaction if the transaction sender is included in + // the low gas tx set. This will also remove the tx and all sequential transaction + // from this transactor + if current.lowGasTransactors.Has(from) { + // add tx to the low gas set. This will be removed at the end of the run + // owned accounts are ignored + if !current.ownedAccounts.Has(from) { + current.lowGasTxs = append(current.lowGasTxs, tx) + } + continue + } + + // Move on to the next transaction when the transactor is in ignored transactions set + // This may occur when a transaction hits the gas limit. When a gas limit is hit and + // the transaction is processed (that could potentially be included in the block) it + // will throw a nonce error because the previous transaction hasn't been processed. + // Therefor we need to ignore any transaction after the ignored one. + if current.ignoredTransactors.Has(from) { + continue + } + + self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0) + + err := self.commitTransaction(tx) + switch { + case core.IsNonceErr(err) || core.IsInvalidTxErr(err): + // Remove invalid transactions + from, _ := tx.From() + + self.chain.TxState().RemoveNonce(from, tx.Nonce()) + current.remove.Add(tx.Hash()) + + if glog.V(logger.Detail) { + glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) + } + case state.IsGasLimitErr(err): + from, _ := tx.From() + // ignore the transactor so no nonce errors will be thrown for this account + // next time the worker is run, they'll be picked up again. + current.ignoredTransactors.Add(from) + + glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) + default: + current.tcount++ + } + } +} + func (self *worker) commitTransaction(tx *types.Transaction) error { snap := self.current.state.Copy() receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true) -- cgit