From 59d1677e97c4220b7fc25c38f831fb1dc807f805 Mon Sep 17 00:00:00 2001 From: Bojie Wu Date: Tue, 9 Oct 2018 13:28:45 +0800 Subject: app: correct process pending block logic --- core/blockchain.go | 237 +++++++++++++++++++++++++---------------------------- 1 file changed, 111 insertions(+), 126 deletions(-) (limited to 'core/blockchain.go') diff --git a/core/blockchain.go b/core/blockchain.go index 5393171f6..43e528a2d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -33,6 +33,7 @@ import ( lru "github.com/hashicorp/golang-lru" "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/common/math" "github.com/dexon-foundation/dexon/common/mclock" "github.com/dexon-foundation/dexon/common/prque" "github.com/dexon-foundation/dexon/consensus" @@ -1506,28 +1507,13 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i return 0, nil, nil, nil } -func (bc *BlockChain) InsertPendingBlocks(chain types.Blocks) (int, error) { - n, events, logs, err := bc.insertPendingBlocks(chain) +func (bc *BlockChain) ProcessPendingBlock(block *types.Block) (int, error) { + n, events, logs, err := bc.processPendingBlock(block) bc.PostChainEvents(events, logs) return n, err } -func (bc *BlockChain) insertPendingBlocks(chain types.Blocks) (int, []interface{}, []*types.Log, error) { - // Sanity check that we have something meaningful to import - if len(chain) == 0 { - return 0, nil, nil, nil - } - // Do a sanity check that the provided chain is actually ordered and linked - for i := 1; i < len(chain); i++ { - if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() { - // Chain broke ancestry, log a message (programming error) and skip insertion - log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), - "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) - - return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), - chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) - } - } +func (bc *BlockChain) processPendingBlock(block *types.Block) (int, []interface{}, []*types.Log, error) { // Pre-checks passed, start the full block imports bc.wg.Add(1) defer bc.wg.Done() @@ -1540,147 +1526,146 @@ func (bc *BlockChain) insertPendingBlocks(chain types.Blocks) (int, []interface{ // acquiring. var ( stats = insertStats{startTime: mclock.Now()} - events = make([]interface{}, 0, len(chain)) + events = make([]interface{}, 0, 2) lastCanon *types.Block coalescedLogs []*types.Log ) // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) - senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) + senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, block.Number()), []*types.Block{block}) - // Iterate over the blocks and insert when the verifier permits - for i, block := range chain { - if atomic.LoadInt32(&bc.procInterrupt) == 1 { - log.Debug("Premature abort during blocks processing") - break - } - bstart := time.Now() + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + log.Debug("Premature abort during blocks processing") + return 0, nil, nil, fmt.Errorf("interrupt") + } + bstart := time.Now() - currentBlock := bc.CurrentBlock() - if block.Header().WitnessHeight > currentBlock.NumberU64() && block.Header().WitnessHeight != 0 { - if bc.pendingBlocks[block.Header().WitnessHeight].block.Root() != block.Header().WitnessRoot { - return i, nil, nil, fmt.Errorf("invalid witness root %s vs %s", bc.pendingBlocks[block.Header().WitnessHeight].block.Root().String(), block.Header().WitnessRoot.String()) - } + currentBlock := bc.CurrentBlock() + if block.Header().WitnessHeight > currentBlock.NumberU64() && block.Header().WitnessHeight != 0 { + if bc.pendingBlocks[block.Header().WitnessHeight].block.Root() != block.Header().WitnessRoot { + return 0, nil, nil, fmt.Errorf("invalid witness root %s vs %s", bc.pendingBlocks[block.Header().WitnessHeight].block.Root().String(), block.Header().WitnessRoot.String()) + } - if bc.pendingBlocks[block.Header().WitnessHeight].block.ReceiptHash() != block.Header().WitnessReceiptHash { - return i, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s", bc.pendingBlocks[block.Header().WitnessHeight].block.ReceiptHash().String(), block.Header().WitnessReceiptHash.String()) - } + if bc.pendingBlocks[block.Header().WitnessHeight].block.ReceiptHash() != block.Header().WitnessReceiptHash { + return 0, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s", bc.pendingBlocks[block.Header().WitnessHeight].block.ReceiptHash().String(), block.Header().WitnessReceiptHash.String()) } + } - var parentBlock *types.Block - var pendingState *state.StateDB - var err error - parent, exist := bc.pendingBlocks[block.NumberU64()-1] - if !exist { - parentBlock = currentBlock - if parentBlock.NumberU64() != block.NumberU64()-1 { - return i, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) - } - } else { - parentBlock = parent.block + var parentBlock *types.Block + var pendingState *state.StateDB + var err error + parent, exist := bc.pendingBlocks[block.NumberU64()-1] + if !exist { + parentBlock = currentBlock + if parentBlock.NumberU64() != block.NumberU64()-1 { + return 0, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) } - block.RawHeader().ParentHash = parentBlock.Hash() - pendingState, err = state.New(parentBlock.Root(), bc.stateCache) + } else { + parentBlock = parent.block + } + block.RawHeader().ParentHash = parentBlock.Hash() + pendingState, err = state.New(parentBlock.Root(), bc.stateCache) + if err != nil { + return 0, nil, nil, err + } + + var ( + receipts types.Receipts + usedGas = new(uint64) + header = block.Header() + gp = new(GasPool).AddGas(math.MaxUint64) + ) + // Iterate over and process the individual transactions + for i, tx := range block.Transactions() { + pendingState.Prepare(tx.Hash(), block.Hash(), i) + receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, pendingState, header, tx, usedGas, bc.vmConfig) if err != nil { - return i, events, coalescedLogs, err + return i, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce()) } + receipts = append(receipts, receipt) + log.Debug("Apply transaction", "tx.hash", tx.Hash(), "nonce", tx.Nonce(), "amount", tx.Value()) + } + // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) + header.GasUsed = *usedGas + newPendingBlock, err := bc.engine.Finalize(bc, header, pendingState, block.Transactions(), block.Uncles(), receipts) + if err != nil { + return 0, nil, nil, fmt.Errorf("finalize error: %v", err) + } - var ( - receipts types.Receipts - usedGas = new(uint64) - header = block.Header() - allLogs []*types.Log - gp = new(GasPool).AddGas(block.GasLimit()) - ) - // Iterate over and process the individual transactions - for i, tx := range block.Transactions() { - pendingState.Prepare(tx.Hash(), block.Hash(), i) - receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, pendingState, header, tx, usedGas, bc.vmConfig) - if err != nil { - return i, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce()) - } - receipts = append(receipts, receipt) - allLogs = append(allLogs, receipt.Logs...) - log.Debug("Apply transaction", "tx.hash", tx.Hash(), "nonce", tx.Nonce(), "amount", tx.Value()) - } - // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) - header.GasUsed = *usedGas - newPendingBlock, err := bc.engine.Finalize(bc, header, pendingState, block.Transactions(), block.Uncles(), receipts) - if err != nil { - return i, events, coalescedLogs, fmt.Errorf("finalize error: %v", err) + // Validate the state using the default validator + err = bc.Validator().ValidateState(block, nil, pendingState, receipts, *usedGas) + if err != nil { + bc.reportBlock(block, receipts, err) + return 0, nil, nil, fmt.Errorf("valiadte state error: %v", err) + } + proctime := time.Since(bstart) + + // commit state to refresh stateCache + root, err := pendingState.Commit(true) + if err != nil { + return 0, nil, nil, fmt.Errorf("pendingState commit error: %v", err) + } + log.Info("Commit pending root", "hash", root) + + // add into pending blocks + bc.pendingBlocks[block.NumberU64()] = struct { + block *types.Block + receipts types.Receipts + }{block: newPendingBlock, receipts: receipts} + + // start insert available pending blocks into db + for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= block.Header().WitnessHeight; pendingHeight++ { + pendingIns, exist := bc.pendingBlocks[pendingHeight] + if !exist { + log.Error("Block has already inserted", "height", pendingHeight) + continue } - // Validate the state using the default validator - err = bc.Validator().ValidateState(block, nil, pendingState, receipts, *usedGas) + s, err := state.New(pendingIns.block.Root(), bc.stateCache) if err != nil { - bc.reportBlock(block, receipts, err) - return i, events, coalescedLogs, fmt.Errorf("valiadte state error: %v", err) + return 0, events, coalescedLogs, err } - proctime := time.Since(bstart) - // commit state to refresh stateCache - root, err := pendingState.Commit(true) + // Write the block to the chain and get the status. + log.Debug("Insert pending block", "height", pendingHeight) + status, err := bc.WriteBlockWithState(pendingIns.block, pendingIns.receipts, s) if err != nil { - return i, nil, nil, fmt.Errorf("pendingState commit error: %v", err) + return 0, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err) } - log.Info("Commit pending root", "hash", root) - // add into pending blocks - bc.pendingBlocks[block.NumberU64()] = struct { - block *types.Block - receipts types.Receipts - }{block: newPendingBlock, receipts: receipts} - - // start insert available pending blocks into db - for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= block.Header().WitnessHeight; pendingHeight++ { - confirmedBlock, exist := bc.pendingBlocks[pendingHeight] - if !exist { - log.Error("Block has already inserted", "height", pendingHeight) - continue - } - - s, err := state.New(confirmedBlock.block.Root(), bc.stateCache) - if err != nil { - return i, events, coalescedLogs, err - } + switch status { + case CanonStatTy: + log.Debug("Inserted new block", "number", pendingIns.block.Number(), "hash", pendingIns.block.Hash(), "uncles", len(pendingIns.block.Uncles()), + "txs", len(pendingIns.block.Transactions()), "gas", pendingIns.block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) - // Write the block to the chain and get the status. - log.Debug("Insert pending block", "height", pendingHeight) - status, err := bc.WriteBlockWithState(confirmedBlock.block, confirmedBlock.receipts, s) - if err != nil { - return i, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err) + var allLogs []*types.Log + for _, r := range pendingIns.receipts { + allLogs = append(allLogs, r.Logs...) } + coalescedLogs = append(coalescedLogs, allLogs...) + blockInsertTimer.UpdateSince(bstart) + events = append(events, ChainEvent{pendingIns.block, pendingIns.block.Hash(), allLogs}) + lastCanon = pendingIns.block - switch status { - case CanonStatTy: - log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), - "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) - - coalescedLogs = append(coalescedLogs, allLogs...) - blockInsertTimer.UpdateSince(bstart) - events = append(events, ChainEvent{confirmedBlock.block, confirmedBlock.block.Hash(), allLogs}) - lastCanon = confirmedBlock.block - - // Only count canonical blocks for GC processing time - bc.gcproc += proctime - - case SideStatTy: - return i, nil, nil, fmt.Errorf("insert pending block and fork found") - } + // Only count canonical blocks for GC processing time + bc.gcproc += proctime - delete(bc.pendingBlocks, pendingHeight) + case SideStatTy: + return 0, nil, nil, fmt.Errorf("insert pending block and fork found") + } + delete(bc.pendingBlocks, pendingHeight) - stats.processed++ - stats.usedGas += *usedGas + stats.processed++ + stats.usedGas += pendingIns.block.GasUsed() - cache, _ := bc.stateCache.TrieDB().Size() - stats.report(chain, i, cache) - } + cache, _ := bc.stateCache.TrieDB().Size() + stats.report([]*types.Block{pendingIns.block}, 0, cache) } // Append a single chain head event if we've progressed the chain if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { events = append(events, ChainHeadEvent{lastCanon}) } + return 0, events, coalescedLogs, nil } -- cgit