aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorWei-Ning Huang <w@dexon.org>2018-10-31 18:21:55 +0800
committerWei-Ning Huang <w@dexon.org>2018-12-19 20:54:27 +0800
commit10511980f4e0d5d5330b2fd619e6d805c65938fd (patch)
tree6ad951518926a50e11e0c9ae98f6fb4a664d38e1
parent26b7d6403b713717d46fef1fa2bed273722a4ce8 (diff)
downloaddexon-10511980f4e0d5d5330b2fd619e6d805c65938fd.tar.gz
dexon-10511980f4e0d5d5330b2fd619e6d805c65938fd.tar.zst
dexon-10511980f4e0d5d5330b2fd619e6d805c65938fd.zip
core: tx_pool: remove transactions on BlockConfirmed event
-rw-r--r--core/blockchain.go26
-rw-r--r--core/events.go2
-rw-r--r--core/tx_pool.go56
-rw-r--r--core/tx_pool_test.go10
-rw-r--r--dex/app.go3
5 files changed, 55 insertions, 42 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 7eb6f3606..1bbaf2bc9 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -104,14 +104,15 @@ type BlockChain struct {
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
- hc *HeaderChain
- rmLogsFeed event.Feed
- chainFeed event.Feed
- chainSideFeed event.Feed
- chainHeadFeed event.Feed
- logsFeed event.Feed
- scope event.SubscriptionScope
- genesisBlock *types.Block
+ hc *HeaderChain
+ rmLogsFeed event.Feed
+ chainFeed event.Feed
+ chainSideFeed event.Feed
+ chainHeadFeed event.Feed
+ blockConfirmedFeed event.Feed
+ logsFeed event.Feed
+ scope event.SubscriptionScope
+ genesisBlock *types.Block
mu sync.RWMutex // global mutex for locking chain operations
chainmu sync.RWMutex // blockchain insertion lock
@@ -1604,6 +1605,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
// add into pending blocks
bc.addPendingBlock(newPendingBlock, receipts)
+ events = append(events, BlockConfirmedEvent{newPendingBlock})
// start insert available pending blocks into db
for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= witness.Height; pendingHeight++ {
@@ -1824,6 +1826,9 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
case ChainHeadEvent:
bc.chainHeadFeed.Send(ev)
+ case BlockConfirmedEvent:
+ bc.blockConfirmedFeed.Send(ev)
+
case ChainSideEvent:
bc.chainSideFeed.Send(ev)
}
@@ -2016,6 +2021,11 @@ func (bc *BlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Su
return bc.scope.Track(bc.chainHeadFeed.Subscribe(ch))
}
+// SubscribeBlockConfirmedEvent registers a subscription of ChainHeadEvent.
+func (bc *BlockChain) SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription {
+ return bc.scope.Track(bc.blockConfirmedFeed.Subscribe(ch))
+}
+
// SubscribeChainSideEvent registers a subscription of ChainSideEvent.
func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Subscription {
return bc.scope.Track(bc.chainSideFeed.Subscribe(ch))
diff --git a/core/events.go b/core/events.go
index e76aa4784..1231daa37 100644
--- a/core/events.go
+++ b/core/events.go
@@ -47,6 +47,8 @@ type ChainSideEvent struct {
type ChainHeadEvent struct{ Block *types.Block }
+type BlockConfirmedEvent struct{ Block *types.Block }
+
type NewNotarySetEvent struct {
Round uint64
Pubkeys map[string]struct{} // pubkeys in hex format
diff --git a/core/tx_pool.go b/core/tx_pool.go
index db9a6b9f9..6a9fa45c3 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -36,8 +36,8 @@ import (
)
const (
- // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
- chainHeadChanSize = 10
+ // blockConfirmedChanSize is the size of channel listening to BlockConfirmedEvent.
+ blockConfirmedChanSize = 10
)
var (
@@ -118,7 +118,7 @@ type blockChain interface {
GetBlock(hash common.Hash, number uint64) *types.Block
StateAt(root common.Hash) (*state.StateDB, error)
- SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
+ SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription
}
// TxPoolConfig are the configuration parameters of the transaction pool.
@@ -183,16 +183,16 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
- config TxPoolConfig
- chainconfig *params.ChainConfig
- chain blockChain
- gasPrice *big.Int
- txFeed event.Feed
- scope event.SubscriptionScope
- chainHeadCh chan ChainHeadEvent
- chainHeadSub event.Subscription
- signer types.Signer
- mu sync.RWMutex
+ config TxPoolConfig
+ chainconfig *params.ChainConfig
+ chain blockChain
+ gasPrice *big.Int
+ txFeed event.Feed
+ scope event.SubscriptionScope
+ blockConfirmedCh chan BlockConfirmedEvent
+ blockConfirmedSub event.Subscription
+ signer types.Signer
+ mu sync.RWMutex
currentState *state.StateDB // Current state in the blockchain head
pendingState *state.ManagedState // Pending state tracking virtual nonces
@@ -220,16 +220,16 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
// Create the transaction pool with its initial settings
pool := &TxPool{
- config: config,
- chainconfig: chainconfig,
- chain: chain,
- signer: types.NewEIP155Signer(chainconfig.ChainID),
- pending: make(map[common.Address]*txList),
- queue: make(map[common.Address]*txList),
- beats: make(map[common.Address]time.Time),
- all: newTxLookup(),
- chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
- gasPrice: new(big.Int).SetUint64(config.PriceLimit),
+ config: config,
+ chainconfig: chainconfig,
+ chain: chain,
+ signer: types.NewEIP155Signer(chainconfig.ChainID),
+ pending: make(map[common.Address]*txList),
+ queue: make(map[common.Address]*txList),
+ beats: make(map[common.Address]time.Time),
+ all: newTxLookup(),
+ blockConfirmedCh: make(chan BlockConfirmedEvent, blockConfirmedChanSize),
+ gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
@@ -251,7 +251,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
}
}
// Subscribe events from blockchain
- pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
+ pool.blockConfirmedSub = pool.chain.SubscribeBlockConfirmedEvent(pool.blockConfirmedCh)
// Start the event loop and return
pool.wg.Add(1)
@@ -284,8 +284,8 @@ func (pool *TxPool) loop() {
// Keep waiting for and reacting to the various events
for {
select {
- // Handle ChainHeadEvent
- case ev := <-pool.chainHeadCh:
+ // Handle BlockConfirmedEvent
+ case ev := <-pool.blockConfirmedCh:
if ev.Block != nil {
pool.mu.Lock()
if pool.chainconfig.IsHomestead(ev.Block.Number()) {
@@ -297,7 +297,7 @@ func (pool *TxPool) loop() {
pool.mu.Unlock()
}
// Be unsubscribed due to system stopped
- case <-pool.chainHeadSub.Err():
+ case <-pool.blockConfirmedSub.Err():
return
// Handle stats reporting ticks
@@ -441,7 +441,7 @@ func (pool *TxPool) Stop() {
pool.scope.Close()
// Unsubscribe subscriptions registered from blockchain
- pool.chainHeadSub.Unsubscribe()
+ pool.blockConfirmedSub.Unsubscribe()
pool.wg.Wait()
if pool.journal != nil {
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index d63125ac9..3f0ab9df3 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -45,9 +45,9 @@ func init() {
}
type testBlockChain struct {
- statedb *state.StateDB
- gasLimit uint64
- chainHeadFeed *event.Feed
+ statedb *state.StateDB
+ gasLimit uint64
+ blockConfirmedFeed *event.Feed
}
func (bc *testBlockChain) CurrentBlock() *types.Block {
@@ -64,8 +64,8 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) {
return bc.statedb, nil
}
-func (bc *testBlockChain) SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription {
- return bc.chainHeadFeed.Subscribe(ch)
+func (bc *testBlockChain) SubscribeBlockConfirmedEvent(ch chan<- BlockConfirmedEvent) event.Subscription {
+ return bc.blockConfirmedFeed.Subscribe(ch)
}
func transaction(nonce uint64, gaslimit uint64, key *ecdsa.PrivateKey) *types.Transaction {
diff --git a/dex/app.go b/dex/app.go
index b325f44d0..3bc5053a4 100644
--- a/dex/app.go
+++ b/dex/app.go
@@ -67,7 +67,8 @@ type witnessData struct {
ReceiptHash common.Hash
}
-func NewDexconApp(txPool *core.TxPool, blockchain *core.BlockChain, gov *DexconGovernance, chainDB ethdb.Database, config *Config, vmConfig vm.Config) *DexconApp {
+func NewDexconApp(txPool *core.TxPool, blockchain *core.BlockChain, gov *DexconGovernance,
+ chainDB ethdb.Database, config *Config, vmConfig vm.Config) *DexconApp {
return &DexconApp{
txPool: txPool,
blockchain: blockchain,