From 85a4531d7398e0b1bee70ca8c2e0651483f9fc14 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Tue, 4 Dec 2018 14:54:28 +0800 Subject: core, dex: Timeout for prepare payload (#72) * Timeout on prepare payload * Leave 2 CPU for others * Add hardLimit and softLimit to PreparePayload --- core/blockchain.go | 16 +++++++----- core/types/transaction_signing.go | 5 +++- dex/app.go | 55 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 8 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 470f7ae28..1b3b22e5e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -270,13 +270,15 @@ func (bc *BlockChain) AddConfirmedBlock(block *coreTypes.Block) error { bc.confirmedBlockInitMu.Unlock() var transactions types.Transactions - err := rlp.Decode(bytes.NewReader(block.Payload), &transactions) - if err != nil { - return err - } - _, err = types.GlobalSigCache.Add(types.NewEIP155Signer(bc.Config().ChainID), transactions) - if err != nil { - return err + if len(block.Payload) != 0 { + err := rlp.Decode(bytes.NewReader(block.Payload), &transactions) + if err != nil { + return err + } + _, err = types.GlobalSigCache.Add(types.NewEIP155Signer(bc.Config().ChainID), transactions) + if err != nil { + return err + } } addressMap := map[common.Address]struct{}{} diff --git a/core/types/transaction_signing.go b/core/types/transaction_signing.go index 99e0c7896..1a13e3e3d 100644 --- a/core/types/transaction_signing.go +++ b/core/types/transaction_signing.go @@ -60,7 +60,10 @@ type resultEntry struct { // Add adds a list of transactions into sig cache. func (c *globalSigCache) Add(signer Signer, txs Transactions) (errorTx *Transaction, err error) { - num := runtime.NumCPU() + num := runtime.NumCPU() - 2 + if num < 1 { + num = 1 + } batchSize := len(txs) / num wg := sync.WaitGroup{} wg.Add(num) diff --git a/dex/app.go b/dex/app.go index cf36ad816..eae9ba0b3 100644 --- a/dex/app.go +++ b/dex/app.go @@ -18,6 +18,7 @@ package dex import ( + "context" "fmt" "math/big" "sync" @@ -129,8 +130,54 @@ func (d *DexconApp) validateNonce(txs types.Transactions) (map[common.Address]ui // PreparePayload is called when consensus core is preparing payload for block. func (d *DexconApp) PreparePayload(position coreTypes.Position) (payload []byte, err error) { + // softLimit limits the runtime of inner call to preparePayload. + // hardLimit limits the runtime of outer PreparePayload. + // If hardLimit is hit, it is possible that no payload is prepared. + softLimit := 100 * time.Millisecond + hardLimit := 150 * time.Millisecond + ctx, cancel := context.WithTimeout(context.Background(), hardLimit) + defer cancel() + payloadCh := make(chan []byte, 1) + errCh := make(chan error, 1) + doneCh := make(chan struct{}, 1) + go func() { + ctx, cancel := context.WithTimeout(context.Background(), softLimit) + defer cancel() + payload, err := d.preparePayload(ctx, position) + if err != nil { + errCh <- err + } + payloadCh <- payload + doneCh <- struct{}{} + }() + select { + case <-ctx.Done(): + case <-doneCh: + } + select { + case err = <-errCh: + if err != nil { + return + } + default: + } + select { + case payload = <-payloadCh: + default: + } + return +} + +func (d *DexconApp) preparePayload(ctx context.Context, position coreTypes.Position) ( + payload []byte, err error) { d.chainRLock(position.ChainID) defer d.chainRUnlock(position.ChainID) + select { + // This case will hit if previous RLock took too much time. + case <-ctx.Done(): + return + default: + } if position.Height != 0 { // Check if chain block height is strictly increamental. @@ -171,6 +218,11 @@ func (d *DexconApp) PreparePayload(position coreTypes.Position) (payload []byte, addressMap: for address, txs := range txsMap { + select { + case <-ctx.Done(): + break addressMap + default: + } // TX hash need to be slot to the given chain in order to be included in the block. if !d.addrBelongsToChain(address, chainNums, chainID) { continue @@ -332,6 +384,9 @@ func (d *DexconApp) VerifyBlock(block *coreTypes.Block) coreTypes.BlockVerifySta } var transactions types.Transactions + if len(block.Payload) == 0 { + return coreTypes.VerifyOK + } err = rlp.DecodeBytes(block.Payload, &transactions) if err != nil { log.Error("Payload rlp decode", "error", err) -- cgit