From 46100ae76f9fb81aaf267101c3801af1d7adcb88 Mon Sep 17 00:00:00 2001 From: Bojie Wu Date: Tue, 9 Oct 2018 13:28:45 +0800 Subject: app: implement new insert blocks logic --- dex/app.go | 213 +++++++++++++++++++------------------------------------------ 1 file changed, 66 insertions(+), 147 deletions(-) (limited to 'dex') diff --git a/dex/app.go b/dex/app.go index 85d7ea621..9b13c32b8 100644 --- a/dex/app.go +++ b/dex/app.go @@ -28,9 +28,7 @@ import ( coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/common/math" "github.com/dexon-foundation/dexon/core" - "github.com/dexon-foundation/dexon/core/rawdb" "github.com/dexon-foundation/dexon/core/state" "github.com/dexon-foundation/dexon/core/types" "github.com/dexon-foundation/dexon/core/vm" @@ -50,10 +48,13 @@ type DexconApp struct { notifyChan map[uint64]*notify mutex *sync.Mutex + + lastHeight uint64 + insertMu sync.Mutex } type notify struct { - results []chan bool + results []chan uint64 } type witnessData struct { @@ -75,10 +76,10 @@ func NewDexconApp(txPool *core.TxPool, blockchain *core.BlockChain, gov *DexconG } } -func (d *DexconApp) addNotify(height uint64) <-chan bool { +func (d *DexconApp) addNotify(height uint64) <-chan uint64 { d.mutex.Lock() defer d.mutex.Unlock() - result := make(chan bool) + result := make(chan uint64) if n, exist := d.notifyChan[height]; exist { n.results = append(n.results, result) } else { @@ -94,11 +95,12 @@ func (d *DexconApp) notify(height uint64) { for h, n := range d.notifyChan { if height >= h { for _, ch := range n.results { - ch <- true + ch <- height } delete(d.notifyChan, h) } } + d.lastHeight = height } func (d *DexconApp) checkChain(address common.Address, chainSize, chainID *big.Int) bool { @@ -108,52 +110,41 @@ func (d *DexconApp) checkChain(address common.Address, chainSize, chainID *big.I // PreparePayload is called when consensus core is preparing payload for block. func (d *DexconApp) PreparePayload(position coreTypes.Position) (payload []byte, err error) { + d.insertMu.Lock() + defer d.insertMu.Unlock() txsMap, err := d.txPool.Pending() if err != nil { return } - currentBlock := d.blockchain.CurrentBlock() - gasLimit := core.CalcGasLimit(currentBlock, d.config.GasFloor, d.config.GasCeil) - gp := new(core.GasPool).AddGas(gasLimit) - stateDB, err := state.New(currentBlock.Root(), state.NewDatabase(d.chainDB)) - if err != nil { - return - } - chainID := new(big.Int).SetUint64(uint64(position.ChainID)) - chainSize := new(big.Int).SetUint64(uint64(d.gov.Configuration(position.Round).NumChains)) + chainSize := new(big.Int).SetUint64(uint64(d.gov.GetNumChains(position.Round))) var allTxs types.Transactions - var totalGasUsed uint64 for addr, txs := range txsMap { // every address's transactions will appear in fixed chain if !d.checkChain(addr, chainSize, chainID) { continue } - undeliveredTxs, err := d.blockchain.GetConfirmedTxsByAddress(position.ChainID, addr) - if err != nil { - return nil, err - } - - for _, tx := range undeliveredTxs { - // confirmed txs must apply successfully - var gasUsed uint64 - gp := new(core.GasPool).AddGas(math.MaxUint64) - _, _, err = core.ApplyTransaction(d.blockchain.Config(), d.blockchain, nil, gp, stateDB, currentBlock.Header(), tx, &gasUsed, d.vmConfig) + var stateDB *state.StateDB + if d.lastHeight > 0 { + stateDB, err = d.blockchain.StateAt(d.blockchain.GetPendingBlockByHeight(d.lastHeight).Root()) if err != nil { - log.Error("apply confirmed transaction", "error", err) - return nil, err + return nil, fmt.Errorf("PreparePayload d.blockchain.StateAt err %v", err) + } + } else { + stateDB, err = d.blockchain.State() + if err != nil { + return nil, fmt.Errorf("PreparePayload d.blockchain.State err %v", err) } } for _, tx := range txs { - // apply transaction to calculate total gas used, validate nonce and check available balance - _, _, err = core.ApplyTransaction(d.blockchain.Config(), d.blockchain, nil, gp, stateDB, currentBlock.Header(), tx, &totalGasUsed, d.vmConfig) - if err != nil || totalGasUsed > gasLimit { - log.Debug("apply transaction fail", "error", err, "totalGasUsed", totalGasUsed, "gasLimit", gasLimit) + if tx.Nonce() != stateDB.GetNonce(addr) { + log.Debug("break transaction", "tx.hash", tx.Hash(), "nonce", tx.Nonce(), "expect", stateDB.GetNonce(addr)) break } + log.Debug("receive transaction", "tx.hash", tx.Hash(), "nonce", tx.Nonce(), "amount", tx.Value()) allTxs = append(allTxs, tx) } } @@ -167,134 +158,48 @@ func (d *DexconApp) PreparePayload(position coreTypes.Position) (payload []byte, // PrepareWitness will return the witness data no lower than consensusHeight. func (d *DexconApp) PrepareWitness(consensusHeight uint64) (witness coreTypes.Witness, err error) { - var currentBlock *types.Block - currentBlock = d.blockchain.CurrentBlock() - if currentBlock.NumberU64() < consensusHeight { - // wait notification - if <-d.addNotify(consensusHeight) { - currentBlock = d.blockchain.CurrentBlock() - } else { - err = fmt.Errorf("fail to wait notification") - return - } + // TODO(bojie): the witness logic need to correct + var witnessBlock *types.Block + if d.lastHeight == 0 && consensusHeight == 0 { + witnessBlock = d.blockchain.CurrentBlock() + } else if d.lastHeight >= consensusHeight { + witnessBlock = d.blockchain.GetPendingBlockByHeight(d.lastHeight) + } else if h := <-d.addNotify(consensusHeight); h >= consensusHeight { + witnessBlock = d.blockchain.GetPendingBlockByHeight(h) + } else { + log.Error("need pending block") + return witness, fmt.Errorf("need pending block") } witnessData, err := rlp.EncodeToBytes(&witnessData{ - Root: currentBlock.Root(), - TxHash: currentBlock.TxHash(), - ReceiptHash: currentBlock.ReceiptHash(), + Root: witnessBlock.Root(), + TxHash: witnessBlock.TxHash(), + ReceiptHash: witnessBlock.ReceiptHash(), }) if err != nil { return } return coreTypes.Witness{ - Timestamp: time.Unix(currentBlock.Time().Int64(), 0), - Height: currentBlock.NumberU64(), + Timestamp: time.Unix(witnessBlock.Time().Int64(), 0), + Height: witnessBlock.NumberU64(), Data: witnessData, }, nil } // VerifyBlock verifies if the payloads are valid. func (d *DexconApp) VerifyBlock(block *coreTypes.Block) bool { - // decode payload to transactions - var transactions types.Transactions - err := rlp.Decode(bytes.NewReader(block.Payload), &transactions) - if err != nil { - return false - } - - currentBlock := d.blockchain.CurrentBlock() - chainID := new(big.Int).SetUint64(uint64(block.Position.ChainID)) - chainSize := new(big.Int).SetUint64(uint64(d.gov.Configuration(block.Position.Round).NumChains)) - stateDB, err := state.New(currentBlock.Root(), state.NewDatabase(d.chainDB)) - if err != nil { - return false - } - - // verify transactions - addressMap := map[common.Address]interface{}{} - gasLimit := core.CalcGasLimit(currentBlock, d.config.GasFloor, d.config.GasCeil) - gp := new(core.GasPool).AddGas(gasLimit) - var totalGasUsed uint64 - for _, transaction := range transactions { - msg, err := transaction.AsMessage(types.MakeSigner(d.blockchain.Config(), currentBlock.Header().Number)) - if err != nil { - return false - } - - if !d.checkChain(msg.From(), chainSize, chainID) { - log.Error("check chain fail", "from", msg.From().String(), "chainSize", chainSize, "chainID", chainID) - return false - } - - _, exist := addressMap[msg.From()] - if !exist { - txs, err := d.blockchain.GetConfirmedTxsByAddress(block.Position.ChainID, msg.From()) - if err != nil { - log.Error("get confirmed txs by address", "error", err) - return false - } - - gp := new(core.GasPool).AddGas(math.MaxUint64) - for _, tx := range txs { - var gasUsed uint64 - // confirmed txs must apply successfully - _, _, err := core.ApplyTransaction(d.blockchain.Config(), d.blockchain, nil, gp, stateDB, currentBlock.Header(), tx, &gasUsed, d.vmConfig) - if err != nil { - log.Error("apply confirmed transaction", "error", err) - return false - } - } - addressMap[msg.From()] = nil - } - - _, _, err = core.ApplyTransaction(d.blockchain.Config(), d.blockchain, nil, gp, stateDB, currentBlock.Header(), transaction, &totalGasUsed, d.vmConfig) - if err != nil { - log.Error("apply block transaction", "error", err) - return false - } - } - - if totalGasUsed > gasLimit+d.config.GasLimitTolerance { - return false - } - - witnessData := witnessData{} - err = rlp.Decode(bytes.NewReader(block.Witness.Data), &witnessData) - if err != nil { - return false - } - - witnessBlock := d.blockchain.GetBlockByNumber(block.Witness.Height) - if witnessBlock == nil { - return false - } else if witnessBlock.Root() != witnessData.Root { - // invalid state root of witness data - return false - } else if witnessBlock.ReceiptHash() != witnessData.ReceiptHash { - // invalid receipt root of witness data - return false - } else if witnessBlock.TxHash() != witnessData.TxHash { - // invalid tx root of witness data - return false - } - - for _, transaction := range witnessBlock.Transactions() { - tx, _, _, _ := rawdb.ReadTransaction(d.chainDB, transaction.Hash()) - if tx == nil { - return false - } - } - + // TODO(bojie): implement this return true } // BlockDelivered is called when a block is add to the compaction chain. func (d *DexconApp) BlockDelivered(blockHash coreCommon.Hash, result coreTypes.FinalizationResult) { + d.insertMu.Lock() + defer d.insertMu.Unlock() + block := d.blockchain.GetConfirmedBlockByHash(blockHash) if block == nil { - // do something log.Error("can not get confirmed block") return } @@ -306,20 +211,34 @@ func (d *DexconApp) BlockDelivered(blockHash coreCommon.Hash, result coreTypes.F return } - _, err = d.blockchain.InsertChain( - []*types.Block{types.NewBlock(&types.Header{ - ParentHash: d.blockchain.CurrentBlock().Hash(), - Number: new(big.Int).SetUint64(result.Height), - Time: big.NewInt(result.Timestamp.Unix()), - TxHash: types.DeriveSha(transactions), - Coinbase: common.BytesToAddress(block.ProposerID.Hash[:]), - GasLimit: 800000, - }, transactions, nil, nil)}) + var witnessData witnessData + err = rlp.Decode(bytes.NewReader(block.Witness.Data), &witnessData) + if err != nil { + log.Error("witness rlp decode", "error", err) + return + } + + log.Debug("block proposer id", "hash", block.ProposerID) + newBlock := types.NewBlock(&types.Header{ + Number: new(big.Int).SetUint64(result.Height), + Time: big.NewInt(result.Timestamp.Unix()), + Coinbase: common.BytesToAddress(block.ProposerID.Bytes()), + WitnessHeight: block.Witness.Height, + WitnessRoot: witnessData.Root, + WitnessReceiptHash: witnessData.ReceiptHash, + // TODO(bojie): fix it + GasLimit: 8000000, + Difficulty: big.NewInt(1), + }, transactions, nil, nil) + + _, err = d.blockchain.InsertPendingBlock([]*types.Block{newBlock}) if err != nil { log.Error("insert chain", "error", err) return } + log.Debug("insert pending block success", "height", result.Height) + d.blockchain.RemoveConfirmedBlock(blockHash) d.notify(result.Height) } -- cgit