From 7c1386d928acd050684f435e49149696bcc0a637 Mon Sep 17 00:00:00 2001 From: Bojie Wu Date: Tue, 9 Oct 2018 13:28:45 +0800 Subject: app: add cache mechanism to increase performance --- dex/app.go | 166 +++++++++++++++++++++++-------------------------------------- 1 file changed, 63 insertions(+), 103 deletions(-) (limited to 'dex/app.go') diff --git a/dex/app.go b/dex/app.go index 867c10361..7730ba22f 100644 --- a/dex/app.go +++ b/dex/app.go @@ -47,12 +47,13 @@ type DexconApp struct { vmConfig vm.Config notifyChan map[uint64]*notify - mutex *sync.Mutex + notifyMu *sync.Mutex lastPendingHeight uint64 insertMu sync.Mutex - chainHeight map[uint32]uint64 + chainLocksInitMu *sync.Mutex + chainLocks map[uint32]*sync.Mutex } type notify struct { @@ -67,21 +68,22 @@ type witnessData struct { func NewDexconApp(txPool *core.TxPool, blockchain *core.BlockChain, gov *DexconGovernance, chainDB ethdb.Database, config *Config, vmConfig vm.Config) *DexconApp { return &DexconApp{ - txPool: txPool, - blockchain: blockchain, - gov: gov, - chainDB: chainDB, - config: config, - vmConfig: vmConfig, - notifyChan: make(map[uint64]*notify), - mutex: &sync.Mutex{}, - chainHeight: make(map[uint32]uint64), + txPool: txPool, + blockchain: blockchain, + gov: gov, + chainDB: chainDB, + config: config, + vmConfig: vmConfig, + notifyChan: make(map[uint64]*notify), + notifyMu: &sync.Mutex{}, + chainLocksInitMu: &sync.Mutex{}, + chainLocks: make(map[uint32]*sync.Mutex), } } func (d *DexconApp) addNotify(height uint64) <-chan uint64 { - d.mutex.Lock() - defer d.mutex.Unlock() + d.notifyMu.Lock() + defer d.notifyMu.Unlock() result := make(chan uint64) if n, exist := d.notifyChan[height]; exist { n.results = append(n.results, result) @@ -93,8 +95,8 @@ func (d *DexconApp) addNotify(height uint64) <-chan uint64 { } func (d *DexconApp) notify(height uint64) { - d.mutex.Lock() - defer d.mutex.Unlock() + d.notifyMu.Lock() + defer d.notifyMu.Unlock() for h, n := range d.notifyChan { if height >= h { for _, ch := range n.results { @@ -113,21 +115,12 @@ func (d *DexconApp) checkChain(address common.Address, chainSize, chainID *big.I // PreparePayload is called when consensus core is preparing payload for block. func (d *DexconApp) PreparePayload(position coreTypes.Position) (payload []byte, err error) { - d.insertMu.Lock() - defer d.insertMu.Unlock() + d.chainLock(position.ChainID) + defer d.chainUnlock(position.ChainID) if position.Height != 0 { - chainLastHeight, empty := d.blockchain.GetChainLastConfirmedHeight(position.ChainID) - if empty { - var exist bool - chainLastHeight, exist = d.chainHeight[position.ChainID] - if !exist { - log.Error("Something wrong") - return nil, fmt.Errorf("something wrong") - } - } - // check if chain block height is sequential + chainLastHeight := d.blockchain.GetChainLastConfirmedHeight(position.ChainID) if chainLastHeight != position.Height-1 { log.Error("Check confirmed block height fail", "chain", position.ChainID, "height", position.Height-1, "cache height", chainLastHeight) return nil, fmt.Errorf("check confirmed block height fail") @@ -170,11 +163,8 @@ addressMap: var expectNonce uint64 // get last nonce from confirmed blocks - lastConfirmedNonce, empty, err := d.blockchain.GetLastNonceFromConfirmedBlocks(position.ChainID, address) - if err != nil { - log.Error("Get last nonce from confirmed blocks", "error", err) - return nil, fmt.Errorf("get last nonce from confirmed blocks error: %v", err) - } else if empty { + lastConfirmedNonce, exist := d.blockchain.GetLastNonceInConfirmedBlocks(address) + if !exist { // get expect nonce from latest state when confirmed block is empty expectNonce = latestState.GetNonce(address) } else { @@ -182,20 +172,14 @@ addressMap: } if expectNonce != txs[0].Nonce() { - log.Warn("Nonce check error", "expect", expectNonce, "nonce", txs[0].Nonce()) + log.Debug("Nonce check error", "expect", expectNonce, "nonce", txs[0].Nonce()) continue } balance := latestState.GetBalance(address) - confirmedTxs, err := d.blockchain.GetConfirmedTxsByAddress(position.ChainID, address) - if err != nil { - return nil, fmt.Errorf("get confirmed txs error: %v", err) - } - - for _, tx := range confirmedTxs { - maxGasUsed := new(big.Int).Mul(new(big.Int).SetUint64(tx.Gas()), tx.GasPrice()) - balance = new(big.Int).Sub(balance, maxGasUsed) - balance = new(big.Int).Sub(balance, tx.Value()) + cost, exist := d.blockchain.GetCostInConfirmedBlocks(address) + if exist { + balance = new(big.Int).Sub(balance, cost) } for _, tx := range txs { @@ -210,8 +194,7 @@ addressMap: break } - balance = new(big.Int).Sub(balance, maxGasUsed) - balance = new(big.Int).Sub(balance, tx.Value()) + balance = new(big.Int).Sub(balance, tx.Cost()) if balance.Cmp(big.NewInt(0)) < 0 { log.Error("Tx fail", "reason", "not enough balance") break @@ -275,15 +258,13 @@ func (d *DexconApp) VerifyBlock(block *coreTypes.Block) coreTypes.BlockVerifySta return coreTypes.VerifyInvalidBlock } - log.Info("Ready to verify witness block", "height", block.Witness.Height) - - for i := 0; i < 3 && err != nil; i++ { + for i := 0; i < 6 && err != nil; i++ { // check witness root exist err = nil _, err = d.blockchain.StateAt(witnessData.Root) if err != nil { - log.Warn("Sleep 2 seconds and try again", "error", err) - time.Sleep(2 * time.Second) + log.Debug("Sleep 0.5 seconds and try again", "error", err) + time.Sleep(500 * time.Millisecond) } } if err != nil { @@ -291,23 +272,14 @@ func (d *DexconApp) VerifyBlock(block *coreTypes.Block) coreTypes.BlockVerifySta return coreTypes.VerifyRetryLater } - d.insertMu.Lock() - defer d.insertMu.Unlock() + d.chainLock(block.Position.ChainID) + defer d.chainUnlock(block.Position.ChainID) if block.Position.Height != 0 { - chainLastHeight, empty := d.blockchain.GetChainLastConfirmedHeight(block.Position.ChainID) - if empty { - var exist bool - chainLastHeight, exist = d.chainHeight[block.Position.ChainID] - if !exist { - log.Error("Something wrong") - return coreTypes.VerifyInvalidBlock - } - } - // check if chain block height is sequential + chainLastHeight := d.blockchain.GetChainLastConfirmedHeight(block.Position.ChainID) if chainLastHeight != block.Position.Height-1 { - log.Error("Check confirmed block height fail", "chain", block.Position.ChainID, "height", block.Position.Height-1) + log.Error("Check confirmed block height fail", "chain", block.Position.ChainID, "height", block.Position.Height-1, "cache height", chainLastHeight) return coreTypes.VerifyRetryLater } } @@ -353,11 +325,8 @@ func (d *DexconApp) VerifyBlock(block *coreTypes.Block) coreTypes.BlockVerifySta var expectNonce uint64 // get last nonce from confirmed blocks - lastConfirmedNonce, empty, err := d.blockchain.GetLastNonceFromConfirmedBlocks(block.Position.ChainID, address) - if err != nil { - log.Error("Get last nonce from confirmed blocks", "error", err) - return coreTypes.VerifyInvalidBlock - } else if empty { + lastConfirmedNonce, exist := d.blockchain.GetLastNonceInConfirmedBlocks(address) + if !exist { // get expect nonce from latest state when confirmed block is empty expectNonce = latestState.GetNonce(address) } else { @@ -373,37 +342,12 @@ func (d *DexconApp) VerifyBlock(block *coreTypes.Block) coreTypes.BlockVerifySta // get balance from state addressesBalance := map[common.Address]*big.Int{} for address := range addresses { - addressesBalance[address] = latestState.GetBalance(address) - } - - // replay confirmed block tx to correct balance - confirmedBlocks := d.blockchain.GetConfirmedBlocksByChainID(block.Position.ChainID) - for _, block := range confirmedBlocks { - var txs types.Transactions - err := rlp.Decode(bytes.NewReader(block.Payload), &txs) - if err != nil { - log.Error("Decode confirmed block", "error", err) - return coreTypes.VerifyInvalidBlock - } - - for _, tx := range txs { - msg, err := tx.AsMessage(types.MakeSigner(d.blockchain.Config(), new(big.Int))) - if err != nil { - log.Error("Tx to message", "error", err) - return coreTypes.VerifyInvalidBlock - } - - balance, exist := addressesBalance[msg.From()] - if exist { - maxGasUsed := new(big.Int).Mul(new(big.Int).SetUint64(msg.Gas()), msg.GasPrice()) - balance = new(big.Int).Sub(balance, maxGasUsed) - balance = new(big.Int).Sub(balance, msg.Value()) - if balance.Cmp(big.NewInt(0)) <= 0 { - log.Error("Replay confirmed tx fail", "reason", "not enough balance") - return coreTypes.VerifyInvalidBlock - } - addressesBalance[msg.From()] = balance - } + // replay confirmed block tx to correct balance + cost, exist := d.blockchain.GetCostInConfirmedBlocks(address) + if exist { + addressesBalance[address] = new(big.Int).Sub(latestState.GetBalance(address), cost) + } else { + addressesBalance[address] = latestState.GetBalance(address) } } @@ -428,8 +372,7 @@ func (d *DexconApp) VerifyBlock(block *coreTypes.Block) coreTypes.BlockVerifySta return coreTypes.VerifyInvalidBlock } - balance = new(big.Int).Sub(balance, maxGasUsed) - balance = new(big.Int).Sub(balance, msg.Value()) + balance = new(big.Int).Sub(balance, tx.Cost()) if balance.Cmp(big.NewInt(0)) < 0 { log.Error("Tx fail", "reason", "not enough balance") return coreTypes.VerifyInvalidBlock @@ -457,6 +400,9 @@ func (d *DexconApp) BlockDelivered(blockHash coreCommon.Hash, result coreTypes.F panic("Can not get confirmed block") } + d.chainLock(block.Position.ChainID) + defer d.chainUnlock(block.Position.ChainID) + var transactions types.Transactions err := rlp.Decode(bytes.NewReader(block.Payload), &transactions) if err != nil { @@ -489,15 +435,14 @@ func (d *DexconApp) BlockDelivered(blockHash coreCommon.Hash, result coreTypes.F } log.Info("Insert pending block success", "height", result.Height) - d.chainHeight[block.Position.ChainID] = block.Position.Height d.blockchain.RemoveConfirmedBlock(blockHash) d.notify(result.Height) } // BlockConfirmed is called when a block is confirmed and added to lattice. func (d *DexconApp) BlockConfirmed(block coreTypes.Block) { - d.insertMu.Lock() - defer d.insertMu.Unlock() + d.chainLock(block.Position.ChainID) + defer d.chainUnlock(block.Position.ChainID) d.blockchain.AddConfirmedBlock(&block) } @@ -525,3 +470,18 @@ func (d *DexconApp) validateNonce(txs types.Transactions) (map[common.Address]ui return addressFirstNonce, nil } + +func (d *DexconApp) chainLock(chainID uint32) { + d.chainLocksInitMu.Lock() + _, exist := d.chainLocks[chainID] + if !exist { + d.chainLocks[chainID] = &sync.Mutex{} + } + d.chainLocksInitMu.Unlock() + + d.chainLocks[chainID].Lock() +} + +func (d *DexconApp) chainUnlock(chainID uint32) { + d.chainLocks[chainID].Unlock() +} -- cgit