aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorWei-Ning Huang <w@dexon.org>2018-11-21 16:14:51 +0800
committerWei-Ning Huang <w@dexon.org>2019-03-12 12:19:09 +0800
commit589393f8c64d94c268cd057715f8f8c247321aa5 (patch)
treebb2a0c96309d40d350a90e4b0f57bb84c9b84e2b
parent38e42c16675bee26253886c28e41101c10d66496 (diff)
downloaddexon-589393f8c64d94c268cd057715f8f8c247321aa5.tar.gz
dexon-589393f8c64d94c268cd057715f8f8c247321aa5.tar.zst
dexon-589393f8c64d94c268cd057715f8f8c247321aa5.zip
core: add global signature cache and improve concurrency (#42)
From the go trace result, the bottleneck hides in the lock of StoreTxCache. To improve this, we update the cache in a batched fassion.
-rw-r--r--core/blockchain.go2
-rw-r--r--core/tx_pool.go2
-rw-r--r--core/types/transaction.go36
-rw-r--r--core/types/transaction_signing.go99
-rw-r--r--dex/app.go2
-rw-r--r--dex/handler.go2
6 files changed, 88 insertions, 55 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index a0fa146f7..1c3de3c4f 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -268,7 +268,7 @@ func (bc *BlockChain) AddConfirmedBlock(block *coreTypes.Block) error {
if err != nil {
return err
}
- _, err = transactions.TouchSenders(types.MakeSigner(bc.Config(), new(big.Int)))
+ _, err = types.GlobalSigCache.Add(types.NewEIP155Signer(bc.Config().ChainID), transactions)
if err != nil {
return err
}
diff --git a/core/tx_pool.go b/core/tx_pool.go
index fc36d50bf..0a87be25b 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -966,7 +966,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
}
}
- types.DeleteTxCacheByHash(hash)
+ types.GlobalSigCache.Prune([]common.Hash{hash})
}
// promoteExecutables moves transactions that have become processable from the
diff --git a/core/types/transaction.go b/core/types/transaction.go
index 5e11c0dbc..857ac2137 100644
--- a/core/types/transaction.go
+++ b/core/types/transaction.go
@@ -21,8 +21,6 @@ import (
"errors"
"io"
"math/big"
- "runtime"
- "sync"
"sync/atomic"
"github.com/dexon-foundation/dexon/common"
@@ -273,40 +271,6 @@ func (s Transactions) GetRlp(i int) []byte {
return enc
}
-// TouchSenders calculates the sender of each transaction and update the cache.
-func (s Transactions) TouchSenders(signer Signer) (errorTx *Transaction, err error) {
- num := runtime.NumCPU()
- batchSize := len(s) / num
- wg := sync.WaitGroup{}
- wg.Add(num)
- txError := make(chan error, 1)
- for i := 0; i < num; i++ {
- go func(txs Transactions) {
- defer wg.Done()
- for _, tx := range txs {
- if len(txError) > 0 {
- return
- }
- _, err := Sender(signer, tx)
- if err != nil {
- select {
- case txError <- err:
- errorTx = tx
- default:
- }
- return
- }
- }
- }(s[i*batchSize : (i+1)*batchSize])
- }
- wg.Wait()
- select {
- case err = <-txError:
- default:
- }
- return
-}
-
// TxDifference returns a new set which is the difference between a and b.
func TxDifference(a, b Transactions) Transactions {
keep := make(Transactions, 0, len(a))
diff --git a/core/types/transaction_signing.go b/core/types/transaction_signing.go
index a6c5c1f16..47c7a2f91 100644
--- a/core/types/transaction_signing.go
+++ b/core/types/transaction_signing.go
@@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"math/big"
+ "runtime"
"sync"
"github.com/dexon-foundation/dexon/common"
@@ -32,25 +33,95 @@ var (
ErrInvalidChainId = errors.New("invalid chain id for signer")
)
-var (
- txCache = &sync.Map{}
-)
+var GlobalSigCache *globalSigCache
-func DeleteTxCacheByHash(hash common.Hash) {
- txCache.Delete(hash)
+func init() {
+ GlobalSigCache = newGlobalSigCache()
}
-func StoreTxCache(key common.Hash, value common.Address) {
- txCache.Store(key, value)
+// globalSigCache stores the mapping between txHash and sender address.
+// Since ECRecover is slow, and we run ECRecover very frequently (in
+// app.VerifyBlock, app.ConfirmedBlock), so we need to cache it globally.
+type globalSigCache struct {
+ cache map[common.Hash]common.Address
+ cacheMu sync.RWMutex
}
-func LoadTxCache(key common.Hash) (common.Address, bool) {
- addr, ok := txCache.Load(key)
- if !ok {
- return common.Address{}, ok
+func newGlobalSigCache() *globalSigCache {
+ return &globalSigCache{
+ cache: make(map[common.Hash]common.Address),
}
+}
+
+type resultEntry struct {
+ Hash common.Hash
+ Addr common.Address
+}
- return addr.(common.Address), ok
+// Add adds a list of transactions into sig cache.
+func (c *globalSigCache) Add(signer Signer, txs Transactions) (errorTx *Transaction, err error) {
+ num := runtime.NumCPU()
+ batchSize := len(txs) / num
+ wg := sync.WaitGroup{}
+ wg.Add(num)
+ txError := make(chan error, 1)
+
+ for i := 0; i < num; i++ {
+ go func(txs Transactions) {
+ defer wg.Done()
+ results := make([]resultEntry, len(txs))
+ for i, tx := range txs {
+ if len(txError) > 0 {
+ return
+ }
+ addr, err := Sender(signer, tx)
+ if err != nil {
+ select {
+ case txError <- err:
+ errorTx = tx
+ default:
+ }
+ return
+ }
+ results[i] = resultEntry{
+ Hash: tx.Hash(),
+ Addr: addr,
+ }
+ }
+ // Acquire lock and set cache.
+ c.cacheMu.Lock()
+ defer c.cacheMu.Unlock()
+ for _, r := range results {
+ c.cache[r.Hash] = r.Addr
+ }
+ }(txs[i*batchSize : (i+1)*batchSize])
+ }
+ wg.Wait()
+
+ select {
+ case err = <-txError:
+ default:
+ }
+ return
+}
+
+// Prune removes a list of hashes of tx from the cache.
+func (c *globalSigCache) Prune(hashes []common.Hash) {
+ c.cacheMu.Lock()
+ defer c.cacheMu.Unlock()
+
+ for _, hash := range hashes {
+ delete(c.cache, hash)
+ }
+}
+
+// Get returns a single address given a tx hash.
+func (c *globalSigCache) Get(hash common.Hash) (common.Address, bool) {
+ c.cacheMu.RLock()
+ defer c.cacheMu.RUnlock()
+
+ res, ok := c.cache[hash]
+ return res, ok
}
// sigCache is used to cache the derived sender and contains
@@ -147,7 +218,7 @@ func (s EIP155Signer) Equal(s2 Signer) bool {
var big8 = big.NewInt(8)
func (s EIP155Signer) Sender(tx *Transaction) (common.Address, error) {
- addr, ok := LoadTxCache(tx.Hash())
+ addr, ok := GlobalSigCache.Get(tx.Hash())
if ok {
return addr, nil
}
@@ -165,8 +236,6 @@ func (s EIP155Signer) Sender(tx *Transaction) (common.Address, error) {
if err != nil {
return common.Address{}, err
}
-
- StoreTxCache(tx.Hash(), addr)
return addr, nil
}
diff --git a/dex/app.go b/dex/app.go
index 99514ed83..df76b2b7d 100644
--- a/dex/app.go
+++ b/dex/app.go
@@ -327,7 +327,7 @@ func (d *DexconApp) VerifyBlock(block *coreTypes.Block) coreTypes.BlockVerifySta
return coreTypes.VerifyInvalidBlock
}
- _, err = transactions.TouchSenders(types.MakeSigner(d.blockchain.Config(), new(big.Int)))
+ _, err = types.GlobalSigCache.Add(types.NewEIP155Signer(d.blockchain.Config().ChainID), transactions)
if err != nil {
log.Error("Failed to calculate sender", "error", err)
return coreTypes.VerifyInvalidBlock
diff --git a/dex/handler.go b/dex/handler.go
index a74c78e3b..a1a158560 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -724,7 +724,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
p.MarkTransaction(tx.Hash())
}
- types.Transactions(txs).TouchSenders(types.MakeSigner(pm.blockchain.Config(), new(big.Int)))
+ types.GlobalSigCache.Add(types.NewEIP155Signer(pm.blockchain.Config().ChainID), txs)
pm.txpool.AddRemotes(txs)
case msg.Code == MetaMsg: