diff options
author | Wei-Ning Huang <w@dexon.org> | 2018-11-21 16:14:51 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-03-12 12:19:09 +0800 |
commit | 589393f8c64d94c268cd057715f8f8c247321aa5 (patch) | |
tree | bb2a0c96309d40d350a90e4b0f57bb84c9b84e2b | |
parent | 38e42c16675bee26253886c28e41101c10d66496 (diff) | |
download | dexon-589393f8c64d94c268cd057715f8f8c247321aa5.tar.gz dexon-589393f8c64d94c268cd057715f8f8c247321aa5.tar.zst dexon-589393f8c64d94c268cd057715f8f8c247321aa5.zip |
core: add global signature cache and improve concurrency (#42)
From the go trace result, the bottleneck hides in the lock of
StoreTxCache. To improve this, we update the cache in a batched fassion.
-rw-r--r-- | core/blockchain.go | 2 | ||||
-rw-r--r-- | core/tx_pool.go | 2 | ||||
-rw-r--r-- | core/types/transaction.go | 36 | ||||
-rw-r--r-- | core/types/transaction_signing.go | 99 | ||||
-rw-r--r-- | dex/app.go | 2 | ||||
-rw-r--r-- | dex/handler.go | 2 |
6 files changed, 88 insertions, 55 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index a0fa146f7..1c3de3c4f 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -268,7 +268,7 @@ func (bc *BlockChain) AddConfirmedBlock(block *coreTypes.Block) error { if err != nil { return err } - _, err = transactions.TouchSenders(types.MakeSigner(bc.Config(), new(big.Int))) + _, err = types.GlobalSigCache.Add(types.NewEIP155Signer(bc.Config().ChainID), transactions) if err != nil { return err } diff --git a/core/tx_pool.go b/core/tx_pool.go index fc36d50bf..0a87be25b 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -966,7 +966,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { } } - types.DeleteTxCacheByHash(hash) + types.GlobalSigCache.Prune([]common.Hash{hash}) } // promoteExecutables moves transactions that have become processable from the diff --git a/core/types/transaction.go b/core/types/transaction.go index 5e11c0dbc..857ac2137 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -21,8 +21,6 @@ import ( "errors" "io" "math/big" - "runtime" - "sync" "sync/atomic" "github.com/dexon-foundation/dexon/common" @@ -273,40 +271,6 @@ func (s Transactions) GetRlp(i int) []byte { return enc } -// TouchSenders calculates the sender of each transaction and update the cache. -func (s Transactions) TouchSenders(signer Signer) (errorTx *Transaction, err error) { - num := runtime.NumCPU() - batchSize := len(s) / num - wg := sync.WaitGroup{} - wg.Add(num) - txError := make(chan error, 1) - for i := 0; i < num; i++ { - go func(txs Transactions) { - defer wg.Done() - for _, tx := range txs { - if len(txError) > 0 { - return - } - _, err := Sender(signer, tx) - if err != nil { - select { - case txError <- err: - errorTx = tx - default: - } - return - } - } - }(s[i*batchSize : (i+1)*batchSize]) - } - wg.Wait() - select { - case err = <-txError: - default: - } - return -} - // TxDifference returns a new set which is the difference between a and b. func TxDifference(a, b Transactions) Transactions { keep := make(Transactions, 0, len(a)) diff --git a/core/types/transaction_signing.go b/core/types/transaction_signing.go index a6c5c1f16..47c7a2f91 100644 --- a/core/types/transaction_signing.go +++ b/core/types/transaction_signing.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math/big" + "runtime" "sync" "github.com/dexon-foundation/dexon/common" @@ -32,25 +33,95 @@ var ( ErrInvalidChainId = errors.New("invalid chain id for signer") ) -var ( - txCache = &sync.Map{} -) +var GlobalSigCache *globalSigCache -func DeleteTxCacheByHash(hash common.Hash) { - txCache.Delete(hash) +func init() { + GlobalSigCache = newGlobalSigCache() } -func StoreTxCache(key common.Hash, value common.Address) { - txCache.Store(key, value) +// globalSigCache stores the mapping between txHash and sender address. +// Since ECRecover is slow, and we run ECRecover very frequently (in +// app.VerifyBlock, app.ConfirmedBlock), so we need to cache it globally. +type globalSigCache struct { + cache map[common.Hash]common.Address + cacheMu sync.RWMutex } -func LoadTxCache(key common.Hash) (common.Address, bool) { - addr, ok := txCache.Load(key) - if !ok { - return common.Address{}, ok +func newGlobalSigCache() *globalSigCache { + return &globalSigCache{ + cache: make(map[common.Hash]common.Address), } +} + +type resultEntry struct { + Hash common.Hash + Addr common.Address +} - return addr.(common.Address), ok +// Add adds a list of transactions into sig cache. +func (c *globalSigCache) Add(signer Signer, txs Transactions) (errorTx *Transaction, err error) { + num := runtime.NumCPU() + batchSize := len(txs) / num + wg := sync.WaitGroup{} + wg.Add(num) + txError := make(chan error, 1) + + for i := 0; i < num; i++ { + go func(txs Transactions) { + defer wg.Done() + results := make([]resultEntry, len(txs)) + for i, tx := range txs { + if len(txError) > 0 { + return + } + addr, err := Sender(signer, tx) + if err != nil { + select { + case txError <- err: + errorTx = tx + default: + } + return + } + results[i] = resultEntry{ + Hash: tx.Hash(), + Addr: addr, + } + } + // Acquire lock and set cache. + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + for _, r := range results { + c.cache[r.Hash] = r.Addr + } + }(txs[i*batchSize : (i+1)*batchSize]) + } + wg.Wait() + + select { + case err = <-txError: + default: + } + return +} + +// Prune removes a list of hashes of tx from the cache. +func (c *globalSigCache) Prune(hashes []common.Hash) { + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + + for _, hash := range hashes { + delete(c.cache, hash) + } +} + +// Get returns a single address given a tx hash. +func (c *globalSigCache) Get(hash common.Hash) (common.Address, bool) { + c.cacheMu.RLock() + defer c.cacheMu.RUnlock() + + res, ok := c.cache[hash] + return res, ok } // sigCache is used to cache the derived sender and contains @@ -147,7 +218,7 @@ func (s EIP155Signer) Equal(s2 Signer) bool { var big8 = big.NewInt(8) func (s EIP155Signer) Sender(tx *Transaction) (common.Address, error) { - addr, ok := LoadTxCache(tx.Hash()) + addr, ok := GlobalSigCache.Get(tx.Hash()) if ok { return addr, nil } @@ -165,8 +236,6 @@ func (s EIP155Signer) Sender(tx *Transaction) (common.Address, error) { if err != nil { return common.Address{}, err } - - StoreTxCache(tx.Hash(), addr) return addr, nil } diff --git a/dex/app.go b/dex/app.go index 99514ed83..df76b2b7d 100644 --- a/dex/app.go +++ b/dex/app.go @@ -327,7 +327,7 @@ func (d *DexconApp) VerifyBlock(block *coreTypes.Block) coreTypes.BlockVerifySta return coreTypes.VerifyInvalidBlock } - _, err = transactions.TouchSenders(types.MakeSigner(d.blockchain.Config(), new(big.Int))) + _, err = types.GlobalSigCache.Add(types.NewEIP155Signer(d.blockchain.Config().ChainID), transactions) if err != nil { log.Error("Failed to calculate sender", "error", err) return coreTypes.VerifyInvalidBlock diff --git a/dex/handler.go b/dex/handler.go index a74c78e3b..a1a158560 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -724,7 +724,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } p.MarkTransaction(tx.Hash()) } - types.Transactions(txs).TouchSenders(types.MakeSigner(pm.blockchain.Config(), new(big.Int))) + types.GlobalSigCache.Add(types.NewEIP155Signer(pm.blockchain.Config().ChainID), txs) pm.txpool.AddRemotes(txs) case msg.Code == MetaMsg: |