diff options
Diffstat (limited to 'core/blockchain.go')
-rw-r--r-- | core/blockchain.go | 375 |
1 files changed, 275 insertions, 100 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 325753c7a..4ae0e4f4e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -42,6 +42,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" "github.com/hashicorp/golang-lru" + "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) var ( @@ -56,11 +57,20 @@ const ( maxFutureBlocks = 256 maxTimeFutureBlocks = 30 badBlockLimit = 10 + triesInMemory = 128 // BlockChainVersion ensures that an incompatible database forces a resync from scratch. BlockChainVersion = 3 ) +// CacheConfig contains the configuration values for the trie caching/pruning +// that's resident in a blockchain. +type CacheConfig struct { + Disabled bool // Whether to disable trie write caching (archive node) + TrieNodeLimit int // Memory limit (MB) at which to flush the current in-memory trie to disk + TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk +} + // BlockChain represents the canonical chain given a database with a genesis // block. The Blockchain manages chain imports, reverts, chain reorganisations. // @@ -76,10 +86,14 @@ const ( // included in the canonical one where as GetBlockByNumber always represents the // canonical chain. type BlockChain struct { - config *params.ChainConfig // chain & network configuration + chainConfig *params.ChainConfig // Chain & network configuration + cacheConfig *CacheConfig // Cache configuration for pruning + + db ethdb.Database // Low level persistent database to store final content in + triegc *prque.Prque // Priority queue mapping block numbers to tries to gc + gcproc time.Duration // Accumulates canonical block processing for trie dumping hc *HeaderChain - chainDb ethdb.Database rmLogsFeed event.Feed chainFeed event.Feed chainSideFeed event.Feed @@ -119,7 +133,13 @@ type BlockChain struct { // NewBlockChain returns a fully initialised block chain using information // available in the database. It initialises the default Ethereum Validator and // Processor. -func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) { +func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *params.ChainConfig, engine consensus.Engine, vmConfig vm.Config) (*BlockChain, error) { + if cacheConfig == nil { + cacheConfig = &CacheConfig{ + TrieNodeLimit: 256 * 1024 * 1024, + TrieTimeLimit: 5 * time.Minute, + } + } bodyCache, _ := lru.New(bodyCacheLimit) bodyRLPCache, _ := lru.New(bodyCacheLimit) blockCache, _ := lru.New(blockCacheLimit) @@ -127,9 +147,11 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co badBlocks, _ := lru.New(badBlockLimit) bc := &BlockChain{ - config: config, - chainDb: chainDb, - stateCache: state.NewDatabase(chainDb), + chainConfig: chainConfig, + cacheConfig: cacheConfig, + db: db, + triegc: prque.New(), + stateCache: state.NewDatabase(db), quit: make(chan struct{}), bodyCache: bodyCache, bodyRLPCache: bodyRLPCache, @@ -139,11 +161,11 @@ func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, engine co vmConfig: vmConfig, badBlocks: badBlocks, } - bc.SetValidator(NewBlockValidator(config, bc, engine)) - bc.SetProcessor(NewStateProcessor(config, bc, engine)) + bc.SetValidator(NewBlockValidator(chainConfig, bc, engine)) + bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine)) var err error - bc.hc, err = NewHeaderChain(chainDb, config, engine, bc.getProcInterrupt) + bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.getProcInterrupt) if err != nil { return nil, err } @@ -180,7 +202,7 @@ func (bc *BlockChain) getProcInterrupt() bool { // assumes that the chain manager mutex is held. func (bc *BlockChain) loadLastState() error { // Restore the last known head block - head := GetHeadBlockHash(bc.chainDb) + head := GetHeadBlockHash(bc.db) if head == (common.Hash{}) { // Corrupt or empty database, init from scratch log.Warn("Empty database, resetting chain") @@ -196,15 +218,17 @@ func (bc *BlockChain) loadLastState() error { // Make sure the state associated with the block is available if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil { // Dangling block without a state associated, init from scratch - log.Warn("Head state missing, resetting chain", "number", currentBlock.Number(), "hash", currentBlock.Hash()) - return bc.Reset() + log.Warn("Head state missing, repairing chain", "number", currentBlock.Number(), "hash", currentBlock.Hash()) + if err := bc.repair(¤tBlock); err != nil { + return err + } } // Everything seems to be fine, set as the head block bc.currentBlock = currentBlock // Restore the last known head header currentHeader := bc.currentBlock.Header() - if head := GetHeadHeaderHash(bc.chainDb); head != (common.Hash{}) { + if head := GetHeadHeaderHash(bc.db); head != (common.Hash{}) { if header := bc.GetHeaderByHash(head); header != nil { currentHeader = header } @@ -213,7 +237,7 @@ func (bc *BlockChain) loadLastState() error { // Restore the last known head fast block bc.currentFastBlock = bc.currentBlock - if head := GetHeadFastBlockHash(bc.chainDb); head != (common.Hash{}) { + if head := GetHeadFastBlockHash(bc.db); head != (common.Hash{}) { if block := bc.GetBlockByHash(head); block != nil { bc.currentFastBlock = block } @@ -243,7 +267,7 @@ func (bc *BlockChain) SetHead(head uint64) error { // Rewind the header chain, deleting all block bodies until then delFn := func(hash common.Hash, num uint64) { - DeleteBody(bc.chainDb, hash, num) + DeleteBody(bc.db, hash, num) } bc.hc.SetHead(head, delFn) currentHeader := bc.hc.CurrentHeader() @@ -275,10 +299,10 @@ func (bc *BlockChain) SetHead(head uint64) error { if bc.currentFastBlock == nil { bc.currentFastBlock = bc.genesisBlock } - if err := WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash()); err != nil { + if err := WriteHeadBlockHash(bc.db, bc.currentBlock.Hash()); err != nil { log.Crit("Failed to reset head full block", "err", err) } - if err := WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash()); err != nil { + if err := WriteHeadFastBlockHash(bc.db, bc.currentFastBlock.Hash()); err != nil { log.Crit("Failed to reset head fast block", "err", err) } return bc.loadLastState() @@ -292,7 +316,7 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { if block == nil { return fmt.Errorf("non existent block [%x…]", hash[:4]) } - if _, err := trie.NewSecure(block.Root(), bc.chainDb, 0); err != nil { + if _, err := trie.NewSecure(block.Root(), bc.stateCache.TrieDB(), 0); err != nil { return err } // If all checks out, manually set the head block @@ -305,21 +329,13 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { } // GasLimit returns the gas limit of the current HEAD block. -func (bc *BlockChain) GasLimit() *big.Int { +func (bc *BlockChain) GasLimit() uint64 { bc.mu.RLock() defer bc.mu.RUnlock() return bc.currentBlock.GasLimit() } -// LastBlockHash return the hash of the HEAD block. -func (bc *BlockChain) LastBlockHash() common.Hash { - bc.mu.RLock() - defer bc.mu.RUnlock() - - return bc.currentBlock.Hash() -} - // CurrentBlock retrieves the current head block of the canonical chain. The // block is retrieved from the blockchain's internal cache. func (bc *BlockChain) CurrentBlock() *types.Block { @@ -338,15 +354,6 @@ func (bc *BlockChain) CurrentFastBlock() *types.Block { return bc.currentFastBlock } -// Status returns status information about the current chain such as the HEAD Td, -// the HEAD hash and the hash of the genesis block. -func (bc *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) { - bc.mu.RLock() - defer bc.mu.RUnlock() - - return bc.GetTd(bc.currentBlock.Hash(), bc.currentBlock.NumberU64()), bc.currentBlock.Hash(), bc.genesisBlock.Hash() -} - // SetProcessor sets the processor required for making state modifications. func (bc *BlockChain) SetProcessor(processor Processor) { bc.procmu.Lock() @@ -404,7 +411,7 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil { log.Crit("Failed to write genesis block TD", "err", err) } - if err := WriteBlock(bc.chainDb, genesis); err != nil { + if err := WriteBlock(bc.db, genesis); err != nil { log.Crit("Failed to write genesis block", "err", err) } bc.genesisBlock = genesis @@ -417,6 +424,24 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { return nil } +// repair tries to repair the current blockchain by rolling back the current block +// until one with associated state is found. This is needed to fix incomplete db +// writes caused either by crashes/power outages, or simply non-committed tries. +// +// This method only rolls back the current block. The current header and current +// fast block are left intact. +func (bc *BlockChain) repair(head **types.Block) error { + for { + // Abort if we've rewound to a head block that does have associated state + if _, err := state.New((*head).Root(), bc.stateCache); err == nil { + log.Info("Rewound blockchain to past state", "number", (*head).Number(), "hash", (*head).Hash()) + return nil + } + // Otherwise rewind one block and recheck state availability there + (*head) = bc.GetBlock((*head).ParentHash(), (*head).NumberU64()-1) + } +} + // Export writes the active chain to the given writer. func (bc *BlockChain) Export(w io.Writer) error { return bc.ExportN(w, uint64(0), bc.currentBlock.NumberU64()) @@ -454,22 +479,22 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { // Note, this function assumes that the `mu` mutex is held! func (bc *BlockChain) insert(block *types.Block) { // If the block is on a side chain or an unknown one, force other heads onto it too - updateHeads := GetCanonicalHash(bc.chainDb, block.NumberU64()) != block.Hash() + updateHeads := GetCanonicalHash(bc.db, block.NumberU64()) != block.Hash() // Add the block to the canonical chain number scheme and mark as the head - if err := WriteCanonicalHash(bc.chainDb, block.Hash(), block.NumberU64()); err != nil { + if err := WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64()); err != nil { log.Crit("Failed to insert block number", "err", err) } - if err := WriteHeadBlockHash(bc.chainDb, block.Hash()); err != nil { + if err := WriteHeadBlockHash(bc.db, block.Hash()); err != nil { log.Crit("Failed to insert head block hash", "err", err) } bc.currentBlock = block - // If the block is better than out head or is on a different chain, force update heads + // If the block is better than our head or is on a different chain, force update heads if updateHeads { bc.hc.SetCurrentHeader(block.Header()) - if err := WriteHeadFastBlockHash(bc.chainDb, block.Hash()); err != nil { + if err := WriteHeadFastBlockHash(bc.db, block.Hash()); err != nil { log.Crit("Failed to insert head fast block hash", "err", err) } bc.currentFastBlock = block @@ -489,7 +514,7 @@ func (bc *BlockChain) GetBody(hash common.Hash) *types.Body { body := cached.(*types.Body) return body } - body := GetBody(bc.chainDb, hash, bc.hc.GetBlockNumber(hash)) + body := GetBody(bc.db, hash, bc.hc.GetBlockNumber(hash)) if body == nil { return nil } @@ -505,7 +530,7 @@ func (bc *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue { if cached, ok := bc.bodyRLPCache.Get(hash); ok { return cached.(rlp.RawValue) } - body := GetBodyRLP(bc.chainDb, hash, bc.hc.GetBlockNumber(hash)) + body := GetBodyRLP(bc.db, hash, bc.hc.GetBlockNumber(hash)) if len(body) == 0 { return nil } @@ -519,21 +544,25 @@ func (bc *BlockChain) HasBlock(hash common.Hash, number uint64) bool { if bc.blockCache.Contains(hash) { return true } - ok, _ := bc.chainDb.Has(blockBodyKey(hash, number)) + ok, _ := bc.db.Has(blockBodyKey(hash, number)) return ok } +// HasState checks if state trie is fully present in the database or not. +func (bc *BlockChain) HasState(hash common.Hash) bool { + _, err := bc.stateCache.OpenTrie(hash) + return err == nil +} + // HasBlockAndState checks if a block and associated state trie is fully present // in the database or not, caching it if present. -func (bc *BlockChain) HasBlockAndState(hash common.Hash) bool { +func (bc *BlockChain) HasBlockAndState(hash common.Hash, number uint64) bool { // Check first that the block itself is known - block := bc.GetBlockByHash(hash) + block := bc.GetBlock(hash, number) if block == nil { return false } - // Ensure the associated state is also present - _, err := bc.stateCache.OpenTrie(block.Root()) - return err == nil + return bc.HasState(block.Root()) } // GetBlock retrieves a block from the database by hash and number, @@ -543,7 +572,7 @@ func (bc *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block { if block, ok := bc.blockCache.Get(hash); ok { return block.(*types.Block) } - block := GetBlock(bc.chainDb, hash, number) + block := GetBlock(bc.db, hash, number) if block == nil { return nil } @@ -560,13 +589,18 @@ func (bc *BlockChain) GetBlockByHash(hash common.Hash) *types.Block { // GetBlockByNumber retrieves a block from the database by number, caching it // (associated with its hash) if found. func (bc *BlockChain) GetBlockByNumber(number uint64) *types.Block { - hash := GetCanonicalHash(bc.chainDb, number) + hash := GetCanonicalHash(bc.db, number) if hash == (common.Hash{}) { return nil } return bc.GetBlock(hash, number) } +// GetReceiptsByHash retrieves the receipts for all transactions in a given block. +func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts { + return GetBlockReceipts(bc.db, hash, GetBlockNumber(bc.db, hash)) +} + // GetBlocksFromHash returns the block corresponding to hash and up to n-1 ancestors. // [deprecated by eth/62] func (bc *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*types.Block) { @@ -594,6 +628,12 @@ func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types. return uncles } +// TrieNode retrieves a blob of data associated with a trie node (or code hash) +// either from ephemeral in-memory cache, or from persistent storage. +func (bc *BlockChain) TrieNode(hash common.Hash) ([]byte, error) { + return bc.stateCache.TrieDB().Node(hash) +} + // Stop stops the blockchain service. If any imports are currently in progress // it will abort them using the procInterrupt. func (bc *BlockChain) Stop() { @@ -606,6 +646,33 @@ func (bc *BlockChain) Stop() { atomic.StoreInt32(&bc.procInterrupt, 1) bc.wg.Wait() + + // Ensure the state of a recent block is also stored to disk before exiting. + // It is fine if this state does not exist (fast start/stop cycle), but it is + // advisable to leave an N block gap from the head so 1) a restart loads up + // the last N blocks as sync assistance to remote nodes; 2) a restart during + // a (small) reorg doesn't require deep reprocesses; 3) chain "repair" from + // missing states are constantly tested. + // + // This may be tuned a bit on mainnet if its too annoying to reprocess the last + // N blocks. + if !bc.cacheConfig.Disabled { + triedb := bc.stateCache.TrieDB() + if number := bc.CurrentBlock().NumberU64(); number >= triesInMemory { + recent := bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - triesInMemory + 1) + + log.Info("Writing cached state to disk", "block", recent.Number(), "hash", recent.Hash(), "root", recent.Root()) + if err := triedb.Commit(recent.Root(), true); err != nil { + log.Error("Failed to commit recent state trie", "err", err) + } + } + for !bc.triegc.Empty() { + triedb.Dereference(bc.triegc.PopItem().(common.Hash), common.Hash{}) + } + if size := triedb.Size(); size != 0 { + log.Error("Dangling trie nodes after full cleanup") + } + } log.Info("Blockchain manager stopped") } @@ -650,11 +717,11 @@ func (bc *BlockChain) Rollback(chain []common.Hash) { } if bc.currentFastBlock.Hash() == hash { bc.currentFastBlock = bc.GetBlock(bc.currentFastBlock.ParentHash(), bc.currentFastBlock.NumberU64()-1) - WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash()) + WriteHeadFastBlockHash(bc.db, bc.currentFastBlock.Hash()) } if bc.currentBlock.Hash() == hash { bc.currentBlock = bc.GetBlock(bc.currentBlock.ParentHash(), bc.currentBlock.NumberU64()-1) - WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash()) + WriteHeadBlockHash(bc.db, bc.currentBlock.Hash()) } } } @@ -677,9 +744,9 @@ func SetReceiptsData(config *params.ChainConfig, block *types.Block, receipts ty } // The used gas can be calculated based on previous receipts if j == 0 { - receipts[j].GasUsed = new(big.Int).Set(receipts[j].CumulativeGasUsed) + receipts[j].GasUsed = receipts[j].CumulativeGasUsed } else { - receipts[j].GasUsed = new(big.Int).Sub(receipts[j].CumulativeGasUsed, receipts[j-1].CumulativeGasUsed) + receipts[j].GasUsed = receipts[j].CumulativeGasUsed - receipts[j-1].CumulativeGasUsed } // The derived log fields can simply be set from the block and transaction for k := 0; k < len(receipts[j].Logs); k++ { @@ -713,7 +780,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ stats = struct{ processed, ignored int32 }{} start = time.Now() bytes = 0 - batch = bc.chainDb.NewBatch() + batch = bc.db.NewBatch() ) for i, block := range blockChain { receipts := receiptChain[i] @@ -731,7 +798,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ continue } // Compute all the non-consensus fields of the receipts - SetReceiptsData(bc.config, block, receipts) + SetReceiptsData(bc.chainConfig, block, receipts) // Write all the data out into the database if err := WriteBody(batch, block.Hash(), block.NumberU64(), block.Body()); err != nil { return i, fmt.Errorf("failed to write block body: %v", err) @@ -749,7 +816,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ return 0, err } bytes += batch.ValueSize() - batch = bc.chainDb.NewBatch() + batch.Reset() } } if batch.ValueSize() > 0 { @@ -764,7 +831,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ head := blockChain[len(blockChain)-1] if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case if bc.GetTd(bc.currentFastBlock.Hash(), bc.currentFastBlock.NumberU64()).Cmp(td) < 0 { - if err := WriteHeadFastBlockHash(bc.chainDb, head.Hash()); err != nil { + if err := WriteHeadFastBlockHash(bc.db, head.Hash()); err != nil { log.Crit("Failed to update head fast block hash", "err", err) } bc.currentFastBlock = head @@ -775,15 +842,33 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ log.Info("Imported new block receipts", "count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)), - "bytes", bytes, "number", head.Number(), "hash", head.Hash(), + "size", common.StorageSize(bytes), "ignored", stats.ignored) return 0, nil } -// WriteBlock writes the block to the chain. -func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { +var lastWrite uint64 + +// WriteBlockWithoutState writes only the block and its metadata to the database, +// but does not write any state. This is used to construct competing side forks +// up to the point where they exceed the canonical total difficulty. +func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) { + bc.wg.Add(1) + defer bc.wg.Done() + + if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil { + return err + } + if err := WriteBlock(bc.db, block); err != nil { + return err + } + return nil +} + +// WriteBlockWithState writes the block and all associated state to the database. +func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) { bc.wg.Add(1) defer bc.wg.Done() @@ -804,17 +889,69 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R return NonStatTy, err } // Write other block data using a batch. - batch := bc.chainDb.NewBatch() + batch := bc.db.NewBatch() if err := WriteBlock(batch, block); err != nil { return NonStatTy, err } - if _, err := state.CommitTo(batch, bc.config.IsEIP158(block.Number())); err != nil { + root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) + if err != nil { return NonStatTy, err } + triedb := bc.stateCache.TrieDB() + + // If we're running an archive node, always flush + if bc.cacheConfig.Disabled { + if err := triedb.Commit(root, false); err != nil { + return NonStatTy, err + } + } else { + // Full but not archive node, do proper garbage collection + triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive + bc.triegc.Push(root, -float32(block.NumberU64())) + + if current := block.NumberU64(); current > triesInMemory { + // Find the next state trie we need to commit + header := bc.GetHeaderByNumber(current - triesInMemory) + chosen := header.Number.Uint64() + + // Only write to disk if we exceeded our memory allowance *and* also have at + // least a given number of tries gapped. + var ( + size = triedb.Size() + limit = common.StorageSize(bc.cacheConfig.TrieNodeLimit) * 1024 * 1024 + ) + if size > limit || bc.gcproc > bc.cacheConfig.TrieTimeLimit { + // If we're exceeding limits but haven't reached a large enough memory gap, + // warn the user that the system is becoming unstable. + if chosen < lastWrite+triesInMemory { + switch { + case size >= 2*limit: + log.Warn("State memory usage too high, committing", "size", size, "limit", limit, "optimum", float64(chosen-lastWrite)/triesInMemory) + case bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit: + log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", bc.cacheConfig.TrieTimeLimit, "optimum", float64(chosen-lastWrite)/triesInMemory) + } + } + // If optimum or critical limits reached, write to disk + if chosen >= lastWrite+triesInMemory || size >= 2*limit || bc.gcproc >= 2*bc.cacheConfig.TrieTimeLimit { + triedb.Commit(header.Root, true) + lastWrite = chosen + bc.gcproc = 0 + } + } + // Garbage collect anything below our required write retention + for !bc.triegc.Empty() { + root, number := bc.triegc.Pop() + if uint64(-number) > chosen { + bc.triegc.Push(root, number) + break + } + triedb.Dereference(root.(common.Hash), common.Hash{}) + } + } + } if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil { return NonStatTy, err } - // If the total difficulty is higher than our known, add it to the canonical chain // Second clause in the if statement reduces the vulnerability to selfish mining. // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf @@ -835,7 +972,7 @@ func (bc *BlockChain) WriteBlockAndState(block *types.Block, receipts []*types.R return NonStatTy, err } // Write hash preimages - if err := WritePreimages(bc.chainDb, block.NumberU64(), state.Preimages()); err != nil { + if err := WritePreimages(bc.db, block.NumberU64(), state.Preimages()); err != nil { return NonStatTy, err } status = CanonStatTy @@ -927,31 +1064,64 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty if err == nil { err = bc.Validator().ValidateBody(block) } - if err != nil { - if err == ErrKnownBlock { + switch { + case err == ErrKnownBlock: + // Block and state both already known. However if the current block is below + // this number we did a rollback and we should reimport it nonetheless. + if bc.CurrentBlock().NumberU64() >= block.NumberU64() { stats.ignored++ continue } - if err == consensus.ErrFutureBlock { - // Allow up to MaxFuture second in the future blocks. If this limit - // is exceeded the chain is discarded and processed at a later time - // if given. - max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) - if block.Time().Cmp(max) > 0 { - return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max) + case err == consensus.ErrFutureBlock: + // Allow up to MaxFuture second in the future blocks. If this limit is exceeded + // the chain is discarded and processed at a later time if given. + max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) + if block.Time().Cmp(max) > 0 { + return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max) + } + bc.futureBlocks.Add(block.Hash(), block) + stats.queued++ + continue + + case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()): + bc.futureBlocks.Add(block.Hash(), block) + stats.queued++ + continue + + case err == consensus.ErrPrunedAncestor: + // Block competing with the canonical chain, store in the db, but don't process + // until the competitor TD goes above the canonical TD + localTd := bc.GetTd(bc.currentBlock.Hash(), bc.currentBlock.NumberU64()) + externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty()) + if localTd.Cmp(externTd) > 0 { + if err = bc.WriteBlockWithoutState(block, externTd); err != nil { + return i, events, coalescedLogs, err } - bc.futureBlocks.Add(block.Hash(), block) - stats.queued++ continue } + // Competitor chain beat canonical, gather all blocks from the common ancestor + var winner []*types.Block - if err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()) { - bc.futureBlocks.Add(block.Hash(), block) - stats.queued++ - continue + parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1) + for !bc.HasState(parent.Root()) { + winner = append(winner, parent) + parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) + } + for j := 0; j < len(winner)/2; j++ { + winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j] + } + // Import all the pruned blocks to make the state available + bc.chainmu.Unlock() + _, evs, logs, err := bc.insertChain(winner) + bc.chainmu.Lock() + events, coalescedLogs = evs, logs + + if err != nil { + return i, events, coalescedLogs, err } + case err != nil: bc.reportBlock(block, nil, err) return i, events, coalescedLogs, err } @@ -979,8 +1149,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty bc.reportBlock(block, receipts, err) return i, events, coalescedLogs, err } + proctime := time.Since(bstart) + // Write the block to the chain and get the status. - status, err := bc.WriteBlockAndState(block, receipts, state) + status, err := bc.WriteBlockWithState(block, receipts, state) if err != nil { return i, events, coalescedLogs, err } @@ -994,6 +1166,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty events = append(events, ChainEvent{block, block.Hash(), logs}) lastCanon = block + // Only count canonical blocks for GC processing time + bc.gcproc += proctime + case SideStatTy: log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) @@ -1002,11 +1177,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty events = append(events, ChainSideEvent{block}) } stats.processed++ - stats.usedGas += usedGas.Uint64() - stats.report(chain, i) + stats.usedGas += usedGas + stats.report(chain, i, bc.stateCache.TrieDB().Size()) } // Append a single chain head event if we've progressed the chain - if lastCanon != nil && bc.LastBlockHash() == lastCanon.Hash() { + if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { events = append(events, ChainHeadEvent{lastCanon}) } return 0, events, coalescedLogs, nil @@ -1026,7 +1201,7 @@ const statsReportLimit = 8 * time.Second // report prints statistics if some number of blocks have been processed // or more than a few seconds have passed since the last message. -func (st *insertStats) report(chain []*types.Block, index int) { +func (st *insertStats) report(chain []*types.Block, index int, cache common.StorageSize) { // Fetch the timings for the batch var ( now = mclock.Now() @@ -1041,7 +1216,7 @@ func (st *insertStats) report(chain []*types.Block, index int) { context := []interface{}{ "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, "elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed), - "number", end.Number(), "hash", end.Hash(), + "number", end.Number(), "hash", end.Hash(), "cache", cache, } if st.queued > 0 { context = append(context, []interface{}{"queued", st.queued}...) @@ -1077,7 +1252,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { // These logs are later announced as deleted. collectLogs = func(h common.Hash) { // Coalesce logs and set 'Removed'. - receipts := GetBlockReceipts(bc.chainDb, h, bc.hc.GetBlockNumber(h)) + receipts := GetBlockReceipts(bc.db, h, bc.hc.GetBlockNumber(h)) for _, receipt := range receipts { for _, log := range receipt.Logs { del := *log @@ -1140,24 +1315,23 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { } else { log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash()) } + // Insert the new chain, taking care of the proper incremental order var addedTxs types.Transactions - // insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly - for _, block := range newChain { + for i := len(newChain) - 1; i >= 0; i-- { // insert the block in the canonical way, re-writing history - bc.insert(block) + bc.insert(newChain[i]) // write lookup entries for hash based transaction/receipt searches - if err := WriteTxLookupEntries(bc.chainDb, block); err != nil { + if err := WriteTxLookupEntries(bc.db, newChain[i]); err != nil { return err } - addedTxs = append(addedTxs, block.Transactions()...) + addedTxs = append(addedTxs, newChain[i].Transactions()...) } - // calculate the difference between deleted and added transactions diff := types.TxDifference(deletedTxs, addedTxs) // When transactions get deleted from the database that means the // receipts that were created in the fork must also be deleted for _, tx := range diff { - DeleteTxLookupEntry(bc.chainDb, tx.Hash()) + DeleteTxLookupEntry(bc.db, tx.Hash()) } if len(deletedLogs) > 0 { go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) @@ -1196,10 +1370,11 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { } func (bc *BlockChain) update() { - futureTimer := time.Tick(5 * time.Second) + futureTimer := time.NewTicker(5 * time.Second) + defer futureTimer.Stop() for { select { - case <-futureTimer: + case <-futureTimer.C: bc.procFutureBlocks() case <-bc.quit: return @@ -1248,7 +1423,7 @@ Hash: 0x%x Error: %v ############################## -`, bc.config, block.Number(), block.Hash(), receiptString, err)) +`, bc.chainConfig, block.Number(), block.Hash(), receiptString, err)) } // InsertHeaderChain attempts to insert the given header chain in to the local @@ -1355,7 +1530,7 @@ func (bc *BlockChain) GetHeaderByNumber(number uint64) *types.Header { } // Config retrieves the blockchain's chain configuration. -func (bc *BlockChain) Config() *params.ChainConfig { return bc.config } +func (bc *BlockChain) Config() *params.ChainConfig { return bc.chainConfig } // Engine retrieves the blockchain's consensus engine. func (bc *BlockChain) Engine() consensus.Engine { return bc.engine } |