From bf1e2631281e1e439533f2abcf1e99a7b2f9552a Mon Sep 17 00:00:00 2001 From: Miya Chen Date: Fri, 18 Aug 2017 18:58:36 +0800 Subject: core, light: send chain events using event.Feed (#14865) --- miner/worker.go | 68 ++++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 55 insertions(+), 13 deletions(-) (limited to 'miner') diff --git a/miner/worker.go b/miner/worker.go index dab192c24..24e03be60 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -41,6 +41,14 @@ import ( const ( resultQueueSize = 10 miningLogAtDepth = 5 + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 + // chainHeadChanSize is the size of channel listening to ChainHeadEvent. + chainHeadChanSize = 10 + // chainSideChanSize is the size of channel listening to ChainSideEvent. + chainSideChanSize = 10 ) // Agent can register themself with the worker @@ -87,9 +95,14 @@ type worker struct { mu sync.Mutex // update loop - mux *event.TypeMux - events *event.TypeMuxSubscription - wg sync.WaitGroup + mux *event.TypeMux + txCh chan core.TxPreEvent + txSub event.Subscription + chainHeadCh chan core.ChainHeadEvent + chainHeadSub event.Subscription + chainSideCh chan core.ChainSideEvent + chainSideSub event.Subscription + wg sync.WaitGroup agents map[Agent]struct{} recv chan *Result @@ -123,6 +136,9 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com engine: engine, eth: eth, mux: mux, + txCh: make(chan core.TxPreEvent, txChanSize), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), chainDb: eth.ChainDb(), recv: make(chan *Result, resultQueueSize), chain: eth.BlockChain(), @@ -133,7 +149,11 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), fullValidation: false, } - worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{}) + // Subscribe TxPreEvent for tx pool + worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh) + // Subscribe events for blockchain + worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) + worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) go worker.update() go worker.wait() @@ -225,20 +245,28 @@ func (self *worker) unregister(agent Agent) { } func (self *worker) update() { - for event := range self.events.Chan() { + defer self.txSub.Unsubscribe() + defer self.chainHeadSub.Unsubscribe() + defer self.chainSideSub.Unsubscribe() + + for { // A real event arrived, process interesting content - switch ev := event.Data.(type) { - case core.ChainHeadEvent: + select { + // Handle ChainHeadEvent + case <-self.chainHeadCh: self.commitNewWork() - case core.ChainSideEvent: + + // Handle ChainSideEvent + case ev := <-self.chainSideCh: self.uncleMu.Lock() self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() - case core.TxPreEvent: + + // Handle TxPreEvent + case ev := <-self.txCh: // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() - acc, _ := types.Sender(self.current.signer, ev.Tx) txs := map[common.Address]types.Transactions{acc: {ev.Tx}} txset := types.NewTransactionsByPriceAndNonce(txs) @@ -246,6 +274,14 @@ func (self *worker) update() { self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) self.currentMu.Unlock() } + + // System stopped + case <-self.txSub.Err(): + return + case <-self.chainHeadSub.Err(): + return + case <-self.chainSideSub.Err(): + return } } } @@ -298,12 +334,18 @@ func (self *worker) wait() { // broadcast before waiting for validation go func(block *types.Block, logs []*types.Log, receipts []*types.Receipt) { self.mux.Post(core.NewMinedBlockEvent{Block: block}) - self.mux.Post(core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) + var ( + events []interface{} + coalescedLogs []*types.Log + ) + events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) if stat == core.CanonStatTy { - self.mux.Post(core.ChainHeadEvent{Block: block}) - self.mux.Post(logs) + events = append(events, core.ChainHeadEvent{Block: block}) + coalescedLogs = logs } + // post blockchain events + self.chain.PostChainEvents(events, coalescedLogs) if err := core.WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil { log.Warn("Failed writing block receipts", "err", err) } -- cgit