aboutsummaryrefslogtreecommitdiffstats
path: root/core/blockchain.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2018-02-06 00:40:32 +0800
committerFelix Lange <fjl@users.noreply.github.com>2018-02-06 00:40:32 +0800
commit55599ee95d4151a2502465e0afc7c47bd1acba77 (patch)
tree4165e73ae852db4f025a5ed57f0bc499e87cb8b9 /core/blockchain.go
parent59336283c0dbeb1d0a74ff7a8b717b2b3bb0cf40 (diff)
downloaddexon-55599ee95d4151a2502465e0afc7c47bd1acba77.tar.gz
dexon-55599ee95d4151a2502465e0afc7c47bd1acba77.tar.zst
dexon-55599ee95d4151a2502465e0afc7c47bd1acba77.zip
core, trie: intermediate mempool between trie and database (#15857)
This commit reduces database I/O by not writing every state trie to disk.
Diffstat (limited to 'core/blockchain.go')
-rw-r--r--core/blockchain.go334
1 files changed, 263 insertions, 71 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index d5e139e31..8d141fddb 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -42,6 +42,7 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/hashicorp/golang-lru"
+ "gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
@@ -56,11 +57,20 @@ const (
maxFutureBlocks = 256
maxTimeFutureBlocks = 30
badBlockLimit = 10
+ triesInMemory = 128
// BlockChainVersion ensures that an incompatible database forces a resync from scratch.
BlockChainVersion = 3
)
+// CacheConfig contains the configuration values for the trie caching/pruning
+// that's resident in a blockchain.
+type CacheConfig struct {
+ Disabled bool // Whether to disable trie write caching (archive node)
+ TrieNodeLimit int // Memory limit (MB) at which to flush the current in-memory trie to disk
+ TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
+}
+
// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
@@ -76,10 +86,14 @@ const (
// included in the canonical one where as GetBlockByNumber always represents the
// canonical chain.
type BlockChain struct {
- config *params.ChainConfig // chain & network configuration
+ chainConfig *params.ChainConfig // Chain & network configuration
+ cacheConfig *CacheConfig // Cache configuration for pruning
+
+ db ethdb.Database // Low level persistent database to store final content in
+ triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
+ gcproc time.Duration // Accumulates canonical block processing for trie dumping
hc *HeaderChain
- chainDb ethdb.Database
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
@@ -119,7 +133,13 @@ type BlockChain struct {
// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialises the default Ethereum Validator and
// Processor.
-func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) {
+func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) {
+ if cacheConfig == nil {
+ cacheConfig = &CacheConfig{
+ TrieNodeLimit: 256 * 1024 * 1024,
+ TrieTimeLimit: 5 * time.Minute,
+ }
+ }
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
blockCache, _ := lru.New(blockCacheLimit)
@@ -127,9 +147,11 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co
badBlocks, _ := lru.New(badBlockLimit)
bc := &BlockChain{
- config: config,
- chainDb: chainDb,
- stateCache: state.NewDatabase(chainDb),
+ chainConfig: chainConfig,
+ cacheConfig: cacheConfig,
+ db: db,
+ triegc: prque.New(),
+ stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
@@ -139,11 +161,11 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co
vmConfig: vmConfig,
badBlocks: badBlocks,
}
- bc.SetValidator(NewBlockValidator(config, bc, engine))
- bc.SetProcessor(NewStateProcessor(config, bc, engine))
+ bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
+ bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
var err error
- bc.hc, err = NewHeaderChain(chainDb, config, engine, bc.getProcInterrupt)
+ bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt)
if err != nil {
return nil, err
}
@@ -180,7 +202,7 @@ func (bc *BlockChain) getProcInterrupt() bool {
// assumes that the chain manager mutex is held.
func (bc *BlockChain) loadLastState() error {
// Restore the last known head block
- head := GetHeadBlockHash(bc.chainDb)
+ head := GetHeadBlockHash(bc.db)
if head == (common.Hash{}) {
// Corrupt or empty database, init from scratch
log.Warn("Empty database, resetting chain")
@@ -196,15 +218,17 @@ func (bc *BlockChain) loadLastState() error {
// Make sure the state associated with the block is available
if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil {
// Dangling block without a state associated, init from scratch
- log.Warn("Head state missing, resetting chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
- return bc.Reset()
+ log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash())
+ if err := bc.repair(&currentBlock); err != nil {
+ return err
+ }
}
// Everything seems to be fine, set as the head block
bc.currentBlock = currentBlock
// Restore the last known head header
currentHeader := bc.currentBlock.Header()
- if head := GetHeadHeaderHash(bc.chainDb); head != (common.Hash{}) {
+ if head := GetHeadHeaderHash(bc.db); head != (common.Hash{}) {
if header := bc.GetHeaderByHash(head); header != nil {
currentHeader = header
}
@@ -213,7 +237,7 @@ func (bc *BlockChain) loadLastState() error {
// Restore the last known head fast block
bc.currentFastBlock = bc.currentBlock
- if head := GetHeadFastBlockHash(bc.chainDb); head != (common.Hash{}) {
+ if head := GetHeadFastBlockHash(bc.db); head != (common.Hash{}) {
if block := bc.GetBlockByHash(head); block != nil {
bc.currentFastBlock = block
}
@@ -243,7 +267,7 @@ func (bc *BlockChain) SetHead(head uint64) error {
// Rewind the header chain, deleting all block bodies until then
delFn := func(hash common.Hash, num uint64) {
- DeleteBody(bc.chainDb, hash, num)
+ DeleteBody(bc.db, hash, num)
}
bc.hc.SetHead(head, delFn)
currentHeader := bc.hc.CurrentHeader()
@@ -275,10 +299,10 @@ func (bc *BlockChain) SetHead(head uint64) error {
if bc.currentFastBlock == nil {
bc.currentFastBlock = bc.genesisBlock
}
- if err := WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash()); err != nil {
+ if err := WriteHeadBlockHash(bc.db, bc.currentBlock.Hash()); err != nil {
log.Crit("Failed to reset head full block", "err", err)
}
- if err := WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash()); err != nil {
+ if err := WriteHeadFastBlockHash(bc.db, bc.currentFastBlock.Hash()); err != nil {
log.Crit("Failed to reset head fast block", "err", err)
}
return bc.loadLastState()
@@ -292,7 +316,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
if block == nil {
return fmt.Errorf("non existent block [%x…]", hash[:4])
}
- if _, err := trie.NewSecure(block.Root(), bc.chainDb, 0); err != nil {
+ if _, err := trie.NewSecure(block.Root(), bc.stateCache.TrieDB(), 0); err != nil {
return err
}
// If all checks out, manually set the head block
@@ -387,7 +411,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
log.Crit("Failed to write genesis block TD", "err", err)
}
- if err := WriteBlock(bc.chainDb, genesis); err != nil {
+ if err := WriteBlock(bc.db, genesis); err != nil {
log.Crit("Failed to write genesis block", "err", err)
}
bc.genesisBlock = genesis
@@ -400,6 +424,24 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
return nil
}
+// repair tries to repair the current blockchain by rolling back the current block
+// until one with associated state is found. This is needed to fix incomplete db
+// writes caused either by crashes/power outages, or simply non-committed tries.
+//
+// This method only rolls back the current block. The current header and current
+// fast block are left intact.
+func (bc *BlockChain) repair(head **types.Block) error {
+ for {
+ // Abort if we've rewound to a head block that does have associated state
+ if _, err := state.New((*head).Root(), bc.stateCache); err == nil {
+ log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash())
+ return nil
+ }
+ // Otherwise rewind one block and recheck state availability there
+ (*head) = bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1)
+ }
+}
+
// Export writes the active chain to the given writer.
func (bc *BlockChain) Export(w io.Writer) error {
return bc.ExportN(w, uint64(0), bc.currentBlock.NumberU64())
@@ -437,13 +479,13 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) insert(block *types.Block) {
// If the block is on a side chain or an unknown one, force other heads onto it too
- updateHeads := GetCanonicalHash(bc.chainDb, block.NumberU64()) != block.Hash()
+ updateHeads := GetCanonicalHash(bc.db, block.NumberU64()) != block.Hash()
// Add the block to the canonical chain number scheme and mark as the head
- if err := WriteCanonicalHash(bc.chainDb, block.Hash(), block.NumberU64()); err != nil {
+ if err := WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64()); err != nil {
log.Crit("Failed to insert block number", "err", err)
}
- if err := WriteHeadBlockHash(bc.chainDb, block.Hash()); err != nil {
+ if err := WriteHeadBlockHash(bc.db, block.Hash()); err != nil {
log.Crit("Failed to insert head block hash", "err", err)
}
bc.currentBlock = block
@@ -452,7 +494,7 @@ func (bc *BlockChain) insert(block *types.Block) {
if updateHeads {
bc.hc.SetCurrentHeader(block.Header())
- if err := WriteHeadFastBlockHash(bc.chainDb, block.Hash()); err != nil {
+ if err := WriteHeadFastBlockHash(bc.db, block.Hash()); err != nil {
log.Crit("Failed to insert head fast block hash", "err", err)
}
bc.currentFastBlock = block
@@ -472,7 +514,7 @@ func (bc *BlockChain) GetBody(hash common.Hash) *types.Body {
body := cached.(*types.Body)
return body
}
- body := GetBody(bc.chainDb, hash, bc.hc.GetBlockNumber(hash))
+ body := GetBody(bc.db, hash, bc.hc.GetBlockNumber(hash))
if body == nil {
return nil
}
@@ -488,7 +530,7 @@ func (bc *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue {
if cached, ok := bc.bodyRLPCache.Get(hash); ok {
return cached.(rlp.RawValue)
}
- body := GetBodyRLP(bc.chainDb, hash, bc.hc.GetBlockNumber(hash))
+ body := GetBodyRLP(bc.db, hash, bc.hc.GetBlockNumber(hash))
if len(body) == 0 {
return nil
}
@@ -502,21 +544,25 @@ func (bc *BlockChain) HasBlock(hash common.Hash, number uint64) bool {
if bc.blockCache.Contains(hash) {
return true
}
- ok, _ := bc.chainDb.Has(blockBodyKey(hash, number))
+ ok, _ := bc.db.Has(blockBodyKey(hash, number))
return ok
}
+// HasState checks if state trie is fully present in the database or not.
+func (bc *BlockChain) HasState(hash common.Hash) bool {
+ _, err := bc.stateCache.OpenTrie(hash)
+ return err == nil
+}
+
// HasBlockAndState checks if a block and associated state trie is fully present
// in the database or not, caching it if present.
-func (bc *BlockChain) HasBlockAndState(hash common.Hash) bool {
+func (bc *BlockChain) HasBlockAndState(hash common.Hash, number uint64) bool {
// Check first that the block itself is known
- block := bc.GetBlockByHash(hash)
+ block := bc.GetBlock(hash, number)
if block == nil {
return false
}
- // Ensure the associated state is also present
- _, err := bc.stateCache.OpenTrie(block.Root())
- return err == nil
+ return bc.HasState(block.Root())
}
// GetBlock retrieves a block from the database by hash and number,
@@ -526,7 +572,7 @@ func (bc *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
if block, ok := bc.blockCache.Get(hash); ok {
return block.(*types.Block)
}
- block := GetBlock(bc.chainDb, hash, number)
+ block := GetBlock(bc.db, hash, number)
if block == nil {
return nil
}
@@ -543,13 +589,18 @@ func (bc *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {
// GetBlockByNumber retrieves a block from the database by number, caching it
// (associated with its hash) if found.
func (bc *BlockChain) GetBlockByNumber(number uint64) *types.Block {
- hash := GetCanonicalHash(bc.chainDb, number)
+ hash := GetCanonicalHash(bc.db, number)
if hash == (common.Hash{}) {
return nil
}
return bc.GetBlock(hash, number)
}
+// GetReceiptsByHash retrieves the receipts for all transactions in a given block.
+func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
+ return GetBlockReceipts(bc.db, hash, GetBlockNumber(bc.db, hash))
+}
+
// GetBlocksFromHash returns the block corresponding to hash and up to n-1 ancestors.
// [deprecated by eth/62]
func (bc *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*types.Block) {
@@ -577,6 +628,12 @@ func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.
return uncles
}
+// TrieNode retrieves a blob of data associated with a trie node (or code hash)
+// either from ephemeral in-memory cache, or from persistent storage.
+func (bc *BlockChain) TrieNode(hash common.Hash) ([]byte, error) {
+ return bc.stateCache.TrieDB().Node(hash)
+}
+
// Stop stops the blockchain service. If any imports are currently in progress
// it will abort them using the procInterrupt.
func (bc *BlockChain) Stop() {
@@ -589,6 +646,33 @@ func (bc *BlockChain) Stop() {
atomic.StoreInt32(&bc.procInterrupt, 1)
bc.wg.Wait()
+
+ // Ensure the state of a recent block is also stored to disk before exiting.
+ // It is fine if this state does not exist (fast start/stop cycle), but it is
+ // advisable to leave an N block gap from the head so 1) a restart loads up
+ // the last N blocks as sync assistance to remote nodes; 2) a restart during
+ // a (small) reorg doesn't require deep reprocesses; 3) chain "repair" from
+ // missing states are constantly tested.
+ //
+ // This may be tuned a bit on mainnet if its too annoying to reprocess the last
+ // N blocks.
+ if !bc.cacheConfig.Disabled {
+ triedb := bc.stateCache.TrieDB()
+ if number := bc.CurrentBlock().NumberU64(); number >= triesInMemory {
+ recent := bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - triesInMemory + 1)
+
+ log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root())
+ if err := triedb.Commit(recent.Root(), true); err != nil {
+ log.Error("Failed to commit recent state trie", "err", err)
+ }
+ }
+ for !bc.triegc.Empty() {
+ triedb.Dereference(bc.triegc.PopItem().(common.Hash), common.Hash{})
+ }
+ if size := triedb.Size(); size != 0 {
+ log.Error("Dangling trie nodes after full cleanup")
+ }
+ }
log.Info("Blockchain manager stopped")
}
@@ -633,11 +717,11 @@ func (bc *BlockChain) Rollback(chain []common.Hash) {
}
if bc.currentFastBlock.Hash() == hash {
bc.currentFastBlock = bc.GetBlock(bc.currentFastBlock.ParentHash(), bc.currentFastBlock.NumberU64()-1)
- WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash())
+ WriteHeadFastBlockHash(bc.db, bc.currentFastBlock.Hash())
}
if bc.currentBlock.Hash() == hash {
bc.currentBlock = bc.GetBlock(bc.currentBlock.ParentHash(), bc.currentBlock.NumberU64()-1)
- WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash())
+ WriteHeadBlockHash(bc.db, bc.currentBlock.Hash())
}
}
}
@@ -696,7 +780,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
stats = struct{ processed, ignored int32 }{}
start = time.Now()
bytes = 0
- batch = bc.chainDb.NewBatch()
+ batch = bc.db.NewBatch()
)
for i, block := range blockChain {
receipts := receiptChain[i]
@@ -714,7 +798,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
continue
}
// Compute all the non-consensus fields of the receipts
- SetReceiptsData(bc.config, block, receipts)
+ SetReceiptsData(bc.chainConfig, block, receipts)
// Write all the data out into the database
if err := WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()); err != nil {
return i, fmt.Errorf("failed to write block body: %v", err)
@@ -747,7 +831,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
head := blockChain[len(blockChain)-1]
if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case
if bc.GetTd(bc.currentFastBlock.Hash(), bc.currentFastBlock.NumberU64()).Cmp(td) < 0 {
- if err := WriteHeadFastBlockHash(bc.chainDb, head.Hash()); err != nil {
+ if err := WriteHeadFastBlockHash(bc.db, head.Hash()); err != nil {
log.Crit("Failed to update head fast block hash", "err", err)
}
bc.currentFastBlock = head
@@ -758,15 +842,33 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
log.Info("Imported new block receipts",
"count", stats.processed,
"elapsed", common.PrettyDuration(time.Since(start)),
- "bytes", bytes,
"number", head.Number(),
"hash", head.Hash(),
+ "size", common.StorageSize(bytes),
"ignored", stats.ignored)
return 0, nil
}
-// WriteBlock writes the block to the chain.
-func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
+var lastWrite uint64
+
+// WriteBlockWithoutState writes only the block and its metadata to the database,
+// but does not write any state. This is used to construct competing side forks
+// up to the point where they exceed the canonical total difficulty.
+func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) {
+ bc.wg.Add(1)
+ defer bc.wg.Done()
+
+ if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil {
+ return err
+ }
+ if err := WriteBlock(bc.db, block); err != nil {
+ return err
+ }
+ return nil
+}
+
+// WriteBlockWithState writes the block and all associated state to the database.
+func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
bc.wg.Add(1)
defer bc.wg.Done()
@@ -787,17 +889,73 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R
return NonStatTy, err
}
// Write other block data using a batch.
- batch := bc.chainDb.NewBatch()
+ batch := bc.db.NewBatch()
if err := WriteBlock(batch, block); err != nil {
return NonStatTy, err
}
- if _, err := state.CommitTo(batch, bc.config.IsEIP158(block.Number())); err != nil {
+ root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
+ if err != nil {
return NonStatTy, err
}
+ triedb := bc.stateCache.TrieDB()
+
+ // If we're running an archive node, always flush
+ if bc.cacheConfig.Disabled {
+ if err := triedb.Commit(root, false); err != nil {
+ return NonStatTy, err
+ }
+ } else {
+ // Full but not archive node, do proper garbage collection
+ triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
+ bc.triegc.Push(root, -float32(block.NumberU64()))
+
+ if current := block.NumberU64(); current > triesInMemory {
+ // Find the next state trie we need to commit
+ header := bc.GetHeaderByNumber(current - triesInMemory)
+ chosen := header.Number.Uint64()
+
+ // Only write to disk if we exceeded our memory allowance *and* also have at
+ // least a given number of tries gapped.
+ var (
+ size = triedb.Size()
+ limit = common.StorageSize(bc.cacheConfig.TrieNodeLimit) * 1024 * 1024
+ )
+ if size > limit || bc.gcproc > bc.cacheConfig.TrieTimeLimit {
+ // If we're exceeding limits but haven't reached a large enough memory gap,
+ // warn the user that the system is becoming unstable.
+ if chosen < lastWrite+triesInMemory {
+ switch {
+ case size >= 2*limit:
+ log.Error("Trie memory critical, forcing to disk", "size", size, "limit", limit, "optimum", float64(chosen-lastWrite)/triesInMemory)
+ case bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit:
+ log.Error("Trie timing critical, forcing to disk", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/triesInMemory)
+ case size > limit:
+ log.Warn("Trie memory at dangerous levels", "size", size, "limit", limit, "optimum", float64(chosen-lastWrite)/triesInMemory)
+ case bc.gcproc > bc.cacheConfig.TrieTimeLimit:
+ log.Warn("Trie timing at dangerous levels", "time", bc.gcproc, "limit", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/triesInMemory)
+ }
+ }
+ // If optimum or critical limits reached, write to disk
+ if chosen >= lastWrite+triesInMemory || size >= 2*limit || bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit {
+ triedb.Commit(header.Root, true)
+ lastWrite = chosen
+ bc.gcproc = 0
+ }
+ }
+ // Garbage collect anything below our required write retention
+ for !bc.triegc.Empty() {
+ root, number := bc.triegc.Pop()
+ if uint64(-number) > chosen {
+ bc.triegc.Push(root, number)
+ break
+ }
+ triedb.Dereference(root.(common.Hash), common.Hash{})
+ }
+ }
+ }
if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil {
return NonStatTy, err
}
-
// If the total difficulty is higher than our known, add it to the canonical chain
// Second clause in the if statement reduces the vulnerability to selfish mining.
// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
@@ -818,7 +976,7 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R
return NonStatTy, err
}
// Write hash preimages
- if err := WritePreimages(bc.chainDb, block.NumberU64(), state.Preimages()); err != nil {
+ if err := WritePreimages(bc.db, block.NumberU64(), state.Preimages()); err != nil {
return NonStatTy, err
}
status = CanonStatTy
@@ -910,31 +1068,60 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
if err == nil {
err = bc.Validator().ValidateBody(block)
}
- if err != nil {
- if err == ErrKnownBlock {
- stats.ignored++
- continue
+ switch {
+ case err == ErrKnownBlock:
+ stats.ignored++
+ continue
+
+ case err == consensus.ErrFutureBlock:
+ // Allow up to MaxFuture second in the future blocks. If this limit is exceeded
+ // the chain is discarded and processed at a later time if given.
+ max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
+ if block.Time().Cmp(max) > 0 {
+ return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max)
}
+ bc.futureBlocks.Add(block.Hash(), block)
+ stats.queued++
+ continue
+
+ case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()):
+ bc.futureBlocks.Add(block.Hash(), block)
+ stats.queued++
+ continue
- if err == consensus.ErrFutureBlock {
- // Allow up to MaxFuture second in the future blocks. If this limit
- // is exceeded the chain is discarded and processed at a later time
- // if given.
- max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
- if block.Time().Cmp(max) > 0 {
- return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max)
+ case err == consensus.ErrPrunedAncestor:
+ // Block competing with the canonical chain, store in the db, but don't process
+ // until the competitor TD goes above the canonical TD
+ localTd := bc.GetTd(bc.currentBlock.Hash(), bc.currentBlock.NumberU64())
+ externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty())
+ if localTd.Cmp(externTd) > 0 {
+ if err = bc.WriteBlockWithoutState(block, externTd); err != nil {
+ return i, events, coalescedLogs, err
}
- bc.futureBlocks.Add(block.Hash(), block)
- stats.queued++
continue
}
+ // Competitor chain beat canonical, gather all blocks from the common ancestor
+ var winner []*types.Block
- if err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()) {
- bc.futureBlocks.Add(block.Hash(), block)
- stats.queued++
- continue
+ parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
+ for !bc.HasState(parent.Root()) {
+ winner = append(winner, parent)
+ parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)
+ }
+ for j := 0; j < len(winner)/2; j++ {
+ winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j]
+ }
+ // Import all the pruned blocks to make the state available
+ bc.chainmu.Unlock()
+ _, evs, logs, err := bc.insertChain(winner)
+ bc.chainmu.Lock()
+ events, coalescedLogs = evs, logs
+
+ if err != nil {
+ return i, events, coalescedLogs, err
}
+ case err != nil:
bc.reportBlock(block, nil, err)
return i, events, coalescedLogs, err
}
@@ -962,8 +1149,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
bc.reportBlock(block, receipts, err)
return i, events, coalescedLogs, err
}
+ proctime := time.Since(bstart)
+
// Write the block to the chain and get the status.
- status, err := bc.WriteBlockAndState(block, receipts, state)
+ status, err := bc.WriteBlockWithState(block, receipts, state)
if err != nil {
return i, events, coalescedLogs, err
}
@@ -977,6 +1166,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
events = append(events, ChainEvent{block, block.Hash(), logs})
lastCanon = block
+ // Only count canonical blocks for GC processing time
+ bc.gcproc += proctime
+
case SideStatTy:
log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))
@@ -986,7 +1178,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
}
stats.processed++
stats.usedGas += usedGas
- stats.report(chain, i)
+ stats.report(chain, i, bc.stateCache.TrieDB().Size())
}
// Append a single chain head event if we've progressed the chain
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
@@ -1009,7 +1201,7 @@ const statsReportLimit = 8 * time.Second
// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
-func (st *insertStats) report(chain []*types.Block, index int) {
+func (st *insertStats) report(chain []*types.Block, index int, cache common.StorageSize) {
// Fetch the timings for the batch
var (
now = mclock.Now()
@@ -1024,7 +1216,7 @@ func (st *insertStats) report(chain []*types.Block, index int) {
context := []interface{}{
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
- "number", end.Number(), "hash", end.Hash(),
+ "number", end.Number(), "hash", end.Hash(), "cache", cache,
}
if st.queued > 0 {
context = append(context, []interface{}{"queued", st.queued}...)
@@ -1060,7 +1252,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// These logs are later announced as deleted.
collectLogs = func(h common.Hash) {
// Coalesce logs and set 'Removed'.
- receipts := GetBlockReceipts(bc.chainDb, h, bc.hc.GetBlockNumber(h))
+ receipts := GetBlockReceipts(bc.db, h, bc.hc.GetBlockNumber(h))
for _, receipt := range receipts {
for _, log := range receipt.Logs {
del := *log
@@ -1129,7 +1321,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// insert the block in the canonical way, re-writing history
bc.insert(newChain[i])
// write lookup entries for hash based transaction/receipt searches
- if err := WriteTxLookupEntries(bc.chainDb, newChain[i]); err != nil {
+ if err := WriteTxLookupEntries(bc.db, newChain[i]); err != nil {
return err
}
addedTxs = append(addedTxs, newChain[i].Transactions()...)
@@ -1139,7 +1331,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// When transactions get deleted from the database that means the
// receipts that were created in the fork must also be deleted
for _, tx := range diff {
- DeleteTxLookupEntry(bc.chainDb, tx.Hash())
+ DeleteTxLookupEntry(bc.db, tx.Hash())
}
if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
@@ -1231,7 +1423,7 @@ Hash: 0x%x
Error: %v
##############################
-`, bc.config, block.Number(), block.Hash(), receiptString, err))
+`, bc.chainConfig, block.Number(), block.Hash(), receiptString, err))
}
// InsertHeaderChain attempts to insert the given header chain in to the local
@@ -1338,7 +1530,7 @@ func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
}
// Config retrieves the blockchain's chain configuration.
-func (bc *BlockChain) Config() *params.ChainConfig { return bc.config }
+func (bc *BlockChain) Config() *params.ChainConfig { return bc.chainConfig }
// Engine retrieves the blockchain's consensus engine.
func (bc *BlockChain) Engine() consensus.Engine { return bc.engine }