diff options
-rw-r--r-- | core/chain_manager.go | 116 |
1 files changed, 49 insertions, 67 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go index 088ca8d5b..927055103 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -522,13 +522,14 @@ type queueEvent struct { } func (self *ChainManager) procFutureBlocks() { - blocks := []*types.Block{} + var blocks []*types.Block self.futureBlocks.Each(func(i int, block *types.Block) { blocks = append(blocks, block) }) - - types.BlockBy(types.Number).Sort(blocks) - self.InsertChain(blocks) + if len(blocks) > 0 { + types.BlockBy(types.Number).Sort(blocks) + self.InsertChain(blocks) + } } // InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned @@ -540,17 +541,34 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { self.chainmu.Lock() defer self.chainmu.Unlock() - // A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring. + // A queued approach to delivering events. This is generally + // faster than direct delivery and requires much less mutex + // acquiring. var ( queue = make([]interface{}, len(chain)) queueEvent = queueEvent{queue: queue} stats struct{ queued, processed, ignored int } tstart = time.Now() + + nonceDone = make(chan nonceResult, len(chain)) + nonceQuit = make(chan struct{}) + nonceChecked = make([]bool, len(chain)) ) + // Start the parallel nonce verifier. + go verifyNonces(self.pow, chain, nonceQuit, nonceDone) + defer close(nonceQuit) + for i, block := range chain { - if block == nil { - continue + // Wait for block i's nonce to be verified before processing + // its state transition. + for nonceChecked[i] { + r := <-nonceDone + nonceChecked[r.i] = true + if !r.valid { + block := chain[i] + return i, ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce) + } } if BadHashes[block.Hash()] { @@ -559,10 +577,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { return i, err } - // create a nonce channel for parallisation of the nonce check - nonceErrCh := make(chan error) - go verifyBlockNonce(self.pow, block, nonceErrCh) - // Setting block.Td regardless of error (known for example) prevents errors down the line // in the protocol handler block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash()))) @@ -571,9 +585,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { // all others will fail too (unless a known block is returned). logs, err := self.processor.Process(block) if err != nil { - // empty the nonce channel - <-nonceErrCh - if IsKnownBlockErr(err) { stats.ignored++ continue @@ -597,11 +608,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { return i, err } - // Wait and check nonce channel and make sure it checks out fine - // otherwise return the error - if err := <-nonceErrCh; err != nil { - return i, err - } cblock := self.currentBlock // Compare the TD of the last known block in the canonical chain to make sure it's greater. @@ -776,66 +782,42 @@ func blockErr(block *types.Block, err error) { h := block.Header() glog.V(logger.Error).Infof("Bad block #%v (%x)\n", h.Number, h.Hash().Bytes()) glog.V(logger.Error).Infoln(err) - glog.V(logger.Debug).Infoln(block) + glog.V(logger.Debug).Infoln(verifyNonces) } -// verifyNonces verifies nonces of the given blocks in parallel and returns +type nonceResult struct { + i int + valid bool +} + +// block verifies nonces of the given blocks in parallel and returns // an error if one of the blocks nonce verifications failed. -func verifyNonces(pow pow.PoW, blocks []*types.Block) error { +func verifyNonces(pow pow.PoW, blocks []*types.Block, quit <-chan struct{}, done chan<- nonceResult) { // Spawn a few workers. They listen for blocks on the in channel // and send results on done. The workers will exit in the // background when in is closed. var ( - in = make(chan *types.Block) - done = make(chan error, runtime.GOMAXPROCS(0)) + in = make(chan int) + nworkers = runtime.GOMAXPROCS(0) ) defer close(in) - for i := 0; i < cap(done); i++ { - go verifyNonce(pow, in, done) + if len(blocks) < nworkers { + nworkers = len(blocks) } - // Feed blocks to the workers, aborting at the first invalid nonce. - var ( - running, i int - block *types.Block - sendin = in - ) - for i < len(blocks) || running > 0 { - if i == len(blocks) { - // Disable sending to in. - sendin = nil - } else { - block = blocks[i] - i++ - } - select { - case sendin <- block: - running++ - case err := <-done: - running-- - if err != nil { - return err + for i := 0; i < nworkers; i++ { + go func() { + for i := range in { + done <- nonceResult{i: i, valid: pow.Verify(blocks[i])} } - } + }() } - return nil -} - -// verifyNonce is a worker for the verifyNonces method. It will run until -// in is closed. -func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) { - for block := range in { - if !pow.Verify(block) { - done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce) - } else { - done <- nil + // Feed block indices to the workers. + for i := range blocks { + select { + case in <- i: + continue + case <-quit: + return } } } - -func verifyBlockNonce(pow pow.PoW, block *types.Block, done chan<- error) { - if !pow.Verify(block) { - done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce) - } else { - done <- nil - } -} |