diff options
author | gary rong <garyrong0905@gmail.com> | 2018-08-28 21:59:05 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2018-08-28 21:59:05 +0800 |
commit | c1c003e4ff36c22d67662ca661fc78cde850d401 (patch) | |
tree | b8bea54350fb6894cfd63ebc87a164acc3fba7e6 | |
parent | 63352bf4247f05d8ef255ff8c63290225c3bc671 (diff) | |
download | go-tangerine-c1c003e4ff36c22d67662ca661fc78cde850d401.tar.gz go-tangerine-c1c003e4ff36c22d67662ca661fc78cde850d401.tar.zst go-tangerine-c1c003e4ff36c22d67662ca661fc78cde850d401.zip |
consensus, miner: stale block mining support (#17506)
* consensus, miner: stale block supporting
* consensus, miner: refactor seal signature
* cmd, consensus, eth: add miner noverify flag
* cmd, consensus, miner: polish
-rw-r--r-- | cmd/geth/main.go | 1 | ||||
-rw-r--r-- | cmd/geth/usage.go | 1 | ||||
-rw-r--r-- | cmd/utils/flags.go | 11 | ||||
-rw-r--r-- | consensus/clique/clique.go | 39 | ||||
-rw-r--r-- | consensus/consensus.go | 9 | ||||
-rw-r--r-- | consensus/ethash/algorithm_test.go | 2 | ||||
-rw-r--r-- | consensus/ethash/ethash.go | 33 | ||||
-rw-r--r-- | consensus/ethash/ethash_test.go | 40 | ||||
-rw-r--r-- | consensus/ethash/sealer.go | 134 | ||||
-rw-r--r-- | consensus/ethash/sealer_test.go | 94 | ||||
-rw-r--r-- | eth/api_tracer.go | 2 | ||||
-rw-r--r-- | eth/backend.go | 8 | ||||
-rw-r--r-- | eth/config.go | 1 | ||||
-rw-r--r-- | eth/gen_config.go | 7 | ||||
-rw-r--r-- | les/backend.go | 2 | ||||
-rw-r--r-- | miner/worker.go | 116 |
16 files changed, 317 insertions, 183 deletions
diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 7b8a8244e..0ed67d0d5 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -108,6 +108,7 @@ var ( utils.MinerExtraDataFlag, utils.MinerLegacyExtraDataFlag, utils.MinerRecommitIntervalFlag, + utils.MinerNoVerfiyFlag, utils.NATFlag, utils.NoDiscoverFlag, utils.DiscoveryV5Flag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 1e27d0ae8..2f8260e0f 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -192,6 +192,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.MinerEtherbaseFlag, utils.MinerExtraDataFlag, utils.MinerRecommitIntervalFlag, + utils.MinerNoVerfiyFlag, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index b9a33ffe7..13430ad56 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -366,9 +366,13 @@ var ( } MinerRecommitIntervalFlag = cli.DurationFlag{ Name: "miner.recommit", - Usage: "Time interval to recreate the block being mined.", + Usage: "Time interval to recreate the block being mined", Value: eth.DefaultConfig.MinerRecommit, } + MinerNoVerfiyFlag = cli.BoolFlag{ + Name: "miner.noverify", + Usage: "Disable remote sealing verification", + } // Account settings UnlockedAccountFlag = cli.StringFlag{ Name: "unlock", @@ -1151,6 +1155,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { if ctx.GlobalIsSet(MinerRecommitIntervalFlag.Name) { cfg.MinerRecommit = ctx.Duration(MinerRecommitIntervalFlag.Name) } + if ctx.GlobalIsSet(MinerNoVerfiyFlag.Name) { + cfg.MinerNoverify = ctx.Bool(MinerNoVerfiyFlag.Name) + } if ctx.GlobalIsSet(VMEnableDebugFlag.Name) { // TODO(fjl): force-enable this in --dev mode cfg.EnablePreimageRecording = ctx.GlobalBool(VMEnableDebugFlag.Name) @@ -1345,7 +1352,7 @@ func MakeChain(ctx *cli.Context, stack *node.Node) (chain *core.BlockChain, chai DatasetDir: stack.ResolvePath(eth.DefaultConfig.Ethash.DatasetDir), DatasetsInMem: eth.DefaultConfig.Ethash.DatasetsInMem, DatasetsOnDisk: eth.DefaultConfig.Ethash.DatasetsOnDisk, - }, nil) + }, nil, false) } } if gcmode := ctx.GlobalString(GCModeFlag.Name); gcmode != "full" && gcmode != "archive" { diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 3730c91f6..547290984 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -590,17 +590,17 @@ func (c *Clique) Authorize(signer common.Address, signFn SignerFn) { // Seal implements consensus.Engine, attempting to create a sealed block using // the local signing credentials. -func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) { +func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { header := block.Header() // Sealing the genesis block is not supported number := header.Number.Uint64() if number == 0 { - return nil, errUnknownBlock + return errUnknownBlock } // For 0-period chains, refuse to seal empty blocks (no reward but would spin sealing) if c.config.Period == 0 && len(block.Transactions()) == 0 { - return nil, errWaitTransactions + return errWaitTransactions } // Don't hold the signer fields for the entire sealing procedure c.lock.RLock() @@ -610,10 +610,10 @@ func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, stop <-ch // Bail out if we're unauthorized to sign a block snap, err := c.snapshot(chain, number-1, header.ParentHash, nil) if err != nil { - return nil, err + return err } if _, authorized := snap.Signers[signer]; !authorized { - return nil, errUnauthorized + return errUnauthorized } // If we're amongst the recent signers, wait for the next block for seen, recent := range snap.Recents { @@ -621,8 +621,7 @@ func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, stop <-ch // Signer is among recents, only wait if the current block doesn't shift it out if limit := uint64(len(snap.Signers)/2 + 1); number < limit || seen > number-limit { log.Info("Signed recently, must wait for others") - <-stop - return nil, nil + return nil } } } @@ -635,21 +634,29 @@ func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, stop <-ch log.Trace("Out-of-turn signing requested", "wiggle", common.PrettyDuration(wiggle)) } - log.Trace("Waiting for slot to sign and propagate", "delay", common.PrettyDuration(delay)) - - select { - case <-stop: - return nil, nil - case <-time.After(delay): - } // Sign all the things! sighash, err := signFn(accounts.Account{Address: signer}, sigHash(header).Bytes()) if err != nil { - return nil, err + return err } copy(header.Extra[len(header.Extra)-extraSeal:], sighash) + // Wait until sealing is terminated or delay timeout. + log.Trace("Waiting for slot to sign and propagate", "delay", common.PrettyDuration(delay)) + go func() { + select { + case <-stop: + return + case <-time.After(delay): + } - return block.WithSeal(header), nil + select { + case results <- block.WithSeal(header): + default: + log.Warn("Sealing result is not read by miner", "sealhash", c.SealHash(header)) + } + }() + + return nil } // CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty diff --git a/consensus/consensus.go b/consensus/consensus.go index 27799f13c..12ede7ff4 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -86,9 +86,12 @@ type Engine interface { Finalize(chain ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) - // Seal generates a new block for the given input block with the local miner's - // seal place on top. - Seal(chain ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) + // Seal generates a new sealing request for the given input block and pushes + // the result into the given channel. + // + // Note, the method returns immediately and will send the result async. More + // than one result may also be returned depending on the consensus algorothm. + Seal(chain ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error // SealHash returns the hash of a block prior to it being sealed. SealHash(header *types.Header) common.Hash diff --git a/consensus/ethash/algorithm_test.go b/consensus/ethash/algorithm_test.go index db22cccd0..c58479e28 100644 --- a/consensus/ethash/algorithm_test.go +++ b/consensus/ethash/algorithm_test.go @@ -729,7 +729,7 @@ func TestConcurrentDiskCacheGeneration(t *testing.T) { go func(idx int) { defer pend.Done() - ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal}, nil) + ethash := New(Config{cachedir, 0, 1, "", 0, 0, ModeNormal}, nil, false) defer ethash.Close() if err := ethash.VerifySeal(nil, block.Header()); err != nil { t.Errorf("proc %d: block verification failed: %v", idx, err) diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index d98c3371c..b4819ca38 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -50,7 +50,7 @@ var ( two256 = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), big.NewInt(0)) // sharedEthash is a full instance that can be shared between multiple users. - sharedEthash = New(Config{"", 3, 0, "", 1, 0, ModeNormal}, nil) + sharedEthash = New(Config{"", 3, 0, "", 1, 0, ModeNormal}, nil, false) // algorithmRevision is the data structure version used for file naming. algorithmRevision = 23 @@ -405,6 +405,12 @@ type Config struct { PowMode Mode } +// sealTask wraps a seal block with relative result channel for remote sealer thread. +type sealTask struct { + block *types.Block + results chan<- *types.Block +} + // mineResult wraps the pow solution parameters for the specified block. type mineResult struct { nonce types.BlockNonce @@ -444,12 +450,11 @@ type Ethash struct { hashrate metrics.Meter // Meter tracking the average hashrate // Remote sealer related fields - workCh chan *types.Block // Notification channel to push new work to remote sealer - resultCh chan *types.Block // Channel used by mining threads to return result - fetchWorkCh chan *sealWork // Channel used for remote sealer to fetch mining work - submitWorkCh chan *mineResult // Channel used for remote sealer to submit their mining result - fetchRateCh chan chan uint64 // Channel used to gather submitted hash rate for local or remote sealer. - submitRateCh chan *hashrate // Channel used for remote sealer to submit their mining hashrate + workCh chan *sealTask // Notification channel to push new work and relative result channel to remote sealer + fetchWorkCh chan *sealWork // Channel used for remote sealer to fetch mining work + submitWorkCh chan *mineResult // Channel used for remote sealer to submit their mining result + fetchRateCh chan chan uint64 // Channel used to gather submitted hash rate for local or remote sealer. + submitRateCh chan *hashrate // Channel used for remote sealer to submit their mining hashrate // The fields below are hooks for testing shared *Ethash // Shared PoW verifier to avoid cache regeneration @@ -464,7 +469,7 @@ type Ethash struct { // New creates a full sized ethash PoW scheme and starts a background thread for // remote mining, also optionally notifying a batch of remote services of new work // packages. -func New(config Config, notify []string) *Ethash { +func New(config Config, notify []string, noverify bool) *Ethash { if config.CachesInMem <= 0 { log.Warn("One ethash cache must always be in memory", "requested", config.CachesInMem) config.CachesInMem = 1 @@ -481,36 +486,34 @@ func New(config Config, notify []string) *Ethash { datasets: newlru("dataset", config.DatasetsInMem, newDataset), update: make(chan struct{}), hashrate: metrics.NewMeter(), - workCh: make(chan *types.Block), - resultCh: make(chan *types.Block), + workCh: make(chan *sealTask), fetchWorkCh: make(chan *sealWork), submitWorkCh: make(chan *mineResult), fetchRateCh: make(chan chan uint64), submitRateCh: make(chan *hashrate), exitCh: make(chan chan error), } - go ethash.remote(notify) + go ethash.remote(notify, noverify) return ethash } // NewTester creates a small sized ethash PoW scheme useful only for testing // purposes. -func NewTester(notify []string) *Ethash { +func NewTester(notify []string, noverify bool) *Ethash { ethash := &Ethash{ config: Config{PowMode: ModeTest}, caches: newlru("cache", 1, newCache), datasets: newlru("dataset", 1, newDataset), update: make(chan struct{}), hashrate: metrics.NewMeter(), - workCh: make(chan *types.Block), - resultCh: make(chan *types.Block), + workCh: make(chan *sealTask), fetchWorkCh: make(chan *sealWork), submitWorkCh: make(chan *mineResult), fetchRateCh: make(chan chan uint64), submitRateCh: make(chan *hashrate), exitCh: make(chan chan error), } - go ethash.remote(notify) + go ethash.remote(notify, noverify) return ethash } diff --git a/consensus/ethash/ethash_test.go b/consensus/ethash/ethash_test.go index b190d63d6..8eded2ca8 100644 --- a/consensus/ethash/ethash_test.go +++ b/consensus/ethash/ethash_test.go @@ -34,17 +34,23 @@ import ( func TestTestMode(t *testing.T) { header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} - ethash := NewTester(nil) + ethash := NewTester(nil, false) defer ethash.Close() - block, err := ethash.Seal(nil, types.NewBlockWithHeader(header), nil) + results := make(chan *types.Block) + err := ethash.Seal(nil, types.NewBlockWithHeader(header), results, nil) if err != nil { t.Fatalf("failed to seal block: %v", err) } - header.Nonce = types.EncodeNonce(block.Nonce()) - header.MixDigest = block.MixDigest() - if err := ethash.VerifySeal(nil, header); err != nil { - t.Fatalf("unexpected verification error: %v", err) + select { + case block := <-results: + header.Nonce = types.EncodeNonce(block.Nonce()) + header.MixDigest = block.MixDigest() + if err := ethash.VerifySeal(nil, header); err != nil { + t.Fatalf("unexpected verification error: %v", err) + } + case <-time.NewTimer(time.Second).C: + t.Error("sealing result timeout") } } @@ -56,7 +62,7 @@ func TestCacheFileEvict(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(tmpdir) - e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest}, nil) + e := New(Config{CachesInMem: 3, CachesOnDisk: 10, CacheDir: tmpdir, PowMode: ModeTest}, nil, false) defer e.Close() workers := 8 @@ -85,7 +91,7 @@ func verifyTest(wg *sync.WaitGroup, e *Ethash, workerIndex, epochs int) { } func TestRemoteSealer(t *testing.T) { - ethash := NewTester(nil) + ethash := NewTester(nil, false) defer ethash.Close() api := &API{ethash} @@ -97,7 +103,8 @@ func TestRemoteSealer(t *testing.T) { sealhash := ethash.SealHash(header) // Push new work. - ethash.Seal(nil, block, nil) + results := make(chan *types.Block) + ethash.Seal(nil, block, results, nil) var ( work [3]string @@ -114,20 +121,11 @@ func TestRemoteSealer(t *testing.T) { header = &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(1000)} block = types.NewBlockWithHeader(header) sealhash = ethash.SealHash(header) - ethash.Seal(nil, block, nil) + ethash.Seal(nil, block, results, nil) if work, err = api.GetWork(); err != nil || work[0] != sealhash.Hex() { t.Error("expect to return the latest pushed work") } - // Push block with higher block number. - newHead := &types.Header{Number: big.NewInt(2), Difficulty: big.NewInt(100)} - newBlock := types.NewBlockWithHeader(newHead) - newSealhash := ethash.SealHash(newHead) - ethash.Seal(nil, newBlock, nil) - - if res := api.SubmitWork(types.BlockNonce{}, newSealhash, common.Hash{}); res { - t.Error("expect to return false when submit a stale solution") - } } func TestHashRate(t *testing.T) { @@ -136,7 +134,7 @@ func TestHashRate(t *testing.T) { expect uint64 ids = []common.Hash{common.HexToHash("a"), common.HexToHash("b"), common.HexToHash("c")} ) - ethash := NewTester(nil) + ethash := NewTester(nil, false) defer ethash.Close() if tot := ethash.Hashrate(); tot != 0 { @@ -156,7 +154,7 @@ func TestHashRate(t *testing.T) { } func TestClosedRemoteSealer(t *testing.T) { - ethash := NewTester(nil) + ethash := NewTester(nil, false) time.Sleep(1 * time.Second) // ensure exit channel is listening ethash.Close() diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index a458c60f6..06c98a781 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -35,6 +35,11 @@ import ( "github.com/ethereum/go-ethereum/log" ) +const ( + // staleThreshold is the maximum depth of the acceptable stale but valid ethash solution. + staleThreshold = 7 +) + var ( errNoMiningWork = errors.New("no mining work available yet") errInvalidSealResult = errors.New("invalid or stale proof-of-work solution") @@ -42,16 +47,21 @@ var ( // Seal implements consensus.Engine, attempting to find a nonce that satisfies // the block's difficulty requirements. -func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop <-chan struct{}) (*types.Block, error) { +func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { // If we're running a fake PoW, simply return a 0 nonce immediately if ethash.config.PowMode == ModeFake || ethash.config.PowMode == ModeFullFake { header := block.Header() header.Nonce, header.MixDigest = types.BlockNonce{}, common.Hash{} - return block.WithSeal(header), nil + select { + case results <- block.WithSeal(header): + default: + log.Warn("Sealing result is not read by miner", "mode", "fake", "sealhash", ethash.SealHash(block.Header())) + } + return nil } // If we're running a shared PoW, delegate sealing to it if ethash.shared != nil { - return ethash.shared.Seal(chain, block, stop) + return ethash.shared.Seal(chain, block, results, stop) } // Create a runner and the multiple search threads it directs abort := make(chan struct{}) @@ -62,7 +72,7 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64)) if err != nil { ethash.lock.Unlock() - return nil, err + return err } ethash.rand = rand.New(rand.NewSource(seed.Int64())) } @@ -75,34 +85,45 @@ func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, stop } // Push new work to remote sealer if ethash.workCh != nil { - ethash.workCh <- block + ethash.workCh <- &sealTask{block: block, results: results} } - var pend sync.WaitGroup + var ( + pend sync.WaitGroup + locals = make(chan *types.Block) + ) for i := 0; i < threads; i++ { pend.Add(1) go func(id int, nonce uint64) { defer pend.Done() - ethash.mine(block, id, nonce, abort, ethash.resultCh) + ethash.mine(block, id, nonce, abort, locals) }(i, uint64(ethash.rand.Int63())) } // Wait until sealing is terminated or a nonce is found - var result *types.Block - select { - case <-stop: - // Outside abort, stop all miner threads - close(abort) - case result = <-ethash.resultCh: - // One of the threads found a block, abort all others - close(abort) - case <-ethash.update: - // Thread count was changed on user request, restart - close(abort) + go func() { + var result *types.Block + select { + case <-stop: + // Outside abort, stop all miner threads + close(abort) + case result = <-locals: + // One of the threads found a block, abort all others + select { + case results <- result: + default: + log.Warn("Sealing result is not read by miner", "mode", "local", "sealhash", ethash.SealHash(block.Header())) + } + close(abort) + case <-ethash.update: + // Thread count was changed on user request, restart + close(abort) + if err := ethash.Seal(chain, block, results, stop); err != nil { + log.Error("Failed to restart sealing after update", "err", err) + } + } + // Wait for all miners to terminate and return the block pend.Wait() - return ethash.Seal(chain, block, stop) - } - // Wait for all miners to terminate and return the block - pend.Wait() - return result, nil + }() + return nil } // mine is the actual proof-of-work miner that searches for a nonce starting from @@ -165,11 +186,12 @@ search: } // remote is a standalone goroutine to handle remote mining related stuff. -func (ethash *Ethash) remote(notify []string) { +func (ethash *Ethash) remote(notify []string, noverify bool) { var ( works = make(map[common.Hash]*types.Block) rates = make(map[common.Hash]hashrate) + results chan<- *types.Block currentBlock *types.Block currentWork [3]string @@ -226,11 +248,15 @@ func (ethash *Ethash) remote(notify []string) { // submitWork verifies the submitted pow solution, returning // whether the solution was accepted or not (not can be both a bad pow as well as // any other error, like no pending work or stale mining result). - submitWork := func(nonce types.BlockNonce, mixDigest common.Hash, hash common.Hash) bool { + submitWork := func(nonce types.BlockNonce, mixDigest common.Hash, sealhash common.Hash) bool { + if currentBlock == nil { + log.Error("Pending work without block", "sealhash", sealhash) + return false + } // Make sure the work submitted is present - block := works[hash] + block := works[sealhash] if block == nil { - log.Info("Work submitted but none pending", "hash", hash) + log.Warn("Work submitted but none pending", "sealhash", sealhash, "curnumber", currentBlock.NumberU64()) return false } // Verify the correctness of submitted result. @@ -239,26 +265,36 @@ func (ethash *Ethash) remote(notify []string) { header.MixDigest = mixDigest start := time.Now() - if err := ethash.verifySeal(nil, header, true); err != nil { - log.Warn("Invalid proof-of-work submitted", "hash", hash, "elapsed", time.Since(start), "err", err) - return false + if !noverify { + if err := ethash.verifySeal(nil, header, true); err != nil { + log.Warn("Invalid proof-of-work submitted", "sealhash", sealhash, "elapsed", time.Since(start), "err", err) + return false + } } - // Make sure the result channel is created. - if ethash.resultCh == nil { + // Make sure the result channel is assigned. + if results == nil { log.Warn("Ethash result channel is empty, submitted mining result is rejected") return false } - log.Trace("Verified correct proof-of-work", "hash", hash, "elapsed", time.Since(start)) + log.Trace("Verified correct proof-of-work", "sealhash", sealhash, "elapsed", time.Since(start)) // Solutions seems to be valid, return to the miner and notify acceptance. - select { - case ethash.resultCh <- block.WithSeal(header): - delete(works, hash) - return true - default: - log.Info("Work submitted is stale", "hash", hash) - return false + solution := block.WithSeal(header) + + // The submitted solution is within the scope of acceptance. + if solution.NumberU64()+staleThreshold > currentBlock.NumberU64() { + select { + case results <- solution: + log.Debug("Work submitted is acceptable", "number", solution.NumberU64(), "sealhash", sealhash, "hash", solution.Hash()) + return true + default: + log.Warn("Sealing result is not read by miner", "mode", "remote", "sealhash", sealhash) + return false + } } + // The submitted block is too old to accept, drop it. + log.Warn("Work submitted is too old", "number", solution.NumberU64(), "sealhash", sealhash, "hash", solution.Hash()) + return false } ticker := time.NewTicker(5 * time.Second) @@ -266,14 +302,12 @@ func (ethash *Ethash) remote(notify []string) { for { select { - case block := <-ethash.workCh: - if currentBlock != nil && block.ParentHash() != currentBlock.ParentHash() { - // Start new round mining, throw out all previous work. - works = make(map[common.Hash]*types.Block) - } + case work := <-ethash.workCh: // Update current work with new received block. // Note same work can be past twice, happens when changing CPU threads. - makeWork(block) + results = work.results + + makeWork(work.block) // Notify and requested URLs of the new work availability notifyWork() @@ -315,6 +349,14 @@ func (ethash *Ethash) remote(notify []string) { delete(rates, id) } } + // Clear stale pending blocks + if currentBlock != nil { + for hash, block := range works { + if block.NumberU64()+staleThreshold <= currentBlock.NumberU64() { + delete(works, hash) + } + } + } case errc := <-ethash.exitCh: // Exit remote loop if ethash is closed and return relevant error. diff --git a/consensus/ethash/sealer_test.go b/consensus/ethash/sealer_test.go index d1b66f9cf..31d18b67c 100644 --- a/consensus/ethash/sealer_test.go +++ b/consensus/ethash/sealer_test.go @@ -41,14 +41,14 @@ func TestRemoteNotify(t *testing.T) { go server.Serve(listener) // Create the custom ethash engine - ethash := NewTester([]string{"http://" + listener.Addr().String()}) + ethash := NewTester([]string{"http://" + listener.Addr().String()}, false) defer ethash.Close() // Stream a work task and ensure the notification bubbles out header := &types.Header{Number: big.NewInt(1), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) - ethash.Seal(nil, block, nil) + ethash.Seal(nil, block, nil, nil) select { case work := <-sink: if want := ethash.SealHash(header).Hex(); work[0] != want { @@ -66,7 +66,7 @@ func TestRemoteNotify(t *testing.T) { } } -// Tests that pushing work packages fast to the miner doesn't cause any daa race +// Tests that pushing work packages fast to the miner doesn't cause any data race // issues in the notifications. func TestRemoteMultiNotify(t *testing.T) { // Start a simple webserver to capture notifications @@ -95,7 +95,7 @@ func TestRemoteMultiNotify(t *testing.T) { go server.Serve(listener) // Create the custom ethash engine - ethash := NewTester([]string{"http://" + listener.Addr().String()}) + ethash := NewTester([]string{"http://" + listener.Addr().String()}, false) defer ethash.Close() // Stream a lot of work task and ensure all the notifications bubble out @@ -103,7 +103,7 @@ func TestRemoteMultiNotify(t *testing.T) { header := &types.Header{Number: big.NewInt(int64(i)), Difficulty: big.NewInt(100)} block := types.NewBlockWithHeader(header) - ethash.Seal(nil, block, nil) + ethash.Seal(nil, block, nil, nil) } for i := 0; i < cap(sink); i++ { select { @@ -113,3 +113,87 @@ func TestRemoteMultiNotify(t *testing.T) { } } } + +// Tests whether stale solutions are correctly processed. +func TestStaleSubmission(t *testing.T) { + ethash := NewTester(nil, true) + defer ethash.Close() + api := &API{ethash} + + fakeNonce, fakeDigest := types.BlockNonce{0x01, 0x02, 0x03}, common.HexToHash("deadbeef") + + testcases := []struct { + headers []*types.Header + submitIndex int + submitRes bool + }{ + // Case1: submit solution for the latest mining package + { + []*types.Header{ + {ParentHash: common.BytesToHash([]byte{0xa}), Number: big.NewInt(1), Difficulty: big.NewInt(100000000)}, + }, + 0, + true, + }, + // Case2: submit solution for the previous package but have same parent. + { + []*types.Header{ + {ParentHash: common.BytesToHash([]byte{0xb}), Number: big.NewInt(2), Difficulty: big.NewInt(100000000)}, + {ParentHash: common.BytesToHash([]byte{0xb}), Number: big.NewInt(2), Difficulty: big.NewInt(100000001)}, + }, + 0, + true, + }, + // Case3: submit stale but acceptable solution + { + []*types.Header{ + {ParentHash: common.BytesToHash([]byte{0xc}), Number: big.NewInt(3), Difficulty: big.NewInt(100000000)}, + {ParentHash: common.BytesToHash([]byte{0xd}), Number: big.NewInt(9), Difficulty: big.NewInt(100000000)}, + }, + 0, + true, + }, + // Case4: submit very old solution + { + []*types.Header{ + {ParentHash: common.BytesToHash([]byte{0xe}), Number: big.NewInt(10), Difficulty: big.NewInt(100000000)}, + {ParentHash: common.BytesToHash([]byte{0xf}), Number: big.NewInt(17), Difficulty: big.NewInt(100000000)}, + }, + 0, + false, + }, + } + results := make(chan *types.Block, 16) + + for id, c := range testcases { + for _, h := range c.headers { + ethash.Seal(nil, types.NewBlockWithHeader(h), results, nil) + } + if res := api.SubmitWork(fakeNonce, ethash.SealHash(c.headers[c.submitIndex]), fakeDigest); res != c.submitRes { + t.Errorf("case %d submit result mismatch, want %t, get %t", id+1, c.submitRes, res) + } + if !c.submitRes { + continue + } + select { + case res := <-results: + if res.Header().Nonce != fakeNonce { + t.Errorf("case %d block nonce mismatch, want %s, get %s", id+1, fakeNonce, res.Header().Nonce) + } + if res.Header().MixDigest != fakeDigest { + t.Errorf("case %d block digest mismatch, want %s, get %s", id+1, fakeDigest, res.Header().MixDigest) + } + if res.Header().Difficulty.Uint64() != c.headers[c.submitIndex].Difficulty.Uint64() { + t.Errorf("case %d block difficulty mismatch, want %d, get %d", id+1, c.headers[c.submitIndex].Difficulty, res.Header().Difficulty) + } + if res.Header().Number.Uint64() != c.headers[c.submitIndex].Number.Uint64() { + t.Errorf("case %d block number mismatch, want %d, get %d", id+1, c.headers[c.submitIndex].Number.Uint64(), res.Header().Number.Uint64()) + } + if res.Header().ParentHash != c.headers[c.submitIndex].ParentHash { + t.Errorf("case %d block parent hash mismatch, want %s, get %s", id+1, c.headers[c.submitIndex].ParentHash.Hex(), res.Header().ParentHash.Hex()) + } + case <-time.NewTimer(time.Second).C: + t.Errorf("case %d fetch ethash result timeout", id+1) + } + } +} diff --git a/eth/api_tracer.go b/eth/api_tracer.go index 704a6cdba..0a8b9a994 100644 --- a/eth/api_tracer.go +++ b/eth/api_tracer.go @@ -294,7 +294,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl failed = err break } - // Reference the trie twice, once for us, once for the trancer + // Reference the trie twice, once for us, once for the tracer database.TrieDB().Reference(root, common.Hash{}) if number >= origin { database.TrieDB().Reference(root, common.Hash{}) diff --git a/eth/backend.go b/eth/backend.go index da7e0b2cd..3032e1a6d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -130,7 +130,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { chainConfig: chainConfig, eventMux: ctx.EventMux, accountManager: ctx.AccountManager, - engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, chainDb), + engine: CreateConsensusEngine(ctx, chainConfig, &config.Ethash, config.MinerNotify, config.MinerNoverify, chainDb), shutdownChan: make(chan bool), networkID: config.NetworkId, gasPrice: config.MinerGasPrice, @@ -216,7 +216,7 @@ func CreateDB(ctx *node.ServiceContext, config *Config, name string) (ethdb.Data } // CreateConsensusEngine creates the required type of consensus engine instance for an Ethereum service -func CreateConsensusEngine(ctx *node.ServiceContext, chainConfig *params.ChainConfig, config *ethash.Config, notify []string, db ethdb.Database) consensus.Engine { +func CreateConsensusEngine(ctx *node.ServiceContext, chainConfig *params.ChainConfig, config *ethash.Config, notify []string, noverify bool, db ethdb.Database) consensus.Engine { // If proof-of-authority is requested, set it up if chainConfig.Clique != nil { return clique.New(chainConfig.Clique, db) @@ -228,7 +228,7 @@ func CreateConsensusEngine(ctx *node.ServiceContext, chainConfig *params.ChainCo return ethash.NewFaker() case ethash.ModeTest: log.Warn("Ethash used in test mode") - return ethash.NewTester(nil) + return ethash.NewTester(nil, noverify) case ethash.ModeShared: log.Warn("Ethash used in shared mode") return ethash.NewShared() @@ -240,7 +240,7 @@ func CreateConsensusEngine(ctx *node.ServiceContext, chainConfig *params.ChainCo DatasetDir: config.DatasetDir, DatasetsInMem: config.DatasetsInMem, DatasetsOnDisk: config.DatasetsOnDisk, - }, notify) + }, notify, noverify) engine.SetThreads(-1) // Disable CPU mining return engine } diff --git a/eth/config.go b/eth/config.go index 5b2eca585..517d7d8f3 100644 --- a/eth/config.go +++ b/eth/config.go @@ -101,6 +101,7 @@ type Config struct { MinerExtraData []byte `toml:",omitempty"` MinerGasPrice *big.Int MinerRecommit time.Duration + MinerNoverify bool // Ethash options Ethash ethash.Config diff --git a/eth/gen_config.go b/eth/gen_config.go index a72e35bcd..df4ffeb11 100644 --- a/eth/gen_config.go +++ b/eth/gen_config.go @@ -35,6 +35,7 @@ func (c Config) MarshalTOML() (interface{}, error) { MinerExtraData hexutil.Bytes `toml:",omitempty"` MinerGasPrice *big.Int MinerRecommit time.Duration + MinerNoverify bool Ethash ethash.Config TxPool core.TxPoolConfig GPO gasprice.Config @@ -58,6 +59,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.MinerExtraData = c.MinerExtraData enc.MinerGasPrice = c.MinerGasPrice enc.MinerRecommit = c.MinerRecommit + enc.MinerNoverify = c.MinerNoverify enc.Ethash = c.Ethash enc.TxPool = c.TxPool enc.GPO = c.GPO @@ -81,11 +83,11 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { TrieCache *int TrieTimeout *time.Duration Etherbase *common.Address `toml:",omitempty"` - MinerThreads *int `toml:",omitempty"` MinerNotify []string `toml:",omitempty"` MinerExtraData *hexutil.Bytes `toml:",omitempty"` MinerGasPrice *big.Int MinerRecommit *time.Duration + MinerNoverify *bool Ethash *ethash.Config TxPool *core.TxPoolConfig GPO *gasprice.Config @@ -144,6 +146,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.MinerRecommit != nil { c.MinerRecommit = *dec.MinerRecommit } + if dec.MinerNoverify != nil { + c.MinerNoverify = *dec.MinerNoverify + } if dec.Ethash != nil { c.Ethash = *dec.Ethash } diff --git a/les/backend.go b/les/backend.go index 75049da08..a3474a683 100644 --- a/les/backend.go +++ b/les/backend.go @@ -102,7 +102,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { peers: peers, reqDist: newRequestDistributor(peers, quitSync), accountManager: ctx.AccountManager, - engine: eth.CreateConsensusEngine(ctx, chainConfig, &config.Ethash, nil, chainDb), + engine: eth.CreateConsensusEngine(ctx, chainConfig, &config.Ethash, nil, false, chainDb), shutdownChan: make(chan bool), networkId: config.NetworkId, bloomRequests: make(chan chan *bloombits.Retrieval), diff --git a/miner/worker.go b/miner/worker.go index 1a881799d..ca68da6e9 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -73,7 +73,7 @@ const ( // increasing upper limit or decreasing lower limit so that the limit can be reachable. intervalAdjustBias = 200 * 1000.0 * 1000.0 - // staleThreshold is the maximum distance of the acceptable stale block. + // staleThreshold is the maximum depth of the acceptable stale block. staleThreshold = 7 ) @@ -139,7 +139,7 @@ type worker struct { // Channels newWorkCh chan *newWorkReq taskCh chan *task - resultCh chan *task + resultCh chan *types.Block startCh chan struct{} exitCh chan struct{} resubmitIntervalCh chan time.Duration @@ -186,7 +186,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), newWorkCh: make(chan *newWorkReq), taskCh: make(chan *task), - resultCh: make(chan *task, resultQueueSize), + resultCh: make(chan *types.Block, resultQueueSize), exitCh: make(chan struct{}), startCh: make(chan struct{}, 1), resubmitIntervalCh: make(chan time.Duration), @@ -269,18 +269,10 @@ func (w *worker) isRunning() bool { return atomic.LoadInt32(&w.running) == 1 } -// close terminates all background threads maintained by the worker and cleans up buffered channels. +// close terminates all background threads maintained by the worker. // Note the worker does not support being closed multiple times. func (w *worker) close() { close(w.exitCh) - // Clean up buffered channels - for empty := false; !empty; { - select { - case <-w.resultCh: - default: - empty = true - } - } } // newWorkLoop is a standalone goroutine to submit new mining work upon received events. @@ -471,42 +463,6 @@ func (w *worker) mainLoop() { } } -// seal pushes a sealing task to consensus engine and submits the result. -func (w *worker) seal(t *task, stop <-chan struct{}) { - if w.skipSealHook != nil && w.skipSealHook(t) { - return - } - // The reason for caching task first is: - // A previous sealing action will be canceled by subsequent actions, - // however, remote miner may submit a result based on the cancelled task. - // So we should only submit the pending state corresponding to the seal result. - // TODO(rjl493456442) Replace the seal-wait logic structure - w.pendingMu.Lock() - w.pendingTasks[w.engine.SealHash(t.block.Header())] = t - w.pendingMu.Unlock() - - if block, err := w.engine.Seal(w.chain, t.block, stop); block != nil { - sealhash := w.engine.SealHash(block.Header()) - w.pendingMu.RLock() - task, exist := w.pendingTasks[sealhash] - w.pendingMu.RUnlock() - if !exist { - log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash()) - return - } - // Assemble sealing result - task.block = block - log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash(), - "elapsed", common.PrettyDuration(time.Since(task.createdAt))) - select { - case w.resultCh <- task: - case <-w.exitCh: - } - } else if err != nil { - log.Warn("Block sealing failed", "err", err) - } -} - // taskLoop is a standalone goroutine to fetch sealing task from the generator and // push them to consensus engine. func (w *worker) taskLoop() { @@ -533,10 +489,20 @@ func (w *worker) taskLoop() { if sealHash == prev { continue } + // Interrupt previous sealing operation interrupt() - stopCh = make(chan struct{}) - prev = sealHash - go w.seal(task, stopCh) + stopCh, prev = make(chan struct{}), sealHash + + if w.skipSealHook != nil && w.skipSealHook(task) { + continue + } + w.pendingMu.Lock() + w.pendingTasks[w.engine.SealHash(task.block.Header())] = task + w.pendingMu.Unlock() + + if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil { + log.Warn("Block sealing failed", "err", err) + } case <-w.exitCh: interrupt() return @@ -549,38 +515,54 @@ func (w *worker) taskLoop() { func (w *worker) resultLoop() { for { select { - case result := <-w.resultCh: + case block := <-w.resultCh: // Short circuit when receiving empty result. - if result == nil { + if block == nil { continue } // Short circuit when receiving duplicate result caused by resubmitting. - block := result.block if w.chain.HasBlock(block.Hash(), block.NumberU64()) { continue } - // Update the block hash in all logs since it is now available and not when the - // receipt/log of individual transactions were created. - for _, r := range result.receipts { - for _, l := range r.Logs { - l.BlockHash = block.Hash() - } + var ( + sealhash = w.engine.SealHash(block.Header()) + hash = block.Hash() + ) + w.pendingMu.RLock() + task, exist := w.pendingTasks[sealhash] + w.pendingMu.RUnlock() + if !exist { + log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash) + continue } - for _, log := range result.state.Logs() { - log.BlockHash = block.Hash() + // Different block could share same sealhash, deep copy here to prevent write-write conflict. + var ( + receipts = make([]*types.Receipt, len(task.receipts)) + logs []*types.Log + ) + for i, receipt := range task.receipts { + receipts[i] = new(types.Receipt) + *receipts[i] = *receipt + // Update the block hash in all logs since it is now available and not when the + // receipt/log of individual transactions were created. + for _, log := range receipt.Logs { + log.BlockHash = hash + } + logs = append(logs, receipt.Logs...) } // Commit block and state to database. - stat, err := w.chain.WriteBlockWithState(block, result.receipts, result.state) + stat, err := w.chain.WriteBlockWithState(block, receipts, task.state) if err != nil { log.Error("Failed writing block to chain", "err", err) continue } + log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash, + "elapsed", common.PrettyDuration(time.Since(task.createdAt))) + // Broadcast the block and announce chain insertion event w.mux.Post(core.NewMinedBlockEvent{Block: block}) - var ( - events []interface{} - logs = result.state.Logs() - ) + + var events []interface{} switch stat { case core.CanonStatTy: events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) |