aboutsummaryrefslogtreecommitdiffstats
path: root/miner/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'miner/worker.go')
-rw-r--r--miner/worker.go116
1 files changed, 49 insertions, 67 deletions
diff --git a/miner/worker.go b/miner/worker.go
index 1a881799d..ca68da6e9 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -73,7 +73,7 @@ const (
// increasing upper limit or decreasing lower limit so that the limit can be reachable.
intervalAdjustBias = 200 * 1000.0 * 1000.0
- // staleThreshold is the maximum distance of the acceptable stale block.
+ // staleThreshold is the maximum depth of the acceptable stale block.
staleThreshold = 7
)
@@ -139,7 +139,7 @@ type worker struct {
// Channels
newWorkCh chan *newWorkReq
taskCh chan *task
- resultCh chan *task
+ resultCh chan *types.Block
startCh chan struct{}
exitCh chan struct{}
resubmitIntervalCh chan time.Duration
@@ -186,7 +186,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend,
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
taskCh: make(chan *task),
- resultCh: make(chan *task, resultQueueSize),
+ resultCh: make(chan *types.Block, resultQueueSize),
exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration),
@@ -269,18 +269,10 @@ func (w *worker) isRunning() bool {
return atomic.LoadInt32(&w.running) == 1
}
-// close terminates all background threads maintained by the worker and cleans up buffered channels.
+// close terminates all background threads maintained by the worker.
// Note the worker does not support being closed multiple times.
func (w *worker) close() {
close(w.exitCh)
- // Clean up buffered channels
- for empty := false; !empty; {
- select {
- case <-w.resultCh:
- default:
- empty = true
- }
- }
}
// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
@@ -471,42 +463,6 @@ func (w *worker) mainLoop() {
}
}
-// seal pushes a sealing task to consensus engine and submits the result.
-func (w *worker) seal(t *task, stop <-chan struct{}) {
- if w.skipSealHook != nil && w.skipSealHook(t) {
- return
- }
- // The reason for caching task first is:
- // A previous sealing action will be canceled by subsequent actions,
- // however, remote miner may submit a result based on the cancelled task.
- // So we should only submit the pending state corresponding to the seal result.
- // TODO(rjl493456442) Replace the seal-wait logic structure
- w.pendingMu.Lock()
- w.pendingTasks[w.engine.SealHash(t.block.Header())] = t
- w.pendingMu.Unlock()
-
- if block, err := w.engine.Seal(w.chain, t.block, stop); block != nil {
- sealhash := w.engine.SealHash(block.Header())
- w.pendingMu.RLock()
- task, exist := w.pendingTasks[sealhash]
- w.pendingMu.RUnlock()
- if !exist {
- log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash())
- return
- }
- // Assemble sealing result
- task.block = block
- log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash(),
- "elapsed", common.PrettyDuration(time.Since(task.createdAt)))
- select {
- case w.resultCh <- task:
- case <-w.exitCh:
- }
- } else if err != nil {
- log.Warn("Block sealing failed", "err", err)
- }
-}
-
// taskLoop is a standalone goroutine to fetch sealing task from the generator and
// push them to consensus engine.
func (w *worker) taskLoop() {
@@ -533,10 +489,20 @@ func (w *worker) taskLoop() {
if sealHash == prev {
continue
}
+ // Interrupt previous sealing operation
interrupt()
- stopCh = make(chan struct{})
- prev = sealHash
- go w.seal(task, stopCh)
+ stopCh, prev = make(chan struct{}), sealHash
+
+ if w.skipSealHook != nil && w.skipSealHook(task) {
+ continue
+ }
+ w.pendingMu.Lock()
+ w.pendingTasks[w.engine.SealHash(task.block.Header())] = task
+ w.pendingMu.Unlock()
+
+ if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
+ log.Warn("Block sealing failed", "err", err)
+ }
case <-w.exitCh:
interrupt()
return
@@ -549,38 +515,54 @@ func (w *worker) taskLoop() {
func (w *worker) resultLoop() {
for {
select {
- case result := <-w.resultCh:
+ case block := <-w.resultCh:
// Short circuit when receiving empty result.
- if result == nil {
+ if block == nil {
continue
}
// Short circuit when receiving duplicate result caused by resubmitting.
- block := result.block
if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
continue
}
- // Update the block hash in all logs since it is now available and not when the
- // receipt/log of individual transactions were created.
- for _, r := range result.receipts {
- for _, l := range r.Logs {
- l.BlockHash = block.Hash()
- }
+ var (
+ sealhash = w.engine.SealHash(block.Header())
+ hash = block.Hash()
+ )
+ w.pendingMu.RLock()
+ task, exist := w.pendingTasks[sealhash]
+ w.pendingMu.RUnlock()
+ if !exist {
+ log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash)
+ continue
}
- for _, log := range result.state.Logs() {
- log.BlockHash = block.Hash()
+ // Different block could share same sealhash, deep copy here to prevent write-write conflict.
+ var (
+ receipts = make([]*types.Receipt, len(task.receipts))
+ logs []*types.Log
+ )
+ for i, receipt := range task.receipts {
+ receipts[i] = new(types.Receipt)
+ *receipts[i] = *receipt
+ // Update the block hash in all logs since it is now available and not when the
+ // receipt/log of individual transactions were created.
+ for _, log := range receipt.Logs {
+ log.BlockHash = hash
+ }
+ logs = append(logs, receipt.Logs...)
}
// Commit block and state to database.
- stat, err := w.chain.WriteBlockWithState(block, result.receipts, result.state)
+ stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
if err != nil {
log.Error("Failed writing block to chain", "err", err)
continue
}
+ log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
+ "elapsed", common.PrettyDuration(time.Since(task.createdAt)))
+
// Broadcast the block and announce chain insertion event
w.mux.Post(core.NewMinedBlockEvent{Block: block})
- var (
- events []interface{}
- logs = result.state.Logs()
- )
+
+ var events []interface{}
switch stat {
case core.CanonStatTy:
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})