aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--core/chain_manager.go116
1 files changed, 49 insertions, 67 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 088ca8d5b..927055103 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -522,13 +522,14 @@ type queueEvent struct {
}
func (self *ChainManager) procFutureBlocks() {
- blocks := []*types.Block{}
+ var blocks []*types.Block
self.futureBlocks.Each(func(i int, block *types.Block) {
blocks = append(blocks, block)
})
-
- types.BlockBy(types.Number).Sort(blocks)
- self.InsertChain(blocks)
+ if len(blocks) > 0 {
+ types.BlockBy(types.Number).Sort(blocks)
+ self.InsertChain(blocks)
+ }
}
// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
@@ -540,17 +541,34 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
self.chainmu.Lock()
defer self.chainmu.Unlock()
- // A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring.
+ // A queued approach to delivering events. This is generally
+ // faster than direct delivery and requires much less mutex
+ // acquiring.
var (
queue = make([]interface{}, len(chain))
queueEvent = queueEvent{queue: queue}
stats struct{ queued, processed, ignored int }
tstart = time.Now()
+
+ nonceDone = make(chan nonceResult, len(chain))
+ nonceQuit = make(chan struct{})
+ nonceChecked = make([]bool, len(chain))
)
+ // Start the parallel nonce verifier.
+ go verifyNonces(self.pow, chain, nonceQuit, nonceDone)
+ defer close(nonceQuit)
+
for i, block := range chain {
- if block == nil {
- continue
+ // Wait for block i's nonce to be verified before processing
+ // its state transition.
+ for nonceChecked[i] {
+ r := <-nonceDone
+ nonceChecked[r.i] = true
+ if !r.valid {
+ block := chain[i]
+ return i, ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce)
+ }
}
if BadHashes[block.Hash()] {
@@ -559,10 +577,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return i, err
}
- // create a nonce channel for parallisation of the nonce check
- nonceErrCh := make(chan error)
- go verifyBlockNonce(self.pow, block, nonceErrCh)
-
// Setting block.Td regardless of error (known for example) prevents errors down the line
// in the protocol handler
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
@@ -571,9 +585,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
// all others will fail too (unless a known block is returned).
logs, err := self.processor.Process(block)
if err != nil {
- // empty the nonce channel
- <-nonceErrCh
-
if IsKnownBlockErr(err) {
stats.ignored++
continue
@@ -597,11 +608,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return i, err
}
- // Wait and check nonce channel and make sure it checks out fine
- // otherwise return the error
- if err := <-nonceErrCh; err != nil {
- return i, err
- }
cblock := self.currentBlock
// Compare the TD of the last known block in the canonical chain to make sure it's greater.
@@ -776,66 +782,42 @@ func blockErr(block *types.Block, err error) {
h := block.Header()
glog.V(logger.Error).Infof("Bad block #%v (%x)\n", h.Number, h.Hash().Bytes())
glog.V(logger.Error).Infoln(err)
- glog.V(logger.Debug).Infoln(block)
+ glog.V(logger.Debug).Infoln(verifyNonces)
}
-// verifyNonces verifies nonces of the given blocks in parallel and returns
+type nonceResult struct {
+ i int
+ valid bool
+}
+
+// block verifies nonces of the given blocks in parallel and returns
// an error if one of the blocks nonce verifications failed.
-func verifyNonces(pow pow.PoW, blocks []*types.Block) error {
+func verifyNonces(pow pow.PoW, blocks []*types.Block, quit <-chan struct{}, done chan<- nonceResult) {
// Spawn a few workers. They listen for blocks on the in channel
// and send results on done. The workers will exit in the
// background when in is closed.
var (
- in = make(chan *types.Block)
- done = make(chan error, runtime.GOMAXPROCS(0))
+ in = make(chan int)
+ nworkers = runtime.GOMAXPROCS(0)
)
defer close(in)
- for i := 0; i < cap(done); i++ {
- go verifyNonce(pow, in, done)
+ if len(blocks) < nworkers {
+ nworkers = len(blocks)
}
- // Feed blocks to the workers, aborting at the first invalid nonce.
- var (
- running, i int
- block *types.Block
- sendin = in
- )
- for i < len(blocks) || running > 0 {
- if i == len(blocks) {
- // Disable sending to in.
- sendin = nil
- } else {
- block = blocks[i]
- i++
- }
- select {
- case sendin <- block:
- running++
- case err := <-done:
- running--
- if err != nil {
- return err
+ for i := 0; i < nworkers; i++ {
+ go func() {
+ for i := range in {
+ done <- nonceResult{i: i, valid: pow.Verify(blocks[i])}
}
- }
+ }()
}
- return nil
-}
-
-// verifyNonce is a worker for the verifyNonces method. It will run until
-// in is closed.
-func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) {
- for block := range in {
- if !pow.Verify(block) {
- done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce)
- } else {
- done <- nil
+ // Feed block indices to the workers.
+ for i := range blocks {
+ select {
+ case in <- i:
+ continue
+ case <-quit:
+ return
}
}
}
-
-func verifyBlockNonce(pow pow.PoW, block *types.Block, done chan<- error) {
- if !pow.Verify(block) {
- done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce)
- } else {
- done <- nil
- }
-}