diff options
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 35 |
1 files changed, 33 insertions, 2 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index 622c8ce9d..54bad9eae 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -36,6 +36,9 @@ import ( ) const ( + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 + // blockConfirmedChanSize is the size of channel listening to BlockConfirmedEvent. blockConfirmedChanSize = 10 ) @@ -118,6 +121,7 @@ type blockChain interface { GetBlock(hash common.Hash, number uint64) *types.Block StateAt(root common.Hash) (*state.StateDB, error) + SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription } @@ -209,6 +213,8 @@ type TxPool struct { gasPrice *big.Int txFeed event.Feed scope event.SubscriptionScope + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription blockConfirmedCh chan BlockConfirmedEvent blockConfirmedSub event.Subscription signer types.Signer @@ -229,12 +235,13 @@ type TxPool struct { wg sync.WaitGroup // for shutdown sync - homestead bool + homestead bool + isBlockProposer bool } // NewTxPool creates a new transaction pool to gather, sort and filter inbound // transactions from the network. -func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool { +func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain, isBlockProposer bool) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() @@ -248,8 +255,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), all: newTxLookup(), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), blockConfirmedCh: make(chan BlockConfirmedEvent, blockConfirmedChanSize), gasPrice: new(big.Int).SetUint64(config.PriceLimit), + isBlockProposer: isBlockProposer, } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -272,6 +281,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block } // Subscribe events from blockchain pool.blockConfirmedSub = pool.chain.SubscribeBlockConfirmedEvent(pool.blockConfirmedCh) + pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) // Start the event loop and return pool.wg.Add(1) @@ -304,8 +314,29 @@ func (pool *TxPool) loop() { // Keep waiting for and reacting to the various events for { select { + // Handle ChainHeadEvent + case ev := <-pool.chainHeadCh: + if pool.isBlockProposer { + break + } + if ev.Block != nil { + pool.mu.Lock() + if pool.chainconfig.IsHomestead(ev.Block.Number()) { + pool.homestead = true + } + pool.reset(head.Header(), ev.Block.Header()) + head = ev.Block + + pool.mu.Unlock() + } + // Be unsubscribed due to system stopped + case <-pool.chainHeadSub.Err(): + return // Handle BlockConfirmedEvent case ev := <-pool.blockConfirmedCh: + if !pool.isBlockProposer { + break + } if ev.Block != nil { pool.mu.Lock() if pool.chainconfig.IsHomestead(ev.Block.Number()) { |