diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-10-13 17:04:25 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-10-21 21:49:55 +0800 |
commit | 5b0ee8ec304663898073b7a4c659e1def23716df (patch) | |
tree | 8f2f49a8d26dc1c29e1d360fb787ab420d90a2ae /core/blockchain.go | |
parent | aa0538db0b5de2bb2c609d629b65d083649f9171 (diff) | |
download | go-tangerine-5b0ee8ec304663898073b7a4c659e1def23716df.tar.gz go-tangerine-5b0ee8ec304663898073b7a4c659e1def23716df.tar.zst go-tangerine-5b0ee8ec304663898073b7a4c659e1def23716df.zip |
core, eth, trie: fix data races and merge/review issues
Diffstat (limited to 'core/blockchain.go')
-rw-r--r-- | core/blockchain.go | 71 |
1 files changed, 42 insertions, 29 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 490552ea0..f14ff363c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -18,11 +18,13 @@ package core import ( + crand "crypto/rand" "errors" "fmt" "io" + "math" "math/big" - "math/rand" + mrand "math/rand" "runtime" "sync" "sync/atomic" @@ -89,7 +91,8 @@ type BlockChain struct { procInterrupt int32 // interrupt signaler for block processing wg sync.WaitGroup - pow pow.PoW + pow pow.PoW + rand *mrand.Rand } func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*BlockChain, error) { @@ -112,6 +115,12 @@ func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*Bl futureBlocks: futureBlocks, pow: pow, } + // Seed a fast but crypto originating random generator + seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64)) + if err != nil { + return nil, err + } + bc.rand = mrand.New(mrand.NewSource(seed.Int64())) bc.genesisBlock = bc.GetBlockByNumber(0) if bc.genesisBlock == nil { @@ -178,21 +187,21 @@ func (self *BlockChain) loadLastState() error { fastTd := self.GetTd(self.currentFastBlock.Hash()) glog.V(logger.Info).Infof("Last header: #%d [%x…] TD=%v", self.currentHeader.Number, self.currentHeader.Hash().Bytes()[:4], headerTd) - glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd) glog.V(logger.Info).Infof("Last block: #%d [%x…] TD=%v", self.currentBlock.Number(), self.currentBlock.Hash().Bytes()[:4], blockTd) + glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd) return nil } -// SetHead rewind the local chain to a new head entity. In the case of headers, -// everything above the new head will be deleted and the new one set. In the case -// of blocks though, the head may be further rewound if block bodies are missing -// (non-archive nodes after a fast sync). +// SetHead rewinds the local chain to a new head. In the case of headers, everything +// above the new head will be deleted and the new one set. In the case of blocks +// though, the head may be further rewound if block bodies are missing (non-archive +// nodes after a fast sync). func (bc *BlockChain) SetHead(head uint64) { bc.mu.Lock() defer bc.mu.Unlock() - // Figure out the highest known canonical assignment + // Figure out the highest known canonical headers and/or blocks height := uint64(0) if bc.currentHeader != nil { if hh := bc.currentHeader.Number.Uint64(); hh > height { @@ -266,7 +275,7 @@ func (bc *BlockChain) SetHead(head uint64) { // FastSyncCommitHead sets the current head block to the one defined by the hash // irrelevant what the chain contents were prior. func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error { - // Make sure that both the block as well at it's state trie exists + // Make sure that both the block as well at its state trie exists block := self.GetBlock(hash) if block == nil { return fmt.Errorf("non existent block [%x…]", hash[:4]) @@ -298,7 +307,7 @@ func (self *BlockChain) LastBlockHash() common.Hash { } // CurrentHeader retrieves the current head header of the canonical chain. The -// header is retrieved from the chain manager's internal cache. +// header is retrieved from the blockchain's internal cache. func (self *BlockChain) CurrentHeader() *types.Header { self.mu.RLock() defer self.mu.RUnlock() @@ -307,7 +316,7 @@ func (self *BlockChain) CurrentHeader() *types.Header { } // CurrentBlock retrieves the current head block of the canonical chain. The -// block is retrieved from the chain manager's internal cache. +// block is retrieved from the blockchain's internal cache. func (self *BlockChain) CurrentBlock() *types.Block { self.mu.RLock() defer self.mu.RUnlock() @@ -316,7 +325,7 @@ func (self *BlockChain) CurrentBlock() *types.Block { } // CurrentFastBlock retrieves the current fast-sync head block of the canonical -// chain. The block is retrieved from the chain manager's internal cache. +// chain. The block is retrieved from the blockchain's internal cache. func (self *BlockChain) CurrentFastBlock() *types.Block { self.mu.RLock() defer self.mu.RUnlock() @@ -353,7 +362,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) { bc.mu.Lock() defer bc.mu.Unlock() - // Prepare the genesis block and reinitialize the chain + // Prepare the genesis block and reinitialise the chain if err := WriteTd(bc.chainDb, genesis.Hash(), genesis.Difficulty()); err != nil { glog.Fatalf("failed to write genesis block TD: %v", err) } @@ -403,7 +412,7 @@ func (self *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { // insert injects a new head block into the current block chain. This method // assumes that the block is indeed a true head. It will also reset the head // header and the head fast sync block to this very same block to prevent them -// from diverging on a different header chain. +// from pointing to a possibly old canonical chain (i.e. side chain by now). // // Note, this function assumes that the `mu` mutex is held! func (bc *BlockChain) insert(block *types.Block) { @@ -625,10 +634,10 @@ const ( // writeHeader writes a header into the local chain, given that its parent is // already known. If the total difficulty of the newly inserted header becomes -// greater than the old known TD, the canonical chain is re-routed. +// greater than the current known TD, the canonical chain is re-routed. // // Note: This method is not concurrent-safe with inserting blocks simultaneously -// into the chain, as side effects caused by reorganizations cannot be emulated +// into the chain, as side effects caused by reorganisations cannot be emulated // without the real blocks. Hence, writing headers directly should only be done // in two scenarios: pure-header mode of operation (light clients), or properly // separated header/block phases (non-archive clients). @@ -678,10 +687,9 @@ func (self *BlockChain) writeHeader(header *types.Header) error { return nil } -// InsertHeaderChain will attempt to insert the given header chain in to the -// local chain, possibly creating a fork. If an error is returned, it will -// return the index number of the failing header as well an error describing -// what went wrong. +// InsertHeaderChain attempts to insert the given header chain in to the local +// chain, possibly creating a reorg. If an error is returned, it will return the +// index number of the failing header as well an error describing what went wrong. // // The verify parameter can be used to fine tune whether nonce verification // should be done or not. The reason behind the optional check is because some @@ -702,7 +710,7 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) // Generate the list of headers that should be POW verified verify := make([]bool, len(chain)) for i := 0; i < len(verify)/checkFreq; i++ { - index := i*checkFreq + rand.Intn(checkFreq) + index := i*checkFreq + self.rand.Intn(checkFreq) if index >= len(verify) { index = len(verify) - 1 } @@ -766,10 +774,6 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) pending.Wait() // If anything failed, report - if atomic.LoadInt32(&self.procInterrupt) == 1 { - glog.V(logger.Debug).Infoln("premature abort during receipt chain processing") - return 0, nil - } if failed > 0 { for i, err := range errs { if err != nil { @@ -807,6 +811,9 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) // Rollback is designed to remove a chain of links from the database that aren't // certain enough to be valid. func (self *BlockChain) Rollback(chain []common.Hash) { + self.mu.Lock() + defer self.mu.Unlock() + for i := len(chain) - 1; i >= 0; i-- { hash := chain[i] @@ -905,6 +912,12 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain glog.Fatal(errs[index]) return } + if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil { + errs[index] = fmt.Errorf("failed to write log blooms: %v", err) + atomic.AddInt32(&failed, 1) + glog.Fatal(errs[index]) + return + } atomic.AddInt32(&stats.processed, 1) } } @@ -920,10 +933,6 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain pending.Wait() // If anything failed, report - if atomic.LoadInt32(&self.procInterrupt) == 1 { - glog.V(logger.Debug).Infoln("premature abort during receipt chain processing") - return 0, nil - } if failed > 0 { for i, err := range errs { if err != nil { @@ -931,6 +940,10 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain } } } + if atomic.LoadInt32(&self.procInterrupt) == 1 { + glog.V(logger.Debug).Infoln("premature abort during receipt chain processing") + return 0, nil + } // Update the head fast sync block if better self.mu.Lock() head := blockChain[len(errs)-1] |