aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorBojie Wu <bojie@dexon.org>2018-10-09 13:28:45 +0800
committerWei-Ning Huang <w@dexon.org>2019-03-12 12:19:09 +0800
commitfe25828b2e96e071e41cb4f04bc5302fde435cf2 (patch)
treeeddc866e8b044f879a8137f56585e9d5e665c66c /core
parent886d114e4f10d9737367c1e4f3e91b50e3b310e7 (diff)
downloaddexon-fe25828b2e96e071e41cb4f04bc5302fde435cf2.tar.gz
dexon-fe25828b2e96e071e41cb4f04bc5302fde435cf2.tar.zst
dexon-fe25828b2e96e071e41cb4f04bc5302fde435cf2.zip
app: using lock correctly to use map safely
Diffstat (limited to 'core')
-rw-r--r--core/blockchain.go142
1 files changed, 86 insertions, 56 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 8e7b1b7b5..af975f81a 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -142,14 +142,16 @@ type BlockChain struct {
badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
- confirmedBlocks map[coreCommon.Hash]*blockInfo
- addressNonce map[common.Address]uint64
- addressCost map[common.Address]*big.Int
- addressCounter map[common.Address]uint64
- chainLastHeight map[uint32]uint64
-
- pendingBlockMu *sync.Mutex
- pendingBlocks map[uint64]struct {
+ confirmedBlocksMu sync.RWMutex
+ confirmedBlocks map[coreCommon.Hash]*blockInfo
+ addressNonce map[common.Address]uint64
+ addressCost map[common.Address]*big.Int
+ addressCounter map[common.Address]uint64
+ chainLastHeight map[uint32]uint64
+
+ pendingBlockMu sync.RWMutex
+ lastPendingHeight uint64
+ pendingBlocks map[uint64]struct {
block *types.Block
receipts types.Receipts
}
@@ -174,27 +176,25 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
badBlocks, _ := lru.New(badBlockLimit)
bc := &BlockChain{
- chainConfig: chainConfig,
- cacheConfig: cacheConfig,
- db: db,
- triegc: prque.New(nil),
- stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
- quit: make(chan struct{}),
- shouldPreserve: shouldPreserve,
- bodyCache: bodyCache,
- bodyRLPCache: bodyRLPCache,
- receiptsCache: receiptsCache,
- blockCache: blockCache,
- futureBlocks: futureBlocks,
- engine: engine,
- vmConfig: vmConfig,
- badBlocks: badBlocks,
- confirmedBlocks: make(map[coreCommon.Hash]*blockInfo),
+ chainConfig: chainConfig,
+ cacheConfig: cacheConfig,
+ db: db,
+ triegc: prque.New(nil),
+ stateCache: state.NewDatabaseWithCache(db, cacheConfig.TrieCleanLimit),
+ quit: make(chan struct{}),
+ bodyCache: bodyCache,
+ bodyRLPCache: bodyRLPCache,
+ receiptsCache: receiptsCache,
+ blockCache: blockCache,
+ futureBlocks: futureBlocks,
+ engine: engine,
+ vmConfig: vmConfig,
+ badBlocks: badBlocks,
pendingBlocks: make(map[uint64]struct {
block *types.Block
receipts types.Receipts
}),
- pendingBlockMu: &sync.Mutex{},
+ confirmedBlocks: make(map[coreCommon.Hash]*blockInfo),
addressNonce: make(map[common.Address]uint64),
addressCost: make(map[common.Address]*big.Int),
addressCounter: make(map[common.Address]uint64),
@@ -248,6 +248,9 @@ type blockInfo struct {
}
func (bc *BlockChain) AddConfirmedBlock(block *coreTypes.Block) error {
+ bc.confirmedBlocksMu.Lock()
+ defer bc.confirmedBlocksMu.Unlock()
+
var transactions types.Transactions
err := rlp.Decode(bytes.NewReader(block.Payload), &transactions)
if err != nil {
@@ -300,20 +303,32 @@ func (bc *BlockChain) RemoveConfirmedBlock(hash coreCommon.Hash) {
}
func (bc *BlockChain) GetConfirmedBlockByHash(hash coreCommon.Hash) *coreTypes.Block {
+ bc.confirmedBlocksMu.RLock()
+ defer bc.confirmedBlocksMu.RUnlock()
+
return bc.confirmedBlocks[hash].block
}
func (bc *BlockChain) GetLastNonceInConfirmedBlocks(address common.Address) (uint64, bool) {
+ bc.confirmedBlocksMu.RLock()
+ defer bc.confirmedBlocksMu.RUnlock()
+
nonce, exist := bc.addressNonce[address]
return nonce, exist
}
func (bc *BlockChain) GetCostInConfirmedBlocks(address common.Address) (*big.Int, bool) {
+ bc.confirmedBlocksMu.RLock()
+ defer bc.confirmedBlocksMu.RUnlock()
+
cost, exist := bc.addressCost[address]
return cost, exist
}
func (bc *BlockChain) GetChainLastConfirmedHeight(chainID uint32) uint64 {
+ bc.confirmedBlocksMu.RLock()
+ defer bc.confirmedBlocksMu.RUnlock()
+
return bc.chainLastHeight[chainID]
}
@@ -1493,20 +1508,13 @@ func (bc *BlockChain) insertSidechain(block *types.Block, it *insertIterator) (i
return 0, nil, nil, nil
}
-func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (int, error) {
+func (bc *BlockChain) ProcessPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, error) {
n, events, logs, err := bc.processPendingBlock(block, witness)
bc.PostChainEvents(events, logs)
return n, err
}
-func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes.Witness) (int, []interface{}, []*types.Log, error) {
- // Pre-checks passed, start the full block imports
- bc.wg.Add(1)
- defer bc.wg.Done()
-
- bc.chainmu.Lock()
- defer bc.chainmu.Unlock()
-
+func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes.Witness) (*common.Hash, []interface{}, []*types.Log, error) {
// A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
// acquiring.
@@ -1528,19 +1536,19 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
log.Debug("Premature abort during blocks processing")
- return 0, nil, nil, fmt.Errorf("interrupt")
+ return nil, nil, nil, fmt.Errorf("interrupt")
}
bstart := time.Now()
currentBlock := bc.CurrentBlock()
if witness.Height > currentBlock.NumberU64() && witness.Height != 0 {
if bc.pendingBlocks[witness.Height].block.Root() != witnessData.Root {
- return 0, nil, nil, fmt.Errorf("invalid witness root %s vs %s",
+ return nil, nil, nil, fmt.Errorf("invalid witness root %s vs %s",
bc.pendingBlocks[witness.Height].block.Root().String(), witnessData.Root.String())
}
if bc.pendingBlocks[witness.Height].block.ReceiptHash() != witnessData.ReceiptHash {
- return 0, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s",
+ return nil, nil, nil, fmt.Errorf("invalid witness receipt hash %s vs %s",
bc.pendingBlocks[witness.Height].block.ReceiptHash().String(), witnessData.ReceiptHash.String())
}
}
@@ -1552,7 +1560,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
if !exist {
parentBlock = currentBlock
if parentBlock.NumberU64() != block.NumberU64()-1 {
- return 0, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1)
+ return nil, nil, nil, fmt.Errorf("parent block %d not exist", block.NumberU64()-1)
}
} else {
parentBlock = parent.block
@@ -1560,7 +1568,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
block.RawHeader().ParentHash = parentBlock.Hash()
pendingState, err = state.New(parentBlock.Root(), bc.stateCache)
if err != nil {
- return 0, nil, nil, err
+ return nil, nil, nil, err
}
var (
@@ -1574,7 +1582,7 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
pendingState.Prepare(tx.Hash(), block.Hash(), i)
receipt, _, err := ApplyTransaction(bc.chainConfig, bc, nil, gp, pendingState, header, tx, usedGas, bc.vmConfig)
if err != nil {
- return i, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce())
+ return nil, nil, nil, fmt.Errorf("apply transaction error: %v %d", err, tx.Nonce())
}
receipts = append(receipts, receipt)
log.Debug("Apply transaction", "tx.hash", tx.Hash(), "nonce", tx.Nonce(), "amount", tx.Value())
@@ -1583,28 +1591,25 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
header.GasUsed = *usedGas
newPendingBlock, err := bc.engine.Finalize(bc, header, pendingState, block.Transactions(), block.Uncles(), receipts)
if err != nil {
- return 0, nil, nil, fmt.Errorf("finalize error: %v", err)
+ return nil, nil, nil, fmt.Errorf("finalize error: %v", err)
}
// Validate the state using the default validator
err = bc.Validator().ValidateState(block, nil, pendingState, receipts, *usedGas)
if err != nil {
bc.reportBlock(block, receipts, err)
- return 0, nil, nil, fmt.Errorf("valiadte state error: %v", err)
+ return nil, nil, nil, fmt.Errorf("valiadte state error: %v", err)
}
proctime := time.Since(bstart)
// commit state to refresh stateCache
_, err = pendingState.Commit(true)
if err != nil {
- return 0, nil, nil, fmt.Errorf("pendingState commit error: %v", err)
+ return nil, nil, nil, fmt.Errorf("pendingState commit error: %v", err)
}
// add into pending blocks
- bc.pendingBlocks[block.NumberU64()] = struct {
- block *types.Block
- receipts types.Receipts
- }{block: newPendingBlock, receipts: receipts}
+ bc.addPendingBlock(newPendingBlock, receipts)
// start insert available pending blocks into db
for pendingHeight := bc.CurrentBlock().NumberU64() + 1; pendingHeight <= witness.Height; pendingHeight++ {
@@ -1616,13 +1621,13 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
s, err := state.New(pendingIns.block.Root(), bc.stateCache)
if err != nil {
- return 0, events, coalescedLogs, err
+ return nil, events, coalescedLogs, err
}
// Write the block to the chain and get the status.
status, err := bc.WriteBlockWithState(pendingIns.block, pendingIns.receipts, s)
if err != nil {
- return 0, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err)
+ return nil, events, coalescedLogs, fmt.Errorf("WriteBlockWithState error: %v", err)
}
switch status {
@@ -1643,11 +1648,9 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
bc.gcproc += proctime
case SideStatTy:
- return 0, nil, nil, fmt.Errorf("insert pending block and fork found")
+ return nil, nil, nil, fmt.Errorf("insert pending block and fork found")
}
- bc.pendingBlockMu.Lock()
- delete(bc.pendingBlocks, pendingHeight)
- bc.pendingBlockMu.Unlock()
+ bc.removePendingBlock(pendingHeight)
stats.processed++
stats.usedGas += pendingIns.block.GasUsed()
@@ -1660,13 +1663,40 @@ func (bc *BlockChain) processPendingBlock(block *types.Block, witness *coreTypes
events = append(events, ChainHeadEvent{lastCanon})
}
- return 0, events, coalescedLogs, nil
+ root := newPendingBlock.Root()
+ return &root, events, coalescedLogs, nil
+}
+
+func (bc *BlockChain) removePendingBlock(height uint64) {
+ bc.pendingBlockMu.Lock()
+ defer bc.pendingBlockMu.Unlock()
+
+ delete(bc.pendingBlocks, height)
}
-func (bc *BlockChain) GetPendingBlockByHeight(height uint64) *types.Block {
+func (bc *BlockChain) addPendingBlock(block *types.Block, receipts types.Receipts) {
bc.pendingBlockMu.Lock()
defer bc.pendingBlockMu.Unlock()
- return bc.pendingBlocks[height].block
+
+ bc.pendingBlocks[block.NumberU64()] = struct {
+ block *types.Block
+ receipts types.Receipts
+ }{block: block, receipts: receipts}
+ bc.lastPendingHeight = block.NumberU64()
+}
+
+func (bc *BlockChain) GetLastPendingHeight() uint64 {
+ bc.pendingBlockMu.RLock()
+ defer bc.pendingBlockMu.RUnlock()
+
+ return bc.lastPendingHeight
+}
+
+func (bc *BlockChain) GetLastPendingBlock() *types.Block {
+ bc.pendingBlockMu.RLock()
+ defer bc.pendingBlockMu.RUnlock()
+
+ return bc.pendingBlocks[bc.lastPendingHeight].block
}
// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them