aboutsummaryrefslogtreecommitdiffstats
path: root/miner/worker.go
diff options
context:
space:
mode:
authorrjl493456442 <garyrong0905@gmail.com>2018-05-10 15:04:45 +0800
committerPéter Szilágyi <peterke@gmail.com>2018-05-18 16:46:44 +0800
commita2e43d28d01ef9642c7f6992b78b86bd0696c847 (patch)
tree2f5d3444071125e84155321db6fd79d941cfee0b /miner/worker.go
parent6286c255f16a914b39ffd3389cba154a53e66a13 (diff)
downloadgo-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.tar.gz
go-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.tar.zst
go-tangerine-a2e43d28d01ef9642c7f6992b78b86bd0696c847.zip
all: collate new transaction events together
Diffstat (limited to 'miner/worker.go')
-rw-r--r--miner/worker.go45
1 files changed, 27 insertions, 18 deletions
diff --git a/miner/worker.go b/miner/worker.go
index 48b0b2765..3d086d6ae 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -42,7 +42,7 @@ const (
resultQueueSize = 10
miningLogAtDepth = 5
- // txChanSize is the size of channel listening to TxPreEvent.
+ // txChanSize is the size of channel listening to TxsPreEvent.
// The number is referenced from the size of tx pool.
txChanSize = 4096
// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
@@ -71,6 +71,7 @@ type Work struct {
family *set.Set // family set (used for checking uncle invalidity)
uncles *set.Set // uncle set
tcount int // tx count in cycle
+ gasPool *core.GasPool // available gas used to pack transaction.
Block *types.Block // the new block
@@ -95,8 +96,8 @@ type worker struct {
// update loop
mux *event.TypeMux
- txCh chan core.TxPreEvent
- txSub event.Subscription
+ txsCh chan core.TxsPreEvent
+ txsSub event.Subscription
chainHeadCh chan core.ChainHeadEvent
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
@@ -137,7 +138,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
engine: engine,
eth: eth,
mux: mux,
- txCh: make(chan core.TxPreEvent, txChanSize),
+ txsCh: make(chan core.TxsPreEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
chainDb: eth.ChainDb(),
@@ -149,8 +150,8 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
agents: make(map[Agent]struct{}),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
}
- // Subscribe TxPreEvent for tx pool
- worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
+ // Subscribe TxsPreEvent for tx pool
+ worker.txsSub = eth.TxPool().SubscribeTxPreEvent(worker.txsCh)
// Subscribe events for blockchain
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
@@ -241,7 +242,7 @@ func (self *worker) unregister(agent Agent) {
}
func (self *worker) update() {
- defer self.txSub.Unsubscribe()
+ defer self.txsSub.Unsubscribe()
defer self.chainHeadSub.Unsubscribe()
defer self.chainSideSub.Unsubscribe()
@@ -258,15 +259,21 @@ func (self *worker) update() {
self.possibleUncles[ev.Block.Hash()] = ev.Block
self.uncleMu.Unlock()
- // Handle TxPreEvent
- case ev := <-self.txCh:
- // Apply transaction to the pending state if we're not mining
+ // Handle TxsPreEvent
+ case ev := <-self.txsCh:
+ // Apply transactions to the pending state if we're not mining.
+ //
+ // Note all transactions received may not be continuous with transactions
+ // already included in the current mining block. These transactions will
+ // be automatically eliminated.
if atomic.LoadInt32(&self.mining) == 0 {
self.currentMu.Lock()
- acc, _ := types.Sender(self.current.signer, ev.Tx)
- txs := map[common.Address]types.Transactions{acc: {ev.Tx}}
+ txs := make(map[common.Address]types.Transactions)
+ for _, tx := range ev.Txs {
+ acc, _ := types.Sender(self.current.signer, tx)
+ txs[acc] = append(txs[acc], tx)
+ }
txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)
-
self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
self.updateSnapshot()
self.currentMu.Unlock()
@@ -278,7 +285,7 @@ func (self *worker) update() {
}
// System stopped
- case <-self.txSub.Err():
+ case <-self.txsSub.Err():
return
case <-self.chainHeadSub.Err():
return
@@ -522,14 +529,16 @@ func (self *worker) updateSnapshot() {
}
func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) {
- gp := new(core.GasPool).AddGas(env.header.GasLimit)
+ if env.gasPool == nil {
+ env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit)
+ }
var coalescedLogs []*types.Log
for {
// If we don't have enough gas for any further transactions then we're done
- if gp.Gas() < params.TxGas {
- log.Trace("Not enough gas for further transactions", "gp", gp)
+ if env.gasPool.Gas() < params.TxGas {
+ log.Trace("Not enough gas for further transactions", "gp", env.gasPool)
break
}
// Retrieve the next transaction and abort if all done
@@ -553,7 +562,7 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
// Start executing the transaction
env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount)
- err, logs := env.commitTransaction(tx, bc, coinbase, gp)
+ err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool)
switch err {
case core.ErrGasLimitReached:
// Pop the current out-of-gas transaction without shifting in the next from the account