aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2015-05-19 02:13:53 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2015-05-19 02:13:53 +0800
commitbd0c0a633bebe8893c4b06a50740047e4376107c (patch)
tree0b0632fa78f80206eb1fd96dd2b3d3e6b43ac1aa /core
parentd6adadc5e37f3d01cac25b5371e1c42b7036735a (diff)
parent40717465bc8c0e2feb0f1e48ddc721c7292ba992 (diff)
downloaddexon-bd0c0a633bebe8893c4b06a50740047e4376107c.tar.gz
dexon-bd0c0a633bebe8893c4b06a50740047e4376107c.tar.zst
dexon-bd0c0a633bebe8893c4b06a50740047e4376107c.zip
Merge pull request #1022 from obscuren/parallel_nonce_checks
Parallelise nonce checks
Diffstat (limited to 'core')
-rw-r--r--core/block_processor.go14
-rw-r--r--core/block_processor_test.go6
-rw-r--r--core/chain_makers.go2
-rw-r--r--core/chain_manager.go91
-rw-r--r--core/chain_manager_test.go15
5 files changed, 107 insertions, 21 deletions
diff --git a/core/block_processor.go b/core/block_processor.go
index 0652d217f..20e6722a4 100644
--- a/core/block_processor.go
+++ b/core/block_processor.go
@@ -188,7 +188,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
state := state.New(parent.Root(), sm.db)
// Block validation
- if err = sm.ValidateHeader(block.Header(), parent.Header()); err != nil {
+ if err = sm.ValidateHeader(block.Header(), parent.Header(), false); err != nil {
return
}
@@ -268,7 +268,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
// Validates the current block. Returns an error if the block was invalid,
// an uncle or anything that isn't on the current block chain.
// Validation validates easy over difficult (dagger takes longer time = difficult)
-func (sm *BlockProcessor) ValidateHeader(block, parent *types.Header) error {
+func (sm *BlockProcessor) ValidateHeader(block, parent *types.Header, checkPow bool) error {
if big.NewInt(int64(len(block.Extra))).Cmp(params.MaximumExtraDataSize) == 1 {
return fmt.Errorf("Block extra data too long (%d)", len(block.Extra))
}
@@ -299,9 +299,11 @@ func (sm *BlockProcessor) ValidateHeader(block, parent *types.Header) error {
return BlockEqualTSErr //ValidationError("Block timestamp equal or less than previous block (%v - %v)", block.Time, parent.Time)
}
- // Verify the nonce of the block. Return an error if it's not valid
- if !sm.Pow.Verify(types.NewBlockWithHeader(block)) {
- return ValidationError("Block's nonce is invalid (= %x)", block.Nonce)
+ if checkPow {
+ // Verify the nonce of the block. Return an error if it's not valid
+ if !sm.Pow.Verify(types.NewBlockWithHeader(block)) {
+ return ValidationError("Block's nonce is invalid (= %x)", block.Nonce)
+ }
}
return nil
@@ -375,7 +377,7 @@ func (sm *BlockProcessor) VerifyUncles(statedb *state.StateDB, block, parent *ty
return UncleError("uncle[%d](%x)'s parent unknown (%x)", i, hash[:4], uncle.ParentHash[0:4])
}
- if err := sm.ValidateHeader(uncle, ancestorHeaders[uncle.ParentHash]); err != nil {
+ if err := sm.ValidateHeader(uncle, ancestorHeaders[uncle.ParentHash], true); err != nil {
return ValidationError(fmt.Sprintf("uncle[%d](%x) header invalid: %v", i, hash[:4], err))
}
}
diff --git a/core/block_processor_test.go b/core/block_processor_test.go
index 02524a4c1..e0aa5fb4c 100644
--- a/core/block_processor_test.go
+++ b/core/block_processor_test.go
@@ -14,7 +14,7 @@ func proc() (*BlockProcessor, *ChainManager) {
db, _ := ethdb.NewMemDatabase()
var mux event.TypeMux
- chainMan := NewChainManager(db, db, &mux)
+ chainMan := NewChainManager(db, db, thePow(), &mux)
return NewBlockProcessor(db, db, ezp.New(), nil, chainMan, &mux), chainMan
}
@@ -24,13 +24,13 @@ func TestNumber(t *testing.T) {
block1.Header().Number = big.NewInt(3)
block1.Header().Time--
- err := bp.ValidateHeader(block1.Header(), chain.Genesis().Header())
+ err := bp.ValidateHeader(block1.Header(), chain.Genesis().Header(), false)
if err != BlockNumberErr {
t.Errorf("expected block number error %v", err)
}
block1 = chain.NewBlock(common.Address{})
- err = bp.ValidateHeader(block1.Header(), chain.Genesis().Header())
+ err = bp.ValidateHeader(block1.Header(), chain.Genesis().Header(), false)
if err == BlockNumberErr {
t.Errorf("didn't expect block number error")
}
diff --git a/core/chain_makers.go b/core/chain_makers.go
index acf7b39cc..44f17cc33 100644
--- a/core/chain_makers.go
+++ b/core/chain_makers.go
@@ -109,7 +109,7 @@ func makeChain(bman *BlockProcessor, parent *types.Block, max int, db common.Dat
// Effectively a fork factory
func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Database) *ChainManager {
genesis := GenesisBlock(db)
- bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: eventMux}
+ bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: eventMux, pow: FakePow{}}
bc.txState = state.ManageState(state.New(genesis.Root(), db))
bc.futureBlocks = NewBlockCache(1000)
if block == nil {
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 62e518ca0..7dff7dffd 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -5,6 +5,7 @@ import (
"fmt"
"io"
"math/big"
+ "runtime"
"sync"
"time"
@@ -15,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -100,9 +102,11 @@ type ChainManager struct {
quit chan struct{}
wg sync.WaitGroup
+
+ pow pow.PoW
}
-func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *ChainManager {
+func NewChainManager(blockDb, stateDb common.Database, pow pow.PoW, mux *event.TypeMux) *ChainManager {
bc := &ChainManager{
blockDb: blockDb,
stateDb: stateDb,
@@ -110,6 +114,7 @@ func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *Chai
eventMux: mux,
quit: make(chan struct{}),
cache: NewBlockCache(blockCacheLimit),
+ pow: pow,
}
bc.setLastState()
@@ -343,7 +348,7 @@ func (self *ChainManager) Export(w io.Writer) error {
last := self.currentBlock.NumberU64()
- for nr := uint64(0); nr <= last; nr++ {
+ for nr := uint64(1); nr <= last; nr++ {
block := self.GetBlockByNumber(nr)
if block == nil {
return fmt.Errorf("export failed on #%d: not found", nr)
@@ -529,10 +534,19 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
stats struct{ queued, processed, ignored int }
tstart = time.Now()
)
+
+ // check the nonce in parallel to the block processing
+ // this speeds catching up significantly
+ nonceErrCh := make(chan error)
+ go func() {
+ nonceErrCh <- verifyNonces(self.pow, chain)
+ }()
+
for i, block := range chain {
if block == nil {
continue
}
+
// Setting block.Td regardless of error (known for example) prevents errors down the line
// in the protocol handler
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
@@ -562,11 +576,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
continue
}
- h := block.Header()
-
- glog.V(logger.Error).Infof("INVALID block #%v (%x)\n", h.Number, h.Hash().Bytes())
- glog.V(logger.Error).Infoln(err)
- glog.V(logger.Debug).Infoln(block)
+ blockErr(block, err)
return i, err
}
@@ -620,6 +630,13 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
}
+ // check and wait for the nonce error channel and
+ // make sure no nonce error was thrown in the process
+ err := <-nonceErrCh
+ if err != nil {
+ return 0, err
+ }
+
if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {
tend := time.Since(tstart)
start, end := chain[0], chain[len(chain)-1]
@@ -718,3 +735,63 @@ out:
}
}
}
+
+func blockErr(block *types.Block, err error) {
+ h := block.Header()
+ glog.V(logger.Error).Infof("INVALID block #%v (%x)\n", h.Number, h.Hash().Bytes())
+ glog.V(logger.Error).Infoln(err)
+ glog.V(logger.Debug).Infoln(block)
+}
+
+// verifyNonces verifies nonces of the given blocks in parallel and returns
+// an error if one of the blocks nonce verifications failed.
+func verifyNonces(pow pow.PoW, blocks []*types.Block) error {
+ // Spawn a few workers. They listen for blocks on the in channel
+ // and send results on done. The workers will exit in the
+ // background when in is closed.
+ var (
+ in = make(chan *types.Block)
+ done = make(chan error, runtime.GOMAXPROCS(0))
+ )
+ defer close(in)
+ for i := 0; i < cap(done); i++ {
+ go verifyNonce(pow, in, done)
+ }
+ // Feed blocks to the workers, aborting at the first invalid nonce.
+ var (
+ running, i int
+ block *types.Block
+ sendin = in
+ )
+ for i < len(blocks) || running > 0 {
+ if i == len(blocks) {
+ // Disable sending to in.
+ sendin = nil
+ } else {
+ block = blocks[i]
+ i++
+ }
+ select {
+ case sendin <- block:
+ running++
+ case err := <-done:
+ running--
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+// verifyNonce is a worker for the verifyNonces method. It will run until
+// in is closed.
+func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) {
+ for block := range in {
+ if !pow.Verify(block) {
+ done <- ValidationError("Block(#%v) nonce is invalid (= %x)", block.Number(), block.Nonce)
+ } else {
+ done <- nil
+ }
+ }
+}
diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go
index b5155e223..56bef24f3 100644
--- a/core/chain_manager_test.go
+++ b/core/chain_manager_test.go
@@ -9,11 +9,13 @@ import (
"strconv"
"testing"
+ "github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -21,6 +23,11 @@ func init() {
runtime.GOMAXPROCS(runtime.NumCPU())
}
+func thePow() pow.PoW {
+ pow, _ := ethash.NewForTesting()
+ return pow
+}
+
// Test fork of length N starting from block i
func testFork(t *testing.T, bman *BlockProcessor, i, N int, f func(td1, td2 *big.Int)) {
// switch databases to process the new chain
@@ -259,7 +266,7 @@ func TestChainInsertions(t *testing.T) {
}
var eventMux event.TypeMux
- chainMan := NewChainManager(db, db, &eventMux)
+ chainMan := NewChainManager(db, db, thePow(), &eventMux)
txPool := NewTxPool(&eventMux, chainMan.State, func() *big.Int { return big.NewInt(100000000) })
blockMan := NewBlockProcessor(db, db, nil, txPool, chainMan, &eventMux)
chainMan.SetProcessor(blockMan)
@@ -305,7 +312,7 @@ func TestChainMultipleInsertions(t *testing.T) {
}
}
var eventMux event.TypeMux
- chainMan := NewChainManager(db, db, &eventMux)
+ chainMan := NewChainManager(db, db, thePow(), &eventMux)
txPool := NewTxPool(&eventMux, chainMan.State, func() *big.Int { return big.NewInt(100000000) })
blockMan := NewBlockProcessor(db, db, nil, txPool, chainMan, &eventMux)
chainMan.SetProcessor(blockMan)
@@ -334,7 +341,7 @@ func TestGetAncestors(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
var eventMux event.TypeMux
- chainMan := NewChainManager(db, db, &eventMux)
+ chainMan := NewChainManager(db, db, thePow(), &eventMux)
chain, err := loadChain("valid1", t)
if err != nil {
fmt.Println(err)
@@ -372,7 +379,7 @@ func makeChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Block
func chm(genesis *types.Block, db common.Database) *ChainManager {
var eventMux event.TypeMux
- bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: &eventMux}
+ bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: &eventMux, pow: FakePow{}}
bc.cache = NewBlockCache(100)
bc.futureBlocks = NewBlockCache(100)
bc.processor = bproc{}