aboutsummaryrefslogtreecommitdiffstats
path: root/miner
diff options
context:
space:
mode:
authorgary rong <garyrong0905@gmail.com>2018-08-23 21:02:57 +0800
committerPéter Szilágyi <peterke@gmail.com>2018-08-23 21:02:57 +0800
commit40a71f28cf1ada0bf6bdcdc2f3c6f31a8da134a2 (patch)
tree70d5c37338def1e3246a3117ac84abca793c476a /miner
parentc3f7e3be3b60df3edd168e80aa89ee2992932b0d (diff)
downloaddexon-40a71f28cf1ada0bf6bdcdc2f3c6f31a8da134a2.tar.gz
dexon-40a71f28cf1ada0bf6bdcdc2f3c6f31a8da134a2.tar.zst
dexon-40a71f28cf1ada0bf6bdcdc2f3c6f31a8da134a2.zip
miner: fix state commit, track old work packages too (#17490)
* miner: commit state which is relative with sealing result * consensus, core, miner, mobile: introduce sealHash interface * miner: evict pending task with threshold * miner: go fmt
Diffstat (limited to 'miner')
-rw-r--r--miner/worker.go75
1 files changed, 52 insertions, 23 deletions
diff --git a/miner/worker.go b/miner/worker.go
index 8c3337ba4..18fb12e45 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -72,6 +72,9 @@ const (
// intervalAdjustBias is applied during the new resubmit interval calculation in favor of
// increasing upper limit or decreasing lower limit so that the limit can be reachable.
intervalAdjustBias = 200 * 1000.0 * 1000.0
+
+ // staleThreshold is the maximum distance of the acceptable stale block.
+ staleThreshold = 7
)
// environment is the worker's current environment and holds all of the current state information.
@@ -150,6 +153,9 @@ type worker struct {
coinbase common.Address
extra []byte
+ pendingMu sync.RWMutex
+ pendingTasks map[common.Hash]*task
+
snapshotMu sync.RWMutex // The lock used to protect the block snapshot and state snapshot
snapshotBlock *types.Block
snapshotState *state.StateDB
@@ -174,6 +180,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend,
chain: eth.BlockChain(),
possibleUncles: make(map[common.Hash]*types.Block),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
+ pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
@@ -317,13 +324,25 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
}
recommit = time.Duration(int64(next))
}
+ // clearPending cleans the stale pending tasks.
+ clearPending := func(number uint64) {
+ w.pendingMu.Lock()
+ for h, t := range w.pendingTasks {
+ if t.block.NumberU64()+staleThreshold <= number {
+ delete(w.pendingTasks, h)
+ }
+ }
+ w.pendingMu.Unlock()
+ }
for {
select {
case <-w.startCh:
+ clearPending(w.chain.CurrentBlock().NumberU64())
commit(false, commitInterruptNewHead)
- case <-w.chainHeadCh:
+ case head := <-w.chainHeadCh:
+ clearPending(head.Block.NumberU64())
commit(false, commitInterruptNewHead)
case <-timer.C:
@@ -454,28 +473,37 @@ func (w *worker) mainLoop() {
// seal pushes a sealing task to consensus engine and submits the result.
func (w *worker) seal(t *task, stop <-chan struct{}) {
- var (
- err error
- res *task
- )
-
if w.skipSealHook != nil && w.skipSealHook(t) {
return
}
-
- if t.block, err = w.engine.Seal(w.chain, t.block, stop); t.block != nil {
- log.Info("Successfully sealed new block", "number", t.block.Number(), "hash", t.block.Hash(),
- "elapsed", common.PrettyDuration(time.Since(t.createdAt)))
- res = t
- } else {
- if err != nil {
- log.Warn("Block sealing failed", "err", err)
+ // The reason for caching task first is:
+ // A previous sealing action will be canceled by subsequent actions,
+ // however, remote miner may submit a result based on the cancelled task.
+ // So we should only submit the pending state corresponding to the seal result.
+ // TODO(rjl493456442) Replace the seal-wait logic structure
+ w.pendingMu.Lock()
+ w.pendingTasks[w.engine.SealHash(t.block.Header())] = t
+ w.pendingMu.Unlock()
+
+ if block, err := w.engine.Seal(w.chain, t.block, stop); block != nil {
+ sealhash := w.engine.SealHash(block.Header())
+ w.pendingMu.RLock()
+ task, exist := w.pendingTasks[sealhash]
+ w.pendingMu.RUnlock()
+ if !exist {
+ log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash())
+ return
}
- res = nil
- }
- select {
- case w.resultCh <- res:
- case <-w.exitCh:
+ // Assemble sealing result
+ task.block = block
+ log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash(),
+ "elapsed", common.PrettyDuration(time.Since(task.createdAt)))
+ select {
+ case w.resultCh <- task:
+ case <-w.exitCh:
+ }
+ } else if err != nil {
+ log.Warn("Block sealing failed", "err", err)
}
}
@@ -501,12 +529,13 @@ func (w *worker) taskLoop() {
w.newTaskHook(task)
}
// Reject duplicate sealing work due to resubmitting.
- if task.block.HashNoNonce() == prev {
+ sealHash := w.engine.SealHash(task.block.Header())
+ if sealHash == prev {
continue
}
interrupt()
stopCh = make(chan struct{})
- prev = task.block.HashNoNonce()
+ prev = sealHash
go w.seal(task, stopCh)
case <-w.exitCh:
interrupt()
@@ -928,8 +957,8 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
}
feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
- log.Info("Commit new mining work", "number", block.Number(), "uncles", len(uncles), "txs", w.current.tcount,
- "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start)))
+ log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
+ "uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start)))
case <-w.exitCh:
log.Info("Worker has exited")