aboutsummaryrefslogtreecommitdiffstats
path: root/core/blockchain.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-10-13 17:04:25 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-10-21 21:49:55 +0800
commit5b0ee8ec304663898073b7a4c659e1def23716df (patch)
tree8f2f49a8d26dc1c29e1d360fb787ab420d90a2ae /core/blockchain.go
parentaa0538db0b5de2bb2c609d629b65d083649f9171 (diff)
downloadgo-tangerine-5b0ee8ec304663898073b7a4c659e1def23716df.tar.gz
go-tangerine-5b0ee8ec304663898073b7a4c659e1def23716df.tar.zst
go-tangerine-5b0ee8ec304663898073b7a4c659e1def23716df.zip
core, eth, trie: fix data races and merge/review issues
Diffstat (limited to 'core/blockchain.go')
-rw-r--r--core/blockchain.go71
1 files changed, 42 insertions, 29 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 490552ea0..f14ff363c 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -18,11 +18,13 @@
package core
import (
+ crand "crypto/rand"
"errors"
"fmt"
"io"
+ "math"
"math/big"
- "math/rand"
+ mrand "math/rand"
"runtime"
"sync"
"sync/atomic"
@@ -89,7 +91,8 @@ type BlockChain struct {
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup
- pow pow.PoW
+ pow pow.PoW
+ rand *mrand.Rand
}
func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*BlockChain, error) {
@@ -112,6 +115,12 @@ func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*Bl
futureBlocks: futureBlocks,
pow: pow,
}
+ // Seed a fast but crypto originating random generator
+ seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
+ if err != nil {
+ return nil, err
+ }
+ bc.rand = mrand.New(mrand.NewSource(seed.Int64()))
bc.genesisBlock = bc.GetBlockByNumber(0)
if bc.genesisBlock == nil {
@@ -178,21 +187,21 @@ func (self *BlockChain) loadLastState() error {
fastTd := self.GetTd(self.currentFastBlock.Hash())
glog.V(logger.Info).Infof("Last header: #%d [%x…] TD=%v", self.currentHeader.Number, self.currentHeader.Hash().Bytes()[:4], headerTd)
- glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd)
glog.V(logger.Info).Infof("Last block: #%d [%x…] TD=%v", self.currentBlock.Number(), self.currentBlock.Hash().Bytes()[:4], blockTd)
+ glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd)
return nil
}
-// SetHead rewind the local chain to a new head entity. In the case of headers,
-// everything above the new head will be deleted and the new one set. In the case
-// of blocks though, the head may be further rewound if block bodies are missing
-// (non-archive nodes after a fast sync).
+// SetHead rewinds the local chain to a new head. In the case of headers, everything
+// above the new head will be deleted and the new one set. In the case of blocks
+// though, the head may be further rewound if block bodies are missing (non-archive
+// nodes after a fast sync).
func (bc *BlockChain) SetHead(head uint64) {
bc.mu.Lock()
defer bc.mu.Unlock()
- // Figure out the highest known canonical assignment
+ // Figure out the highest known canonical headers and/or blocks
height := uint64(0)
if bc.currentHeader != nil {
if hh := bc.currentHeader.Number.Uint64(); hh > height {
@@ -266,7 +275,7 @@ func (bc *BlockChain) SetHead(head uint64) {
// FastSyncCommitHead sets the current head block to the one defined by the hash
// irrelevant what the chain contents were prior.
func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error {
- // Make sure that both the block as well at it's state trie exists
+ // Make sure that both the block as well at its state trie exists
block := self.GetBlock(hash)
if block == nil {
return fmt.Errorf("non existent block [%x…]", hash[:4])
@@ -298,7 +307,7 @@ func (self *BlockChain) LastBlockHash() common.Hash {
}
// CurrentHeader retrieves the current head header of the canonical chain. The
-// header is retrieved from the chain manager's internal cache.
+// header is retrieved from the blockchain's internal cache.
func (self *BlockChain) CurrentHeader() *types.Header {
self.mu.RLock()
defer self.mu.RUnlock()
@@ -307,7 +316,7 @@ func (self *BlockChain) CurrentHeader() *types.Header {
}
// CurrentBlock retrieves the current head block of the canonical chain. The
-// block is retrieved from the chain manager's internal cache.
+// block is retrieved from the blockchain's internal cache.
func (self *BlockChain) CurrentBlock() *types.Block {
self.mu.RLock()
defer self.mu.RUnlock()
@@ -316,7 +325,7 @@ func (self *BlockChain) CurrentBlock() *types.Block {
}
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
-// chain. The block is retrieved from the chain manager's internal cache.
+// chain. The block is retrieved from the blockchain's internal cache.
func (self *BlockChain) CurrentFastBlock() *types.Block {
self.mu.RLock()
defer self.mu.RUnlock()
@@ -353,7 +362,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) {
bc.mu.Lock()
defer bc.mu.Unlock()
- // Prepare the genesis block and reinitialize the chain
+ // Prepare the genesis block and reinitialise the chain
if err := WriteTd(bc.chainDb, genesis.Hash(), genesis.Difficulty()); err != nil {
glog.Fatalf("failed to write genesis block TD: %v", err)
}
@@ -403,7 +412,7 @@ func (self *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
// insert injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
// header and the head fast sync block to this very same block to prevent them
-// from diverging on a different header chain.
+// from pointing to a possibly old canonical chain (i.e. side chain by now).
//
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) insert(block *types.Block) {
@@ -625,10 +634,10 @@ const (
// writeHeader writes a header into the local chain, given that its parent is
// already known. If the total difficulty of the newly inserted header becomes
-// greater than the old known TD, the canonical chain is re-routed.
+// greater than the current known TD, the canonical chain is re-routed.
//
// Note: This method is not concurrent-safe with inserting blocks simultaneously
-// into the chain, as side effects caused by reorganizations cannot be emulated
+// into the chain, as side effects caused by reorganisations cannot be emulated
// without the real blocks. Hence, writing headers directly should only be done
// in two scenarios: pure-header mode of operation (light clients), or properly
// separated header/block phases (non-archive clients).
@@ -678,10 +687,9 @@ func (self *BlockChain) writeHeader(header *types.Header) error {
return nil
}
-// InsertHeaderChain will attempt to insert the given header chain in to the
-// local chain, possibly creating a fork. If an error is returned, it will
-// return the index number of the failing header as well an error describing
-// what went wrong.
+// InsertHeaderChain attempts to insert the given header chain in to the local
+// chain, possibly creating a reorg. If an error is returned, it will return the
+// index number of the failing header as well an error describing what went wrong.
//
// The verify parameter can be used to fine tune whether nonce verification
// should be done or not. The reason behind the optional check is because some
@@ -702,7 +710,7 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
// Generate the list of headers that should be POW verified
verify := make([]bool, len(chain))
for i := 0; i < len(verify)/checkFreq; i++ {
- index := i*checkFreq + rand.Intn(checkFreq)
+ index := i*checkFreq + self.rand.Intn(checkFreq)
if index >= len(verify) {
index = len(verify) - 1
}
@@ -766,10 +774,6 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
pending.Wait()
// If anything failed, report
- if atomic.LoadInt32(&self.procInterrupt) == 1 {
- glog.V(logger.Debug).Infoln("premature abort during receipt chain processing")
- return 0, nil
- }
if failed > 0 {
for i, err := range errs {
if err != nil {
@@ -807,6 +811,9 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (self *BlockChain) Rollback(chain []common.Hash) {
+ self.mu.Lock()
+ defer self.mu.Unlock()
+
for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]
@@ -905,6 +912,12 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain
glog.Fatal(errs[index])
return
}
+ if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
+ errs[index] = fmt.Errorf("failed to write log blooms: %v", err)
+ atomic.AddInt32(&failed, 1)
+ glog.Fatal(errs[index])
+ return
+ }
atomic.AddInt32(&stats.processed, 1)
}
}
@@ -920,10 +933,6 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain
pending.Wait()
// If anything failed, report
- if atomic.LoadInt32(&self.procInterrupt) == 1 {
- glog.V(logger.Debug).Infoln("premature abort during receipt chain processing")
- return 0, nil
- }
if failed > 0 {
for i, err := range errs {
if err != nil {
@@ -931,6 +940,10 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain
}
}
}
+ if atomic.LoadInt32(&self.procInterrupt) == 1 {
+ glog.V(logger.Debug).Infoln("premature abort during receipt chain processing")
+ return 0, nil
+ }
// Update the head fast sync block if better
self.mu.Lock()
head := blockChain[len(errs)-1]