diff options
Diffstat (limited to 'miner/worker.go')
-rw-r--r-- | miner/worker.go | 116 |
1 files changed, 49 insertions, 67 deletions
diff --git a/miner/worker.go b/miner/worker.go index 1a881799d..ca68da6e9 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -73,7 +73,7 @@ const ( // increasing upper limit or decreasing lower limit so that the limit can be reachable. intervalAdjustBias = 200 * 1000.0 * 1000.0 - // staleThreshold is the maximum distance of the acceptable stale block. + // staleThreshold is the maximum depth of the acceptable stale block. staleThreshold = 7 ) @@ -139,7 +139,7 @@ type worker struct { // Channels newWorkCh chan *newWorkReq taskCh chan *task - resultCh chan *task + resultCh chan *types.Block startCh chan struct{} exitCh chan struct{} resubmitIntervalCh chan time.Duration @@ -186,7 +186,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), newWorkCh: make(chan *newWorkReq), taskCh: make(chan *task), - resultCh: make(chan *task, resultQueueSize), + resultCh: make(chan *types.Block, resultQueueSize), exitCh: make(chan struct{}), startCh: make(chan struct{}, 1), resubmitIntervalCh: make(chan time.Duration), @@ -269,18 +269,10 @@ func (w *worker) isRunning() bool { return atomic.LoadInt32(&w.running) == 1 } -// close terminates all background threads maintained by the worker and cleans up buffered channels. +// close terminates all background threads maintained by the worker. // Note the worker does not support being closed multiple times. func (w *worker) close() { close(w.exitCh) - // Clean up buffered channels - for empty := false; !empty; { - select { - case <-w.resultCh: - default: - empty = true - } - } } // newWorkLoop is a standalone goroutine to submit new mining work upon received events. @@ -471,42 +463,6 @@ func (w *worker) mainLoop() { } } -// seal pushes a sealing task to consensus engine and submits the result. -func (w *worker) seal(t *task, stop <-chan struct{}) { - if w.skipSealHook != nil && w.skipSealHook(t) { - return - } - // The reason for caching task first is: - // A previous sealing action will be canceled by subsequent actions, - // however, remote miner may submit a result based on the cancelled task. - // So we should only submit the pending state corresponding to the seal result. - // TODO(rjl493456442) Replace the seal-wait logic structure - w.pendingMu.Lock() - w.pendingTasks[w.engine.SealHash(t.block.Header())] = t - w.pendingMu.Unlock() - - if block, err := w.engine.Seal(w.chain, t.block, stop); block != nil { - sealhash := w.engine.SealHash(block.Header()) - w.pendingMu.RLock() - task, exist := w.pendingTasks[sealhash] - w.pendingMu.RUnlock() - if !exist { - log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash()) - return - } - // Assemble sealing result - task.block = block - log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash(), - "elapsed", common.PrettyDuration(time.Since(task.createdAt))) - select { - case w.resultCh <- task: - case <-w.exitCh: - } - } else if err != nil { - log.Warn("Block sealing failed", "err", err) - } -} - // taskLoop is a standalone goroutine to fetch sealing task from the generator and // push them to consensus engine. func (w *worker) taskLoop() { @@ -533,10 +489,20 @@ func (w *worker) taskLoop() { if sealHash == prev { continue } + // Interrupt previous sealing operation interrupt() - stopCh = make(chan struct{}) - prev = sealHash - go w.seal(task, stopCh) + stopCh, prev = make(chan struct{}), sealHash + + if w.skipSealHook != nil && w.skipSealHook(task) { + continue + } + w.pendingMu.Lock() + w.pendingTasks[w.engine.SealHash(task.block.Header())] = task + w.pendingMu.Unlock() + + if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil { + log.Warn("Block sealing failed", "err", err) + } case <-w.exitCh: interrupt() return @@ -549,38 +515,54 @@ func (w *worker) taskLoop() { func (w *worker) resultLoop() { for { select { - case result := <-w.resultCh: + case block := <-w.resultCh: // Short circuit when receiving empty result. - if result == nil { + if block == nil { continue } // Short circuit when receiving duplicate result caused by resubmitting. - block := result.block if w.chain.HasBlock(block.Hash(), block.NumberU64()) { continue } - // Update the block hash in all logs since it is now available and not when the - // receipt/log of individual transactions were created. - for _, r := range result.receipts { - for _, l := range r.Logs { - l.BlockHash = block.Hash() - } + var ( + sealhash = w.engine.SealHash(block.Header()) + hash = block.Hash() + ) + w.pendingMu.RLock() + task, exist := w.pendingTasks[sealhash] + w.pendingMu.RUnlock() + if !exist { + log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash) + continue } - for _, log := range result.state.Logs() { - log.BlockHash = block.Hash() + // Different block could share same sealhash, deep copy here to prevent write-write conflict. + var ( + receipts = make([]*types.Receipt, len(task.receipts)) + logs []*types.Log + ) + for i, receipt := range task.receipts { + receipts[i] = new(types.Receipt) + *receipts[i] = *receipt + // Update the block hash in all logs since it is now available and not when the + // receipt/log of individual transactions were created. + for _, log := range receipt.Logs { + log.BlockHash = hash + } + logs = append(logs, receipt.Logs...) } // Commit block and state to database. - stat, err := w.chain.WriteBlockWithState(block, result.receipts, result.state) + stat, err := w.chain.WriteBlockWithState(block, receipts, task.state) if err != nil { log.Error("Failed writing block to chain", "err", err) continue } + log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash, + "elapsed", common.PrettyDuration(time.Since(task.createdAt))) + // Broadcast the block and announce chain insertion event w.mux.Post(core.NewMinedBlockEvent{Block: block}) - var ( - events []interface{} - logs = result.state.Logs() - ) + + var events []interface{} switch stat { case core.CanonStatTy: events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) |