aboutsummaryrefslogtreecommitdiffstats
path: root/miner/worker.go
diff options
context:
space:
mode:
authorMiya Chen <miyatlchen@gmail.com>2017-08-18 18:58:36 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-08-18 18:58:36 +0800
commitbf1e2631281e1e439533f2abcf1e99a7b2f9552a (patch)
treea8b86720edf085a6531e7042ef33f36a993540d5 /miner/worker.go
parenta4da8416eec6a00c358b6a612d21e7cdf859d588 (diff)
downloaddexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.gz
dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.zst
dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.zip
core, light: send chain events using event.Feed (#14865)
Diffstat (limited to 'miner/worker.go')
-rw-r--r--miner/worker.go68
1 files changed, 55 insertions, 13 deletions
diff --git a/miner/worker.go b/miner/worker.go
index dab192c24..24e03be60 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -41,6 +41,14 @@ import (
const (
resultQueueSize = 10
miningLogAtDepth = 5
+
+ // txChanSize is the size of channel listening to TxPreEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+ // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
+ chainHeadChanSize = 10
+ // chainSideChanSize is the size of channel listening to ChainSideEvent.
+ chainSideChanSize = 10
)
// Agent can register themself with the worker
@@ -87,9 +95,14 @@ type worker struct {
mu sync.Mutex
// update loop
- mux *event.TypeMux
- events *event.TypeMuxSubscription
- wg sync.WaitGroup
+ mux *event.TypeMux
+ txCh chan core.TxPreEvent
+ txSub event.Subscription
+ chainHeadCh chan core.ChainHeadEvent
+ chainHeadSub event.Subscription
+ chainSideCh chan core.ChainSideEvent
+ chainSideSub event.Subscription
+ wg sync.WaitGroup
agents map[Agent]struct{}
recv chan *Result
@@ -123,6 +136,9 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
engine: engine,
eth: eth,
mux: mux,
+ txCh: make(chan core.TxPreEvent, txChanSize),
+ chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
+ chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
chainDb: eth.ChainDb(),
recv: make(chan *Result, resultQueueSize),
chain: eth.BlockChain(),
@@ -133,7 +149,11 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
fullValidation: false,
}
- worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
+ // Subscribe TxPreEvent for tx pool
+ worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
+ // Subscribe events for blockchain
+ worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
+ worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
go worker.update()
go worker.wait()
@@ -225,20 +245,28 @@ func (self *worker) unregister(agent Agent) {
}
func (self *worker) update() {
- for event := range self.events.Chan() {
+ defer self.txSub.Unsubscribe()
+ defer self.chainHeadSub.Unsubscribe()
+ defer self.chainSideSub.Unsubscribe()
+
+ for {
// A real event arrived, process interesting content
- switch ev := event.Data.(type) {
- case core.ChainHeadEvent:
+ select {
+ // Handle ChainHeadEvent
+ case <-self.chainHeadCh:
self.commitNewWork()
- case core.ChainSideEvent:
+
+ // Handle ChainSideEvent
+ case ev := <-self.chainSideCh:
self.uncleMu.Lock()
self.possibleUncles[ev.Block.Hash()] = ev.Block
self.uncleMu.Unlock()
- case core.TxPreEvent:
+
+ // Handle TxPreEvent
+ case ev := <-self.txCh:
// Apply transaction to the pending state if we're not mining
if atomic.LoadInt32(&self.mining) == 0 {
self.currentMu.Lock()
-
acc, _ := types.Sender(self.current.signer, ev.Tx)
txs := map[common.Address]types.Transactions{acc: {ev.Tx}}
txset := types.NewTransactionsByPriceAndNonce(txs)
@@ -246,6 +274,14 @@ func (self *worker) update() {
self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
self.currentMu.Unlock()
}
+
+ // System stopped
+ case <-self.txSub.Err():
+ return
+ case <-self.chainHeadSub.Err():
+ return
+ case <-self.chainSideSub.Err():
+ return
}
}
}
@@ -298,12 +334,18 @@ func (self *worker) wait() {
// broadcast before waiting for validation
go func(block *types.Block, logs []*types.Log, receipts []*types.Receipt) {
self.mux.Post(core.NewMinedBlockEvent{Block: block})
- self.mux.Post(core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
+ var (
+ events []interface{}
+ coalescedLogs []*types.Log
+ )
+ events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
if stat == core.CanonStatTy {
- self.mux.Post(core.ChainHeadEvent{Block: block})
- self.mux.Post(logs)
+ events = append(events, core.ChainHeadEvent{Block: block})
+ coalescedLogs = logs
}
+ // post blockchain events
+ self.chain.PostChainEvents(events, coalescedLogs)
if err := core.WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
log.Warn("Failed writing block receipts", "err", err)
}