From fe25828b2e96e071e41cb4f04bc5302fde435cf2 Mon Sep 17 00:00:00 2001 From: Bojie Wu Date: Tue, 9 Oct 2018 13:28:45 +0800 Subject: app: using lock correctly to use map safely --- core/blockchain.go | 142 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 86 insertions(+), 56 deletions(-) (limited to 'core') diff --git a/core/blockchain.go b/core/blockchain.go index 8e7b1b7b5..af975f81a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -142,14 +142,16 @@ type BlockChain struct { badBlocks *lru.Cache // Bad block cache shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. - confirmedBlocks map[coreCommon.Hash]*blockInfo - addressNonce map[common.Address]uint64 - addressCost map[common.Address]*big.Int - addressCounter map[common.Address]uint64 - chainLastHeight map[uint32]uint64 - - pendingBlockMu *sync.Mutex - pendingBlocks map[uint64]struct { + confirmedBlocksMu sync.RWMutex + confirmedBlocks map[coreCommon.Hash]*blockInfo + addressNonce map[common.Address]uint64 + addressCost map[common.Address]*big.Int + addressCounter map[common.Address]uint64 + chainLastHeight map[uint32]uint64 + + pendingBlockMu sync.RWMutex + lastPendingHeight uint64 + pendingBlocks map[uint64]struct { block *types.Block receipts types.Receipts } @@ -174,27 +176,25 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par badBlocks, _ := lru.New(badBlockLimit) bc := &BlockChain{ - chainConfig: chainConfig, - cacheConfig: cacheConfig, - db: db, - triegc: prque.New(nil), - stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), - quit: make(chan struct{}), - shouldPreserve: shouldPreserve, - bodyCache: bodyCache, - bodyRLPCache: bodyRLPCache, - receiptsCache: receiptsCache, - blockCache: blockCache, - futureBlocks: futureBlocks, - engine: engine, - vmConfig: vmConfig, - badBlocks: badBlocks, - confirmedBlocks: make(map[coreCommon.Hash]*blockInfo), + chainConfig: chainConfig, + cacheConfig: cacheConfig, + db: db, + triegc: prque.New(nil), + stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit), + quit: make(chan struct{}), + bodyCache: bodyCache, + bodyRLPCache: bodyRLPCache, + receiptsCache: receiptsCache, + blockCache: blockCache, + futureBlocks: futureBlocks, + engine: engine, + vmConfig: vmConfig, + badBlocks: badBlocks, pendingBlocks: make(map[uint64]struct { block *types.Block receipts types.Receipts }), - pendingBlockMu: &sync.Mutex{}, + confirmedBlocks: make(map[coreCommon.Hash]*blockInfo), addressNonce: make(map[common.Address]uint64), addressCost: make(map[common.Address]*big.Int), addressCounter: make(map[common.Address]uint64), @@ -248,6 +248,9 @@ type blockInfo struct { } func (bc *BlockChain) AddConfirmedBlock(block *coreTypes.Block) error { + bc.confirmedBlocksMu.Lock() + defer bc.confirmedBlocksMu.Unlock() + var transactions types.Transactions err := rlp.Decode(bytes.NewReader(block.Payload), &transactions) if err != nil { @@ -300,20 +303,32 @@ func (bc *BlockChain) RemoveConfirmedBlock(hash coreCommon.Hash) { } func (bc *BlockChain) GetConfirmedBlockByHash(hash coreCommon.Hash) *coreTypes.Block { + bc.confirmedBlocksMu.RLock() + defer bc.confirmedBlocksMu.RUnlock() + return bc.confirmedBlocks[hash].block } func (bc *BlockChain) GetLastNonceInConfirmedBlocks(address common.Address) (uint64, bool) { + bc.confirmedBlocksMu.RLock() + defer bc.confirmedBlocksMu.RUnlock() + nonce, exist := bc.addressNonce[address] return nonce, exist } func (bc *BlockChain) GetCostInConfirmedBlocks(address common.Address) (*big.Int, bool) { + bc.confirmedBlocksMu.RLock() + defer bc.confirmedBlocksMu.RUnlock() + cost, exist := bc.addressCost[address] return cost, exist } func (bc *BlockChain) GetChainLastConfirmedHeight(chainID uint32) uint64 { + bc.confirmedBlocksMu.RLock() + defer bc.confirmedBlocksMu.RUnlock() + return bc.chainLastHeight[chainID] } @@ -1493,20 +1508,13 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i return 0, nil, nil, nil } -func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (int, error) { +func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, error) { n, events, logs, err := bc.processPendingBlock(block, witness) bc.PostChainEvents(events, logs) return n, err } -func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes.Witness) (int, []interface{}, []*types.Log, error) { - // Pre-checks passed, start the full block imports - bc.wg.Add(1) - defer bc.wg.Done() - - bc.chainmu.Lock() - defer bc.chainmu.Unlock() - +func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) { // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex // acquiring. @@ -1528,19 +1536,19 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes if atomic.LoadInt32(&bc.procInterrupt) == 1 { log.Debug("Premature abort during blocks processing") - return 0, nil, nil, fmt.Errorf("interrupt") + return nil, nil, nil, fmt.Errorf("interrupt") } bstart := time.Now() currentBlock := bc.CurrentBlock() if witness.Height > currentBlock.NumberU64() && witness.Height != 0 { if bc.pendingBlocks[witness.Height].block.Root() != witnessData.Root { - return 0, nil, nil, fmt.Errorf("invalid witness root %s vs %s", + return nil, nil, nil, fmt.Errorf("invalid witness root %s vs %s", bc.pendingBlocks[witness.Height].block.Root().String(), witnessData.Root.String()) } if bc.pendingBlocks[witness.Height].block.ReceiptHash() != witnessData.ReceiptHash { - return 0, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s", + return nil, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s", bc.pendingBlocks[witness.Height].block.ReceiptHash().String(), witnessData.ReceiptHash.String()) } } @@ -1552,7 +1560,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes if !exist { parentBlock = currentBlock if parentBlock.NumberU64() != block.NumberU64()-1 { - return 0, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) + return nil, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1) } } else { parentBlock = parent.block @@ -1560,7 +1568,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes block.RawHeader().ParentHash = parentBlock.Hash() pendingState, err = state.New(parentBlock.Root(), bc.stateCache) if err != nil { - return 0, nil, nil, err + return nil, nil, nil, err } var ( @@ -1574,7 +1582,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes pendingState.Prepare(tx.Hash(), block.Hash(), i) receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, pendingState, header, tx, usedGas, bc.vmConfig) if err != nil { - return i, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce()) + return nil, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce()) } receipts = append(receipts, receipt) log.Debug("Apply transaction", "tx.hash", tx.Hash(), "nonce", tx.Nonce(), "amount", tx.Value()) @@ -1583,28 +1591,25 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes header.GasUsed = *usedGas newPendingBlock, err := bc.engine.Finalize(bc, header, pendingState, block.Transactions(), block.Uncles(), receipts) if err != nil { - return 0, nil, nil, fmt.Errorf("finalize error: %v", err) + return nil, nil, nil, fmt.Errorf("finalize error: %v", err) } // Validate the state using the default validator err = bc.Validator().ValidateState(block, nil, pendingState, receipts, *usedGas) if err != nil { bc.reportBlock(block, receipts, err) - return 0, nil, nil, fmt.Errorf("valiadte state error: %v", err) + return nil, nil, nil, fmt.Errorf("valiadte state error: %v", err) } proctime := time.Since(bstart) // commit state to refresh stateCache _, err = pendingState.Commit(true) if err != nil { - return 0, nil, nil, fmt.Errorf("pendingState commit error: %v", err) + return nil, nil, nil, fmt.Errorf("pendingState commit error: %v", err) } // add into pending blocks - bc.pendingBlocks[block.NumberU64()] = struct { - block *types.Block - receipts types.Receipts - }{block: newPendingBlock, receipts: receipts} + bc.addPendingBlock(newPendingBlock, receipts) // start insert available pending blocks into db for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= witness.Height; pendingHeight++ { @@ -1616,13 +1621,13 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes s, err := state.New(pendingIns.block.Root(), bc.stateCache) if err != nil { - return 0, events, coalescedLogs, err + return nil, events, coalescedLogs, err } // Write the block to the chain and get the status. status, err := bc.WriteBlockWithState(pendingIns.block, pendingIns.receipts, s) if err != nil { - return 0, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err) + return nil, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err) } switch status { @@ -1643,11 +1648,9 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes bc.gcproc += proctime case SideStatTy: - return 0, nil, nil, fmt.Errorf("insert pending block and fork found") + return nil, nil, nil, fmt.Errorf("insert pending block and fork found") } - bc.pendingBlockMu.Lock() - delete(bc.pendingBlocks, pendingHeight) - bc.pendingBlockMu.Unlock() + bc.removePendingBlock(pendingHeight) stats.processed++ stats.usedGas += pendingIns.block.GasUsed() @@ -1660,13 +1663,40 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes events = append(events, ChainHeadEvent{lastCanon}) } - return 0, events, coalescedLogs, nil + root := newPendingBlock.Root() + return &root, events, coalescedLogs, nil +} + +func (bc *BlockChain) removePendingBlock(height uint64) { + bc.pendingBlockMu.Lock() + defer bc.pendingBlockMu.Unlock() + + delete(bc.pendingBlocks, height) } -func (bc *BlockChain) GetPendingBlockByHeight(height uint64) *types.Block { +func (bc *BlockChain) addPendingBlock(block *types.Block, receipts types.Receipts) { bc.pendingBlockMu.Lock() defer bc.pendingBlockMu.Unlock() - return bc.pendingBlocks[height].block + + bc.pendingBlocks[block.NumberU64()] = struct { + block *types.Block + receipts types.Receipts + }{block: block, receipts: receipts} + bc.lastPendingHeight = block.NumberU64() +} + +func (bc *BlockChain) GetLastPendingHeight() uint64 { + bc.pendingBlockMu.RLock() + defer bc.pendingBlockMu.RUnlock() + + return bc.lastPendingHeight +} + +func (bc *BlockChain) GetLastPendingBlock() *types.Block { + bc.pendingBlockMu.RLock() + defer bc.pendingBlockMu.RUnlock() + + return bc.pendingBlocks[bc.lastPendingHeight].block } // reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them -- cgit