diff options
author | rjl493456442 <garyrong0905@gmail.com> | 2018-05-10 15:04:45 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2018-05-18 16:46:44 +0800 |
commit | a2e43d28d01ef9642c7f6992b78b86bd0696c847 (patch) | |
tree | 2f5d3444071125e84155321db6fd79d941cfee0b /miner/worker.go | |
parent | 6286c255f16a914b39ffd3389cba154a53e66a13 (diff) | |
download | go-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.tar.gz go-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.tar.zst go-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.zip |
all: collate new transaction events together
Diffstat (limited to 'miner/worker.go')
-rw-r--r-- | miner/worker.go | 45 |
1 files changed, 27 insertions, 18 deletions
diff --git a/miner/worker.go b/miner/worker.go index 48b0b2765..3d086d6ae 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -42,7 +42,7 @@ const ( resultQueueSize = 10 miningLogAtDepth = 5 - // txChanSize is the size of channel listening to TxPreEvent. + // txChanSize is the size of channel listening to TxsPreEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 // chainHeadChanSize is the size of channel listening to ChainHeadEvent. @@ -71,6 +71,7 @@ type Work struct { family *set.Set // family set (used for checking uncle invalidity) uncles *set.Set // uncle set tcount int // tx count in cycle + gasPool *core.GasPool // available gas used to pack transaction. Block *types.Block // the new block @@ -95,8 +96,8 @@ type worker struct { // update loop mux *event.TypeMux - txCh chan core.TxPreEvent - txSub event.Subscription + txsCh chan core.TxsPreEvent + txsSub event.Subscription chainHeadCh chan core.ChainHeadEvent chainHeadSub event.Subscription chainSideCh chan core.ChainSideEvent @@ -137,7 +138,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com engine: engine, eth: eth, mux: mux, - txCh: make(chan core.TxPreEvent, txChanSize), + txsCh: make(chan core.TxsPreEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), chainDb: eth.ChainDb(), @@ -149,8 +150,8 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), } - // Subscribe TxPreEvent for tx pool - worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh) + // Subscribe TxsPreEvent for tx pool + worker.txsSub = eth.TxPool().SubscribeTxPreEvent(worker.txsCh) // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) @@ -241,7 +242,7 @@ func (self *worker) unregister(agent Agent) { } func (self *worker) update() { - defer self.txSub.Unsubscribe() + defer self.txsSub.Unsubscribe() defer self.chainHeadSub.Unsubscribe() defer self.chainSideSub.Unsubscribe() @@ -258,15 +259,21 @@ func (self *worker) update() { self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() - // Handle TxPreEvent - case ev := <-self.txCh: - // Apply transaction to the pending state if we're not mining + // Handle TxsPreEvent + case ev := <-self.txsCh: + // Apply transactions to the pending state if we're not mining. + // + // Note all transactions received may not be continuous with transactions + // already included in the current mining block. These transactions will + // be automatically eliminated. if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() - acc, _ := types.Sender(self.current.signer, ev.Tx) - txs := map[common.Address]types.Transactions{acc: {ev.Tx}} + txs := make(map[common.Address]types.Transactions) + for _, tx := range ev.Txs { + acc, _ := types.Sender(self.current.signer, tx) + txs[acc] = append(txs[acc], tx) + } txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs) - self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) self.updateSnapshot() self.currentMu.Unlock() @@ -278,7 +285,7 @@ func (self *worker) update() { } // System stopped - case <-self.txSub.Err(): + case <-self.txsSub.Err(): return case <-self.chainHeadSub.Err(): return @@ -522,14 +529,16 @@ func (self *worker) updateSnapshot() { } func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) { - gp := new(core.GasPool).AddGas(env.header.GasLimit) + if env.gasPool == nil { + env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit) + } var coalescedLogs []*types.Log for { // If we don't have enough gas for any further transactions then we're done - if gp.Gas() < params.TxGas { - log.Trace("Not enough gas for further transactions", "gp", gp) + if env.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "gp", env.gasPool) break } // Retrieve the next transaction and abort if all done @@ -553,7 +562,7 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB // Start executing the transaction env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount) - err, logs := env.commitTransaction(tx, bc, coinbase, gp) + err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool) switch err { case core.ErrGasLimitReached: // Pop the current out-of-gas transaction without shifting in the next from the account |