aboutsummaryrefslogtreecommitdiffstats
path: root/core/chain_manager.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/chain_manager.go')
-rw-r--r--core/chain_manager.go214
1 files changed, 105 insertions, 109 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 3e8ef6fd8..6897c453c 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -30,8 +30,9 @@ var (
)
const (
- blockCacheLimit = 10000
- maxFutureBlocks = 256
+ blockCacheLimit = 10000
+ maxFutureBlocks = 256
+ maxTimeFutureBlocks = 30
)
func CalcDifficulty(block, parent *types.Header) *big.Int {
@@ -55,10 +56,7 @@ func CalcTD(block, parent *types.Block) *big.Int {
if parent == nil {
return block.Difficulty()
}
-
- td := new(big.Int).Add(parent.Td, block.Header().Difficulty)
-
- return td
+ return new(big.Int).Add(parent.Td, block.Header().Difficulty)
}
func CalcGasLimit(parent *types.Block) *big.Int {
@@ -108,16 +106,23 @@ type ChainManager struct {
pow pow.PoW
}
-func NewChainManager(blockDb, stateDb common.Database, pow pow.PoW, mux *event.TypeMux) *ChainManager {
+func NewChainManager(genesis *types.Block, blockDb, stateDb common.Database, pow pow.PoW, mux *event.TypeMux) (*ChainManager, error) {
bc := &ChainManager{
blockDb: blockDb,
stateDb: stateDb,
- genesisBlock: GenesisBlock(stateDb),
+ genesisBlock: GenesisBlock(42, stateDb),
eventMux: mux,
quit: make(chan struct{}),
cache: NewBlockCache(blockCacheLimit),
pow: pow,
}
+
+ // Check the genesis block given to the chain manager. If the genesis block mismatches block number 0
+ // throw an error. If no block or the same block's found continue.
+ if g := bc.GetBlockByNumber(0); g != nil && g.Hash() != genesis.Hash() {
+ return nil, fmt.Errorf("Genesis mismatch. Maybe different nonce (%d vs %d)? %x / %x", g.Nonce(), genesis.Nonce(), g.Hash().Bytes()[:4], genesis.Hash().Bytes()[:4])
+ }
+ bc.genesisBlock = genesis
bc.setLastState()
// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
@@ -143,7 +148,7 @@ func NewChainManager(blockDb, stateDb common.Database, pow pow.PoW, mux *event.T
go bc.update()
- return bc
+ return bc, nil
}
func (bc *ChainManager) SetHead(head *types.Block) {
@@ -170,11 +175,13 @@ func (self *ChainManager) Td() *big.Int {
self.mu.RLock()
defer self.mu.RUnlock()
- return self.td
+ return new(big.Int).Set(self.td)
}
func (self *ChainManager) GasLimit() *big.Int {
- // return self.currentGasLimit
+ self.mu.RLock()
+ defer self.mu.RUnlock()
+
return self.currentBlock.GasLimit()
}
@@ -196,7 +203,7 @@ func (self *ChainManager) Status() (td *big.Int, currentBlock common.Hash, genes
self.mu.RLock()
defer self.mu.RUnlock()
- return self.td, self.currentBlock.Hash(), self.genesisBlock.Hash()
+ return new(big.Int).Set(self.td), self.currentBlock.Hash(), self.genesisBlock.Hash()
}
func (self *ChainManager) SetProcessor(proc types.BlockProcessor) {
@@ -214,19 +221,6 @@ func (self *ChainManager) TransState() *state.StateDB {
return self.transState
}
-func (self *ChainManager) TxState() *state.ManagedState {
- self.tsmu.RLock()
- defer self.tsmu.RUnlock()
-
- return self.txState
-}
-
-func (self *ChainManager) setTxState(statedb *state.StateDB) {
- self.tsmu.Lock()
- defer self.tsmu.Unlock()
- self.txState = state.ManageState(statedb)
-}
-
func (self *ChainManager) setTransState(statedb *state.StateDB) {
self.transState = statedb
}
@@ -353,13 +347,24 @@ func (bc *ChainManager) ResetWithGenesisBlock(gb *types.Block) {
// Export writes the active chain to the given writer.
func (self *ChainManager) Export(w io.Writer) error {
+ if err := self.ExportN(w, uint64(0), self.currentBlock.NumberU64()); err != nil {
+ return err
+ }
+ return nil
+}
+
+// ExportN writes a subset of the active chain to the given writer.
+func (self *ChainManager) ExportN(w io.Writer, first uint64, last uint64) error {
self.mu.RLock()
defer self.mu.RUnlock()
- glog.V(logger.Info).Infof("exporting %v blocks...\n", self.currentBlock.Header().Number)
- last := self.currentBlock.NumberU64()
+ if first > last {
+ return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
+ }
+
+ glog.V(logger.Info).Infof("exporting %d blocks...\n", last-first+1)
- for nr := uint64(1); nr <= last; nr++ {
+ for nr := first; nr <= last; nr++ {
block := self.GetBlockByNumber(nr)
if block == nil {
return fmt.Errorf("export failed on #%d: not found", nr)
@@ -373,11 +378,13 @@ func (self *ChainManager) Export(w io.Writer) error {
return nil
}
+// insert injects a block into the current chain block chain. Note, this function
+// assumes that the `mu` mutex is held!
func (bc *ChainManager) insert(block *types.Block) {
key := append(blockNumPre, block.Number().Bytes()...)
bc.blockDb.Put(key, block.Hash().Bytes())
-
bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes())
+
bc.currentBlock = block
bc.lastBlockHash = block.Hash()
}
@@ -481,9 +488,10 @@ func (self *ChainManager) GetAncestors(block *types.Block, length int) (blocks [
return
}
+// setTotalDifficulty updates the TD of the chain manager. Note, this function
+// assumes that the `mu` mutex is held!
func (bc *ChainManager) setTotalDifficulty(td *big.Int) {
- //bc.blockDb.Put([]byte("LTD"), td.Bytes())
- bc.td = td
+ bc.td = new(big.Int).Set(td)
}
func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) {
@@ -522,13 +530,14 @@ type queueEvent struct {
}
func (self *ChainManager) procFutureBlocks() {
- blocks := make([]*types.Block, len(self.futureBlocks.blocks))
+ var blocks []*types.Block
self.futureBlocks.Each(func(i int, block *types.Block) {
- blocks[i] = block
+ blocks = append(blocks, block)
})
-
- types.BlockBy(types.Number).Sort(blocks)
- self.InsertChain(blocks)
+ if len(blocks) > 0 {
+ types.BlockBy(types.Number).Sort(blocks)
+ self.InsertChain(blocks)
+ }
}
// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
@@ -540,17 +549,35 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
self.chainmu.Lock()
defer self.chainmu.Unlock()
- // A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring.
+ // A queued approach to delivering events. This is generally
+ // faster than direct delivery and requires much less mutex
+ // acquiring.
var (
queue = make([]interface{}, len(chain))
queueEvent = queueEvent{queue: queue}
stats struct{ queued, processed, ignored int }
tstart = time.Now()
+
+ nonceDone = make(chan nonceResult, len(chain))
+ nonceQuit = make(chan struct{})
+ nonceChecked = make([]bool, len(chain))
)
+ // Start the parallel nonce verifier.
+ go verifyNonces(self.pow, chain, nonceQuit, nonceDone)
+ defer close(nonceQuit)
+
for i, block := range chain {
- if block == nil {
- continue
+ bstart := time.Now()
+ // Wait for block i's nonce to be verified before processing
+ // its state transition.
+ for !nonceChecked[i] {
+ r := <-nonceDone
+ nonceChecked[r.i] = true
+ if !r.valid {
+ block := chain[r.i]
+ return r.i, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
+ }
}
if BadHashes[block.Hash()] {
@@ -559,10 +586,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return i, err
}
- // create a nonce channel for parallisation of the nonce check
- nonceErrCh := make(chan error)
- go verifyBlockNonce(self.pow, block, nonceErrCh)
-
// Setting block.Td regardless of error (known for example) prevents errors down the line
// in the protocol handler
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
@@ -571,15 +594,19 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
// all others will fail too (unless a known block is returned).
logs, err := self.processor.Process(block)
if err != nil {
- // empty the nonce channel
- <-nonceErrCh
-
if IsKnownBlockErr(err) {
stats.ignored++
continue
}
if err == BlockFutureErr {
+ // Allow up to MaxFuture second in the future blocks. If this limit
+ // is exceeded the chain is discarded and processed at a later time
+ // if given.
+ if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max {
+ return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
+ }
+
block.SetQueued(true)
self.futureBlocks.Push(block)
stats.queued++
@@ -597,16 +624,11 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return i, err
}
- // Wait and check nonce channel and make sure it checks out fine
- // otherwise return the error
- if err := <-nonceErrCh; err != nil {
- return i, err
- }
cblock := self.currentBlock
// Compare the TD of the last known block in the canonical chain to make sure it's greater.
// At this point it's possible that a different chain (fork) becomes the new canonical chain.
- if block.Td.Cmp(self.td) > 0 {
+ if block.Td.Cmp(self.Td()) > 0 {
// chain fork
if block.ParentHash() != cblock.Hash() {
// during split we merge two different chains and create the new canonical chain
@@ -619,8 +641,10 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
queueEvent.splitCount++
}
+ self.mu.Lock()
self.setTotalDifficulty(block.Td)
self.insert(block)
+ self.mu.Unlock()
jsonlogger.LogJson(&logger.EthChainNewHead{
BlockHash: block.Hash().Hex(),
@@ -636,11 +660,11 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
queueEvent.canonicalCount++
if glog.V(logger.Debug) {
- glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...)\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4])
+ glog.Infof("[%v] inserted block #%d (%d TXs %d UNCs) (%x...). Took %v\n", time.Now().UnixNano(), block.Number(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
}
} else {
if glog.V(logger.Detail) {
- glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...)\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4])
+ glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
}
queue[i] = ChainSideEvent{block, logs}
@@ -728,9 +752,11 @@ func (self *ChainManager) merge(oldBlock, newBlock *types.Block) error {
}
// insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly
+ self.mu.Lock()
for _, block := range newChain {
self.insert(block)
}
+ self.mu.Unlock()
return nil
}
@@ -744,7 +770,7 @@ out:
case ev := <-events.Chan():
switch ev := ev.(type) {
case queueEvent:
- for i, event := range ev.queue {
+ for _, event := range ev.queue {
switch event := event.(type) {
case ChainEvent:
// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
@@ -753,12 +779,6 @@ out:
self.currentGasLimit = CalcGasLimit(event.Block)
self.eventMux.Post(ChainHeadEvent{event.Block})
}
- case ChainSplitEvent:
- // On chain splits we need to reset the transaction state. We can't be sure whether the actual
- // state of the accounts are still valid.
- if i == ev.splitCount {
- self.setTxState(state.New(event.Block.Root(), self.stateDb))
- }
}
self.eventMux.Post(event)
@@ -776,66 +796,42 @@ func blockErr(block *types.Block, err error) {
h := block.Header()
glog.V(logger.Error).Infof("Bad block #%v (%x)\n", h.Number, h.Hash().Bytes())
glog.V(logger.Error).Infoln(err)
- glog.V(logger.Debug).Infoln(block)
+ glog.V(logger.Debug).Infoln(verifyNonces)
+}
+
+type nonceResult struct {
+ i int
+ valid bool
}
-// verifyNonces verifies nonces of the given blocks in parallel and returns
+// block verifies nonces of the given blocks in parallel and returns
// an error if one of the blocks nonce verifications failed.
-func verifyNonces(pow pow.PoW, blocks []*types.Block) error {
+func verifyNonces(pow pow.PoW, blocks []*types.Block, quit <-chan struct{}, done chan<- nonceResult) {
// Spawn a few workers. They listen for blocks on the in channel
// and send results on done. The workers will exit in the
// background when in is closed.
var (
- in = make(chan *types.Block)
- done = make(chan error, runtime.GOMAXPROCS(0))
+ in = make(chan int)
+ nworkers = runtime.GOMAXPROCS(0)
)
defer close(in)
- for i := 0; i < cap(done); i++ {
- go verifyNonce(pow, in, done)
+ if len(blocks) < nworkers {
+ nworkers = len(blocks)
}
- // Feed blocks to the workers, aborting at the first invalid nonce.
- var (
- running, i int
- block *types.Block
- sendin = in
- )
- for i < len(blocks) || running > 0 {
- if i == len(blocks) {
- // Disable sending to in.
- sendin = nil
- } else {
- block = blocks[i]
- i++
- }
- select {
- case sendin <- block:
- running++
- case err := <-done:
- running--
- if err != nil {
- return err
+ for i := 0; i < nworkers; i++ {
+ go func() {
+ for i := range in {
+ done <- nonceResult{i: i, valid: pow.Verify(blocks[i])}
}
- }
+ }()
}
- return nil
-}
-
-// verifyNonce is a worker for the verifyNonces method. It will run until
-// in is closed.
-func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) {
- for block := range in {
- if !pow.Verify(block) {
- done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce)
- } else {
- done <- nil
+ // Feed block indices to the workers.
+ for i := range blocks {
+ select {
+ case in <- i:
+ continue
+ case <-quit:
+ return
}
}
}
-
-func verifyBlockNonce(pow pow.PoW, block *types.Block, done chan<- error) {
- if !pow.Verify(block) {
- done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce)
- } else {
- done <- nil
- }
-}