aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-05-17 07:42:30 +0800
committerobscuren <geffobscura@gmail.com>2015-05-18 19:59:22 +0800
commitc67424ecc8a75d7c0bc942227a4c4e5c5628d7bc (patch)
treec234714a27c5bab93d56236601524cf7664df5e8
parent443d0248436f653bc2d56653dd52b8abbe408f60 (diff)
downloadgo-tangerine-c67424ecc8a75d7c0bc942227a4c4e5c5628d7bc.tar.gz
go-tangerine-c67424ecc8a75d7c0bc942227a4c4e5c5628d7bc.tar.zst
go-tangerine-c67424ecc8a75d7c0bc942227a4c4e5c5628d7bc.zip
core: parallelise nonce checking when processing blocks
ChainManager now uses a parallel approach to block processing where all nonces are checked seperatly from the block processing process. This speeds up the process by about 3 times on my i7
-rw-r--r--cmd/utils/flags.go2
-rw-r--r--core/block_processor.go14
-rw-r--r--core/chain_manager.go89
-rw-r--r--eth/backend.go4
4 files changed, 94 insertions, 15 deletions
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index 6ec4fdc55..f646e4fcc 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -336,8 +336,8 @@ func GetChain(ctx *cli.Context) (*core.ChainManager, common.Database, common.Dat
}
eventMux := new(event.TypeMux)
- chainManager := core.NewChainManager(blockDb, stateDb, eventMux)
pow := ethash.New()
+ chainManager := core.NewChainManager(blockDb, stateDb, pow, eventMux)
txPool := core.NewTxPool(eventMux, chainManager.State, chainManager.GasLimit)
blockProcessor := core.NewBlockProcessor(stateDb, extraDb, pow, txPool, chainManager, eventMux)
chainManager.SetProcessor(blockProcessor)
diff --git a/core/block_processor.go b/core/block_processor.go
index cae618b39..a021086c0 100644
--- a/core/block_processor.go
+++ b/core/block_processor.go
@@ -189,7 +189,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
state := state.New(parent.Root(), sm.db)
// Block validation
- if err = sm.ValidateHeader(block.Header(), parent.Header()); err != nil {
+ if err = sm.ValidateHeader(block.Header(), parent.Header(), false); err != nil {
return
}
@@ -269,7 +269,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
// Validates the current block. Returns an error if the block was invalid,
// an uncle or anything that isn't on the current block chain.
// Validation validates easy over difficult (dagger takes longer time = difficult)
-func (sm *BlockProcessor) ValidateHeader(block, parent *types.Header) error {
+func (sm *BlockProcessor) ValidateHeader(block, parent *types.Header, checkPow bool) error {
if big.NewInt(int64(len(block.Extra))).Cmp(params.MaximumExtraDataSize) == 1 {
return fmt.Errorf("Block extra data too long (%d)", len(block.Extra))
}
@@ -300,9 +300,11 @@ func (sm *BlockProcessor) ValidateHeader(block, parent *types.Header) error {
return BlockEqualTSErr //ValidationError("Block timestamp equal or less than previous block (%v - %v)", block.Time, parent.Time)
}
- // Verify the nonce of the block. Return an error if it's not valid
- if !sm.Pow.Verify(types.NewBlockWithHeader(block)) {
- return ValidationError("Block's nonce is invalid (= %x)", block.Nonce)
+ if checkPow {
+ // Verify the nonce of the block. Return an error if it's not valid
+ if !sm.Pow.Verify(types.NewBlockWithHeader(block)) {
+ return ValidationError("Block's nonce is invalid (= %x)", block.Nonce)
+ }
}
return nil
@@ -358,7 +360,7 @@ func (sm *BlockProcessor) VerifyUncles(statedb *state.StateDB, block, parent *ty
return UncleError("uncle[%d](%x)'s parent unknown (%x)", i, hash[:4], uncle.ParentHash[0:4])
}
- if err := sm.ValidateHeader(uncle, ancestorHeaders[uncle.ParentHash]); err != nil {
+ if err := sm.ValidateHeader(uncle, ancestorHeaders[uncle.ParentHash], true); err != nil {
return ValidationError(fmt.Sprintf("uncle[%d](%x) header invalid: %v", i, hash[:4], err))
}
}
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 62e518ca0..355e203c7 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -5,6 +5,7 @@ import (
"fmt"
"io"
"math/big"
+ "runtime"
"sync"
"time"
@@ -15,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -100,9 +102,11 @@ type ChainManager struct {
quit chan struct{}
wg sync.WaitGroup
+
+ pow pow.PoW
}
-func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *ChainManager {
+func NewChainManager(blockDb, stateDb common.Database, pow pow.PoW, mux *event.TypeMux) *ChainManager {
bc := &ChainManager{
blockDb: blockDb,
stateDb: stateDb,
@@ -110,6 +114,7 @@ func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *Chai
eventMux: mux,
quit: make(chan struct{}),
cache: NewBlockCache(blockCacheLimit),
+ pow: pow,
}
bc.setLastState()
@@ -529,10 +534,19 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
stats struct{ queued, processed, ignored int }
tstart = time.Now()
)
+
+ // check the nonce in parallel to the block processing
+ // this speeds catching up significantly
+ nonceErrCh := make(chan error)
+ go func() {
+ nonceErrCh <- verifyNonces(self.pow, chain)
+ }()
+
for i, block := range chain {
if block == nil {
continue
}
+
// Setting block.Td regardless of error (known for example) prevents errors down the line
// in the protocol handler
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
@@ -562,11 +576,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
continue
}
- h := block.Header()
-
- glog.V(logger.Error).Infof("INVALID block #%v (%x)\n", h.Number, h.Hash().Bytes())
- glog.V(logger.Error).Infoln(err)
- glog.V(logger.Debug).Infoln(block)
+ blockErr(block, err)
return i, err
}
@@ -620,6 +630,13 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
}
+ // check and wait for the nonce error channel and
+ // make sure no nonce error was thrown in the process
+ err := <-nonceErrCh
+ if err != nil {
+ return 0, err
+ }
+
if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {
tend := time.Since(tstart)
start, end := chain[0], chain[len(chain)-1]
@@ -718,3 +735,63 @@ out:
}
}
}
+
+func blockErr(block *types.Block, err error) {
+ h := block.Header()
+ glog.V(logger.Error).Infof("INVALID block #%v (%x)\n", h.Number, h.Hash().Bytes())
+ glog.V(logger.Error).Infoln(err)
+ glog.V(logger.Debug).Infoln(block)
+}
+
+// verifyNonces verifies nonces of the given blocks in parallel and returns
+// an error if one of the blocks nonce verifications failed.
+func verifyNonces(pow pow.PoW, blocks []*types.Block) error {
+ // Spawn a few workers. They listen for blocks on the in channel
+ // and send results on done. The workers will exit in the
+ // background when in is closed.
+ var (
+ in = make(chan *types.Block)
+ done = make(chan error, runtime.GOMAXPROCS(0))
+ )
+ defer close(in)
+ for i := 0; i < cap(done); i++ {
+ go verifyNonce(pow, in, done)
+ }
+ // Feed blocks to the workers, aborting at the first invalid nonce.
+ var (
+ running, i int
+ block *types.Block
+ sendin = in
+ )
+ for i < len(blocks) || running > 0 {
+ if i == len(blocks) {
+ // Disable sending to in.
+ sendin = nil
+ } else {
+ block = blocks[i]
+ i++
+ }
+ select {
+ case sendin <- block:
+ running++
+ case err := <-done:
+ running--
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// verifyNonce is a worker for the verifyNonces method. It will run until
+// in is closed.
+func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) {
+ for block := range in {
+ if !pow.Verify(block) {
+ done <- ValidationError("Block's nonce is invalid (= %x)", block.Nonce)
+ } else {
+ done <- nil
+ }
+ }
+}
diff --git a/eth/backend.go b/eth/backend.go
index a7107f8d8..519a4c410 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -266,9 +266,9 @@ func New(config *Config) (*Ethereum, error) {
MinerThreads: config.MinerThreads,
}
- eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
- eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock)
eth.pow = ethash.New()
+ eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.pow, eth.EventMux())
+ eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock)
eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit)
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux())
eth.chainManager.SetProcessor(eth.blockProcessor)