From 13506b4e47662dcce303f8845a12048556a22aa3 Mon Sep 17 00:00:00 2001 From: Bojie Wu Date: Tue, 9 Oct 2018 13:28:45 +0800 Subject: app: implement new insert blocks logic --- core/blockchain.go | 195 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 193 insertions(+), 2 deletions(-) (limited to 'core/blockchain.go') diff --git a/core/blockchain.go b/core/blockchain.go index 9775f9e16..87e586d20 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -28,6 +28,8 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/golang-lru" + coreCommon "github.com/dexon-foundation/dexon-consensus-core/common" coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" @@ -47,7 +49,6 @@ import ( "github.com/dexon-foundation/dexon/params" "github.com/dexon-foundation/dexon/rlp" "github.com/dexon-foundation/dexon/trie" - "github.com/hashicorp/golang-lru" ) var ( @@ -144,6 +145,11 @@ type BlockChain struct { confirmedBlockMu sync.Mutex confirmedBlocks map[coreCommon.Hash]*coreTypes.Block chainConfirmedBlocks map[uint32][]*coreTypes.Block + + pendingBlocks map[uint64]struct { + block *types.Block + receipts types.Receipts + } } // NewBlockChain returns a fully initialised block chain using information @@ -183,8 +189,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par badBlocks: badBlocks, confirmedBlocks: make(map[coreCommon.Hash]*coreTypes.Block), chainConfirmedBlocks: make(map[uint32][]*coreTypes.Block), + pendingBlocks: make(map[uint64]struct { + block *types.Block + receipts types.Receipts + }), } - bc.SetValidator(NewBlockValidator(chainConfig, bc, engine)) + bc.SetValidator(NewDexonBlockValidator(chainConfig, bc, engine)) bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine)) var err error @@ -1449,6 +1459,187 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i return 0, nil, nil, nil } +func (bc *BlockChain) InsertPendingBlock(chain types.Blocks) (int, error) { + n, events, logs, err := bc.insertPendingBlocks(chain) + bc.PostChainEvents(events, logs) + return n, err +} + +func (bc *BlockChain) insertPendingBlocks(chain types.Blocks) (int, []interface{}, []*types.Log, error) { + // Sanity check that we have something meaningful to import + if len(chain) == 0 { + return 0, nil, nil, nil + } + // Do a sanity check that the provided chain is actually ordered and linked + for i := 1; i < len(chain); i++ { + if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() { + // Chain broke ancestry, log a message (programming error) and skip insertion + log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), + "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) + + return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(), + chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) + } + } + // Pre-checks passed, start the full block imports + bc.wg.Add(1) + defer bc.wg.Done() + + bc.chainmu.Lock() + defer bc.chainmu.Unlock() + + // A queued approach to delivering events. This is generally + // faster than direct delivery and requires much less mutex + // acquiring. + var ( + stats = insertStats{startTime: mclock.Now()} + events = make([]interface{}, 0, len(chain)) + lastCanon *types.Block + coalescedLogs []*types.Log + ) + + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) + senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) + + // Iterate over the blocks and insert when the verifier permits + for i, block := range chain { + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + log.Debug("Premature abort during blocks processing") + break + } + bstart := time.Now() + + currentBlock := bc.CurrentBlock() + if block.Header().WitnessHeight > currentBlock.NumberU64() && block.Header().WitnessHeight != 0 { + if bc.pendingBlocks[block.Header().WitnessHeight].block.Root() != block.Header().WitnessRoot { + return i, nil, nil, fmt.Errorf("invalid witness root %s vs %s", bc.pendingBlocks[block.Header().WitnessHeight].block.Root().String(), block.Header().WitnessRoot.String()) + } + + if bc.pendingBlocks[block.Header().WitnessHeight].block.ReceiptHash() != block.Header().WitnessReceiptHash { + return i, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s", bc.pendingBlocks[block.Header().WitnessHeight].block.ReceiptHash().String(), block.Header().WitnessReceiptHash.String()) + } + } + + var parentBlock *types.Block + var pendingState *state.StateDB + var err error + parent, exist := bc.pendingBlocks[block.NumberU64()-1] + if !exist { + parentBlock = currentBlock + if parentBlock.NumberU64() != block.NumberU64()-1 { + return i, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) + } + } else { + parentBlock = parent.block + } + block.RawHeader().ParentHash = parentBlock.Hash() + pendingState, err = state.New(parentBlock.Root(), bc.stateCache) + if err != nil { + return i, events, coalescedLogs, err + } + + var ( + receipts types.Receipts + usedGas = new(uint64) + header = block.Header() + allLogs []*types.Log + gp = new(GasPool).AddGas(block.GasLimit()) + ) + // Iterate over and process the individual transactions + for i, tx := range block.Transactions() { + pendingState.Prepare(tx.Hash(), block.Hash(), i) + receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, pendingState, header, tx, usedGas, bc.vmConfig) + if err != nil { + return i, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce()) + } + receipts = append(receipts, receipt) + allLogs = append(allLogs, receipt.Logs...) + log.Debug("apply transaction", "tx.hash", tx.Hash(), "nonce", tx.Nonce(), "amount", tx.Value()) + } + // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) + header.GasUsed = *usedGas + newPendingBlock, err := bc.engine.Finalize(bc, header, pendingState, block.Transactions(), block.Uncles(), receipts) + if err != nil { + return i, events, coalescedLogs, fmt.Errorf("finalize error: %v", err) + } + + // Validate the state using the default validator + err = bc.Validator().ValidateState(block, nil, pendingState, receipts, *usedGas) + if err != nil { + bc.reportBlock(block, receipts, err) + return i, events, coalescedLogs, fmt.Errorf("valiadte state error: %v", err) + } + proctime := time.Since(bstart) + + // commit state to refresh stateCache + _, err = pendingState.Commit(true) + if err != nil { + return i, nil, nil, fmt.Errorf("pendingState commit error: %v", err) + } + + // add into pending blocks + bc.pendingBlocks[block.NumberU64()] = struct { + block *types.Block + receipts types.Receipts + }{block: newPendingBlock, receipts: receipts} + + // start insert available pending blocks into db + for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= block.Header().WitnessHeight; pendingHeight++ { + confirmedBlock, exist := bc.pendingBlocks[pendingHeight] + if !exist { + log.Debug("block has already inserted", "height", pendingHeight) + continue + } + + s, err := state.New(confirmedBlock.block.Root(), bc.stateCache) + if err != nil { + return i, events, coalescedLogs, err + } + + // Write the block to the chain and get the status. + log.Debug("insert pending block", "height", pendingHeight) + status, err := bc.WriteBlockWithState(confirmedBlock.block, confirmedBlock.receipts, s) + if err != nil { + return i, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err) + } + + switch status { + case CanonStatTy: + log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), + "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) + + coalescedLogs = append(coalescedLogs, allLogs...) + blockInsertTimer.UpdateSince(bstart) + events = append(events, ChainEvent{confirmedBlock.block, confirmedBlock.block.Hash(), allLogs}) + lastCanon = confirmedBlock.block + + // Only count canonical blocks for GC processing time + bc.gcproc += proctime + + case SideStatTy: + return i, nil, nil, fmt.Errorf("insert pending block and fork found") + } + + delete(bc.pendingBlocks, pendingHeight) + + stats.processed++ + stats.usedGas += *usedGas + + cache, _ := bc.stateCache.TrieDB().Size() + stats.report(chain, i, cache) + } + } + // Append a single chain head event if we've progressed the chain + if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { + events = append(events, ChainHeadEvent{lastCanon}) + } + return 0, events, coalescedLogs, nil +} + +func (bc *BlockChain) GetPendingBlockByHeight(height uint64) *types.Block { + return bc.pendingBlocks[height].block +} + // reorg takes two blocks, an old chain and a new chain and will reconstruct the // blocks and inserts them to be part of the new canonical chain and accumulates // potential missing transactions and post an event about them. -- cgit