From 67aff49822a411611941e4b93a0343df75fd21b7 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 30 Jun 2017 22:43:26 +0200 Subject: core: Change local-handling to use sender-account instead of tx hashes --- core/tx_list.go | 12 ++++---- core/tx_pool.go | 95 +++++++++++++++++++++++++-------------------------------- 2 files changed, 48 insertions(+), 59 deletions(-) (limited to 'core') diff --git a/core/tx_list.go b/core/tx_list.go index 626d3a3b7..e12af4a89 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -422,7 +422,7 @@ func (l *txPricedList) Removed() { // Discard finds all the transactions below the given price threshold, drops them // from the priced list and returs them for further removal from the entire pool. -func (l *txPricedList) Cap(threshold *big.Int, local *txSet) types.Transactions { +func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transactions { drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep @@ -440,7 +440,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *txSet) types.Transactions break } // Non stale transaction found, discard unless local - if local.contains(hash) { + if local.contains(tx) { save = append(save, tx) } else { drop = append(drop, tx) @@ -454,9 +454,9 @@ func (l *txPricedList) Cap(threshold *big.Int, local *txSet) types.Transactions // Underpriced checks whether a transaction is cheaper than (or as cheap as) the // lowest priced transaction currently being tracked. -func (l *txPricedList) Underpriced(tx *types.Transaction, local *txSet) bool { +func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) bool { // Local transactions cannot be underpriced - if local.contains(tx.Hash()) { + if local.contains(tx) { return false } // Discard stale price points if found at the heap start @@ -480,7 +480,7 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *txSet) bool { // Discard finds a number of most underpriced transactions, removes them from the // priced list and returs them for further removal from the entire pool. -func (l *txPricedList) Discard(count int, local *txSet) types.Transactions { +func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions { drop := make(types.Transactions, 0, count) // Remote underpriced transactions to drop save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep @@ -494,7 +494,7 @@ func (l *txPricedList) Discard(count int, local *txSet) types.Transactions { continue } // Non stale transaction found, discard unless local - if local.contains(hash) { + if local.contains(tx) { save = append(save, tx) } else { drop = append(drop, tx) diff --git a/core/tx_pool.go b/core/tx_pool.go index 2f3cd1e93..3f758957a 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -155,7 +155,7 @@ type TxPool struct { gasPrice *big.Int eventMux *event.TypeMux events *event.TypeMuxSubscription - locals *txSet + locals *accountSet signer types.Signer mu sync.RWMutex @@ -176,12 +176,12 @@ type TxPool struct { func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() - + signer := types.NewEIP155Signer(chainconfig.ChainId) // Create the transaction pool with its initial settings pool := &TxPool{ config: config, chainconfig: chainconfig, - signer: types.NewEIP155Signer(chainconfig.ChainId), + signer: signer, pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), @@ -191,7 +191,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e gasLimit: gasLimitFn, gasPrice: new(big.Int).SetUint64(config.PriceLimit), pendingState: nil, - locals: newTxSet(), + locals: newAccountSet(signer), events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), quit: make(chan struct{}), } @@ -376,13 +376,19 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { func (pool *TxPool) SetLocal(tx *types.Transaction) { pool.mu.Lock() defer pool.mu.Unlock() - pool.locals.add(tx.Hash()) + pool.locals.add(tx) } // validateTx checks whether a transaction is valid according // to the consensus rules. func (pool *TxPool) validateTx(tx *types.Transaction) error { - local := pool.locals.contains(tx.Hash()) + + from, err := types.Sender(pool.signer, tx) + if err != nil { + return ErrInvalidSender + } + + local := pool.locals.containsAddress(from) // Drop transactions under our own minimal accepted gas price if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { return ErrUnderpriced @@ -393,10 +399,6 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { return err } - from, err := types.Sender(pool.signer, tx) - if err != nil { - return ErrInvalidSender - } // Last but not least check for nonce errors if currentState.GetNonce(from) > tx.Nonce() { return ErrNonceTooLow @@ -748,14 +750,8 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A spammers := prque.New() for addr, list := range pool.pending { // Only evict transactions from high rollers - if uint64(list.Len()) > pool.config.AccountSlots { - // Skip local accounts as pools should maintain backlogs for themselves - for _, tx := range list.txs.items { - if !pool.locals.contains(tx.Hash()) { - spammers.Push(addr, float32(list.Len())) - } - break // Checking on transaction for locality is enough - } + if !pool.locals.containsAddress(addr) && uint64(list.Len()) > pool.config.AccountSlots { + spammers.Push(addr, float32(list.Len())) } } // Gradually drop transactions from offenders @@ -929,48 +925,41 @@ func (a addresssByHeartbeat) Len() int { return len(a) } func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) } func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// txSet represents a set of transaction hashes in which entries -// are automatically dropped after txSetDuration time -type txSet struct { - txMap map[common.Hash]struct{} - txOrd map[uint64]txOrdType - addPtr, delPtr uint64 +// accountSet is simply a map of addresses, and a signer, to be able +// to determine the address from a tx +type accountSet struct { + accounts map[common.Address]struct{} + signer types.Signer } -const txSetDuration = time.Hour * 2 - -// txOrdType represents an entry in the time-ordered list of transaction hashes -type txOrdType struct { - hash common.Hash - time time.Time +func newAccountSet(signer types.Signer) *accountSet { + return &accountSet{ + accounts: make(map[common.Address]struct{}), + signer: signer, + } } -// newTxSet creates a new transaction set -func newTxSet() *txSet { - return &txSet{ - txMap: make(map[common.Hash]struct{}), - txOrd: make(map[uint64]txOrdType), - } +// containsAddress checks if a given address is within the set +func (as *accountSet) containsAddress(address common.Address) bool { + _, exist := as.accounts[address] + return exist } -// contains returns true if the set contains the given transaction hash -// (not thread safe, should be called from a locked environment) -func (ts *txSet) contains(hash common.Hash) bool { - _, ok := ts.txMap[hash] - return ok +// contains checks if the sender of a given tx is within the set +func (as *accountSet) contains(tx *types.Transaction) bool { + if address, err := types.Sender(as.signer, tx); err == nil { + return as.containsAddress(address) + } + return false } -// add adds a transaction hash to the set, then removes entries older than txSetDuration -// (not thread safe, should be called from a locked environment) -func (ts *txSet) add(hash common.Hash) { - ts.txMap[hash] = struct{}{} - now := time.Now() - ts.txOrd[ts.addPtr] = txOrdType{hash: hash, time: now} - ts.addPtr++ - delBefore := now.Add(-txSetDuration) - for ts.delPtr < ts.addPtr && ts.txOrd[ts.delPtr].time.Before(delBefore) { - delete(ts.txMap, ts.txOrd[ts.delPtr].hash) - delete(ts.txOrd, ts.delPtr) - ts.delPtr++ +// add a transaction sender to the set +// if sender can't be derived, this is a no-op (no errors returned) +func (as *accountSet) add(tx *types.Transaction) { + if address, err := types.Sender(as.signer, tx); err == nil { + if _, exist := as.accounts[address]; !exist { + as.accounts[address] = struct{}{} + } } + } -- cgit From a633a2d7ea8aadb1d435679449d002de880fab30 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 30 Jun 2017 22:55:10 +0200 Subject: core: Prevent local tx:s from being discarded --- core/tx_pool.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) (limited to 'core') diff --git a/core/tx_pool.go b/core/tx_pool.go index 3f758957a..143689c59 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -725,12 +725,14 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A pool.promoteTx(addr, hash, tx) } // Drop all transactions over the allowed limit - for _, tx := range list.Cap(int(pool.config.AccountQueue)) { - hash := tx.Hash() - delete(pool.all, hash) - pool.priced.Removed() - queuedRateLimitCounter.Inc(1) - log.Trace("Removed cap-exceeding queued transaction", "hash", hash) + if !pool.locals.containsAddress(addr) { + for _, tx := range list.Cap(int(pool.config.AccountQueue)) { + hash := tx.Hash() + delete(pool.all, hash) + pool.priced.Removed() + queuedRateLimitCounter.Inc(1) + log.Trace("Removed cap-exceeding queued transaction", "hash", hash) + } } queued += uint64(list.Len()) @@ -815,7 +817,10 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A // Sort all accounts with queued transactions by heartbeat addresses := make(addresssByHeartbeat, 0, len(pool.queue)) for addr := range pool.queue { - addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) + // Don't drop locals + if !pool.locals.containsAddress(addr) { + addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) + } } sort.Sort(addresses) -- cgit From 48ee7f9de7da0455b80ee09f498dbce54127103a Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Wed, 5 Jul 2017 16:51:55 +0300 Subject: core, eth, les: polish txpool API around local/remote txs --- core/tx_list.go | 16 ++---- core/tx_pool.go | 157 ++++++++++++++++++++++++++++----------------------- core/tx_pool_test.go | 108 +++++++++++++++++------------------ 3 files changed, 143 insertions(+), 138 deletions(-) (limited to 'core') diff --git a/core/tx_list.go b/core/tx_list.go index e12af4a89..4593943be 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -420,7 +420,7 @@ func (l *txPricedList) Removed() { heap.Init(l.items) } -// Discard finds all the transactions below the given price threshold, drops them +// Cap finds all the transactions below the given price threshold, drops them // from the priced list and returs them for further removal from the entire pool. func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transactions { drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop @@ -429,9 +429,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact for len(*l.items) > 0 { // Discard stale transactions if found during cleanup tx := heap.Pop(l.items).(*types.Transaction) - - hash := tx.Hash() - if _, ok := (*l.all)[hash]; !ok { + if _, ok := (*l.all)[tx.Hash()]; !ok { l.stales-- continue } @@ -440,7 +438,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact break } // Non stale transaction found, discard unless local - if local.contains(tx) { + if local.containsTx(tx) { save = append(save, tx) } else { drop = append(drop, tx) @@ -456,7 +454,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact // lowest priced transaction currently being tracked. func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) bool { // Local transactions cannot be underpriced - if local.contains(tx) { + if local.containsTx(tx) { return false } // Discard stale price points if found at the heap start @@ -487,14 +485,12 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions for len(*l.items) > 0 && count > 0 { // Discard stale transactions if found during cleanup tx := heap.Pop(l.items).(*types.Transaction) - - hash := tx.Hash() - if _, ok := (*l.all)[hash]; !ok { + if _, ok := (*l.all)[tx.Hash()]; !ok { l.stales-- continue } // Non stale transaction found, discard unless local - if local.contains(tx) { + if local.containsTx(tx) { save = append(save, tx) } else { drop = append(drop, tx) diff --git a/core/tx_pool.go b/core/tx_pool.go index 143689c59..a8018d74f 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -176,12 +176,12 @@ type TxPool struct { func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() - signer := types.NewEIP155Signer(chainconfig.ChainId) + // Create the transaction pool with its initial settings pool := &TxPool{ config: config, chainconfig: chainconfig, - signer: signer, + signer: types.NewEIP155Signer(chainconfig.ChainId), pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), @@ -191,10 +191,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e gasLimit: gasLimitFn, gasPrice: new(big.Int).SetUint64(config.PriceLimit), pendingState: nil, - locals: newAccountSet(signer), events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), quit: make(chan struct{}), } + pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(&pool.all) pool.resetState() @@ -237,7 +237,7 @@ func (pool *TxPool) eventLoop() { pool.mu.Unlock() case RemovedTransactionEvent: - pool.AddBatch(ev.Txs) + pool.addTxs(ev.Txs, false) } // Handle stats reporting ticks @@ -371,52 +371,40 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { return pending, nil } -// SetLocal marks a transaction as local, skipping gas price -// check against local miner minimum in the future -func (pool *TxPool) SetLocal(tx *types.Transaction) { - pool.mu.Lock() - defer pool.mu.Unlock() - pool.locals.add(tx) -} - -// validateTx checks whether a transaction is valid according -// to the consensus rules. -func (pool *TxPool) validateTx(tx *types.Transaction) error { - +// validateTx checks whether a transaction is valid according to the consensus +// rules and adheres to some heuristic limits of the local node (price and size). +func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { + // Heuristic limit, reject transactions over 32KB to prevent DOS attacks + if tx.Size() > 32*1024 { + return ErrOversizedData + } + // Transactions can't be negative. This may never happen using RLP decoded + // transactions but may occur if you create a transaction using the RPC. + if tx.Value().Sign() < 0 { + return ErrNegativeValue + } + // Ensure the transaction doesn't exceed the current block limit gas. + if pool.gasLimit().Cmp(tx.Gas()) < 0 { + return ErrGasLimit + } + // Make sure the transaction is signed properly from, err := types.Sender(pool.signer, tx) if err != nil { return ErrInvalidSender } - - local := pool.locals.containsAddress(from) - // Drop transactions under our own minimal accepted gas price + // Drop non-local transactions under our own minimal accepted gas price + local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { return ErrUnderpriced } - + // Ensure the transaction adheres to nonce ordering currentState, err := pool.currentState() if err != nil { return err } - - // Last but not least check for nonce errors if currentState.GetNonce(from) > tx.Nonce() { return ErrNonceTooLow } - - // Check the transaction doesn't exceed the current - // block limit gas. - if pool.gasLimit().Cmp(tx.Gas()) < 0 { - return ErrGasLimit - } - - // Transactions can't be negative. This may never happen - // using RLP decoded transactions but may occur if you create - // a transaction using the RPC for example. - if tx.Value().Sign() < 0 { - return ErrNegativeValue - } - // Transactor should have enough funds to cover the costs // cost == V + GP * GL if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { @@ -426,11 +414,6 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { if tx.Gas().Cmp(intrGas) < 0 { return ErrIntrinsicGas } - - // Heuristic limit, reject transactions over 32KB to prevent DOS attacks - if tx.Size() > 32*1024 { - return ErrOversizedData - } return nil } @@ -438,7 +421,11 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { // later pending promotion and execution. If the transaction is a replacement for // an already pending or queued one, it overwrites the previous and returns this // so outer code doesn't uselessly call promote. -func (pool *TxPool) add(tx *types.Transaction) (bool, error) { +// +// If a newly added transaction is marked as local, its sending account will be +// whitelisted, preventing any associated transaction from being dropped out of +// the pool due to pricing constraints. +func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // If the transaction is already known, discard it hash := tx.Hash() if pool.all[hash] != nil { @@ -446,7 +433,7 @@ func (pool *TxPool) add(tx *types.Transaction) (bool, error) { return false, fmt.Errorf("known transaction: %x", hash) } // If the transaction fails basic validation, discard it - if err := pool.validateTx(tx); err != nil { + if err := pool.validateTx(tx, local); err != nil { log.Trace("Discarding invalid transaction", "hash", hash, "err", err) invalidTxCounter.Inc(1) return false, err @@ -488,11 +475,14 @@ func (pool *TxPool) add(tx *types.Transaction) (bool, error) { log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) return old != nil, nil } - // New transaction isn't replacing a pending one, push into queue + // New transaction isn't replacing a pending one, push into queue and potentially mark local replace, err := pool.enqueueTx(hash, tx) if err != nil { return false, err } + if local { + pool.locals.add(from) + } log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replace, nil } @@ -560,13 +550,41 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T go pool.eventMux.Post(TxPreEvent{tx}) } -// Add queues a single transaction in the pool if it is valid. -func (pool *TxPool) Add(tx *types.Transaction) error { +// AddLocal enqueues a single transaction into the pool if it is valid, marking +// the sender as a local one in the mean time, ensuring it goes around the local +// pricing constraints. +func (pool *TxPool) AddLocal(tx *types.Transaction) error { + return pool.addTx(tx, true) +} + +// AddRemote enqueues a single transaction into the pool if it is valid. If the +// sender is not among the locally tracked ones, full pricing constraints will +// apply. +func (pool *TxPool) AddRemote(tx *types.Transaction) error { + return pool.addTx(tx, false) +} + +// AddLocals enqueues a batch of transactions into the pool if they are valid, +// marking the senders as a local ones in the mean time, ensuring they go around +// the local pricing constraints. +func (pool *TxPool) AddLocals(txs []*types.Transaction) error { + return pool.addTxs(txs, true) +} + +// AddRemotes enqueues a batch of transactions into the pool if they are valid. +// If the senders are not among the locally tracked ones, full pricing constraints +// will apply. +func (pool *TxPool) AddRemotes(txs []*types.Transaction) error { + return pool.addTxs(txs, false) +} + +// addTx enqueues a single transaction into the pool if it is valid. +func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { pool.mu.Lock() defer pool.mu.Unlock() // Try to inject the transaction and update any state - replace, err := pool.add(tx) + replace, err := pool.add(tx, local) if err != nil { return err } @@ -582,15 +600,15 @@ func (pool *TxPool) Add(tx *types.Transaction) error { return nil } -// AddBatch attempts to queue a batch of transactions. -func (pool *TxPool) AddBatch(txs []*types.Transaction) error { +// addTxs attempts to queue a batch of transactions if they are valid. +func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { pool.mu.Lock() defer pool.mu.Unlock() // Add the batch of transaction, tracking the accepted ones dirty := make(map[common.Address]struct{}) for _, tx := range txs { - if replace, err := pool.add(tx); err == nil { + if replace, err := pool.add(tx, local); err == nil { if !replace { from, _ := types.Sender(pool.signer, tx) // already validated dirty[from] = struct{}{} @@ -725,7 +743,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A pool.promoteTx(addr, hash, tx) } // Drop all transactions over the allowed limit - if !pool.locals.containsAddress(addr) { + if !pool.locals.contains(addr) { for _, tx := range list.Cap(int(pool.config.AccountQueue)) { hash := tx.Hash() delete(pool.all, hash) @@ -752,7 +770,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A spammers := prque.New() for addr, list := range pool.pending { // Only evict transactions from high rollers - if !pool.locals.containsAddress(addr) && uint64(list.Len()) > pool.config.AccountSlots { + if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { spammers.Push(addr, float32(list.Len())) } } @@ -818,7 +836,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A addresses := make(addresssByHeartbeat, 0, len(pool.queue)) for addr := range pool.queue { // Don't drop locals - if !pool.locals.containsAddress(addr) { + if !pool.locals.contains(addr) { addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) } } @@ -930,13 +948,15 @@ func (a addresssByHeartbeat) Len() int { return len(a) } func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) } func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// accountSet is simply a map of addresses, and a signer, to be able -// to determine the address from a tx +// accountSet is simply a set of addresses to check for existance, and a signer +// capable of deriving addresses from transactions. type accountSet struct { accounts map[common.Address]struct{} signer types.Signer } +// newAccountSet creates a new address set with an associated signer for sender +// derivations. func newAccountSet(signer types.Signer) *accountSet { return &accountSet{ accounts: make(map[common.Address]struct{}), @@ -944,27 +964,22 @@ func newAccountSet(signer types.Signer) *accountSet { } } -// containsAddress checks if a given address is within the set -func (as *accountSet) containsAddress(address common.Address) bool { - _, exist := as.accounts[address] +// contains checks if a given address is contained within the set. +func (as *accountSet) contains(addr common.Address) bool { + _, exist := as.accounts[addr] return exist } -// contains checks if the sender of a given tx is within the set -func (as *accountSet) contains(tx *types.Transaction) bool { - if address, err := types.Sender(as.signer, tx); err == nil { - return as.containsAddress(address) +// containsTx checks if the sender of a given tx is within the set. If the sender +// cannot be derived, this method returns false. +func (as *accountSet) containsTx(tx *types.Transaction) bool { + if addr, err := types.Sender(as.signer, tx); err == nil { + return as.contains(addr) } return false } -// add a transaction sender to the set -// if sender can't be derived, this is a no-op (no errors returned) -func (as *accountSet) add(tx *types.Transaction) { - if address, err := types.Sender(as.signer, tx); err == nil { - if _, exist := as.accounts[address]; !exist { - as.accounts[address] = struct{}{} - } - } - +// add inserts a new address into the set to track. +func (as *accountSet) add(addr common.Address) { + as.accounts[addr] = struct{}{} } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 4903bc3ca..980805ee9 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -133,7 +133,7 @@ func TestStateChangeDuringPoolReset(t *testing.T) { t.Fatalf("Invalid nonce, want 0, got %d", nonce) } - txpool.AddBatch(types.Transactions{tx0, tx1}) + txpool.AddRemotes(types.Transactions{tx0, tx1}) nonce = txpool.State().GetNonce(address) if nonce != 2 { @@ -167,31 +167,29 @@ func TestInvalidTransactions(t *testing.T) { from, _ := deriveSender(tx) currentState, _ := pool.currentState() currentState.AddBalance(from, big.NewInt(1)) - if err := pool.Add(tx); err != ErrInsufficientFunds { + if err := pool.AddRemote(tx); err != ErrInsufficientFunds { t.Error("expected", ErrInsufficientFunds) } balance := new(big.Int).Add(tx.Value(), new(big.Int).Mul(tx.Gas(), tx.GasPrice())) currentState.AddBalance(from, balance) - if err := pool.Add(tx); err != ErrIntrinsicGas { + if err := pool.AddRemote(tx); err != ErrIntrinsicGas { t.Error("expected", ErrIntrinsicGas, "got", err) } currentState.SetNonce(from, 1) currentState.AddBalance(from, big.NewInt(0xffffffffffffff)) tx = transaction(0, big.NewInt(100000), key) - if err := pool.Add(tx); err != ErrNonceTooLow { + if err := pool.AddRemote(tx); err != ErrNonceTooLow { t.Error("expected", ErrNonceTooLow) } tx = transaction(1, big.NewInt(100000), key) pool.gasPrice = big.NewInt(1000) - if err := pool.Add(tx); err != ErrUnderpriced { + if err := pool.AddRemote(tx); err != ErrUnderpriced { t.Error("expected", ErrUnderpriced, "got", err) } - - pool.SetLocal(tx) - if err := pool.Add(tx); err != nil { + if err := pool.AddLocal(tx); err != nil { t.Error("expected", nil, "got", err) } } @@ -282,7 +280,7 @@ func TestNegativeValue(t *testing.T) { from, _ := deriveSender(tx) currentState, _ := pool.currentState() currentState.AddBalance(from, big.NewInt(1)) - if err := pool.Add(tx); err != ErrNegativeValue { + if err := pool.AddRemote(tx); err != ErrNegativeValue { t.Error("expected", ErrNegativeValue, "got", err) } } @@ -301,14 +299,14 @@ func TestTransactionChainFork(t *testing.T) { resetState() tx := transaction(0, big.NewInt(100000), key) - if _, err := pool.add(tx); err != nil { + if _, err := pool.add(tx, false); err != nil { t.Error("didn't expect error", err) } pool.RemoveBatch([]*types.Transaction{tx}) // reset the pool's internal state resetState() - if _, err := pool.add(tx); err != nil { + if _, err := pool.add(tx, false); err != nil { t.Error("didn't expect error", err) } } @@ -332,10 +330,10 @@ func TestTransactionDoubleNonce(t *testing.T) { tx3, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), big.NewInt(1000000), big.NewInt(1), nil), signer, key) // Add the first two transaction, ensure higher priced stays only - if replace, err := pool.add(tx1); err != nil || replace { + if replace, err := pool.add(tx1, false); err != nil || replace { t.Errorf("first transaction insert failed (%v) or reported replacement (%v)", err, replace) } - if replace, err := pool.add(tx2); err != nil || !replace { + if replace, err := pool.add(tx2, false); err != nil || !replace { t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) } state, _ := pool.currentState() @@ -347,7 +345,7 @@ func TestTransactionDoubleNonce(t *testing.T) { t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) } // Add the third transaction and ensure it's not saved (smaller price) - pool.add(tx3) + pool.add(tx3, false) pool.promoteExecutables(state, []common.Address{addr}) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) @@ -367,7 +365,7 @@ func TestMissingNonce(t *testing.T) { currentState, _ := pool.currentState() currentState.AddBalance(addr, big.NewInt(100000000000000)) tx := transaction(1, big.NewInt(100000), key) - if _, err := pool.add(tx); err != nil { + if _, err := pool.add(tx, false); err != nil { t.Error("didn't expect error", err) } if len(pool.pending) != 0 { @@ -390,7 +388,7 @@ func TestNonceRecovery(t *testing.T) { currentState.AddBalance(addr, big.NewInt(100000000000000)) pool.resetState() tx := transaction(n, big.NewInt(100000), key) - if err := pool.Add(tx); err != nil { + if err := pool.AddRemote(tx); err != nil { t.Error(err) } // simulate some weird re-order of transactions and missing nonce(s) @@ -598,7 +596,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(1); i <= DefaultTxPoolConfig.AccountQueue+5; i++ { - if err := pool.Add(transaction(i, big.NewInt(100000), key)); err != nil { + if err := pool.AddRemote(transaction(i, big.NewInt(100000), key)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } if len(pool.pending) != 0 { @@ -653,7 +651,7 @@ func TestTransactionQueueGlobalLimiting(t *testing.T) { nonces[addr]++ } // Import the batch and verify that limits have been enforced - pool.AddBatch(txs) + pool.AddRemotes(txs) queued := 0 for addr, list := range pool.queue { @@ -686,7 +684,7 @@ func TestTransactionQueueTimeLimiting(t *testing.T) { // Queue up a batch of transactions for i := uint64(1); i <= DefaultTxPoolConfig.AccountQueue; i++ { - if err := pool.Add(transaction(i, big.NewInt(100000), key)); err != nil { + if err := pool.AddRemote(transaction(i, big.NewInt(100000), key)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } } @@ -711,7 +709,7 @@ func TestTransactionPendingLimiting(t *testing.T) { // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(0); i < DefaultTxPoolConfig.AccountQueue+5; i++ { - if err := pool.Add(transaction(i, big.NewInt(100000), key)); err != nil { + if err := pool.AddRemote(transaction(i, big.NewInt(100000), key)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } if pool.pending[account].Len() != int(i)+1 { @@ -739,7 +737,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { state1.AddBalance(account1, big.NewInt(1000000)) for i := uint64(0); i < DefaultTxPoolConfig.AccountQueue+5; i++ { - if err := pool1.Add(transaction(origin+i, big.NewInt(100000), key1)); err != nil { + if err := pool1.AddRemote(transaction(origin+i, big.NewInt(100000), key1)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } } @@ -753,7 +751,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { for i := uint64(0); i < DefaultTxPoolConfig.AccountQueue+5; i++ { txns = append(txns, transaction(origin+i, big.NewInt(100000), key2)) } - pool2.AddBatch(txns) + pool2.AddRemotes(txns) // Ensure the batch optimization honors the same pool mechanics if len(pool1.pending) != len(pool2.pending) { @@ -808,7 +806,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { } } // Import the batch and verify that limits have been enforced - pool.AddBatch(txs) + pool.AddRemotes(txs) pending := 0 for _, list := range pool.pending { @@ -852,7 +850,7 @@ func TestTransactionCapClearsFromAll(t *testing.T) { txs = append(txs, transaction(uint64(j), big.NewInt(100000), key)) } // Import the batch and verify that limits have been enforced - pool.AddBatch(txs) + pool.AddRemotes(txs) if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } @@ -893,7 +891,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { } } // Import the batch and verify that limits have been enforced - pool.AddBatch(txs) + pool.AddRemotes(txs) for addr, list := range pool.pending { if list.Len() != int(DefaultTxPoolConfig.AccountSlots) { @@ -937,11 +935,11 @@ func TestTransactionPoolRepricing(t *testing.T) { txs = append(txs, pricedTransaction(2, big.NewInt(100000), big.NewInt(1), keys[1])) txs = append(txs, pricedTransaction(3, big.NewInt(100000), big.NewInt(2), keys[1])) - txs = append(txs, pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[2])) - pool.SetLocal(txs[len(txs)-1]) // prevent this one from ever being dropped + ltx := pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[2]) // Import the batch and that both pending and queued transactions match up - pool.AddBatch(txs) + pool.AddRemotes(txs) + pool.AddLocal(ltx) pending, queued := pool.stats() if pending != 4 { @@ -967,10 +965,10 @@ func TestTransactionPoolRepricing(t *testing.T) { t.Fatalf("pool internal state corrupted: %v", err) } // Check that we can't add the old transactions back - if err := pool.Add(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[0])); err != ErrUnderpriced { + if err := pool.AddRemote(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[0])); err != ErrUnderpriced { t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced) } - if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced { + if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced { t.Fatalf("adding underpriced queued transaction error mismatch: have %v, want %v", err, ErrUnderpriced) } if err := validateTxPoolInternals(pool); err != nil { @@ -978,9 +976,7 @@ func TestTransactionPoolRepricing(t *testing.T) { } // However we can add local underpriced transactions tx := pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[2]) - - pool.SetLocal(tx) // prevent this one from ever being dropped - if err := pool.Add(tx); err != nil { + if err := pool.AddLocal(tx); err != nil { t.Fatalf("failed to add underpriced local transaction: %v", err) } if pending, _ = pool.stats(); pending != 3 { @@ -1027,11 +1023,11 @@ func TestTransactionPoolUnderpricing(t *testing.T) { txs = append(txs, pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[1])) - txs = append(txs, pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[2])) - pool.SetLocal(txs[len(txs)-1]) // prevent this one from ever being dropped + ltx := pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[2]) // Import the batch and that both pending and queued transactions match up - pool.AddBatch(txs) + pool.AddRemotes(txs) + pool.AddLocal(ltx) pending, queued := pool.stats() if pending != 3 { @@ -1044,17 +1040,17 @@ func TestTransactionPoolUnderpricing(t *testing.T) { t.Fatalf("pool internal state corrupted: %v", err) } // Ensure that adding an underpriced transaction on block limit fails - if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced { + if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced { t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced) } // Ensure that adding high priced transactions drops cheap ones, but not own - if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(3), keys[1])); err != nil { + if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(3), keys[1])); err != nil { t.Fatalf("failed to add well priced transaction: %v", err) } - if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(4), keys[1])); err != nil { + if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(4), keys[1])); err != nil { t.Fatalf("failed to add well priced transaction: %v", err) } - if err := pool.Add(pricedTransaction(3, big.NewInt(100000), big.NewInt(5), keys[1])); err != nil { + if err := pool.AddRemote(pricedTransaction(3, big.NewInt(100000), big.NewInt(5), keys[1])); err != nil { t.Fatalf("failed to add well priced transaction: %v", err) } pending, queued = pool.stats() @@ -1069,9 +1065,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) { } // Ensure that adding local transactions can push out even higher priced ones tx := pricedTransaction(1, big.NewInt(100000), big.NewInt(0), keys[2]) - - pool.SetLocal(tx) // prevent this one from ever being dropped - if err := pool.Add(tx); err != nil { + if err := pool.AddLocal(tx); err != nil { t.Fatalf("failed to add underpriced local transaction: %v", err) } pending, queued = pool.stats() @@ -1106,43 +1100,43 @@ func TestTransactionReplacement(t *testing.T) { price := int64(100) threshold := (price * (100 + int64(DefaultTxPoolConfig.PriceBump))) / 100 - if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), key)); err != nil { + if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), key)); err != nil { t.Fatalf("failed to add original cheap pending transaction: %v", err) } - if err := pool.Add(pricedTransaction(0, big.NewInt(100001), big.NewInt(1), key)); err != ErrReplaceUnderpriced { + if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100001), big.NewInt(1), key)); err != ErrReplaceUnderpriced { t.Fatalf("original cheap pending transaction replacement error mismatch: have %v, want %v", err, ErrReplaceUnderpriced) } - if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(2), key)); err != nil { + if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(2), key)); err != nil { t.Fatalf("failed to replace original cheap pending transaction: %v", err) } - if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(price), key)); err != nil { + if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(price), key)); err != nil { t.Fatalf("failed to add original proper pending transaction: %v", err) } - if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(threshold), key)); err != ErrReplaceUnderpriced { + if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(threshold), key)); err != ErrReplaceUnderpriced { t.Fatalf("original proper pending transaction replacement error mismatch: have %v, want %v", err, ErrReplaceUnderpriced) } - if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil { + if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil { t.Fatalf("failed to replace original proper pending transaction: %v", err) } // Add queued transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too) - if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), key)); err != nil { + if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), key)); err != nil { t.Fatalf("failed to add original queued transaction: %v", err) } - if err := pool.Add(pricedTransaction(2, big.NewInt(100001), big.NewInt(1), key)); err != ErrReplaceUnderpriced { + if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100001), big.NewInt(1), key)); err != ErrReplaceUnderpriced { t.Fatalf("original queued transaction replacement error mismatch: have %v, want %v", err, ErrReplaceUnderpriced) } - if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(2), key)); err != nil { + if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(2), key)); err != nil { t.Fatalf("failed to replace original queued transaction: %v", err) } - if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(price), key)); err != nil { + if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(price), key)); err != nil { t.Fatalf("failed to add original queued transaction: %v", err) } - if err := pool.Add(pricedTransaction(2, big.NewInt(100001), big.NewInt(threshold), key)); err != ErrReplaceUnderpriced { + if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100001), big.NewInt(threshold), key)); err != ErrReplaceUnderpriced { t.Fatalf("original queued transaction replacement error mismatch: have %v, want %v", err, ErrReplaceUnderpriced) } - if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil { + if err := pool.AddRemote(pricedTransaction(2, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil { t.Fatalf("failed to replace original queued transaction: %v", err) } if err := validateTxPoolInternals(pool); err != nil { @@ -1213,7 +1207,7 @@ func BenchmarkPoolInsert(b *testing.B) { // Benchmark importing the transactions into the queue b.ResetTimer() for _, tx := range txs { - pool.Add(tx) + pool.AddRemote(tx) } } @@ -1239,6 +1233,6 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) { // Benchmark importing the transactions into the queue b.ResetTimer() for _, batch := range batches { - pool.AddBatch(batch) + pool.AddRemotes(batch) } } -- cgit From 5e38f7a664aa5401117f1e1705ec97476b19411e Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Wed, 5 Jul 2017 17:06:05 +0300 Subject: cmd, core: add --txpool.nolocals to disable local price exemptions --- core/tx_pool.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/tx_pool.go b/core/tx_pool.go index a8018d74f..093d3c5fd 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -99,6 +99,8 @@ type stateFn func() (*state.StateDB, error) // TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { + NoLocals bool // Whether local transaction handling should be disabled + PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) @@ -394,7 +396,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { } // Drop non-local transactions under our own minimal accepted gas price local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network - if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { + if (!local || pool.config.NoLocals) && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering @@ -480,7 +482,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { if err != nil { return false, err } - if local { + if local && !pool.config.NoLocals { pool.locals.add(from) } log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) -- cgit From 88b4fe7d21bf03bbe01961dc49508bcf4edafb51 Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Wed, 5 Jul 2017 17:16:42 +0300 Subject: core: handle nolocals during add, exepmt locals from expiration --- core/tx_pool.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'core') diff --git a/core/tx_pool.go b/core/tx_pool.go index 093d3c5fd..46b09c8af 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -396,7 +396,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { } // Drop non-local transactions under our own minimal accepted gas price local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network - if (!local || pool.config.NoLocals) && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { + if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering @@ -482,7 +482,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { if err != nil { return false, err } - if local && !pool.config.NoLocals { + if local { pool.locals.add(from) } log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) @@ -556,7 +556,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // the sender as a local one in the mean time, ensuring it goes around the local // pricing constraints. func (pool *TxPool) AddLocal(tx *types.Transaction) error { - return pool.addTx(tx, true) + return pool.addTx(tx, !pool.config.NoLocals) } // AddRemote enqueues a single transaction into the pool if it is valid. If the @@ -570,7 +570,7 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { // marking the senders as a local ones in the mean time, ensuring they go around // the local pricing constraints. func (pool *TxPool) AddLocals(txs []*types.Transaction) error { - return pool.addTxs(txs, true) + return pool.addTxs(txs, !pool.config.NoLocals) } // AddRemotes enqueues a batch of transactions into the pool if they are valid. @@ -924,6 +924,11 @@ func (pool *TxPool) expirationLoop() { case <-evict.C: pool.mu.Lock() for addr := range pool.queue { + // Skip local transactions from the eviction mechanism + if pool.locals.contains(addr) { + continue + } + // Any non-locals old enough should be removed if time.Since(pool.beats[addr]) > pool.config.Lifetime { for _, tx := range pool.queue[addr].Flatten() { pool.removeTx(tx.Hash()) -- cgit From 34ec9913f628180d0ace740abfe1362995879c93 Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Thu, 6 Jul 2017 11:51:59 +0300 Subject: core: test locals support in txpool queue limits, fix The commit reworks the transaction pool queue limitation tests to cater for testing local accounts, also testing the nolocal flag. In addition, it also fixes a panic if local transactions exceeded the global queue allowance (no accounts left to drop from) and also fixes queue eviction to operate on all accounts, not just the one being updated. --- core/tx_list.go | 2 +- core/tx_pool.go | 14 +-- core/tx_pool_test.go | 264 ++++++++++++++++++++++++++++++++++++--------------- 3 files changed, 197 insertions(+), 83 deletions(-) (limited to 'core') diff --git a/core/tx_list.go b/core/tx_list.go index 4593943be..0d87c20bc 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -477,7 +477,7 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo } // Discard finds a number of most underpriced transactions, removes them from the -// priced list and returs them for further removal from the entire pool. +// priced list and returns them for further removal from the entire pool. func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions { drop := make(types.Transactions, 0, count) // Remote underpriced transactions to drop save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep diff --git a/core/tx_pool.go b/core/tx_pool.go index 46b09c8af..8e2d1b31d 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -716,7 +716,6 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A } } // Iterate over all accounts and promote any executable transactions - queued := uint64(0) for _, addr := range accounts { list := pool.queue[addr] if list == nil { @@ -754,8 +753,6 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A log.Trace("Removed cap-exceeding queued transaction", "hash", hash) } } - queued += uint64(list.Len()) - // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.queue, addr) @@ -833,19 +830,22 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending)) } // If we've queued more transactions than the hard limit, drop oldest ones + queued := uint64(0) + for _, list := range pool.queue { + queued += uint64(list.Len()) + } if queued > pool.config.GlobalQueue { // Sort all accounts with queued transactions by heartbeat addresses := make(addresssByHeartbeat, 0, len(pool.queue)) for addr := range pool.queue { - // Don't drop locals - if !pool.locals.contains(addr) { + if !pool.locals.contains(addr) { // don't drop locals addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) } } sort.Sort(addresses) - // Drop transactions until the total is below the limit - for drop := queued - pool.config.GlobalQueue; drop > 0; { + // Drop transactions until the total is below the limit or only locals remain + for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; { addr := addresses[len(addresses)-1] list := pool.queue[addr.address] diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 980805ee9..03ece3886 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -47,10 +47,10 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) key, _ := crypto.GenerateKey() - newPool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) - newPool.resetState() + pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool.resetState() - return newPool, key + return pool, key } // validateTxPoolInternals checks various consistency invariants within the pool. @@ -125,17 +125,18 @@ func TestStateChangeDuringPoolReset(t *testing.T) { gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) } - txpool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, mux, stateFunc, gasLimitFunc) - txpool.resetState() + pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, mux, stateFunc, gasLimitFunc) + defer pool.Stop() + pool.resetState() - nonce := txpool.State().GetNonce(address) + nonce := pool.State().GetNonce(address) if nonce != 0 { t.Fatalf("Invalid nonce, want 0, got %d", nonce) } - txpool.AddRemotes(types.Transactions{tx0, tx1}) + pool.AddRemotes(types.Transactions{tx0, tx1}) - nonce = txpool.State().GetNonce(address) + nonce = pool.State().GetNonce(address) if nonce != 2 { t.Fatalf("Invalid nonce, want 2, got %d", nonce) } @@ -143,9 +144,9 @@ func TestStateChangeDuringPoolReset(t *testing.T) { // trigger state change in the background trigger = true - txpool.resetState() + pool.resetState() - pendingTx, err := txpool.Pending() + pendingTx, err := pool.Pending() if err != nil { t.Fatalf("Could not fetch pending transactions: %v", err) } @@ -154,7 +155,7 @@ func TestStateChangeDuringPoolReset(t *testing.T) { t.Logf("%0x: %d\n", addr, len(txs)) } - nonce = txpool.State().GetNonce(address) + nonce = pool.State().GetNonce(address) if nonce != 2 { t.Fatalf("Invalid nonce, want 2, got %d", nonce) } @@ -162,6 +163,7 @@ func TestStateChangeDuringPoolReset(t *testing.T) { func TestInvalidTransactions(t *testing.T) { pool, key := setupTxPool() + defer pool.Stop() tx := transaction(0, big.NewInt(100), key) from, _ := deriveSender(tx) @@ -196,6 +198,8 @@ func TestInvalidTransactions(t *testing.T) { func TestTransactionQueue(t *testing.T) { pool, key := setupTxPool() + defer pool.Stop() + tx := transaction(0, big.NewInt(100), key) from, _ := deriveSender(tx) currentState, _ := pool.currentState() @@ -246,6 +250,8 @@ func TestTransactionQueue(t *testing.T) { func TestRemoveTx(t *testing.T) { pool, key := setupTxPool() + defer pool.Stop() + addr := crypto.PubkeyToAddress(key.PublicKey) currentState, _ := pool.currentState() currentState.AddBalance(addr, big.NewInt(1)) @@ -275,6 +281,7 @@ func TestRemoveTx(t *testing.T) { func TestNegativeValue(t *testing.T) { pool, key := setupTxPool() + defer pool.Stop() tx, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(-1), big.NewInt(100), big.NewInt(1), nil), types.HomesteadSigner{}, key) from, _ := deriveSender(tx) @@ -287,6 +294,8 @@ func TestNegativeValue(t *testing.T) { func TestTransactionChainFork(t *testing.T) { pool, key := setupTxPool() + defer pool.Stop() + addr := crypto.PubkeyToAddress(key.PublicKey) resetState := func() { db, _ := ethdb.NewMemDatabase() @@ -313,6 +322,8 @@ func TestTransactionChainFork(t *testing.T) { func TestTransactionDoubleNonce(t *testing.T) { pool, key := setupTxPool() + defer pool.Stop() + addr := crypto.PubkeyToAddress(key.PublicKey) resetState := func() { db, _ := ethdb.NewMemDatabase() @@ -361,6 +372,8 @@ func TestTransactionDoubleNonce(t *testing.T) { func TestMissingNonce(t *testing.T) { pool, key := setupTxPool() + defer pool.Stop() + addr := crypto.PubkeyToAddress(key.PublicKey) currentState, _ := pool.currentState() currentState.AddBalance(addr, big.NewInt(100000000000000)) @@ -382,6 +395,8 @@ func TestMissingNonce(t *testing.T) { func TestNonceRecovery(t *testing.T) { const n = 10 pool, key := setupTxPool() + defer pool.Stop() + addr := crypto.PubkeyToAddress(key.PublicKey) currentState, _ := pool.currentState() currentState.SetNonce(addr, n) @@ -401,6 +416,8 @@ func TestNonceRecovery(t *testing.T) { func TestRemovedTxEvent(t *testing.T) { pool, key := setupTxPool() + defer pool.Stop() + tx := transaction(0, big.NewInt(1000000), key) from, _ := deriveSender(tx) currentState, _ := pool.currentState() @@ -421,6 +438,8 @@ func TestRemovedTxEvent(t *testing.T) { func TestTransactionDropping(t *testing.T) { // Create a test account and fund it pool, key := setupTxPool() + defer pool.Stop() + account, _ := deriveSender(transaction(0, big.NewInt(0), key)) state, _ := pool.currentState() @@ -514,6 +533,8 @@ func TestTransactionDropping(t *testing.T) { func TestTransactionPostponing(t *testing.T) { // Create a test account and fund it pool, key := setupTxPool() + defer pool.Stop() + account, _ := deriveSender(transaction(0, big.NewInt(0), key)) state, _ := pool.currentState() @@ -588,6 +609,8 @@ func TestTransactionPostponing(t *testing.T) { func TestTransactionQueueAccountLimiting(t *testing.T) { // Create a test account and fund it pool, key := setupTxPool() + defer pool.Stop() + account, _ := deriveSender(transaction(0, big.NewInt(0), key)) state, _ := pool.currentState() @@ -619,19 +642,30 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { // Tests that if the transaction count belonging to multiple accounts go above // some threshold, the higher transactions are dropped to prevent DOS attacks. +// +// This logic should not hold for local transactions, unless the local tracking +// mechanism is disabled. func TestTransactionQueueGlobalLimiting(t *testing.T) { - // Reduce the queue limits to shorten test time - defer func(old uint64) { DefaultTxPoolConfig.GlobalQueue = old }(DefaultTxPoolConfig.GlobalQueue) - DefaultTxPoolConfig.GlobalQueue = DefaultTxPoolConfig.AccountQueue * 3 + testTransactionQueueGlobalLimiting(t, false) +} +func TestTransactionQueueGlobalLimitingNoLocals(t *testing.T) { + testTransactionQueueGlobalLimiting(t, true) +} +func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + config := DefaultTxPoolConfig + config.NoLocals = nolocals + config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) + + pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + defer pool.Stop() pool.resetState() - // Create a number of test accounts and fund them + // Create a number of test accounts and fund them (last one will be the local) state, _ := pool.currentState() keys := make([]*ecdsa.PrivateKey, 5) @@ -639,12 +673,14 @@ func TestTransactionQueueGlobalLimiting(t *testing.T) { keys[i], _ = crypto.GenerateKey() state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) } + local := keys[len(keys)-1] + // Generate and queue a batch of transactions nonces := make(map[common.Address]uint64) - txs := make(types.Transactions, 0, 3*DefaultTxPoolConfig.GlobalQueue) + txs := make(types.Transactions, 0, 3*config.GlobalQueue) for len(txs) < cap(txs) { - key := keys[rand.Intn(len(keys))] + key := keys[rand.Intn(len(keys)-1)] // skip adding transactions with the local account addr := crypto.PubkeyToAddress(key.PublicKey) txs = append(txs, transaction(nonces[addr]+1, big.NewInt(100000), key)) @@ -655,43 +691,114 @@ func TestTransactionQueueGlobalLimiting(t *testing.T) { queued := 0 for addr, list := range pool.queue { - if list.Len() > int(DefaultTxPoolConfig.AccountQueue) { - t.Errorf("addr %x: queued accounts overflown allowance: %d > %d", addr, list.Len(), DefaultTxPoolConfig.AccountQueue) + if list.Len() > int(config.AccountQueue) { + t.Errorf("addr %x: queued accounts overflown allowance: %d > %d", addr, list.Len(), config.AccountQueue) } queued += list.Len() } - if queued > int(DefaultTxPoolConfig.GlobalQueue) { - t.Fatalf("total transactions overflow allowance: %d > %d", queued, DefaultTxPoolConfig.GlobalQueue) + if queued > int(config.GlobalQueue) { + t.Fatalf("total transactions overflow allowance: %d > %d", queued, config.GlobalQueue) + } + // Generate a batch of transactions from the local account and import them + txs = txs[:0] + for i := uint64(0); i < 3*config.GlobalQueue; i++ { + txs = append(txs, transaction(i+1, big.NewInt(100000), local)) + } + pool.AddLocals(txs) + + // If locals are disabled, the previous eviction algorithm should apply here too + if nolocals { + queued := 0 + for addr, list := range pool.queue { + if list.Len() > int(config.AccountQueue) { + t.Errorf("addr %x: queued accounts overflown allowance: %d > %d", addr, list.Len(), config.AccountQueue) + } + queued += list.Len() + } + if queued > int(config.GlobalQueue) { + t.Fatalf("total transactions overflow allowance: %d > %d", queued, config.GlobalQueue) + } + } else { + // Local exemptions are enabled, make sure the local account owned the queue + if len(pool.queue) != 1 { + t.Errorf("multiple accounts in queue: have %v, want %v", len(pool.queue), 1) + } + // Also ensure no local transactions are ever dropped, even if above global limits + if queued := pool.queue[crypto.PubkeyToAddress(local.PublicKey)].Len(); uint64(queued) != 3*config.GlobalQueue { + t.Fatalf("local account queued transaction count mismatch: have %v, want %v", queued, 3*config.GlobalQueue) + } } } // Tests that if an account remains idle for a prolonged amount of time, any // non-executable transactions queued up are dropped to prevent wasting resources // on shuffling them around. -func TestTransactionQueueTimeLimiting(t *testing.T) { - // Reduce the queue limits to shorten test time - defer func(old time.Duration) { DefaultTxPoolConfig.Lifetime = old }(DefaultTxPoolConfig.Lifetime) +// +// This logic should not hold for local transactions, unless the local tracking +// mechanism is disabled. +func TestTransactionQueueTimeLimiting(t *testing.T) { testTransactionQueueTimeLimiting(t, false) } +func TestTransactionQueueTimeLimitingNoLocals(t *testing.T) { testTransactionQueueTimeLimiting(t, true) } + +func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) { + // Reduce the eviction interval to a testable amount defer func(old time.Duration) { evictionInterval = old }(evictionInterval) - DefaultTxPoolConfig.Lifetime = time.Second - evictionInterval = time.Second + evictionInterval = 250 * time.Millisecond - // Create a test account and fund it - pool, key := setupTxPool() - account, _ := deriveSender(transaction(0, big.NewInt(0), key)) + // Create the pool to test the non-expiration enforcement + db, _ := ethdb.NewMemDatabase() + statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) + + config := DefaultTxPoolConfig + config.Lifetime = 250 * time.Millisecond + config.NoLocals = nolocals + + pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + defer pool.Stop() + pool.resetState() + + // Create two test accounts to ensure remotes expire but locals do not + local, _ := crypto.GenerateKey() + remote, _ := crypto.GenerateKey() state, _ := pool.currentState() - state.AddBalance(account, big.NewInt(1000000)) + state.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) + state.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) - // Queue up a batch of transactions - for i := uint64(1); i <= DefaultTxPoolConfig.AccountQueue; i++ { - if err := pool.AddRemote(transaction(i, big.NewInt(100000), key)); err != nil { - t.Fatalf("tx %d: failed to add transaction: %v", i, err) + // Add the two transactions and ensure they both are queued up + if err := pool.AddLocal(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), local)); err != nil { + t.Fatalf("failed to add local transaction: %v", err) + } + if err := pool.AddRemote(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), remote)); err != nil { + t.Fatalf("failed to add remote transaction: %v", err) + } + pending, queued := pool.stats() + if pending != 0 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) + } + if queued != 2 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } + // Wait a bit for eviction to run and clean up any leftovers, and ensure only the local remains + time.Sleep(2 * config.Lifetime) + + pending, queued = pool.stats() + if pending != 0 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) + } + if nolocals { + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) + } + } else { + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) } } - // Wait until at least two expiration cycles hit and make sure the transactions are gone - time.Sleep(2 * evictionInterval) - if len(pool.queue) > 0 { - t.Fatalf("old transactions remained after eviction") + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) } } @@ -701,6 +808,8 @@ func TestTransactionQueueTimeLimiting(t *testing.T) { func TestTransactionPendingLimiting(t *testing.T) { // Create a test account and fund it pool, key := setupTxPool() + defer pool.Stop() + account, _ := deriveSender(transaction(0, big.NewInt(0), key)) state, _ := pool.currentState() @@ -775,15 +884,15 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { // some hard threshold, the higher transactions are dropped to prevent DOS // attacks. func TestTransactionPendingGlobalLimiting(t *testing.T) { - // Reduce the queue limits to shorten test time - defer func(old uint64) { DefaultTxPoolConfig.GlobalSlots = old }(DefaultTxPoolConfig.GlobalSlots) - DefaultTxPoolConfig.GlobalSlots = DefaultTxPoolConfig.AccountSlots * 10 - // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + config := DefaultTxPoolConfig + config.GlobalSlots = config.AccountSlots * 10 + + pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + defer pool.Stop() pool.resetState() // Create a number of test accounts and fund them @@ -800,7 +909,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { txs := types.Transactions{} for _, key := range keys { addr := crypto.PubkeyToAddress(key.PublicKey) - for j := 0; j < int(DefaultTxPoolConfig.GlobalSlots)/len(keys)*2; j++ { + for j := 0; j < int(config.GlobalSlots)/len(keys)*2; j++ { txs = append(txs, transaction(nonces[addr], big.NewInt(100000), key)) nonces[addr]++ } @@ -812,8 +921,8 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { for _, list := range pool.pending { pending += list.Len() } - if pending > int(DefaultTxPoolConfig.GlobalSlots) { - t.Fatalf("total pending transactions overflow allowance: %d > %d", pending, DefaultTxPoolConfig.GlobalSlots) + if pending > int(config.GlobalSlots) { + t.Fatalf("total pending transactions overflow allowance: %d > %d", pending, config.GlobalSlots) } if err := validateTxPoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) @@ -822,20 +931,17 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { // Tests that if transactions start being capped, transasctions are also removed from 'all' func TestTransactionCapClearsFromAll(t *testing.T) { - // Reduce the queue limits to shorten test time - defer func(old uint64) { DefaultTxPoolConfig.AccountSlots = old }(DefaultTxPoolConfig.AccountSlots) - defer func(old uint64) { DefaultTxPoolConfig.AccountQueue = old }(DefaultTxPoolConfig.AccountQueue) - defer func(old uint64) { DefaultTxPoolConfig.GlobalSlots = old }(DefaultTxPoolConfig.GlobalSlots) - - DefaultTxPoolConfig.AccountSlots = 2 - DefaultTxPoolConfig.AccountQueue = 2 - DefaultTxPoolConfig.GlobalSlots = 8 - // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + config := DefaultTxPoolConfig + config.AccountSlots = 2 + config.AccountQueue = 2 + config.GlobalSlots = 8 + + pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + defer pool.Stop() pool.resetState() // Create a number of test accounts and fund them @@ -846,7 +952,7 @@ func TestTransactionCapClearsFromAll(t *testing.T) { state.AddBalance(addr, big.NewInt(1000000)) txs := types.Transactions{} - for j := 0; j < int(DefaultTxPoolConfig.GlobalSlots)*2; j++ { + for j := 0; j < int(config.GlobalSlots)*2; j++ { txs = append(txs, transaction(uint64(j), big.NewInt(100000), key)) } // Import the batch and verify that limits have been enforced @@ -860,15 +966,15 @@ func TestTransactionCapClearsFromAll(t *testing.T) { // some hard threshold, if they are under the minimum guaranteed slot count then // the transactions are still kept. func TestTransactionPendingMinimumAllowance(t *testing.T) { - // Reduce the queue limits to shorten test time - defer func(old uint64) { DefaultTxPoolConfig.GlobalSlots = old }(DefaultTxPoolConfig.GlobalSlots) - DefaultTxPoolConfig.GlobalSlots = 0 - // Create the pool to test the limit enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + config := DefaultTxPoolConfig + config.GlobalSlots = 0 + + pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + defer pool.Stop() pool.resetState() // Create a number of test accounts and fund them @@ -885,7 +991,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { txs := types.Transactions{} for _, key := range keys { addr := crypto.PubkeyToAddress(key.PublicKey) - for j := 0; j < int(DefaultTxPoolConfig.AccountSlots)*2; j++ { + for j := 0; j < int(config.AccountSlots)*2; j++ { txs = append(txs, transaction(nonces[addr], big.NewInt(100000), key)) nonces[addr]++ } @@ -894,8 +1000,8 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { pool.AddRemotes(txs) for addr, list := range pool.pending { - if list.Len() != int(DefaultTxPoolConfig.AccountSlots) { - t.Errorf("addr %x: total pending transactions mismatch: have %d, want %d", addr, list.Len(), DefaultTxPoolConfig.AccountSlots) + if list.Len() != int(config.AccountSlots) { + t.Errorf("addr %x: total pending transactions mismatch: have %d, want %d", addr, list.Len(), config.AccountSlots) } } if err := validateTxPoolInternals(pool); err != nil { @@ -914,6 +1020,7 @@ func TestTransactionPoolRepricing(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + defer pool.Stop() pool.resetState() // Create a number of test accounts and fund them @@ -993,18 +1100,16 @@ func TestTransactionPoolRepricing(t *testing.T) { // // Note, local transactions are never allowed to be dropped. func TestTransactionPoolUnderpricing(t *testing.T) { - // Reduce the queue limits to shorten test time - defer func(old uint64) { DefaultTxPoolConfig.GlobalSlots = old }(DefaultTxPoolConfig.GlobalSlots) - DefaultTxPoolConfig.GlobalSlots = 2 - - defer func(old uint64) { DefaultTxPoolConfig.GlobalQueue = old }(DefaultTxPoolConfig.GlobalQueue) - DefaultTxPoolConfig.GlobalQueue = 2 - // Create the pool to test the pricing enforcement with db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) - pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + config := DefaultTxPoolConfig + config.GlobalSlots = 2 + config.GlobalQueue = 2 + + pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + defer pool.Stop() pool.resetState() // Create a number of test accounts and fund them @@ -1088,9 +1193,10 @@ func TestTransactionReplacement(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(db)) pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + defer pool.Stop() pool.resetState() - // Create a a test account to add transactions with + // Create a test account to add transactions with key, _ := crypto.GenerateKey() state, _ := pool.currentState() @@ -1153,6 +1259,8 @@ func BenchmarkPendingDemotion10000(b *testing.B) { benchmarkPendingDemotion(b, 1 func benchmarkPendingDemotion(b *testing.B, size int) { // Add a batch of transactions to a pool one by one pool, key := setupTxPool() + defer pool.Stop() + account, _ := deriveSender(transaction(0, big.NewInt(0), key)) state, _ := pool.currentState() state.AddBalance(account, big.NewInt(1000000)) @@ -1177,6 +1285,8 @@ func BenchmarkFuturePromotion10000(b *testing.B) { benchmarkFuturePromotion(b, 1 func benchmarkFuturePromotion(b *testing.B, size int) { // Add a batch of transactions to a pool one by one pool, key := setupTxPool() + defer pool.Stop() + account, _ := deriveSender(transaction(0, big.NewInt(0), key)) state, _ := pool.currentState() state.AddBalance(account, big.NewInt(1000000)) @@ -1196,6 +1306,8 @@ func benchmarkFuturePromotion(b *testing.B, size int) { func BenchmarkPoolInsert(b *testing.B) { // Generate a batch of transactions to enqueue into the pool pool, key := setupTxPool() + defer pool.Stop() + account, _ := deriveSender(transaction(0, big.NewInt(0), key)) state, _ := pool.currentState() state.AddBalance(account, big.NewInt(1000000)) @@ -1219,6 +1331,8 @@ func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 1 func benchmarkPoolBatchInsert(b *testing.B, size int) { // Generate a batch of transactions to enqueue into the pool pool, key := setupTxPool() + defer pool.Stop() + account, _ := deriveSender(transaction(0, big.NewInt(0), key)) state, _ := pool.currentState() state.AddBalance(account, big.NewInt(1000000)) -- cgit