diff options
author | Felix Lange <fjl@twurst.com> | 2015-05-30 00:07:23 +0800 |
---|---|---|
committer | Felix Lange <fjl@twurst.com> | 2015-06-01 18:47:13 +0800 |
commit | e7e2cbfc018f5e0353d26b94ff785a206c9a16c4 (patch) | |
tree | 6484b834f571a91a2911b43c1c2562077084e29b /core/chain_manager.go | |
parent | 5b14fdb94b61af9f17f53593766a706c9780a7be (diff) | |
download | dexon-e7e2cbfc018f5e0353d26b94ff785a206c9a16c4.tar.gz dexon-e7e2cbfc018f5e0353d26b94ff785a206c9a16c4.tar.zst dexon-e7e2cbfc018f5e0353d26b94ff785a206c9a16c4.zip |
core: re-add parallel nonce checks
In this incancation, the processor waits until the nonce
has been verified before handling the block.
Diffstat (limited to 'core/chain_manager.go')
-rw-r--r-- | core/chain_manager.go | 107 |
1 files changed, 44 insertions, 63 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go index 088ca8d5b..a785e1854 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -540,17 +540,34 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { self.chainmu.Lock() defer self.chainmu.Unlock() - // A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring. + // A queued approach to delivering events. This is generally + // faster than direct delivery and requires much less mutex + // acquiring. var ( queue = make([]interface{}, len(chain)) queueEvent = queueEvent{queue: queue} stats struct{ queued, processed, ignored int } tstart = time.Now() + + nonceDone = make(chan nonceResult, len(chain)) + nonceQuit = make(chan struct{}) + nonceChecked = make([]bool, len(chain)) ) + // Start the parallel nonce verifier. + go verifyNonces(self.pow, chain, nonceQuit, nonceDone) + defer close(nonceQuit) + for i, block := range chain { - if block == nil { - continue + // Wait for block i's nonce to be verified before processing + // its state transition. + for nonceChecked[i] { + r := <-nonceDone + nonceChecked[r.i] = true + if !r.valid { + block := chain[i] + return i, ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce) + } } if BadHashes[block.Hash()] { @@ -559,10 +576,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { return i, err } - // create a nonce channel for parallisation of the nonce check - nonceErrCh := make(chan error) - go verifyBlockNonce(self.pow, block, nonceErrCh) - // Setting block.Td regardless of error (known for example) prevents errors down the line // in the protocol handler block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash()))) @@ -571,9 +584,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { // all others will fail too (unless a known block is returned). logs, err := self.processor.Process(block) if err != nil { - // empty the nonce channel - <-nonceErrCh - if IsKnownBlockErr(err) { stats.ignored++ continue @@ -597,11 +607,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { return i, err } - // Wait and check nonce channel and make sure it checks out fine - // otherwise return the error - if err := <-nonceErrCh; err != nil { - return i, err - } cblock := self.currentBlock // Compare the TD of the last known block in the canonical chain to make sure it's greater. @@ -776,66 +781,42 @@ func blockErr(block *types.Block, err error) { h := block.Header() glog.V(logger.Error).Infof("Bad block #%v (%x)\n", h.Number, h.Hash().Bytes()) glog.V(logger.Error).Infoln(err) - glog.V(logger.Debug).Infoln(block) + glog.V(logger.Debug).Infoln(verifyNonces) +} + +type nonceResult struct { + i int + valid bool } -// verifyNonces verifies nonces of the given blocks in parallel and returns +// block verifies nonces of the given blocks in parallel and returns // an error if one of the blocks nonce verifications failed. -func verifyNonces(pow pow.PoW, blocks []*types.Block) error { +func verifyNonces(pow pow.PoW, blocks []*types.Block, quit <-chan struct{}, done chan<- nonceResult) { // Spawn a few workers. They listen for blocks on the in channel // and send results on done. The workers will exit in the // background when in is closed. var ( - in = make(chan *types.Block) - done = make(chan error, runtime.GOMAXPROCS(0)) + in = make(chan int) + nworkers = runtime.GOMAXPROCS(0) ) defer close(in) - for i := 0; i < cap(done); i++ { - go verifyNonce(pow, in, done) + if len(blocks) < nworkers { + nworkers = len(blocks) } - // Feed blocks to the workers, aborting at the first invalid nonce. - var ( - running, i int - block *types.Block - sendin = in - ) - for i < len(blocks) || running > 0 { - if i == len(blocks) { - // Disable sending to in. - sendin = nil - } else { - block = blocks[i] - i++ - } - select { - case sendin <- block: - running++ - case err := <-done: - running-- - if err != nil { - return err + for i := 0; i < nworkers; i++ { + go func() { + for i := range in { + done <- nonceResult{i: i, valid: pow.Verify(blocks[i])} } - } + }() } - return nil -} - -// verifyNonce is a worker for the verifyNonces method. It will run until -// in is closed. -func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) { - for block := range in { - if !pow.Verify(block) { - done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce) - } else { - done <- nil + // Feed block indices to the workers. + for i := range blocks { + select { + case in <- i: + continue + case <-quit: + return } } } - -func verifyBlockNonce(pow pow.PoW, block *types.Block, done chan<- error) { - if !pow.Verify(block) { - done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce) - } else { - done <- nil - } -} |