diff options
author | Wei-Ning Huang <w@dexon.org> | 2018-10-31 18:21:55 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2018-12-19 20:54:27 +0800 |
commit | 10511980f4e0d5d5330b2fd619e6d805c65938fd (patch) | |
tree | 6ad951518926a50e11e0c9ae98f6fb4a664d38e1 | |
parent | 26b7d6403b713717d46fef1fa2bed273722a4ce8 (diff) | |
download | dexon-10511980f4e0d5d5330b2fd619e6d805c65938fd.tar.gz dexon-10511980f4e0d5d5330b2fd619e6d805c65938fd.tar.zst dexon-10511980f4e0d5d5330b2fd619e6d805c65938fd.zip |
core: tx_pool: remove transactions on BlockConfirmed event
-rw-r--r-- | core/blockchain.go | 26 | ||||
-rw-r--r-- | core/events.go | 2 | ||||
-rw-r--r-- | core/tx_pool.go | 56 | ||||
-rw-r--r-- | core/tx_pool_test.go | 10 | ||||
-rw-r--r-- | dex/app.go | 3 |
5 files changed, 55 insertions, 42 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 7eb6f3606..1bbaf2bc9 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -104,14 +104,15 @@ type BlockChain struct { triegc *prque.Prque // Priority queue mapping block numbers to tries to gc gcproc time.Duration // Accumulates canonical block processing for trie dumping - hc *HeaderChain - rmLogsFeed event.Feed - chainFeed event.Feed - chainSideFeed event.Feed - chainHeadFeed event.Feed - logsFeed event.Feed - scope event.SubscriptionScope - genesisBlock *types.Block + hc *HeaderChain + rmLogsFeed event.Feed + chainFeed event.Feed + chainSideFeed event.Feed + chainHeadFeed event.Feed + blockConfirmedFeed event.Feed + logsFeed event.Feed + scope event.SubscriptionScope + genesisBlock *types.Block mu sync.RWMutex // global mutex for locking chain operations chainmu sync.RWMutex // blockchain insertion lock @@ -1604,6 +1605,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes // add into pending blocks bc.addPendingBlock(newPendingBlock, receipts) + events = append(events, BlockConfirmedEvent{newPendingBlock}) // start insert available pending blocks into db for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= witness.Height; pendingHeight++ { @@ -1824,6 +1826,9 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { case ChainHeadEvent: bc.chainHeadFeed.Send(ev) + case BlockConfirmedEvent: + bc.blockConfirmedFeed.Send(ev) + case ChainSideEvent: bc.chainSideFeed.Send(ev) } @@ -2016,6 +2021,11 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch)) } +// SubscribeBlockConfirmedEvent registers a subscription of ChainHeadEvent. +func (bc *BlockChain) SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription { + return bc.scope.Track(bc.blockConfirmedFeed.Subscribe(ch)) +} + // SubscribeChainSideEvent registers a subscription of ChainSideEvent. func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription { return bc.scope.Track(bc.chainSideFeed.Subscribe(ch)) diff --git a/core/events.go b/core/events.go index e76aa4784..1231daa37 100644 --- a/core/events.go +++ b/core/events.go @@ -47,6 +47,8 @@ type ChainSideEvent struct { type ChainHeadEvent struct{ Block *types.Block } +type BlockConfirmedEvent struct{ Block *types.Block } + type NewNotarySetEvent struct { Round uint64 Pubkeys map[string]struct{} // pubkeys in hex format diff --git a/core/tx_pool.go b/core/tx_pool.go index db9a6b9f9..6a9fa45c3 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -36,8 +36,8 @@ import ( ) const ( - // chainHeadChanSize is the size of channel listening to ChainHeadEvent. - chainHeadChanSize = 10 + // blockConfirmedChanSize is the size of channel listening to BlockConfirmedEvent. + blockConfirmedChanSize = 10 ) var ( @@ -118,7 +118,7 @@ type blockChain interface { GetBlock(hash common.Hash, number uint64) *types.Block StateAt(root common.Hash) (*state.StateDB, error) - SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription + SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription } // TxPoolConfig are the configuration parameters of the transaction pool. @@ -183,16 +183,16 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type TxPool struct { - config TxPoolConfig - chainconfig *params.ChainConfig - chain blockChain - gasPrice *big.Int - txFeed event.Feed - scope event.SubscriptionScope - chainHeadCh chan ChainHeadEvent - chainHeadSub event.Subscription - signer types.Signer - mu sync.RWMutex + config TxPoolConfig + chainconfig *params.ChainConfig + chain blockChain + gasPrice *big.Int + txFeed event.Feed + scope event.SubscriptionScope + blockConfirmedCh chan BlockConfirmedEvent + blockConfirmedSub event.Subscription + signer types.Signer + mu sync.RWMutex currentState *state.StateDB // Current state in the blockchain head pendingState *state.ManagedState // Pending state tracking virtual nonces @@ -220,16 +220,16 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - chain: chain, - signer: types.NewEIP155Signer(chainconfig.ChainID), - pending: make(map[common.Address]*txList), - queue: make(map[common.Address]*txList), - beats: make(map[common.Address]time.Time), - all: newTxLookup(), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.NewEIP155Signer(chainconfig.ChainID), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + beats: make(map[common.Address]time.Time), + all: newTxLookup(), + blockConfirmedCh: make(chan BlockConfirmedEvent, blockConfirmedChanSize), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -251,7 +251,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block } } // Subscribe events from blockchain - pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) + pool.blockConfirmedSub = pool.chain.SubscribeBlockConfirmedEvent(pool.blockConfirmedCh) // Start the event loop and return pool.wg.Add(1) @@ -284,8 +284,8 @@ func (pool *TxPool) loop() { // Keep waiting for and reacting to the various events for { select { - // Handle ChainHeadEvent - case ev := <-pool.chainHeadCh: + // Handle BlockConfirmedEvent + case ev := <-pool.blockConfirmedCh: if ev.Block != nil { pool.mu.Lock() if pool.chainconfig.IsHomestead(ev.Block.Number()) { @@ -297,7 +297,7 @@ func (pool *TxPool) loop() { pool.mu.Unlock() } // Be unsubscribed due to system stopped - case <-pool.chainHeadSub.Err(): + case <-pool.blockConfirmedSub.Err(): return // Handle stats reporting ticks @@ -441,7 +441,7 @@ func (pool *TxPool) Stop() { pool.scope.Close() // Unsubscribe subscriptions registered from blockchain - pool.chainHeadSub.Unsubscribe() + pool.blockConfirmedSub.Unsubscribe() pool.wg.Wait() if pool.journal != nil { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index d63125ac9..3f0ab9df3 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -45,9 +45,9 @@ func init() { } type testBlockChain struct { - statedb *state.StateDB - gasLimit uint64 - chainHeadFeed *event.Feed + statedb *state.StateDB + gasLimit uint64 + blockConfirmedFeed *event.Feed } func (bc *testBlockChain) CurrentBlock() *types.Block { @@ -64,8 +64,8 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) { return bc.statedb, nil } -func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription { - return bc.chainHeadFeed.Subscribe(ch) +func (bc *testBlockChain) SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription { + return bc.blockConfirmedFeed.Subscribe(ch) } func transaction(nonce uint64, gaslimit uint64, key *ecdsa.PrivateKey) *types.Transaction { diff --git a/dex/app.go b/dex/app.go index b325f44d0..3bc5053a4 100644 --- a/dex/app.go +++ b/dex/app.go @@ -67,7 +67,8 @@ type witnessData struct { ReceiptHash common.Hash } -func NewDexconApp(txPool *core.TxPool, blockchain *core.BlockChain, gov *DexconGovernance, chainDB ethdb.Database, config *Config, vmConfig vm.Config) *DexconApp { +func NewDexconApp(txPool *core.TxPool, blockchain *core.BlockChain, gov *DexconGovernance, + chainDB ethdb.Database, config *Config, vmConfig vm.Config) *DexconApp { return &DexconApp{ txPool: txPool, blockchain: blockchain, |