From 795b70423eac7180ab85b735f64aae9d6a10449d Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Mon, 27 Jun 2016 18:28:34 +0300 Subject: core, eth, miner: only retain 1 tx/nonce, remove bad ones --- core/tx_pool.go | 282 +++++++++++++++++++++++++-------------------- core/tx_pool_test.go | 133 ++++++++++++++------- eth/api_backend.go | 2 +- internal/ethapi/api.go | 60 ++++------ internal/ethapi/backend.go | 2 +- miner/worker.go | 10 +- 6 files changed, 282 insertions(+), 207 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 596356377..ec3d5c16b 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -51,6 +51,11 @@ const ( type stateFn func() (*state.StateDB, error) +// TxList is a "list" of transactions belonging to an account, sorted by account +// nonce. To allow gaps and avoid constant copying, the list is represented as a +// hash map. +type TxList map[uint64]*types.Transaction + // TxPool contains all currently known transactions. Transactions // enter the pool when they are received from the network or submitted // locally. They exit the pool when they are included in the blockchain. @@ -68,8 +73,10 @@ type TxPool struct { events event.Subscription localTx *txSet mu sync.RWMutex - pending map[common.Hash]*types.Transaction // processable transactions - queue map[common.Address]map[common.Hash]*types.Transaction + + pending map[common.Address]TxList // All currently processable transactions + queue map[common.Address]TxList // Queued but non-processable transactions + all map[common.Hash]*types.Transaction // All transactions to allow lookups wg sync.WaitGroup // for shutdown sync @@ -79,8 +86,9 @@ type TxPool struct { func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { pool := &TxPool{ config: config, - pending: make(map[common.Hash]*types.Transaction), - queue: make(map[common.Address]map[common.Hash]*types.Transaction), + pending: make(map[common.Address]TxList), + queue: make(map[common.Address]TxList), + all: make(map[common.Hash]*types.Transaction), eventMux: eventMux, currentState: currentStateFn, gasLimit: gasLimitFn, @@ -143,12 +151,12 @@ func (pool *TxPool) resetState() { // Loop over the pending transactions and base the nonce of the new // pending transaction set. - for _, tx := range pool.pending { - if addr, err := tx.From(); err == nil { - // Set the nonce. Transaction nonce can never be lower - // than the state nonce; validatePool took care of that. - if pool.pendingState.GetNonce(addr) <= tx.Nonce() { - pool.pendingState.SetNonce(addr, tx.Nonce()+1) + for addr, txs := range pool.pending { + // Set the nonce. Transaction nonce can never be lower + // than the state nonce; validatePool took care of that. + for nonce, _ := range txs { + if pool.pendingState.GetNonce(addr) <= nonce { + pool.pendingState.SetNonce(addr, nonce+1) } } } @@ -174,7 +182,9 @@ func (pool *TxPool) Stats() (pending int, queued int) { pool.mu.RLock() defer pool.mu.RUnlock() - pending = len(pool.pending) + for _, txs := range pool.pending { + pending += len(txs) + } for _, txs := range pool.queue { queued += len(txs) } @@ -183,30 +193,27 @@ func (pool *TxPool) Stats() (pending int, queued int) { // Content retrieves the data content of the transaction pool, returning all the // pending as well as queued transactions, grouped by account and nonce. -func (pool *TxPool) Content() (map[common.Address]map[uint64][]*types.Transaction, map[common.Address]map[uint64][]*types.Transaction) { +func (pool *TxPool) Content() (map[common.Address]TxList, map[common.Address]TxList) { pool.mu.RLock() defer pool.mu.RUnlock() // Retrieve all the pending transactions and sort by account and by nonce - pending := make(map[common.Address]map[uint64][]*types.Transaction) - for _, tx := range pool.pending { - account, _ := tx.From() - - owned, ok := pending[account] - if !ok { - owned = make(map[uint64][]*types.Transaction) - pending[account] = owned + pending := make(map[common.Address]TxList) + for addr, txs := range pool.pending { + copy := make(TxList) + for nonce, tx := range txs { + copy[nonce] = tx } - owned[tx.Nonce()] = append(owned[tx.Nonce()], tx) + pending[addr] = copy } // Retrieve all the queued transactions and sort by account and by nonce - queued := make(map[common.Address]map[uint64][]*types.Transaction) - for account, txs := range pool.queue { - owned := make(map[uint64][]*types.Transaction) - for _, tx := range txs { - owned[tx.Nonce()] = append(owned[tx.Nonce()], tx) + queued := make(map[common.Address]TxList) + for addr, txs := range pool.queue { + copy := make(TxList) + for nonce, tx := range txs { + copy[nonce] = tx } - queued[account] = owned + queued[addr] = copy } return pending, queued } @@ -280,7 +287,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { func (self *TxPool) add(tx *types.Transaction) error { hash := tx.Hash() - if self.pending[hash] != nil { + if self.all[hash] != nil { return fmt.Errorf("Known transaction (%x)", hash[:4]) } err := self.validateTx(tx) @@ -306,33 +313,63 @@ func (self *TxPool) add(tx *types.Transaction) error { return nil } -// queueTx will queue an unknown transaction +// queueTx will queue an unknown transaction. func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { - from, _ := tx.From() // already validated - if self.queue[from] == nil { - self.queue[from] = make(map[common.Hash]*types.Transaction) + addr, _ := tx.From() // already validated + if self.queue[addr] == nil { + self.queue[addr] = make(TxList) } - self.queue[from][hash] = tx + // If the nonce is already used, discard the lower priced transaction + nonce := tx.Nonce() + + if old, ok := self.queue[addr][nonce]; ok { + if old.GasPrice().Cmp(tx.GasPrice()) >= 0 { + return // Old was better, discard this + } + delete(self.all, old.Hash()) // New is better, drop and overwrite old one + } + self.queue[addr][nonce] = tx + self.all[hash] = tx } -// addTx will add a transaction to the pending (processable queue) list of transactions -func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) { - // init delayed since tx pool could have been started before any state sync +// addTx will moves a transaction from the non-executable queue to the pending +// (processable) list of transactions. +func (pool *TxPool) addTx(addr common.Address, tx *types.Transaction) { + // Init delayed since tx pool could have been started before any state sync if pool.pendingState == nil { pool.resetState() } + // If the nonce is already used, discard the lower priced transaction + hash, nonce := tx.Hash(), tx.Nonce() - if _, ok := pool.pending[hash]; !ok { - pool.pending[hash] = tx + if old, ok := pool.pending[addr][nonce]; ok { + oldHash := old.Hash() - // Increment the nonce on the pending state. This can only happen if - // the nonce is +1 to the previous one. - pool.pendingState.SetNonce(addr, tx.Nonce()+1) - // Notify the subscribers. This event is posted in a goroutine - // because it's possible that somewhere during the post "Remove transaction" - // gets called which will then wait for the global tx pool lock and deadlock. - go pool.eventMux.Post(TxPreEvent{tx}) + switch { + case oldHash == hash: // Nothing changed, noop + return + case old.GasPrice().Cmp(tx.GasPrice()) >= 0: // Old was better, discard this + delete(pool.all, hash) + return + default: // New is better, discard old + delete(pool.all, oldHash) + } + } + // The transaction is being kept, insert it into the tx pool + if _, ok := pool.pending[addr]; !ok { + pool.pending[addr] = make(TxList) } + pool.pending[addr][nonce] = tx + pool.all[hash] = tx + + // Increment the nonce on the pending state. This can only happen if + // the nonce is +1 to the previous one. + pool.pendingState.SetNonce(addr, nonce+1) + + // Notify the subscribers. This event is posted in a goroutine + // because it's possible that somewhere during the post "Remove transaction" + // gets called which will then wait for the global tx pool lock and deadlock. + go pool.eventMux.Post(TxPreEvent{tx}) } // Add queues a single transaction in the pool if it is valid. @@ -371,58 +408,39 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { tp.mu.RLock() defer tp.mu.RUnlock() - // check the txs first - if tx, ok := tp.pending[hash]; ok { - return tx - } - // check queue - for _, txs := range tp.queue { - if tx, ok := txs[hash]; ok { - return tx - } - } - return nil + return tp.all[hash] } // GetTransactions returns all currently processable transactions. // The returned slice may be modified by the caller. -func (self *TxPool) GetTransactions() (txs types.Transactions) { +func (self *TxPool) GetTransactions() types.Transactions { self.mu.Lock() defer self.mu.Unlock() // check queue first self.checkQueue() + // invalidate any txs self.validatePool() - txs = make(types.Transactions, len(self.pending)) - i := 0 - for _, tx := range self.pending { - txs[i] = tx - i++ + count := 0 + for _, txs := range self.pending { + count += len(txs) } - return txs -} - -// GetQueuedTransactions returns all non-processable transactions. -func (self *TxPool) GetQueuedTransactions() types.Transactions { - self.mu.RLock() - defer self.mu.RUnlock() - - var ret types.Transactions - for _, txs := range self.queue { + pending := make(types.Transactions, 0, count) + for _, txs := range self.pending { for _, tx := range txs { - ret = append(ret, tx) + pending = append(pending, tx) } } - sort.Sort(types.TxByNonce(ret)) - return ret + return pending } // RemoveTransactions removes all given transactions from the pool. func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() + for _, tx := range txs { self.removeTx(tx.Hash()) } @@ -432,29 +450,35 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) { func (pool *TxPool) RemoveTx(hash common.Hash) { pool.mu.Lock() defer pool.mu.Unlock() + pool.removeTx(hash) } func (pool *TxPool) removeTx(hash common.Hash) { - // delete from pending pool - delete(pool.pending, hash) - // delete from queue - for address, txs := range pool.queue { - if _, ok := txs[hash]; ok { - if len(txs) == 1 { - // if only one tx, remove entire address entry. - delete(pool.queue, address) - } else { - delete(txs, hash) - } - break - } + // Fetch the transaction we wish to delete + tx, ok := pool.all[hash] + if !ok { + return + } + addr, _ := tx.From() + + // Remove it from all internal lists + delete(pool.all, hash) + + delete(pool.pending[addr], tx.Nonce()) + if len(pool.pending[addr]) == 0 { + delete(pool.pending, addr) + } + delete(pool.queue[addr], tx.Nonce()) + if len(pool.queue[addr]) == 0 { + delete(pool.queue, addr) } } -// checkQueue moves transactions that have become processable to main pool. +// checkQueue moves transactions that have become processable from the pool's +// queue to the set of pending transactions. func (pool *TxPool) checkQueue() { - // init delayed since tx pool could have been started before any state sync + // Init delayed since tx pool could have been started before any state sync if pool.pendingState == nil { pool.resetState() } @@ -473,17 +497,19 @@ func (pool *TxPool) checkQueue() { trueNonce = currentState.GetNonce(address) // nonce known by the last state ) promote = promote[:0] - for hash, tx := range txs { + for nonce, tx := range txs { // Drop processed or out of fund transactions - if tx.Nonce() < trueNonce || balance.Cmp(tx.Cost()) < 0 { + if nonce < trueNonce || balance.Cmp(tx.Cost()) < 0 { if glog.V(logger.Core) { glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx) } - delete(txs, hash) + delete(txs, nonce) + delete(pool.all, tx.Hash()) + continue } // Collect the remaining transactions for the next pass. - promote = append(promote, txQueueEntry{hash, address, tx}) + promote = append(promote, txQueueEntry{address, tx}) } // Find the next consecutive nonce range starting at the current account nonce, // pushing the guessed nonce forward if we add consecutive transactions. @@ -493,17 +519,18 @@ func (pool *TxPool) checkQueue() { if entry.Nonce() > guessedNonce { if len(promote)-i > maxQueued { if glog.V(logger.Debug) { - glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.hash[:])) + glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.Hash().Bytes())) } for _, drop := range promote[i+maxQueued:] { - delete(txs, drop.hash) + delete(txs, drop.Nonce()) + delete(pool.all, drop.Hash()) } } break } // Otherwise promote the transaction and move the guess nonce if needed - pool.addTx(entry.hash, address, entry.Transaction) - delete(txs, entry.hash) + pool.addTx(address, entry.Transaction) + delete(txs, entry.Nonce()) if entry.Nonce() == guessedNonce { guessedNonce++ @@ -532,40 +559,48 @@ func (pool *TxPool) validatePool() { // Clean up the pending pool, accumulating invalid nonces gaps := make(map[common.Address]uint64) - for hash, tx := range pool.pending { - sender, _ := tx.From() // err already checked - - // Perform light nonce and balance validation - balance := balanceCache[sender] - if balance == nil { - balance = state.GetBalance(sender) - balanceCache[sender] = balance - } - if past := state.GetNonce(sender) > tx.Nonce(); past || balance.Cmp(tx.Cost()) < 0 { - // Remove an already past it invalidated transaction - if glog.V(logger.Core) { - glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx) + for addr, txs := range pool.pending { + for nonce, tx := range txs { + // Perform light nonce and balance validation + balance := balanceCache[addr] + if balance == nil { + balance = state.GetBalance(addr) + balanceCache[addr] = balance } - delete(pool.pending, hash) + if past := state.GetNonce(addr) > nonce; past || balance.Cmp(tx.Cost()) < 0 { + // Remove an already past it invalidated transaction + if glog.V(logger.Core) { + glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx) + } + delete(pool.pending[addr], nonce) + if len(pool.pending[addr]) == 0 { + delete(pool.pending, addr) + } + delete(pool.all, tx.Hash()) - // Track the smallest invalid nonce to postpone subsequent transactions - if !past { - if prev, ok := gaps[sender]; !ok || tx.Nonce() < prev { - gaps[sender] = tx.Nonce() + // Track the smallest invalid nonce to postpone subsequent transactions + if !past { + if prev, ok := gaps[addr]; !ok || nonce < prev { + gaps[addr] = nonce + } } } } } // Move all transactions after a gap back to the future queue if len(gaps) > 0 { - for hash, tx := range pool.pending { - sender, _ := tx.From() - if gap, ok := gaps[sender]; ok && tx.Nonce() >= gap { - if glog.V(logger.Core) { - glog.Infof("postponed tx (%v) due to introduced gap\n", tx) + for addr, txs := range pool.pending { + for nonce, tx := range txs { + if gap, ok := gaps[addr]; ok && nonce >= gap { + if glog.V(logger.Core) { + glog.Infof("postponed tx (%v) due to introduced gap\n", tx) + } + delete(pool.pending[addr], nonce) + if len(pool.pending[addr]) == 0 { + delete(pool.pending, addr) + } + pool.queueTx(tx.Hash(), tx) } - pool.queueTx(hash, tx) - delete(pool.pending, hash) } } } @@ -574,7 +609,6 @@ func (pool *TxPool) validatePool() { type txQueue []txQueueEntry type txQueueEntry struct { - hash common.Hash addr common.Address *types.Transaction } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index ed9bea75f..8aa69233d 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -103,7 +103,7 @@ func TestTransactionQueue(t *testing.T) { currentState.SetNonce(from, 2) pool.queueTx(tx.Hash(), tx) pool.checkQueue() - if _, ok := pool.pending[tx.Hash()]; ok { + if _, ok := pool.pending[from][tx.Nonce()]; ok { t.Error("expected transaction to be in tx pool") } @@ -139,7 +139,7 @@ func TestRemoveTx(t *testing.T) { currentState, _ := pool.currentState() currentState.AddBalance(from, big.NewInt(1)) pool.queueTx(tx.Hash(), tx) - pool.addTx(tx.Hash(), from, tx) + pool.addTx(from, tx) if len(pool.queue) != 1 { t.Error("expected queue to be 1, got", len(pool.queue)) } @@ -210,18 +210,38 @@ func TestTransactionDoubleNonce(t *testing.T) { } resetState() - tx := transaction(0, big.NewInt(100000), key) - tx2 := transaction(0, big.NewInt(1000000), key) - if err := pool.add(tx); err != nil { + tx1, _ := types.NewTransaction(0, common.Address{}, big.NewInt(100), big.NewInt(100000), big.NewInt(1), nil).SignECDSA(key) + tx2, _ := types.NewTransaction(0, common.Address{}, big.NewInt(100), big.NewInt(1000000), big.NewInt(2), nil).SignECDSA(key) + tx3, _ := types.NewTransaction(0, common.Address{}, big.NewInt(100), big.NewInt(1000000), big.NewInt(1), nil).SignECDSA(key) + + // Add the first two transaction, ensure higher priced stays only + if err := pool.add(tx1); err != nil { t.Error("didn't expect error", err) } if err := pool.add(tx2); err != nil { t.Error("didn't expect error", err) } - pool.checkQueue() - if len(pool.pending) != 2 { - t.Error("expected 2 pending txs. Got", len(pool.pending)) + if len(pool.pending[addr]) != 1 { + t.Error("expected 1 pending transactions, got", len(pool.pending)) + } + if tx := pool.pending[addr][0]; tx.Hash() != tx2.Hash() { + t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) + } + // Add the thid transaction and ensure it's not saved (smaller price) + if err := pool.add(tx3); err != nil { + t.Error("didn't expect error", err) + } + pool.checkQueue() + if len(pool.pending[addr]) != 1 { + t.Error("expected 1 pending transactions, got", len(pool.pending)) + } + if tx := pool.pending[addr][0]; tx.Hash() != tx2.Hash() { + t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) + } + // Ensure the total transaction count is correct + if len(pool.all) != 1 { + t.Error("expected 1 total transactions, got", len(pool.all)) } } @@ -234,12 +254,15 @@ func TestMissingNonce(t *testing.T) { if err := pool.add(tx); err != nil { t.Error("didn't expect error", err) } - if len(pool.pending) != 0 { - t.Error("expected 0 pending transactions, got", len(pool.pending)) + if len(pool.pending[addr]) != 0 { + t.Error("expected 0 pending transactions, got", len(pool.pending[addr])) } if len(pool.queue[addr]) != 1 { t.Error("expected 1 queued transaction, got", len(pool.queue[addr])) } + if len(pool.all) != 1 { + t.Error("expected 1 total transactions, got", len(pool.all)) + } } func TestNonceRecovery(t *testing.T) { @@ -270,8 +293,11 @@ func TestRemovedTxEvent(t *testing.T) { currentState.AddBalance(from, big.NewInt(1000000000000)) pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}}) pool.eventMux.Post(ChainHeadEvent{nil}) - if len(pool.pending) != 1 { - t.Error("expected 1 pending tx, got", len(pool.pending)) + if len(pool.pending[from]) != 1 { + t.Error("expected 1 pending tx, got", len(pool.pending[from])) + } + if len(pool.all) != 1 { + t.Error("expected 1 total transactions, got", len(pool.all)) } } @@ -292,41 +318,50 @@ func TestTransactionDropping(t *testing.T) { tx10 = transaction(10, big.NewInt(100), key) tx11 = transaction(11, big.NewInt(200), key) ) - pool.addTx(tx0.Hash(), account, tx0) - pool.addTx(tx1.Hash(), account, tx1) + pool.addTx(account, tx0) + pool.addTx(account, tx1) pool.queueTx(tx10.Hash(), tx10) pool.queueTx(tx11.Hash(), tx11) // Check that pre and post validations leave the pool as is - if len(pool.pending) != 2 { - t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending), 2) + if len(pool.pending[account]) != 2 { + t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending[account]), 2) } if len(pool.queue[account]) != 2 { - t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue), 2) + t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue[account]), 2) + } + if len(pool.all) != 4 { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) } pool.resetState() - if len(pool.pending) != 2 { - t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending), 2) + if len(pool.pending[account]) != 2 { + t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending[account]), 2) } if len(pool.queue[account]) != 2 { - t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue), 2) + t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue[account]), 2) + } + if len(pool.all) != 4 { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) } // Reduce the balance of the account, and check that invalidated transactions are dropped state.AddBalance(account, big.NewInt(-750)) pool.resetState() - if _, ok := pool.pending[tx0.Hash()]; !ok { + if _, ok := pool.pending[account][tx0.Nonce()]; !ok { t.Errorf("funded pending transaction missing: %v", tx0) } - if _, ok := pool.pending[tx1.Hash()]; ok { + if _, ok := pool.pending[account][tx1.Nonce()]; ok { t.Errorf("out-of-fund pending transaction present: %v", tx1) } - if _, ok := pool.queue[account][tx10.Hash()]; !ok { + if _, ok := pool.queue[account][tx10.Nonce()]; !ok { t.Errorf("funded queued transaction missing: %v", tx10) } - if _, ok := pool.queue[account][tx11.Hash()]; ok { + if _, ok := pool.queue[account][tx11.Nonce()]; ok { t.Errorf("out-of-fund queued transaction present: %v", tx11) } + if len(pool.all) != 2 { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 2) + } } // Tests that if a transaction is dropped from the current pending pool (e.g. out @@ -349,50 +384,59 @@ func TestTransactionPostponing(t *testing.T) { } else { tx = transaction(uint64(i), big.NewInt(500), key) } - pool.addTx(tx.Hash(), account, tx) + pool.addTx(account, tx) txns = append(txns, tx) } // Check that pre and post validations leave the pool as is - if len(pool.pending) != len(txns) { - t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending), len(txns)) + if len(pool.pending[account]) != len(txns) { + t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending[account]), len(txns)) } if len(pool.queue[account]) != 0 { - t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue), 0) + t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue[account]), 0) + } + if len(pool.all) != len(txns) { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns)) } pool.resetState() - if len(pool.pending) != len(txns) { - t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending), len(txns)) + if len(pool.pending[account]) != len(txns) { + t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending[account]), len(txns)) } if len(pool.queue[account]) != 0 { - t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue), 0) + t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue[account]), 0) + } + if len(pool.all) != len(txns) { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns)) } // Reduce the balance of the account, and check that transactions are reorganised state.AddBalance(account, big.NewInt(-750)) pool.resetState() - if _, ok := pool.pending[txns[0].Hash()]; !ok { + if _, ok := pool.pending[account][txns[0].Nonce()]; !ok { t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txns[0]) } - if _, ok := pool.queue[account][txns[0].Hash()]; ok { + if _, ok := pool.queue[account][txns[0].Nonce()]; ok { t.Errorf("tx %d: valid and funded transaction present in future queue: %v", 0, txns[0]) } for i, tx := range txns[1:] { if i%2 == 1 { - if _, ok := pool.pending[tx.Hash()]; ok { + if _, ok := pool.pending[account][tx.Nonce()]; ok { t.Errorf("tx %d: valid but future transaction present in pending pool: %v", i+1, tx) } - if _, ok := pool.queue[account][tx.Hash()]; !ok { + if _, ok := pool.queue[account][tx.Nonce()]; !ok { t.Errorf("tx %d: valid but future transaction missing from future queue: %v", i+1, tx) } } else { - if _, ok := pool.pending[tx.Hash()]; ok { + if _, ok := pool.pending[account][tx.Nonce()]; ok { t.Errorf("tx %d: out-of-fund transaction present in pending pool: %v", i+1, tx) } - if _, ok := pool.queue[account][tx.Hash()]; ok { + if _, ok := pool.queue[account][tx.Nonce()]; ok { t.Errorf("tx %d: out-of-fund transaction present in future queue: %v", i+1, tx) } } } + if len(pool.all) != len(txns)/2 { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns)/2) + } } // Tests that if the transaction count belonging to a single account goes above @@ -423,6 +467,9 @@ func TestTransactionQueueLimiting(t *testing.T) { } } } + if len(pool.all) != maxQueued { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), maxQueued) + } } // Tests that even if the transaction count belonging to a single account goes @@ -441,13 +488,16 @@ func TestTransactionPendingLimiting(t *testing.T) { if err := pool.Add(transaction(i, big.NewInt(100000), key)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } - if len(pool.pending) != int(i)+1 { - t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, len(pool.pending), i+1) + if len(pool.pending[account]) != int(i)+1 { + t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, len(pool.pending[account]), i+1) } if len(pool.queue[account]) != 0 { t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, len(pool.queue[account]), 0) } } + if len(pool.all) != maxQueued+5 { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), maxQueued+5) + } } // Tests that the transaction limits are enforced the same way irrelevant whether @@ -486,6 +536,9 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { if len(pool1.queue[account1]) != len(pool2.queue[account2]) { t.Errorf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.queue[account1]), len(pool2.queue[account2])) } + if len(pool1.all) != len(pool2.all) { + t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", len(pool1.all), len(pool2.all)) + } } // Benchmarks the speed of validating the contents of the pending queue of the @@ -503,7 +556,7 @@ func benchmarkValidatePool(b *testing.B, size int) { for i := 0; i < size; i++ { tx := transaction(uint64(i), big.NewInt(100000), key) - pool.addTx(tx.Hash(), account, tx) + pool.addTx(account, tx) } // Benchmark the speed of pool validation b.ResetTimer() diff --git a/eth/api_backend.go b/eth/api_backend.go index efcdb3361..e19254374 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -149,7 +149,7 @@ func (b *EthApiBackend) Stats() (pending int, queued int) { return b.eth.txPool.Stats() } -func (b *EthApiBackend) TxPoolContent() (map[common.Address]map[uint64][]*types.Transaction, map[common.Address]map[uint64][]*types.Transaction) { +func (b *EthApiBackend) TxPoolContent() (map[common.Address]core.TxList, map[common.Address]core.TxList) { b.eth.txMu.Lock() defer b.eth.txMu.Unlock() diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 184b5831f..0fba55ae8 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -100,32 +100,26 @@ func NewPublicTxPoolAPI(b Backend) *PublicTxPoolAPI { } // Content returns the transactions contained within the transaction pool. -func (s *PublicTxPoolAPI) Content() map[string]map[string]map[string][]*RPCTransaction { - content := map[string]map[string]map[string][]*RPCTransaction{ - "pending": make(map[string]map[string][]*RPCTransaction), - "queued": make(map[string]map[string][]*RPCTransaction), +func (s *PublicTxPoolAPI) Content() map[string]map[string]map[string]*RPCTransaction { + content := map[string]map[string]map[string]*RPCTransaction{ + "pending": make(map[string]map[string]*RPCTransaction), + "queued": make(map[string]map[string]*RPCTransaction), } pending, queue := s.b.TxPoolContent() // Flatten the pending transactions - for account, batches := range pending { - dump := make(map[string][]*RPCTransaction) - for nonce, txs := range batches { - nonce := fmt.Sprintf("%d", nonce) - for _, tx := range txs { - dump[nonce] = append(dump[nonce], newRPCPendingTransaction(tx)) - } + for account, txs := range pending { + dump := make(map[string]*RPCTransaction) + for nonce, tx := range txs { + dump[fmt.Sprintf("%d", nonce)] = newRPCPendingTransaction(tx) } content["pending"][account.Hex()] = dump } // Flatten the queued transactions - for account, batches := range queue { - dump := make(map[string][]*RPCTransaction) - for nonce, txs := range batches { - nonce := fmt.Sprintf("%d", nonce) - for _, tx := range txs { - dump[nonce] = append(dump[nonce], newRPCPendingTransaction(tx)) - } + for account, txs := range queue { + dump := make(map[string]*RPCTransaction) + for nonce, tx := range txs { + dump[fmt.Sprintf("%d", nonce)] = newRPCPendingTransaction(tx) } content["queued"][account.Hex()] = dump } @@ -143,10 +137,10 @@ func (s *PublicTxPoolAPI) Status() map[string]*rpc.HexNumber { // Inspect retrieves the content of the transaction pool and flattens it into an // easily inspectable list. -func (s *PublicTxPoolAPI) Inspect() map[string]map[string]map[string][]string { - content := map[string]map[string]map[string][]string{ - "pending": make(map[string]map[string][]string), - "queued": make(map[string]map[string][]string), +func (s *PublicTxPoolAPI) Inspect() map[string]map[string]map[string]string { + content := map[string]map[string]map[string]string{ + "pending": make(map[string]map[string]string), + "queued": make(map[string]map[string]string), } pending, queue := s.b.TxPoolContent() @@ -158,24 +152,18 @@ func (s *PublicTxPoolAPI) Inspect() map[string]map[string]map[string][]string { return fmt.Sprintf("contract creation: %v wei + %v × %v gas", tx.Value(), tx.Gas(), tx.GasPrice()) } // Flatten the pending transactions - for account, batches := range pending { - dump := make(map[string][]string) - for nonce, txs := range batches { - nonce := fmt.Sprintf("%d", nonce) - for _, tx := range txs { - dump[nonce] = append(dump[nonce], format(tx)) - } + for account, txs := range pending { + dump := make(map[string]string) + for nonce, tx := range txs { + dump[fmt.Sprintf("%d", nonce)] = format(tx) } content["pending"][account.Hex()] = dump } // Flatten the queued transactions - for account, batches := range queue { - dump := make(map[string][]string) - for nonce, txs := range batches { - nonce := fmt.Sprintf("%d", nonce) - for _, tx := range txs { - dump[nonce] = append(dump[nonce], format(tx)) - } + for account, txs := range queue { + dump := make(map[string]string) + for nonce, tx := range txs { + dump[fmt.Sprintf("%d", nonce)] = format(tx) } content["queued"][account.Hex()] = dump } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 791a06925..673ab0546 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -58,7 +58,7 @@ type Backend interface { GetPoolTransaction(txHash common.Hash) *types.Transaction GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) Stats() (pending int, queued int) - TxPoolContent() (map[common.Address]map[uint64][]*types.Transaction, map[common.Address]map[uint64][]*types.Transaction) + TxPoolContent() (map[common.Address]core.TxList, map[common.Address]core.TxList) } type State interface { diff --git a/miner/worker.go b/miner/worker.go index 59406bf4e..f243fe799 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -68,12 +68,12 @@ type Work struct { ancestors *set.Set // ancestor set (used for checking uncle parent validity) family *set.Set // family set (used for checking uncle invalidity) uncles *set.Set // uncle set - remove *set.Set // tx which will be removed tcount int // tx count in cycle ignoredTransactors *set.Set lowGasTransactors *set.Set ownedAccounts *set.Set lowGasTxs types.Transactions + failedTxs types.Transactions localMinedBlocks *uint64RingBuffer // the most recent block numbers that were mined locally (used to check block inclusion) Block *types.Block // the new block @@ -383,7 +383,6 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error accounts := self.eth.AccountManager().Accounts() // Keep track of transactions which return errors so they can be removed - work.remove = set.New() work.tcount = 0 work.ignoredTransactors = set.New() work.lowGasTransactors = set.New() @@ -533,7 +532,9 @@ func (self *worker) commitNewWork() { */ work.commitTransactions(self.mux, transactions, self.gasPrice, self.chain) + self.eth.TxPool().RemoveTransactions(work.lowGasTxs) + self.eth.TxPool().RemoveTransactions(work.failedTxs) // compute uncles for the new block. var ( @@ -639,11 +640,10 @@ func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Trans // ignore the transactor so no nonce errors will be thrown for this account // next time the worker is run, they'll be picked up again. env.ignoredTransactors.Add(from) - glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) - case err != nil: - env.remove.Add(tx.Hash()) + case err != nil: + env.failedTxs = append(env.failedTxs, tx) if glog.V(logger.Detail) { glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) } -- cgit From 0ef327bbee79c01a69ba59258acc6ce3a48bc288 Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Fri, 1 Jul 2016 18:59:55 +0300 Subject: core, eth, internal, miner: optimize txpool for quick ops --- core/tx_list.go | 331 +++++++++++++++++++++++++++ core/tx_list_test.go | 58 +++++ core/tx_pool.go | 503 ++++++++++++++++++----------------------- core/tx_pool_test.go | 226 ++++++++++-------- core/types/transaction.go | 27 +-- core/types/transaction_test.go | 7 +- eth/api_backend.go | 14 +- eth/handler.go | 2 +- eth/helper_test.go | 22 +- eth/protocol.go | 8 +- eth/protocol_test.go | 2 +- eth/sync.go | 5 +- internal/ethapi/backend.go | 2 +- miner/worker.go | 7 +- 14 files changed, 788 insertions(+), 426 deletions(-) create mode 100644 core/tx_list.go create mode 100644 core/tx_list_test.go diff --git a/core/tx_list.go b/core/tx_list.go new file mode 100644 index 000000000..e30fee38f --- /dev/null +++ b/core/tx_list.go @@ -0,0 +1,331 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package core + +import ( + "container/heap" + "math" + "math/big" + "sort" + + "github.com/ethereum/go-ethereum/core/types" +) + +// nonceHeap is a heap.Interface implementation over 64bit unsigned integers for +// retrieving sorted transactions from the possibly gapped future queue. +type nonceHeap []uint64 + +func (h nonceHeap) Len() int { return len(h) } +func (h nonceHeap) Less(i, j int) bool { return h[i] < h[j] } +func (h nonceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *nonceHeap) Push(x interface{}) { + *h = append(*h, x.(uint64)) +} + +func (h *nonceHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// txList is a "list" of transactions belonging to an account, sorted by account +// nonce. The same type can be used both for storing contiguous transactions for +// the executable/pending queue; and for storing gapped transactions for the non- +// executable/future queue, with minor behavoiral changes. +type txList struct { + strict bool // Whether nonces are strictly continuous or not + items map[uint64]*types.Transaction // Hash map storing the transaction data + cache types.Transactions // cache of the transactions already sorted + + first uint64 // Nonce of the lowest stored transaction (strict mode) + last uint64 // Nonce of the highest stored transaction (strict mode) + index *nonceHeap // Heap of nonces of all teh stored transactions (non-strict mode) + + costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance) +} + +// newTxList create a new transaction list for maintaining nonce-indexable fast, +// gapped, sortable transaction lists. +func newTxList(strict bool) *txList { + return &txList{ + strict: strict, + items: make(map[uint64]*types.Transaction), + first: math.MaxUint64, + index: &nonceHeap{}, + costcap: new(big.Int), + } +} + +// Add tries to inserts a new transaction into the list, returning whether the +// transaction was acceped, and if yes, any previous transaction it replaced. +// +// In case of strict lists (contiguous nonces) the nonce boundaries are updated +// appropriately with the new transaction. Otherwise (gapped nonces) the heap of +// nonces is expanded with the new transaction. +func (l *txList) Add(tx *types.Transaction) (bool, *types.Transaction) { + // If an existing transaction is better, discard new one + nonce := tx.Nonce() + + old, ok := l.items[nonce] + if ok && old.GasPrice().Cmp(tx.GasPrice()) >= 0 { + return false, nil + } + // Otherwise insert the transaction and replace any previous one + l.items[nonce] = tx + if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 { + l.costcap = cost + } + if l.strict { + // In strict mode, maintain the nonce sequence boundaries + if nonce < l.first { + l.first = nonce + } + if nonce > l.last { + l.last = nonce + } + } else { + // In gapped mode, maintain the nonce heap + heap.Push(l.index, nonce) + } + l.cache = nil + + return true, old +} + +// Forward removes all transactions from the list with a nonce lower than the +// provided threshold. Every removed transaction is returned for any post-removal +// maintenance. +func (l *txList) Forward(threshold uint64) types.Transactions { + var removed types.Transactions + + if l.strict { + // In strict mode, push the lowest nonce forward to the threshold + for l.first < threshold { + if tx, ok := l.items[l.first]; ok { + removed = append(removed, tx) + } + delete(l.items, l.first) + l.first++ + } + if l.first > l.last { + l.last = l.first + } + } else { + // In gapped mode, pop off heap items until the threshold is reached + for l.index.Len() > 0 && (*l.index)[0] < threshold { + nonce := heap.Pop(l.index).(uint64) + removed = append(removed, l.items[nonce]) + delete(l.items, nonce) + } + } + l.cache = nil + + return removed +} + +// Filter removes all transactions from the list with a cost higher than the +// provided threshold. Every removed transaction is returned for any post-removal +// maintenance. Strict-mode invalidated transactions are also returned. +// +// This method uses the cached costcap to quickly decide if there's even a point +// in calculating all the costs or if the balance covers all. If the threshold is +// loewr than the costcap, the costcap will be reset to a new high after removing +// expensive the too transactions. +func (l *txList) Filter(threshold *big.Int) (types.Transactions, types.Transactions) { + // If all transactions are blow the threshold, short circuit + if l.costcap.Cmp(threshold) <= 0 { + return nil, nil + } + l.costcap = new(big.Int).Set(threshold) // Lower the cap to the threshold + + // Gather all the transactions needing deletion + var removed types.Transactions + for _, tx := range l.items { + if cost := tx.Cost(); cost.Cmp(threshold) > 0 { + removed = append(removed, tx) + delete(l.items, tx.Nonce()) + } + } + // Readjust the nonce boundaries/indexes and gather invalidate tranactions + var invalids types.Transactions + if l.strict { + // In strict mode iterate find the first gap and invalidate everything after it + for i := l.first; i <= l.last; i++ { + if _, ok := l.items[i]; !ok { + // Gap found, invalidate all subsequent transactions + for j := i + 1; j <= l.last; j++ { + if tx, ok := l.items[j]; ok { + invalids = append(invalids, tx) + delete(l.items, j) + } + } + // Reduce the highest transaction nonce and return + l.last = i - 1 + break + } + } + } else { + // In gapped mode no transactions are invalid, but the heap is ruined + l.index = &nonceHeap{} + for nonce, _ := range l.items { + *l.index = append(*l.index, nonce) + } + heap.Init(l.index) + } + l.cache = nil + + return removed, invalids +} + +// Cap places a hard limit on the number of items, returning all transactions +// exceeding tht limit. +func (l *txList) Cap(threshold int) types.Transactions { + // Short circuit if the number of items is under the limit + if len(l.items) < threshold { + return nil + } + // Otherwise gather and drop the highest nonce'd transactions + var drops types.Transactions + + if l.strict { + // In strict mode, just gather top down from last to first + for len(l.items) > threshold { + if tx, ok := l.items[l.last]; ok { + drops = append(drops, tx) + delete(l.items, l.last) + l.last-- + } + } + } else { + // In gapped mode it's expensive: we need to sort and drop like that + sort.Sort(*l.index) + for size := len(l.items); size > threshold; size-- { + drops = append(drops, l.items[(*l.index)[size-1]]) + delete(l.items, (*l.index)[size-1]) + *l.index = (*l.index)[:size-1] + } + heap.Init(l.index) + } + l.cache = nil + + return drops +} + +// Remove deletes a transaction from the maintained list, returning whether the +// transaction was found, and also returning any transaction invalidated due to +// the deletion (strict mode only). +func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) { + nonce := tx.Nonce() + if _, ok := l.items[nonce]; ok { + // Remove the item and invalidate the sorted cache + delete(l.items, nonce) + l.cache = nil + + // Remove all invalidated transactions (strict mode only!) + invalids := make(types.Transactions, 0, l.last-nonce) + if l.strict { + for i := nonce + 1; i <= l.last; i++ { + invalids = append(invalids, l.items[i]) + delete(l.items, i) + } + l.last = nonce - 1 + } else { + // In gapped mode, remove the nonce from the index but honour the heap + for i := 0; i < l.index.Len(); i++ { + if (*l.index)[i] == nonce { + heap.Remove(l.index, i) + break + } + } + } + // Figure out the new highest nonce + return true, invalids + } + return false, nil +} + +// Ready retrieves a sequentially increasing list of transactions starting at the +// provided nonce that is ready for processing. The returned transactions will be +// removed from the list. +// +// Note, all transactions with nonces lower that start will also be returned to +// prevent getting into and invalid state. This is not something that should ever +// happen but better to be self correcting than failing! +func (l *txList) Ready(start uint64) types.Transactions { + var txs types.Transactions + if l.strict { + // In strict mode make sure we have valid transaction, return all contiguous + if l.first > start { + return nil + } + for { + if tx, ok := l.items[l.first]; ok { + txs = append(txs, tx) + delete(l.items, l.first) + l.first++ + continue + } + break + } + } else { + // In gapped mode, check the heap start and return all contiguous + if l.index.Len() == 0 || (*l.index)[0] > start { + return nil + } + next := (*l.index)[0] + for l.index.Len() > 0 && (*l.index)[0] == next { + txs = append(txs, l.items[next]) + delete(l.items, next) + heap.Pop(l.index) + next++ + } + } + l.cache = nil + + return txs +} + +// Len returns the length of the transaction list. +func (l *txList) Len() int { + return len(l.items) +} + +// Empty returns whether the list of transactions is empty or not. +func (l *txList) Empty() bool { + return len(l.items) == 0 +} + +// Flatten creates a nonce-sorted slice of transactions based on the loosely +// sorted internal representation. The result of the sorting is cached in case +// it's requested again before any modifications are made to the contents. +func (l *txList) Flatten() types.Transactions { + // If the sorting was not cached yet, create and cache it + if l.cache == nil { + l.cache = make(types.Transactions, 0, len(l.items)) + for _, tx := range l.items { + l.cache = append(l.cache, tx) + } + sort.Sort(types.TxByNonce(l.cache)) + } + // Copy the cache to prevent accidental modifications + txs := make(types.Transactions, len(l.cache)) + copy(txs, l.cache) + return txs +} diff --git a/core/tx_list_test.go b/core/tx_list_test.go new file mode 100644 index 000000000..ea83ca479 --- /dev/null +++ b/core/tx_list_test.go @@ -0,0 +1,58 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package core + +import ( + "math/big" + "math/rand" + "testing" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" +) + +// Tests that transactions can be added to strict lists and list contents and +// nonce boundaries are correctly maintained. +func TestStrictTxListAdd(t *testing.T) { + // Generate a list of transactions to insert + key, _ := crypto.GenerateKey() + + txs := make(types.Transactions, 1024) + for i := 0; i < len(txs); i++ { + txs[i] = transaction(uint64(i), new(big.Int), key) + } + // Insert the transactions in a random order + list := newTxList(true) + for _, v := range rand.Perm(len(txs)) { + list.Add(txs[v]) + } + // Verify internal state + if list.first != 0 { + t.Errorf("lowest nonce mismatch: have %d, want %d", list.first, 0) + } + if int(list.last) != len(txs)-1 { + t.Errorf("highest nonce mismatch: have %d, want %d", list.last, len(txs)-1) + } + if len(list.items) != len(txs) { + t.Errorf("transaction count mismatch: have %d, want %d", len(list.items), len(txs)) + } + for i, tx := range txs { + if list.items[tx.Nonce()] != tx { + t.Errorf("item %d: transaction mismatch: have %v, want %v", i, list.items[tx.Nonce()], tx) + } + } +} diff --git a/core/tx_pool.go b/core/tx_pool.go index ec3d5c16b..c4dcceba0 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "math/big" - "sort" "sync" "time" @@ -51,11 +50,6 @@ const ( type stateFn func() (*state.StateDB, error) -// TxList is a "list" of transactions belonging to an account, sorted by account -// nonce. To allow gaps and avoid constant copying, the list is represented as a -// hash map. -type TxList map[uint64]*types.Transaction - // TxPool contains all currently known transactions. Transactions // enter the pool when they are received from the network or submitted // locally. They exit the pool when they are included in the blockchain. @@ -74,8 +68,8 @@ type TxPool struct { localTx *txSet mu sync.RWMutex - pending map[common.Address]TxList // All currently processable transactions - queue map[common.Address]TxList // Queued but non-processable transactions + pending map[common.Address]*txList // All currently processable transactions + queue map[common.Address]*txList // Queued but non-processable transactions all map[common.Hash]*types.Transaction // All transactions to allow lookups wg sync.WaitGroup // for shutdown sync @@ -86,8 +80,8 @@ type TxPool struct { func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { pool := &TxPool{ config: config, - pending: make(map[common.Address]TxList), - queue: make(map[common.Address]TxList), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), all: make(map[common.Hash]*types.Transaction), eventMux: eventMux, currentState: currentStateFn, @@ -125,7 +119,7 @@ func (pool *TxPool) eventLoop() { pool.minGasPrice = ev.Price pool.mu.Unlock() case RemovedTransactionEvent: - pool.AddTransactions(ev.Txs) + pool.AddBatch(ev.Txs) } } } @@ -133,12 +127,12 @@ func (pool *TxPool) eventLoop() { func (pool *TxPool) resetState() { currentState, err := pool.currentState() if err != nil { - glog.V(logger.Info).Infoln("failed to get current state: %v", err) + glog.V(logger.Error).Infof("Failed to get current state: %v", err) return } managedState := state.ManageState(currentState) if err != nil { - glog.V(logger.Info).Infoln("failed to get managed state: %v", err) + glog.V(logger.Error).Infof("Failed to get managed state: %v", err) return } pool.pendingState = managedState @@ -147,22 +141,15 @@ func (pool *TxPool) resetState() { // any transactions that have been included in the block or // have been invalidated because of another transaction (e.g. // higher gas price) - pool.validatePool() - - // Loop over the pending transactions and base the nonce of the new - // pending transaction set. - for addr, txs := range pool.pending { - // Set the nonce. Transaction nonce can never be lower - // than the state nonce; validatePool took care of that. - for nonce, _ := range txs { - if pool.pendingState.GetNonce(addr) <= nonce { - pool.pendingState.SetNonce(addr, nonce+1) - } - } + pool.demoteUnexecutables() + + // Update all accounts to the latest known pending nonce + for addr, list := range pool.pending { + pool.pendingState.SetNonce(addr, list.last+1) } // Check the queue and move transactions over to the pending if possible // or remove those that have become invalid - pool.checkQueue() + pool.promoteExecutables() } func (pool *TxPool) Stop() { @@ -178,46 +165,58 @@ func (pool *TxPool) State() *state.ManagedState { return pool.pendingState } +// Stats retrieves the current pool stats, namely the number of pending and the +// number of queued (non-executable) transactions. func (pool *TxPool) Stats() (pending int, queued int) { pool.mu.RLock() defer pool.mu.RUnlock() - for _, txs := range pool.pending { - pending += len(txs) + for _, list := range pool.pending { + pending += list.Len() } - for _, txs := range pool.queue { - queued += len(txs) + for _, list := range pool.queue { + queued += list.Len() } return } // Content retrieves the data content of the transaction pool, returning all the -// pending as well as queued transactions, grouped by account and nonce. -func (pool *TxPool) Content() (map[common.Address]TxList, map[common.Address]TxList) { +// pending as well as queued transactions, grouped by account and sorted by nonce. +func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { pool.mu.RLock() defer pool.mu.RUnlock() - // Retrieve all the pending transactions and sort by account and by nonce - pending := make(map[common.Address]TxList) - for addr, txs := range pool.pending { - copy := make(TxList) - for nonce, tx := range txs { - copy[nonce] = tx - } - pending[addr] = copy - } - // Retrieve all the queued transactions and sort by account and by nonce - queued := make(map[common.Address]TxList) - for addr, txs := range pool.queue { - copy := make(TxList) - for nonce, tx := range txs { - copy[nonce] = tx - } - queued[addr] = copy + pending := make(map[common.Address]types.Transactions) + for addr, list := range pool.pending { + pending[addr] = list.Flatten() + } + queued := make(map[common.Address]types.Transactions) + for addr, list := range pool.queue { + queued[addr] = list.Flatten() } return pending, queued } +// Pending retrieves all currently processable transactions, groupped by origin +// account and sorted by nonce. The returned transaction set is a copy and can be +// freely modified by calling code. +func (pool *TxPool) Pending() map[common.Address]types.Transactions { + pool.mu.Lock() + defer pool.mu.Unlock() + + // check queue first + pool.promoteExecutables() + + // invalidate any txs + pool.demoteUnexecutables() + + pending := make(map[common.Address]types.Transactions) + for addr, list := range pool.pending { + pending[addr] = list.Flatten() + } + return pending +} + // SetLocal marks a transaction as local, skipping gas price // check against local miner minimum in the future func (pool *TxPool) SetLocal(tx *types.Transaction) { @@ -283,340 +282,268 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { return nil } -// validate and queue transactions. -func (self *TxPool) add(tx *types.Transaction) error { +// add validates a transaction and inserts it into the non-executable queue for +// later pending promotion and execution. +func (pool *TxPool) add(tx *types.Transaction) error { + // If the transaction is alreayd known, discard it hash := tx.Hash() - - if self.all[hash] != nil { - return fmt.Errorf("Known transaction (%x)", hash[:4]) + if pool.all[hash] != nil { + return fmt.Errorf("Known transaction: %x", hash[:4]) } - err := self.validateTx(tx) - if err != nil { + // Otherwise ensure basic validation passes nd queue it up + if err := pool.validateTx(tx); err != nil { return err } - self.queueTx(hash, tx) + pool.enqueueTx(hash, tx) + // Print a log message if low enough level is set if glog.V(logger.Debug) { - var toname string + rcpt := "[NEW_CONTRACT]" if to := tx.To(); to != nil { - toname = common.Bytes2Hex(to[:4]) - } else { - toname = "[NEW_CONTRACT]" + rcpt = common.Bytes2Hex(to[:4]) } - // we can ignore the error here because From is - // verified in ValidateTransaction. - f, _ := tx.From() - from := common.Bytes2Hex(f[:4]) - glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash) + from, _ := tx.From() // from already verified during tx validation + glog.Infof("(t) 0x%x => %s (%v) %x\n", from[:4], rcpt, tx.Value, hash) } - return nil } -// queueTx will queue an unknown transaction. -func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { - addr, _ := tx.From() // already validated - if self.queue[addr] == nil { - self.queue[addr] = make(TxList) +// enqueueTx inserts a new transction into the non-executable transaction queue. +// +// Note, this method assumes the pool lock is held! +func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) { + // Try to insert the transaction into the future queue + from, _ := tx.From() // already validated + if pool.queue[from] == nil { + pool.queue[from] = newTxList(false) } - // If the nonce is already used, discard the lower priced transaction - nonce := tx.Nonce() - - if old, ok := self.queue[addr][nonce]; ok { - if old.GasPrice().Cmp(tx.GasPrice()) >= 0 { - return // Old was better, discard this - } - delete(self.all, old.Hash()) // New is better, drop and overwrite old one + inserted, old := pool.queue[from].Add(tx) + if !inserted { + return // An older transaction was better, discard this + } + // Discard any previous transaction and mark this + if old != nil { + delete(pool.all, old.Hash()) } - self.queue[addr][nonce] = tx - self.all[hash] = tx + pool.all[hash] = tx } -// addTx will moves a transaction from the non-executable queue to the pending -// (processable) list of transactions. -func (pool *TxPool) addTx(addr common.Address, tx *types.Transaction) { +// promoteTx adds a transaction to the pending (processable) list of transactions. +// +// Note, this method assumes the pool lock is held! +func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) { // Init delayed since tx pool could have been started before any state sync if pool.pendingState == nil { pool.resetState() } - // If the nonce is already used, discard the lower priced transaction - hash, nonce := tx.Hash(), tx.Nonce() - - if old, ok := pool.pending[addr][nonce]; ok { - oldHash := old.Hash() + // Try to insert the transaction into the pending queue + if pool.pending[addr] == nil { + pool.pending[addr] = newTxList(true) + } + list := pool.pending[addr] - switch { - case oldHash == hash: // Nothing changed, noop - return - case old.GasPrice().Cmp(tx.GasPrice()) >= 0: // Old was better, discard this - delete(pool.all, hash) - return - default: // New is better, discard old - delete(pool.all, oldHash) - } + inserted, old := list.Add(tx) + if !inserted { + // An older transaction was better, discard this + delete(pool.all, hash) + return } - // The transaction is being kept, insert it into the tx pool - if _, ok := pool.pending[addr]; !ok { - pool.pending[addr] = make(TxList) + // Otherwise discard any previous transaction and mark this + if old != nil { + delete(pool.all, old.Hash()) } - pool.pending[addr][nonce] = tx - pool.all[hash] = tx - - // Increment the nonce on the pending state. This can only happen if - // the nonce is +1 to the previous one. - pool.pendingState.SetNonce(addr, nonce+1) + pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests) - // Notify the subscribers. This event is posted in a goroutine - // because it's possible that somewhere during the post "Remove transaction" - // gets called which will then wait for the global tx pool lock and deadlock. + // Set the potentially new pending nonce and notify any subsystems of the new tx + pool.pendingState.SetNonce(addr, list.last+1) go pool.eventMux.Post(TxPreEvent{tx}) } // Add queues a single transaction in the pool if it is valid. -func (self *TxPool) Add(tx *types.Transaction) error { - self.mu.Lock() - defer self.mu.Unlock() +func (pool *TxPool) Add(tx *types.Transaction) error { + pool.mu.Lock() + defer pool.mu.Unlock() - if err := self.add(tx); err != nil { + if err := pool.add(tx); err != nil { return err } - self.checkQueue() + pool.promoteExecutables() + return nil } -// AddTransactions attempts to queue all valid transactions in txs. -func (self *TxPool) AddTransactions(txs []*types.Transaction) { - self.mu.Lock() - defer self.mu.Unlock() +// AddBatch attempts to queue a batch of transactions. +func (pool *TxPool) AddBatch(txs []*types.Transaction) { + pool.mu.Lock() + defer pool.mu.Unlock() for _, tx := range txs { - if err := self.add(tx); err != nil { + if err := pool.add(tx); err != nil { glog.V(logger.Debug).Infoln("tx error:", err) - } else { - h := tx.Hash() - glog.V(logger.Debug).Infof("tx %x\n", h[:4]) } } - - // check and validate the queue - self.checkQueue() + pool.promoteExecutables() } -// GetTransaction returns a transaction if it is contained in the pool +// Get returns a transaction if it is contained in the pool // and nil otherwise. -func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { - tp.mu.RLock() - defer tp.mu.RUnlock() - - return tp.all[hash] -} - -// GetTransactions returns all currently processable transactions. -// The returned slice may be modified by the caller. -func (self *TxPool) GetTransactions() types.Transactions { - self.mu.Lock() - defer self.mu.Unlock() - - // check queue first - self.checkQueue() - - // invalidate any txs - self.validatePool() +func (pool *TxPool) Get(hash common.Hash) *types.Transaction { + pool.mu.RLock() + defer pool.mu.RUnlock() - count := 0 - for _, txs := range self.pending { - count += len(txs) - } - pending := make(types.Transactions, 0, count) - for _, txs := range self.pending { - for _, tx := range txs { - pending = append(pending, tx) - } - } - return pending + return pool.all[hash] } -// RemoveTransactions removes all given transactions from the pool. -func (self *TxPool) RemoveTransactions(txs types.Transactions) { - self.mu.Lock() - defer self.mu.Unlock() +// Remove removes the transaction with the given hash from the pool. +func (pool *TxPool) Remove(hash common.Hash) { + pool.mu.Lock() + defer pool.mu.Unlock() - for _, tx := range txs { - self.removeTx(tx.Hash()) - } + pool.removeTx(hash) } -// RemoveTx removes the transaction with the given hash from the pool. -func (pool *TxPool) RemoveTx(hash common.Hash) { +// RemoveBatch removes all given transactions from the pool. +func (pool *TxPool) RemoveBatch(txs types.Transactions) { pool.mu.Lock() defer pool.mu.Unlock() - pool.removeTx(hash) + for _, tx := range txs { + pool.removeTx(tx.Hash()) + } } +// removeTx iterates removes a single transaction from the queue, moving all +// subsequent transactions back to the future queue. func (pool *TxPool) removeTx(hash common.Hash) { // Fetch the transaction we wish to delete tx, ok := pool.all[hash] if !ok { return } - addr, _ := tx.From() + addr, _ := tx.From() // already validated during insertion - // Remove it from all internal lists + // Remove it from the list of known transactions delete(pool.all, hash) - delete(pool.pending[addr], tx.Nonce()) - if len(pool.pending[addr]) == 0 { - delete(pool.pending, addr) + // Remove the transaction from the pending lists and reset the account nonce + if pending := pool.pending[addr]; pending != nil { + if removed, invalids := pending.Remove(tx); removed { + // If no more transactions are left, remove the list and reset the nonce + if pending.Empty() { + delete(pool.pending, addr) + pool.pendingState.SetNonce(addr, tx.Nonce()) + } else { + // Otherwise update the nonce and postpone any invalidated transactions + pool.pendingState.SetNonce(addr, pending.last) + for _, tx := range invalids { + pool.enqueueTx(tx.Hash(), tx) + } + } + } } - delete(pool.queue[addr], tx.Nonce()) - if len(pool.queue[addr]) == 0 { - delete(pool.queue, addr) + // Transaction is in the future queue + if future := pool.queue[addr]; future != nil { + future.Remove(tx) + if future.Empty() { + delete(pool.queue, addr) + } } } -// checkQueue moves transactions that have become processable from the pool's -// queue to the set of pending transactions. -func (pool *TxPool) checkQueue() { +// promoteExecutables moves transactions that have become processable from the +// future queue to the set of pending transactions. During this process, all +// invalidated transactions (low nonce, low balance) are deleted. +func (pool *TxPool) promoteExecutables() { // Init delayed since tx pool could have been started before any state sync if pool.pendingState == nil { pool.resetState() } - - var promote txQueue - for address, txs := range pool.queue { - currentState, err := pool.currentState() - if err != nil { - glog.Errorf("could not get current state: %v", err) - return + // Retrieve the current state to allow nonce and balance checking + state, err := pool.currentState() + if err != nil { + glog.Errorf("Could not get current state: %v", err) + return + } + // Iterate over all accounts and promote any executable transactions + for addr, list := range pool.queue { + // Drop all transactions that are deemed too old (low nonce) + for _, tx := range list.Forward(state.GetNonce(addr)) { + if glog.V(logger.Core) { + glog.Infof("Removed old queued transaction: %v", tx) + } + delete(pool.all, tx.Hash()) } - balance := currentState.GetBalance(address) - - var ( - guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state) - trueNonce = currentState.GetNonce(address) // nonce known by the last state - ) - promote = promote[:0] - for nonce, tx := range txs { - // Drop processed or out of fund transactions - if nonce < trueNonce || balance.Cmp(tx.Cost()) < 0 { - if glog.V(logger.Core) { - glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx) - } - delete(txs, nonce) - delete(pool.all, tx.Hash()) - - continue + // Drop all transactions that are too costly (low balance) + drops, _ := list.Filter(state.GetBalance(addr)) + for _, tx := range drops { + if glog.V(logger.Core) { + glog.Infof("Removed unpayable queued transaction: %v", tx) } - // Collect the remaining transactions for the next pass. - promote = append(promote, txQueueEntry{address, tx}) + delete(pool.all, tx.Hash()) } - // Find the next consecutive nonce range starting at the current account nonce, - // pushing the guessed nonce forward if we add consecutive transactions. - sort.Sort(promote) - for i, entry := range promote { - // If we reached a gap in the nonces, enforce transaction limit and stop - if entry.Nonce() > guessedNonce { - if len(promote)-i > maxQueued { - if glog.V(logger.Debug) { - glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.Hash().Bytes())) - } - for _, drop := range promote[i+maxQueued:] { - delete(txs, drop.Nonce()) - delete(pool.all, drop.Hash()) - } - } - break + // Gather all executable transactions and promote them + for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { + if glog.V(logger.Core) { + glog.Infof("Promoting queued transaction: %v", tx) } - // Otherwise promote the transaction and move the guess nonce if needed - pool.addTx(address, entry.Transaction) - delete(txs, entry.Nonce()) - - if entry.Nonce() == guessedNonce { - guessedNonce++ + pool.promoteTx(addr, tx.Hash(), tx) + } + // Drop all transactions over the allowed limit + for _, tx := range list.Cap(maxQueued) { + if glog.V(logger.Core) { + glog.Infof("Removed cap-exceeding queued transaction: %v", tx) } + delete(pool.all, tx.Hash()) } // Delete the entire queue entry if it became empty. - if len(txs) == 0 { - delete(pool.queue, address) + if list.Empty() { + delete(pool.queue, addr) } } } -// validatePool removes invalid and processed transactions from the main pool. -// If a transaction is removed for being invalid (e.g. out of funds), all sub- -// sequent (Still valid) transactions are moved back into the future queue. This -// is important to prevent a drained account from DOSing the network with non -// executable transactions. -func (pool *TxPool) validatePool() { +// demoteUnexecutables removes invalid and processed transactions from the pools +// executable/pending queue and any subsequent transactions that become unexecutable +// are moved back into the future queue. +func (pool *TxPool) demoteUnexecutables() { + // Retrieve the current state to allow nonce and balance checking state, err := pool.currentState() if err != nil { glog.V(logger.Info).Infoln("failed to get current state: %v", err) return } - balanceCache := make(map[common.Address]*big.Int) - - // Clean up the pending pool, accumulating invalid nonces - gaps := make(map[common.Address]uint64) + // Iterate over all accounts and demote any non-executable transactions + for addr, list := range pool.pending { + nonce := state.GetNonce(addr) - for addr, txs := range pool.pending { - for nonce, tx := range txs { - // Perform light nonce and balance validation - balance := balanceCache[addr] - if balance == nil { - balance = state.GetBalance(addr) - balanceCache[addr] = balance + // Drop all transactions that are deemed too old (low nonce) + for _, tx := range list.Forward(nonce) { + if glog.V(logger.Core) { + glog.Infof("Removed old pending transaction: %v", tx) } - if past := state.GetNonce(addr) > nonce; past || balance.Cmp(tx.Cost()) < 0 { - // Remove an already past it invalidated transaction - if glog.V(logger.Core) { - glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx) - } - delete(pool.pending[addr], nonce) - if len(pool.pending[addr]) == 0 { - delete(pool.pending, addr) - } - delete(pool.all, tx.Hash()) - - // Track the smallest invalid nonce to postpone subsequent transactions - if !past { - if prev, ok := gaps[addr]; !ok || nonce < prev { - gaps[addr] = nonce - } - } + delete(pool.all, tx.Hash()) + } + // Drop all transactions that are too costly (low balance), and queue any invalids back for later + drops, invalids := list.Filter(state.GetBalance(addr)) + for _, tx := range drops { + if glog.V(logger.Core) { + glog.Infof("Removed unpayable pending transaction: %v", tx) } + delete(pool.all, tx.Hash()) } - } - // Move all transactions after a gap back to the future queue - if len(gaps) > 0 { - for addr, txs := range pool.pending { - for nonce, tx := range txs { - if gap, ok := gaps[addr]; ok && nonce >= gap { - if glog.V(logger.Core) { - glog.Infof("postponed tx (%v) due to introduced gap\n", tx) - } - delete(pool.pending[addr], nonce) - if len(pool.pending[addr]) == 0 { - delete(pool.pending, addr) - } - pool.queueTx(tx.Hash(), tx) - } + for _, tx := range invalids { + if glog.V(logger.Core) { + glog.Infof("Demoting pending transaction: %v", tx) } + pool.enqueueTx(tx.Hash(), tx) + } + // Delete the entire queue entry if it became empty. + if list.Empty() { + delete(pool.pending, addr) } } } -type txQueue []txQueueEntry - -type txQueueEntry struct { - addr common.Address - *types.Transaction -} - -func (q txQueue) Len() int { return len(q) } -func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } -func (q txQueue) Less(i, j int) bool { return q[i].Nonce() < q[j].Nonce() } - // txSet represents a set of transaction hashes in which entries // are automatically dropped after txSetDuration time type txSet struct { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 8aa69233d..ec54d8c0e 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -91,9 +91,9 @@ func TestTransactionQueue(t *testing.T) { from, _ := tx.From() currentState, _ := pool.currentState() currentState.AddBalance(from, big.NewInt(1000)) - pool.queueTx(tx.Hash(), tx) + pool.enqueueTx(tx.Hash(), tx) - pool.checkQueue() + pool.promoteExecutables() if len(pool.pending) != 1 { t.Error("expected valid txs to be 1 is", len(pool.pending)) } @@ -101,14 +101,14 @@ func TestTransactionQueue(t *testing.T) { tx = transaction(1, big.NewInt(100), key) from, _ = tx.From() currentState.SetNonce(from, 2) - pool.queueTx(tx.Hash(), tx) - pool.checkQueue() - if _, ok := pool.pending[from][tx.Nonce()]; ok { + pool.enqueueTx(tx.Hash(), tx) + pool.promoteExecutables() + if _, ok := pool.pending[from].items[tx.Nonce()]; ok { t.Error("expected transaction to be in tx pool") } - if len(pool.queue[from]) > 0 { - t.Error("expected transaction queue to be empty. is", len(pool.queue[from])) + if len(pool.queue) > 0 { + t.Error("expected transaction queue to be empty. is", len(pool.queue)) } pool, key = setupTxPool() @@ -118,17 +118,17 @@ func TestTransactionQueue(t *testing.T) { from, _ = tx1.From() currentState, _ = pool.currentState() currentState.AddBalance(from, big.NewInt(1000)) - pool.queueTx(tx1.Hash(), tx1) - pool.queueTx(tx2.Hash(), tx2) - pool.queueTx(tx3.Hash(), tx3) + pool.enqueueTx(tx1.Hash(), tx1) + pool.enqueueTx(tx2.Hash(), tx2) + pool.enqueueTx(tx3.Hash(), tx3) - pool.checkQueue() + pool.promoteExecutables() if len(pool.pending) != 1 { t.Error("expected tx pool to be 1, got", len(pool.pending)) } - if len(pool.queue[from]) != 2 { - t.Error("expected len(queue) == 2, got", len(pool.queue[from])) + if pool.queue[from].Len() != 2 { + t.Error("expected len(queue) == 2, got", pool.queue[from].Len()) } } @@ -138,24 +138,21 @@ func TestRemoveTx(t *testing.T) { from, _ := tx.From() currentState, _ := pool.currentState() currentState.AddBalance(from, big.NewInt(1)) - pool.queueTx(tx.Hash(), tx) - pool.addTx(from, tx) + + pool.enqueueTx(tx.Hash(), tx) + pool.promoteTx(from, tx.Hash(), tx) if len(pool.queue) != 1 { t.Error("expected queue to be 1, got", len(pool.queue)) } - if len(pool.pending) != 1 { - t.Error("expected txs to be 1, got", len(pool.pending)) + t.Error("expected pending to be 1, got", len(pool.pending)) } - - pool.RemoveTx(tx.Hash()) - + pool.Remove(tx.Hash()) if len(pool.queue) > 0 { t.Error("expected queue to be 0, got", len(pool.queue)) } - if len(pool.pending) > 0 { - t.Error("expected txs to be 0, got", len(pool.pending)) + t.Error("expected pending to be 0, got", len(pool.pending)) } } @@ -188,7 +185,7 @@ func TestTransactionChainFork(t *testing.T) { if err := pool.add(tx); err != nil { t.Error("didn't expect error", err) } - pool.RemoveTransactions([]*types.Transaction{tx}) + pool.RemoveBatch([]*types.Transaction{tx}) // reset the pool's internal state resetState() @@ -221,22 +218,22 @@ func TestTransactionDoubleNonce(t *testing.T) { if err := pool.add(tx2); err != nil { t.Error("didn't expect error", err) } - pool.checkQueue() - if len(pool.pending[addr]) != 1 { - t.Error("expected 1 pending transactions, got", len(pool.pending)) + pool.promoteExecutables() + if pool.pending[addr].Len() != 1 { + t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } - if tx := pool.pending[addr][0]; tx.Hash() != tx2.Hash() { + if tx := pool.pending[addr].items[0]; tx.Hash() != tx2.Hash() { t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) } // Add the thid transaction and ensure it's not saved (smaller price) if err := pool.add(tx3); err != nil { t.Error("didn't expect error", err) } - pool.checkQueue() - if len(pool.pending[addr]) != 1 { - t.Error("expected 1 pending transactions, got", len(pool.pending)) + pool.promoteExecutables() + if pool.pending[addr].Len() != 1 { + t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } - if tx := pool.pending[addr][0]; tx.Hash() != tx2.Hash() { + if tx := pool.pending[addr].items[0]; tx.Hash() != tx2.Hash() { t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) } // Ensure the total transaction count is correct @@ -254,11 +251,11 @@ func TestMissingNonce(t *testing.T) { if err := pool.add(tx); err != nil { t.Error("didn't expect error", err) } - if len(pool.pending[addr]) != 0 { - t.Error("expected 0 pending transactions, got", len(pool.pending[addr])) + if len(pool.pending) != 0 { + t.Error("expected 0 pending transactions, got", len(pool.pending)) } - if len(pool.queue[addr]) != 1 { - t.Error("expected 1 queued transaction, got", len(pool.queue[addr])) + if pool.queue[addr].Len() != 1 { + t.Error("expected 1 queued transaction, got", pool.queue[addr].Len()) } if len(pool.all) != 1 { t.Error("expected 1 total transactions, got", len(pool.all)) @@ -293,8 +290,8 @@ func TestRemovedTxEvent(t *testing.T) { currentState.AddBalance(from, big.NewInt(1000000000000)) pool.eventMux.Post(RemovedTransactionEvent{types.Transactions{tx}}) pool.eventMux.Post(ChainHeadEvent{nil}) - if len(pool.pending[from]) != 1 { - t.Error("expected 1 pending tx, got", len(pool.pending[from])) + if pool.pending[from].Len() != 1 { + t.Error("expected 1 pending tx, got", pool.pending[from].Len()) } if len(pool.all) != 1 { t.Error("expected 1 total transactions, got", len(pool.all)) @@ -318,27 +315,27 @@ func TestTransactionDropping(t *testing.T) { tx10 = transaction(10, big.NewInt(100), key) tx11 = transaction(11, big.NewInt(200), key) ) - pool.addTx(account, tx0) - pool.addTx(account, tx1) - pool.queueTx(tx10.Hash(), tx10) - pool.queueTx(tx11.Hash(), tx11) + pool.promoteTx(account, tx0.Hash(), tx0) + pool.promoteTx(account, tx1.Hash(), tx1) + pool.enqueueTx(tx10.Hash(), tx10) + pool.enqueueTx(tx11.Hash(), tx11) // Check that pre and post validations leave the pool as is - if len(pool.pending[account]) != 2 { - t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending[account]), 2) + if pool.pending[account].Len() != 2 { + t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 2) } - if len(pool.queue[account]) != 2 { - t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue[account]), 2) + if pool.queue[account].Len() != 2 { + t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 2) } if len(pool.all) != 4 { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) } pool.resetState() - if len(pool.pending[account]) != 2 { - t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending[account]), 2) + if pool.pending[account].Len() != 2 { + t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 2) } - if len(pool.queue[account]) != 2 { - t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue[account]), 2) + if pool.queue[account].Len() != 2 { + t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 2) } if len(pool.all) != 4 { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) @@ -347,16 +344,16 @@ func TestTransactionDropping(t *testing.T) { state.AddBalance(account, big.NewInt(-750)) pool.resetState() - if _, ok := pool.pending[account][tx0.Nonce()]; !ok { + if _, ok := pool.pending[account].items[tx0.Nonce()]; !ok { t.Errorf("funded pending transaction missing: %v", tx0) } - if _, ok := pool.pending[account][tx1.Nonce()]; ok { + if _, ok := pool.pending[account].items[tx1.Nonce()]; ok { t.Errorf("out-of-fund pending transaction present: %v", tx1) } - if _, ok := pool.queue[account][tx10.Nonce()]; !ok { + if _, ok := pool.queue[account].items[tx10.Nonce()]; !ok { t.Errorf("funded queued transaction missing: %v", tx10) } - if _, ok := pool.queue[account][tx11.Nonce()]; ok { + if _, ok := pool.queue[account].items[tx11.Nonce()]; ok { t.Errorf("out-of-fund queued transaction present: %v", tx11) } if len(pool.all) != 2 { @@ -384,25 +381,25 @@ func TestTransactionPostponing(t *testing.T) { } else { tx = transaction(uint64(i), big.NewInt(500), key) } - pool.addTx(account, tx) + pool.promoteTx(account, tx.Hash(), tx) txns = append(txns, tx) } // Check that pre and post validations leave the pool as is - if len(pool.pending[account]) != len(txns) { - t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending[account]), len(txns)) + if pool.pending[account].Len() != len(txns) { + t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), len(txns)) } - if len(pool.queue[account]) != 0 { - t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue[account]), 0) + if len(pool.queue) != 0 { + t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 0) } if len(pool.all) != len(txns) { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns)) } pool.resetState() - if len(pool.pending[account]) != len(txns) { - t.Errorf("pending transaction mismatch: have %d, want %d", len(pool.pending[account]), len(txns)) + if pool.pending[account].Len() != len(txns) { + t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), len(txns)) } - if len(pool.queue[account]) != 0 { - t.Errorf("queued transaction mismatch: have %d, want %d", len(pool.queue[account]), 0) + if len(pool.queue) != 0 { + t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 0) } if len(pool.all) != len(txns) { t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txns)) @@ -411,25 +408,25 @@ func TestTransactionPostponing(t *testing.T) { state.AddBalance(account, big.NewInt(-750)) pool.resetState() - if _, ok := pool.pending[account][txns[0].Nonce()]; !ok { + if _, ok := pool.pending[account].items[txns[0].Nonce()]; !ok { t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txns[0]) } - if _, ok := pool.queue[account][txns[0].Nonce()]; ok { + if _, ok := pool.queue[account].items[txns[0].Nonce()]; ok { t.Errorf("tx %d: valid and funded transaction present in future queue: %v", 0, txns[0]) } for i, tx := range txns[1:] { if i%2 == 1 { - if _, ok := pool.pending[account][tx.Nonce()]; ok { + if _, ok := pool.pending[account].items[tx.Nonce()]; ok { t.Errorf("tx %d: valid but future transaction present in pending pool: %v", i+1, tx) } - if _, ok := pool.queue[account][tx.Nonce()]; !ok { + if _, ok := pool.queue[account].items[tx.Nonce()]; !ok { t.Errorf("tx %d: valid but future transaction missing from future queue: %v", i+1, tx) } } else { - if _, ok := pool.pending[account][tx.Nonce()]; ok { + if _, ok := pool.pending[account].items[tx.Nonce()]; ok { t.Errorf("tx %d: out-of-fund transaction present in pending pool: %v", i+1, tx) } - if _, ok := pool.queue[account][tx.Nonce()]; ok { + if _, ok := pool.queue[account].items[tx.Nonce()]; ok { t.Errorf("tx %d: out-of-fund transaction present in future queue: %v", i+1, tx) } } @@ -458,12 +455,12 @@ func TestTransactionQueueLimiting(t *testing.T) { t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, len(pool.pending), 0) } if i <= maxQueued { - if len(pool.queue[account]) != int(i) { - t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, len(pool.queue[account]), i) + if pool.queue[account].Len() != int(i) { + t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), i) } } else { - if len(pool.queue[account]) != maxQueued { - t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, len(pool.queue[account]), maxQueued) + if pool.queue[account].Len() != maxQueued { + t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, pool.queue[account].Len(), maxQueued) } } } @@ -488,11 +485,11 @@ func TestTransactionPendingLimiting(t *testing.T) { if err := pool.Add(transaction(i, big.NewInt(100000), key)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } - if len(pool.pending[account]) != int(i)+1 { - t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, len(pool.pending[account]), i+1) + if pool.pending[account].Len() != int(i)+1 { + t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, pool.pending[account].Len(), i+1) } - if len(pool.queue[account]) != 0 { - t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, len(pool.queue[account]), 0) + if len(pool.queue) != 0 { + t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), 0) } } if len(pool.all) != maxQueued+5 { @@ -517,7 +514,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } } - // Add a batch of transactions to a pool in one bit batch + // Add a batch of transactions to a pool in one big batch pool2, key2 := setupTxPool() account2, _ := transaction(0, big.NewInt(0), key2).From() state2, _ := pool2.currentState() @@ -527,14 +524,14 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { for i := uint64(0); i < maxQueued+5; i++ { txns = append(txns, transaction(origin+i, big.NewInt(100000), key2)) } - pool2.AddTransactions(txns) + pool2.AddBatch(txns) // Ensure the batch optimization honors the same pool mechanics if len(pool1.pending) != len(pool2.pending) { t.Errorf("pending transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.pending), len(pool2.pending)) } - if len(pool1.queue[account1]) != len(pool2.queue[account2]) { - t.Errorf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.queue[account1]), len(pool2.queue[account2])) + if len(pool1.queue) != len(pool2.queue) { + t.Errorf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.queue), len(pool2.queue)) } if len(pool1.all) != len(pool2.all) { t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", len(pool1.all), len(pool2.all)) @@ -543,11 +540,11 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { // Benchmarks the speed of validating the contents of the pending queue of the // transaction pool. -func BenchmarkValidatePool100(b *testing.B) { benchmarkValidatePool(b, 100) } -func BenchmarkValidatePool1000(b *testing.B) { benchmarkValidatePool(b, 1000) } -func BenchmarkValidatePool10000(b *testing.B) { benchmarkValidatePool(b, 10000) } +func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) } +func BenchmarkPendingDemotion1000(b *testing.B) { benchmarkPendingDemotion(b, 1000) } +func BenchmarkPendingDemotion10000(b *testing.B) { benchmarkPendingDemotion(b, 10000) } -func benchmarkValidatePool(b *testing.B, size int) { +func benchmarkPendingDemotion(b *testing.B, size int) { // Add a batch of transactions to a pool one by one pool, key := setupTxPool() account, _ := transaction(0, big.NewInt(0), key).From() @@ -556,22 +553,22 @@ func benchmarkValidatePool(b *testing.B, size int) { for i := 0; i < size; i++ { tx := transaction(uint64(i), big.NewInt(100000), key) - pool.addTx(account, tx) + pool.promoteTx(account, tx.Hash(), tx) } // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.validatePool() + pool.demoteUnexecutables() } } // Benchmarks the speed of scheduling the contents of the future queue of the // transaction pool. -func BenchmarkCheckQueue100(b *testing.B) { benchmarkCheckQueue(b, 100) } -func BenchmarkCheckQueue1000(b *testing.B) { benchmarkCheckQueue(b, 1000) } -func BenchmarkCheckQueue10000(b *testing.B) { benchmarkCheckQueue(b, 10000) } +func BenchmarkFuturePromotion100(b *testing.B) { benchmarkFuturePromotion(b, 100) } +func BenchmarkFuturePromotion1000(b *testing.B) { benchmarkFuturePromotion(b, 1000) } +func BenchmarkFuturePromotion10000(b *testing.B) { benchmarkFuturePromotion(b, 10000) } -func benchmarkCheckQueue(b *testing.B, size int) { +func benchmarkFuturePromotion(b *testing.B, size int) { // Add a batch of transactions to a pool one by one pool, key := setupTxPool() account, _ := transaction(0, big.NewInt(0), key).From() @@ -580,11 +577,56 @@ func benchmarkCheckQueue(b *testing.B, size int) { for i := 0; i < size; i++ { tx := transaction(uint64(1+i), big.NewInt(100000), key) - pool.queueTx(tx.Hash(), tx) + pool.enqueueTx(tx.Hash(), tx) } // Benchmark the speed of pool validation b.ResetTimer() for i := 0; i < b.N; i++ { - pool.checkQueue() + pool.promoteExecutables() + } +} + +// Benchmarks the speed of iterative transaction insertion. +func BenchmarkPoolInsert(b *testing.B) { + // Generate a batch of transactions to enqueue into the pool + pool, key := setupTxPool() + account, _ := transaction(0, big.NewInt(0), key).From() + state, _ := pool.currentState() + state.AddBalance(account, big.NewInt(1000000)) + + txs := make(types.Transactions, b.N) + for i := 0; i < b.N; i++ { + txs[i] = transaction(uint64(i), big.NewInt(100000), key) + } + // Benchmark importing the transactions into the queue + b.ResetTimer() + for _, tx := range txs { + pool.Add(tx) + } +} + +// Benchmarks the speed of batched transaction insertion. +func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) } +func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) } +func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000) } + +func benchmarkPoolBatchInsert(b *testing.B, size int) { + // Generate a batch of transactions to enqueue into the pool + pool, key := setupTxPool() + account, _ := transaction(0, big.NewInt(0), key).From() + state, _ := pool.currentState() + state.AddBalance(account, big.NewInt(1000000)) + + batches := make([]types.Transactions, b.N) + for i := 0; i < b.N; i++ { + batches[i] = make(types.Transactions, size) + for j := 0; j < size; j++ { + batches[i][j] = transaction(uint64(size*i+j), big.NewInt(100000), key) + } + } + // Benchmark importing the transactions into the queue + b.ResetTimer() + for _, batch := range batches { + pool.AddBatch(batch) } } diff --git a/core/types/transaction.go b/core/types/transaction.go index d369d7772..af48e4d07 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -24,7 +24,6 @@ import ( "fmt" "io" "math/big" - "sort" "sync/atomic" "github.com/ethereum/go-ethereum/common" @@ -439,37 +438,29 @@ func (s *TxByPrice) Pop() interface{} { // sender accounts and sorts them by nonce. After the account nonce ordering is // satisfied, the results are merged back together by price, always comparing only // the head transaction from each account. This is done via a heap to keep it fast. -func SortByPriceAndNonce(txs []*Transaction) { - // Separate the transactions by account and sort by nonce - byNonce := make(map[common.Address][]*Transaction) - for _, tx := range txs { - acc, _ := tx.From() // we only sort valid txs so this cannot fail - byNonce[acc] = append(byNonce[acc], tx) - } - for _, accTxs := range byNonce { - sort.Sort(TxByNonce(accTxs)) - } +func SortByPriceAndNonce(txs map[common.Address]Transactions) Transactions { // Initialize a price based heap with the head transactions - byPrice := make(TxByPrice, 0, len(byNonce)) - for acc, accTxs := range byNonce { + byPrice := make(TxByPrice, 0, len(txs)) + for acc, accTxs := range txs { byPrice = append(byPrice, accTxs[0]) - byNonce[acc] = accTxs[1:] + txs[acc] = accTxs[1:] } heap.Init(&byPrice) // Merge by replacing the best with the next from the same account - txs = txs[:0] + var sorted Transactions for len(byPrice) > 0 { // Retrieve the next best transaction by price best := heap.Pop(&byPrice).(*Transaction) // Push in its place the next transaction from the same account acc, _ := best.From() // we only sort valid txs so this cannot fail - if accTxs, ok := byNonce[acc]; ok && len(accTxs) > 0 { + if accTxs, ok := txs[acc]; ok && len(accTxs) > 0 { heap.Push(&byPrice, accTxs[0]) - byNonce[acc] = accTxs[1:] + txs[acc] = accTxs[1:] } // Accumulate the best priced transaction - txs = append(txs, best) + sorted = append(sorted, best) } + return sorted } diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index 62420e71f..c6e6e3790 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -128,15 +128,16 @@ func TestTransactionPriceNonceSort(t *testing.T) { keys[i], _ = crypto.GenerateKey() } // Generate a batch of transactions with overlapping values, but shifted nonces - txs := []*Transaction{} + groups := map[common.Address]Transactions{} for start, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) for i := 0; i < 25; i++ { tx, _ := NewTransaction(uint64(start+i), common.Address{}, big.NewInt(100), big.NewInt(100), big.NewInt(int64(start+i)), nil).SignECDSA(key) - txs = append(txs, tx) + groups[addr] = append(groups[addr], tx) } } // Sort the transactions and cross check the nonce ordering - SortByPriceAndNonce(txs) + txs := SortByPriceAndNonce(groups) for i, txi := range txs { fromi, _ := txi.From() diff --git a/eth/api_backend.go b/eth/api_backend.go index e19254374..4f8f06529 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -118,21 +118,25 @@ func (b *EthApiBackend) RemoveTx(txHash common.Hash) { b.eth.txMu.Lock() defer b.eth.txMu.Unlock() - b.eth.txPool.RemoveTx(txHash) + b.eth.txPool.Remove(txHash) } func (b *EthApiBackend) GetPoolTransactions() types.Transactions { b.eth.txMu.Lock() defer b.eth.txMu.Unlock() - return b.eth.txPool.GetTransactions() + var txs types.Transactions + for _, batch := range b.eth.txPool.Pending() { + txs = append(txs, batch...) + } + return txs } -func (b *EthApiBackend) GetPoolTransaction(txHash common.Hash) *types.Transaction { +func (b *EthApiBackend) GetPoolTransaction(hash common.Hash) *types.Transaction { b.eth.txMu.Lock() defer b.eth.txMu.Unlock() - return b.eth.txPool.GetTransaction(txHash) + return b.eth.txPool.Get(hash) } func (b *EthApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { @@ -149,7 +153,7 @@ func (b *EthApiBackend) Stats() (pending int, queued int) { return b.eth.txPool.Stats() } -func (b *EthApiBackend) TxPoolContent() (map[common.Address]core.TxList, map[common.Address]core.TxList) { +func (b *EthApiBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { b.eth.txMu.Lock() defer b.eth.txMu.Unlock() diff --git a/eth/handler.go b/eth/handler.go index 570c79dac..e6c547c02 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -677,7 +677,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } p.MarkTransaction(tx.Hash()) } - pm.txpool.AddTransactions(txs) + pm.txpool.AddBatch(txs) default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) diff --git a/eth/helper_test.go b/eth/helper_test.go index 28ff69b17..732fe89ee 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -23,6 +23,7 @@ import ( "crypto/ecdsa" "crypto/rand" "math/big" + "sort" "sync" "testing" @@ -89,9 +90,9 @@ type testTxPool struct { lock sync.RWMutex // Protects the transaction pool } -// AddTransactions appends a batch of transactions to the pool, and notifies any +// AddBatch appends a batch of transactions to the pool, and notifies any // listeners if the addition channel is non nil -func (p *testTxPool) AddTransactions(txs []*types.Transaction) { +func (p *testTxPool) AddBatch(txs []*types.Transaction) { p.lock.Lock() defer p.lock.Unlock() @@ -101,15 +102,20 @@ func (p *testTxPool) AddTransactions(txs []*types.Transaction) { } } -// GetTransactions returns all the transactions known to the pool -func (p *testTxPool) GetTransactions() types.Transactions { +// Pending returns all the transactions known to the pool +func (p *testTxPool) Pending() map[common.Address]types.Transactions { p.lock.RLock() defer p.lock.RUnlock() - txs := make([]*types.Transaction, len(p.pool)) - copy(txs, p.pool) - - return txs + batches := make(map[common.Address]types.Transactions) + for _, tx := range p.pool { + from, _ := tx.From() + batches[from] = append(batches[from], tx) + } + for _, batch := range batches { + sort.Sort(types.TxByNonce(batch)) + } + return batches } // newTestTransaction create a new dummy transaction. diff --git a/eth/protocol.go b/eth/protocol.go index 69b3be578..3f65c204b 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -97,12 +97,12 @@ var errorToString = map[int]string{ } type txPool interface { - // AddTransactions should add the given transactions to the pool. - AddTransactions([]*types.Transaction) + // AddBatch should add the given transactions to the pool. + AddBatch([]*types.Transaction) - // GetTransactions should return pending transactions. + // Pending should return pending transactions. // The slice should be modifiable by the caller. - GetTransactions() types.Transactions + Pending() map[common.Address]types.Transactions } // statusData is the network packet for the status message. diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 4633344da..0aac19f43 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -130,7 +130,7 @@ func testSendTransactions(t *testing.T, protocol int) { for nonce := range alltxs { alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), txsize) } - pm.txpool.AddTransactions(alltxs) + pm.txpool.AddBatch(alltxs) // Connect several peers. They should all receive the pending transactions. var wg sync.WaitGroup diff --git a/eth/sync.go b/eth/sync.go index e1946edda..6584bb1e2 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -45,7 +45,10 @@ type txsync struct { // syncTransactions starts sending all currently pending transactions to the given peer. func (pm *ProtocolManager) syncTransactions(p *peer) { - txs := pm.txpool.GetTransactions() + var txs types.Transactions + for _, batch := range pm.txpool.Pending() { + txs = append(txs, batch...) + } if len(txs) == 0 { return } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 673ab0546..0aa3da18d 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -58,7 +58,7 @@ type Backend interface { GetPoolTransaction(txHash common.Hash) *types.Transaction GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) Stats() (pending int, queued int) - TxPoolContent() (map[common.Address]core.TxList, map[common.Address]core.TxList) + TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) } type State interface { diff --git a/miner/worker.go b/miner/worker.go index f243fe799..b46b368ea 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -501,8 +501,7 @@ func (self *worker) commitNewWork() { */ //approach 2 - transactions := self.eth.TxPool().GetTransactions() - types.SortByPriceAndNonce(transactions) + transactions := types.SortByPriceAndNonce(self.eth.TxPool().Pending()) /* // approach 3 // commit transactions for this run. @@ -533,8 +532,8 @@ func (self *worker) commitNewWork() { work.commitTransactions(self.mux, transactions, self.gasPrice, self.chain) - self.eth.TxPool().RemoveTransactions(work.lowGasTxs) - self.eth.TxPool().RemoveTransactions(work.failedTxs) + self.eth.TxPool().RemoveBatch(work.lowGasTxs) + self.eth.TxPool().RemoveBatch(work.failedTxs) // compute uncles for the new block. var ( -- cgit From affffb39b366321e47784e48c469da9584ceb92c Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Tue, 9 Aug 2016 14:54:36 +0300 Subject: core/types, miner: switch over to the grouped tx sets --- core/types/transaction.go | 79 +++++++++++++++---------- core/types/transaction_test.go | 11 +++- miner/worker.go | 127 +++++++++++++---------------------------- 3 files changed, 98 insertions(+), 119 deletions(-) diff --git a/core/types/transaction.go b/core/types/transaction.go index af48e4d07..5bb599479 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -426,41 +426,58 @@ func (s *TxByPrice) Pop() interface{} { return x } -// SortByPriceAndNonce sorts the transactions by price in such a way that the -// nonce orderings within a single account are maintained. -// -// Note, this is not as trivial as it seems from the first look as there are three -// different criteria that need to be taken into account (price, nonce, account -// match), which cannot be done with any plain sorting method, as certain items -// cannot be compared without context. +// TransactionsByPriceAndNonce represents a set of transactions that can return +// transactions in a profit-maximising sorted order, while supporting removing +// entire batches of transactions for non-executable accounts. +type TransactionsByPriceAndNonce struct { + txs map[common.Address]Transactions // Per account nonce-sorted list of transactions + heads TxByPrice // Next transaction for each unique account (price heap) +} + +// NewTransactionsByPriceAndNonce creates a transaction set that can retrieve +// price sorted transactions in a nonce-honouring way. // -// This method first sorts the separates the list of transactions into individual -// sender accounts and sorts them by nonce. After the account nonce ordering is -// satisfied, the results are merged back together by price, always comparing only -// the head transaction from each account. This is done via a heap to keep it fast. -func SortByPriceAndNonce(txs map[common.Address]Transactions) Transactions { +// Note, the input map is reowned so the caller should not interact any more with +// if after providng it to the constructor. +func NewTransactionsByPriceAndNonce(txs map[common.Address]Transactions) *TransactionsByPriceAndNonce { // Initialize a price based heap with the head transactions - byPrice := make(TxByPrice, 0, len(txs)) + heads := make(TxByPrice, 0, len(txs)) for acc, accTxs := range txs { - byPrice = append(byPrice, accTxs[0]) + heads = append(heads, accTxs[0]) txs[acc] = accTxs[1:] } - heap.Init(&byPrice) - - // Merge by replacing the best with the next from the same account - var sorted Transactions - for len(byPrice) > 0 { - // Retrieve the next best transaction by price - best := heap.Pop(&byPrice).(*Transaction) - - // Push in its place the next transaction from the same account - acc, _ := best.From() // we only sort valid txs so this cannot fail - if accTxs, ok := txs[acc]; ok && len(accTxs) > 0 { - heap.Push(&byPrice, accTxs[0]) - txs[acc] = accTxs[1:] - } - // Accumulate the best priced transaction - sorted = append(sorted, best) + heap.Init(&heads) + + // Assemble and return the transaction set + return &TransactionsByPriceAndNonce{ + txs: txs, + heads: heads, + } +} + +// Peek returns the next transaction by price. +func (t *TransactionsByPriceAndNonce) Peek() *Transaction { + if len(t.heads) == 0 { + return nil + } + return t.heads[0] +} + +// Shift replaces the current best head with the next one from the same account. +func (t *TransactionsByPriceAndNonce) Shift() { + acc, _ := t.heads[0].From() // we only sort valid txs so this cannot fail + + if txs, ok := t.txs[acc]; ok && len(txs) > 0 { + t.heads[0], t.txs[acc] = txs[0], txs[1:] + heap.Fix(&t.heads, 0) + } else { + heap.Pop(&t.heads) } - return sorted +} + +// Pop removes the best transaction, *not* replacing it with the next one from +// the same account. This should be used when a transaction cannot be executed +// and hence all subsequent ones should be discarded from the same account. +func (t *TransactionsByPriceAndNonce) Pop() { + heap.Pop(&t.heads) } diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index c6e6e3790..8b0b02c3e 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -137,7 +137,16 @@ func TestTransactionPriceNonceSort(t *testing.T) { } } // Sort the transactions and cross check the nonce ordering - txs := SortByPriceAndNonce(groups) + txset := NewTransactionsByPriceAndNonce(groups) + + txs := Transactions{} + for { + if tx := txset.Peek(); tx != nil { + txs = append(txs, tx) + txset.Shift() + } + break + } for i, txi := range txs { fromi, _ := txi.From() diff --git a/miner/worker.go b/miner/worker.go index b46b368ea..d899622b6 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -63,18 +63,16 @@ type uint64RingBuffer struct { // Work is the workers current environment and holds // all of the current state information type Work struct { - config *core.ChainConfig - state *state.StateDB // apply state changes here - ancestors *set.Set // ancestor set (used for checking uncle parent validity) - family *set.Set // family set (used for checking uncle invalidity) - uncles *set.Set // uncle set - tcount int // tx count in cycle - ignoredTransactors *set.Set - lowGasTransactors *set.Set - ownedAccounts *set.Set - lowGasTxs types.Transactions - failedTxs types.Transactions - localMinedBlocks *uint64RingBuffer // the most recent block numbers that were mined locally (used to check block inclusion) + config *core.ChainConfig + state *state.StateDB // apply state changes here + ancestors *set.Set // ancestor set (used for checking uncle parent validity) + family *set.Set // family set (used for checking uncle invalidity) + uncles *set.Set // uncle set + tcount int // tx count in cycle + ownedAccounts *set.Set + lowGasTxs types.Transactions + failedTxs types.Transactions + localMinedBlocks *uint64RingBuffer // the most recent block numbers that were mined locally (used to check block inclusion) Block *types.Block // the new block @@ -236,7 +234,12 @@ func (self *worker) update() { // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() - self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain) + + acc, _ := ev.Tx.From() + txs := map[common.Address]types.Transactions{acc: types.Transactions{ev.Tx}} + txset := types.NewTransactionsByPriceAndNonce(txs) + + self.current.commitTransactions(self.mux, txset, self.gasPrice, self.chain) self.currentMu.Unlock() } } @@ -384,8 +387,6 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error // Keep track of transactions which return errors so they can be removed work.tcount = 0 - work.ignoredTransactors = set.New() - work.lowGasTransactors = set.New() work.ownedAccounts = accountAddressesSet(accounts) if self.current != nil { work.localMinedBlocks = self.current.localMinedBlocks @@ -494,43 +495,8 @@ func (self *worker) commitNewWork() { if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 { core.ApplyDAOHardFork(work.state) } - - /* //approach 1 - transactions := self.eth.TxPool().GetTransactions() - sort.Sort(types.TxByNonce(transactions)) - */ - - //approach 2 - transactions := types.SortByPriceAndNonce(self.eth.TxPool().Pending()) - - /* // approach 3 - // commit transactions for this run. - txPerOwner := make(map[common.Address]types.Transactions) - // Sort transactions by owner - for _, tx := range self.eth.TxPool().GetTransactions() { - from, _ := tx.From() // we can ignore the sender error - txPerOwner[from] = append(txPerOwner[from], tx) - } - var ( - singleTxOwner types.Transactions - multiTxOwner types.Transactions - ) - // Categorise transactions by - // 1. 1 owner tx per block - // 2. multi txs owner per block - for _, txs := range txPerOwner { - if len(txs) == 1 { - singleTxOwner = append(singleTxOwner, txs[0]) - } else { - multiTxOwner = append(multiTxOwner, txs...) - } - } - sort.Sort(types.TxByPrice(singleTxOwner)) - sort.Sort(types.TxByNonce(multiTxOwner)) - transactions := append(singleTxOwner, multiTxOwner...) - */ - - work.commitTransactions(self.mux, transactions, self.gasPrice, self.chain) + txs := types.NewTransactionsByPriceAndNonce(self.eth.TxPool().Pending()) + work.commitTransactions(self.mux, txs, self.gasPrice, self.chain) self.eth.TxPool().RemoveBatch(work.lowGasTxs) self.eth.TxPool().RemoveBatch(work.failedTxs) @@ -591,64 +557,51 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error { return nil } -func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) { +func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, gasPrice *big.Int, bc *core.BlockChain) { gp := new(core.GasPool).AddGas(env.header.GasLimit) var coalescedLogs vm.Logs - for _, tx := range transactions { + for { + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + break + } // Error may be ignored here. The error has already been checked // during transaction acceptance is the transaction pool. from, _ := tx.From() - // Check if it falls within margin. Txs from owned accounts are always processed. + // Ignore any transactions (and accounts subsequently) with low gas limits if tx.GasPrice().Cmp(gasPrice) < 0 && !env.ownedAccounts.Has(from) { - // ignore the transaction and transactor. We ignore the transactor - // because nonce will fail after ignoring this transaction so there's - // no point - env.lowGasTransactors.Add(from) - - glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(gasPrice), from[:4]) - } + // Pop the current low-priced transaction without shifting in the next from the account + glog.V(logger.Info).Infof("Transaction (%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(gasPrice), from[:4]) - // Continue with the next transaction if the transaction sender is included in - // the low gas tx set. This will also remove the tx and all sequential transaction - // from this transactor - if env.lowGasTransactors.Has(from) { - // add tx to the low gas set. This will be removed at the end of the run - // owned accounts are ignored - if !env.ownedAccounts.Has(from) { - env.lowGasTxs = append(env.lowGasTxs, tx) - } - continue - } + env.lowGasTxs = append(env.lowGasTxs, tx) + txs.Pop() - // Move on to the next transaction when the transactor is in ignored transactions set - // This may occur when a transaction hits the gas limit. When a gas limit is hit and - // the transaction is processed (that could potentially be included in the block) it - // will throw a nonce error because the previous transaction hasn't been processed. - // Therefor we need to ignore any transaction after the ignored one. - if env.ignoredTransactors.Has(from) { continue } - + // Start executing the transaction env.state.StartRecord(tx.Hash(), common.Hash{}, 0) err, logs := env.commitTransaction(tx, bc, gp) switch { case core.IsGasLimitErr(err): - // ignore the transactor so no nonce errors will be thrown for this account - // next time the worker is run, they'll be picked up again. - env.ignoredTransactors.Add(from) + // Pop the current out-of-gas transaction without shifting in the next from the account glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4]) + txs.Pop() case err != nil: + // Pop the current failed transaction without shifting in the next from the account + glog.V(logger.Detail).Infof("Transaction (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) env.failedTxs = append(env.failedTxs, tx) - if glog.V(logger.Detail) { - glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) - } + txs.Pop() + default: - env.tcount++ + // Everything ok, collect the logs and shift in the next transaction from the same account coalescedLogs = append(coalescedLogs, logs...) + env.tcount++ + txs.Shift() } } if len(coalescedLogs) > 0 || env.tcount > 0 { -- cgit From a183ea29f9313cb1d00ed8f73bfbc4ae51e9cb04 Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Wed, 17 Aug 2016 16:53:15 +0300 Subject: core: add upper bound on the queued transctions --- core/tx_list.go | 20 +++++----- core/tx_pool.go | 108 ++++++++++++++++++++++++++++++++++++++++++++++----- core/tx_pool_test.go | 108 ++++++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 203 insertions(+), 33 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index e30fee38f..8c69331cc 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -52,11 +52,11 @@ func (h *nonceHeap) Pop() interface{} { type txList struct { strict bool // Whether nonces are strictly continuous or not items map[uint64]*types.Transaction // Hash map storing the transaction data - cache types.Transactions // cache of the transactions already sorted + cache types.Transactions // Cache of the transactions already sorted first uint64 // Nonce of the lowest stored transaction (strict mode) last uint64 // Nonce of the highest stored transaction (strict mode) - index *nonceHeap // Heap of nonces of all teh stored transactions (non-strict mode) + index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode) costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance) } @@ -73,8 +73,8 @@ func newTxList(strict bool) *txList { } } -// Add tries to inserts a new transaction into the list, returning whether the -// transaction was acceped, and if yes, any previous transaction it replaced. +// Add tries to insert a new transaction into the list, returning whether the +// transaction was accepted, and if yes, any previous transaction it replaced. // // In case of strict lists (contiguous nonces) the nonce boundaries are updated // appropriately with the new transaction. Otherwise (gapped nonces) the heap of @@ -146,10 +146,10 @@ func (l *txList) Forward(threshold uint64) types.Transactions { // // This method uses the cached costcap to quickly decide if there's even a point // in calculating all the costs or if the balance covers all. If the threshold is -// loewr than the costcap, the costcap will be reset to a new high after removing +// lower than the costcap, the costcap will be reset to a new high after removing // expensive the too transactions. func (l *txList) Filter(threshold *big.Int) (types.Transactions, types.Transactions) { - // If all transactions are blow the threshold, short circuit + // If all transactions are below the threshold, short circuit if l.costcap.Cmp(threshold) <= 0 { return nil, nil } @@ -195,7 +195,7 @@ func (l *txList) Filter(threshold *big.Int) (types.Transactions, types.Transacti } // Cap places a hard limit on the number of items, returning all transactions -// exceeding tht limit. +// exceeding that limit. func (l *txList) Cap(threshold int) types.Transactions { // Short circuit if the number of items is under the limit if len(l.items) < threshold { @@ -239,8 +239,9 @@ func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) { l.cache = nil // Remove all invalidated transactions (strict mode only!) - invalids := make(types.Transactions, 0, l.last-nonce) + var invalids types.Transactions if l.strict { + invalids = make(types.Transactions, 0, l.last-nonce) for i := nonce + 1; i <= l.last; i++ { invalids = append(invalids, l.items[i]) delete(l.items, i) @@ -255,7 +256,6 @@ func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) { } } } - // Figure out the new highest nonce return true, invalids } return false, nil @@ -265,7 +265,7 @@ func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) { // provided nonce that is ready for processing. The returned transactions will be // removed from the list. // -// Note, all transactions with nonces lower that start will also be returned to +// Note, all transactions with nonces lower than start will also be returned to // prevent getting into and invalid state. This is not something that should ever // happen but better to be self correcting than failing! func (l *txList) Ready(start uint64) types.Transactions { diff --git a/core/tx_pool.go b/core/tx_pool.go index c4dcceba0..58d304f00 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "math/big" + "sort" "sync" "time" @@ -44,8 +45,11 @@ var ( ErrNegativeValue = errors.New("Negative value") ) -const ( - maxQueued = 64 // max limit of queued txs per address +var ( + maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address + maxQueuedInTotal = uint64(65536) // Max limit of queued transactions from all accounts + maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued + evictionInterval = time.Minute // Time interval to check for evictable transactions ) type stateFn func() (*state.StateDB, error) @@ -71,8 +75,10 @@ type TxPool struct { pending map[common.Address]*txList // All currently processable transactions queue map[common.Address]*txList // Queued but non-processable transactions all map[common.Hash]*types.Transaction // All transactions to allow lookups + beats map[common.Address]time.Time // Last heartbeat from each known account - wg sync.WaitGroup // for shutdown sync + wg sync.WaitGroup // for shutdown sync + quit chan struct{} homestead bool } @@ -83,6 +89,7 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), all: make(map[common.Hash]*types.Transaction), + beats: make(map[common.Address]time.Time), eventMux: eventMux, currentState: currentStateFn, gasLimit: gasLimitFn, @@ -90,10 +97,12 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat pendingState: nil, localTx: newTxSet(), events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}), + quit: make(chan struct{}), } - pool.wg.Add(1) + pool.wg.Add(2) go pool.eventLoop() + go pool.expirationLoop() return pool } @@ -154,6 +163,7 @@ func (pool *TxPool) resetState() { func (pool *TxPool) Stop() { pool.events.Unsubscribe() + close(pool.quit) pool.wg.Wait() glog.V(logger.Info).Infoln("Transaction pool stopped") } @@ -290,7 +300,7 @@ func (pool *TxPool) add(tx *types.Transaction) error { if pool.all[hash] != nil { return fmt.Errorf("Known transaction: %x", hash[:4]) } - // Otherwise ensure basic validation passes nd queue it up + // Otherwise ensure basic validation passes and queue it up if err := pool.validateTx(tx); err != nil { return err } @@ -308,7 +318,7 @@ func (pool *TxPool) add(tx *types.Transaction) error { return nil } -// enqueueTx inserts a new transction into the non-executable transaction queue. +// enqueueTx inserts a new transaction into the non-executable transaction queue. // // Note, this method assumes the pool lock is held! func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) { @@ -355,6 +365,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests) // Set the potentially new pending nonce and notify any subsystems of the new tx + pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, list.last+1) go pool.eventMux.Post(TxPreEvent{tx}) } @@ -412,8 +423,8 @@ func (pool *TxPool) RemoveBatch(txs types.Transactions) { } } -// removeTx iterates removes a single transaction from the queue, moving all -// subsequent transactions back to the future queue. +// removeTx removes a single transaction from the queue, moving all subsequent +// transactions back to the future queue. func (pool *TxPool) removeTx(hash common.Hash) { // Fetch the transaction we wish to delete tx, ok := pool.all[hash] @@ -431,6 +442,8 @@ func (pool *TxPool) removeTx(hash common.Hash) { // If no more transactions are left, remove the list and reset the nonce if pending.Empty() { delete(pool.pending, addr) + delete(pool.beats, addr) + pool.pendingState.SetNonce(addr, tx.Nonce()) } else { // Otherwise update the nonce and postpone any invalidated transactions @@ -465,6 +478,8 @@ func (pool *TxPool) promoteExecutables() { return } // Iterate over all accounts and promote any executable transactions + queued := uint64(0) + for addr, list := range pool.queue { // Drop all transactions that are deemed too old (low nonce) for _, tx := range list.Forward(state.GetNonce(addr)) { @@ -489,17 +504,51 @@ func (pool *TxPool) promoteExecutables() { pool.promoteTx(addr, tx.Hash(), tx) } // Drop all transactions over the allowed limit - for _, tx := range list.Cap(maxQueued) { + for _, tx := range list.Cap(int(maxQueuedPerAccount)) { if glog.V(logger.Core) { glog.Infof("Removed cap-exceeding queued transaction: %v", tx) } delete(pool.all, tx.Hash()) } + queued += uint64(list.Len()) + // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.queue, addr) } } + // If we've queued more transactions than the hard limit, drop oldest ones + if queued > maxQueuedInTotal { + // Sort all accounts with queued transactions by heartbeat + addresses := make(addresssByHeartbeat, 0, len(pool.queue)) + for addr, _ := range pool.queue { + addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) + } + sort.Sort(addresses) + + // Drop transactions until the total is below the limit + for drop := queued - maxQueuedInTotal; drop > 0; { + addr := addresses[len(addresses)-1] + list := pool.queue[addr.address] + + addresses = addresses[:len(addresses)-1] + + // Drop all transactions if they are less than the overflow + if size := uint64(list.Len()); size <= drop { + for _, tx := range list.Flatten() { + pool.removeTx(tx.Hash()) + } + drop -= size + continue + } + // Otherwise drop only last few transactions + txs := list.Flatten() + for i := len(txs) - 1; i >= 0 && drop > 0; i-- { + pool.removeTx(txs[i].Hash()) + drop-- + } + } + } } // demoteUnexecutables removes invalid and processed transactions from the pools @@ -540,10 +589,51 @@ func (pool *TxPool) demoteUnexecutables() { // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.pending, addr) + delete(pool.beats, addr) } } } +// expirationLoop is a loop that periodically iterates over all accounts with +// queued transactions and drop all that have been inactive for a prolonged amount +// of time. +func (pool *TxPool) expirationLoop() { + defer pool.wg.Done() + + evict := time.NewTicker(evictionInterval) + defer evict.Stop() + + for { + select { + case <-evict.C: + pool.mu.Lock() + for addr := range pool.queue { + if time.Since(pool.beats[addr]) > maxQueuedLifetime { + for _, tx := range pool.queue[addr].Flatten() { + pool.removeTx(tx.Hash()) + } + } + } + pool.mu.Unlock() + + case <-pool.quit: + return + } + } +} + +// addressByHeartbeat is an account address tagged with its last activity timestamp. +type addressByHeartbeat struct { + address common.Address + heartbeat time.Time +} + +type addresssByHeartbeat []addressByHeartbeat + +func (a addresssByHeartbeat) Len() int { return len(a) } +func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) } +func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + // txSet represents a set of transaction hashes in which entries // are automatically dropped after txSetDuration time type txSet struct { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index ec54d8c0e..f08334fa1 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -19,7 +19,9 @@ package core import ( "crypto/ecdsa" "math/big" + "math/rand" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" @@ -38,10 +40,10 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { db, _ := ethdb.NewMemDatabase() statedb, _ := state.New(common.Hash{}, db) - var m event.TypeMux key, _ := crypto.GenerateKey() - newPool := NewTxPool(testChainConfig(), &m, func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + newPool := NewTxPool(testChainConfig(), new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) newPool.resetState() + return newPool, key } @@ -438,7 +440,7 @@ func TestTransactionPostponing(t *testing.T) { // Tests that if the transaction count belonging to a single account goes above // some threshold, the higher transactions are dropped to prevent DOS attacks. -func TestTransactionQueueLimiting(t *testing.T) { +func TestTransactionQueueAccountLimiting(t *testing.T) { // Create a test account and fund it pool, key := setupTxPool() account, _ := transaction(0, big.NewInt(0), key).From() @@ -447,25 +449,103 @@ func TestTransactionQueueLimiting(t *testing.T) { state.AddBalance(account, big.NewInt(1000000)) // Keep queuing up transactions and make sure all above a limit are dropped - for i := uint64(1); i <= maxQueued+5; i++ { + for i := uint64(1); i <= maxQueuedPerAccount+5; i++ { if err := pool.Add(transaction(i, big.NewInt(100000), key)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } if len(pool.pending) != 0 { t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, len(pool.pending), 0) } - if i <= maxQueued { + if i <= maxQueuedPerAccount { if pool.queue[account].Len() != int(i) { t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), i) } } else { - if pool.queue[account].Len() != maxQueued { - t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, pool.queue[account].Len(), maxQueued) + if pool.queue[account].Len() != int(maxQueuedPerAccount) { + t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, pool.queue[account].Len(), maxQueuedPerAccount) } } } - if len(pool.all) != maxQueued { - t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), maxQueued) + if len(pool.all) != int(maxQueuedPerAccount) { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), maxQueuedPerAccount) + } +} + +// Tests that if the transaction count belonging to multiple accounts go above +// some threshold, the higher transactions are dropped to prevent DOS attacks. +func TestTransactionQueueGlobalLimiting(t *testing.T) { + // Reduce the queue limits to shorten test time + defer func(old uint64) { maxQueuedInTotal = old }(maxQueuedInTotal) + maxQueuedInTotal = maxQueuedPerAccount * 3 + + // Create the pool to test the limit enforcement with + db, _ := ethdb.NewMemDatabase() + statedb, _ := state.New(common.Hash{}, db) + + pool := NewTxPool(testChainConfig(), new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool.resetState() + + // Create a number of test accounts and fund them + state, _ := pool.currentState() + + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + // Generate and queue a batch of transactions + nonces := make(map[common.Address]uint64) + + txs := make(types.Transactions, 0, 3*maxQueuedInTotal) + for len(txs) < cap(txs) { + key := keys[rand.Intn(len(keys))] + addr := crypto.PubkeyToAddress(key.PublicKey) + + txs = append(txs, transaction(nonces[addr]+1, big.NewInt(100000), key)) + nonces[addr]++ + } + // Import the batch and verify that limits have been enforced + pool.AddBatch(txs) + + queued := 0 + for addr, list := range pool.queue { + if list.Len() > int(maxQueuedPerAccount) { + t.Errorf("addr %x: queued accounts overflown allowance: %d > %d", addr, list.Len(), maxQueuedPerAccount) + } + queued += list.Len() + } + if queued > int(maxQueuedInTotal) { + t.Fatalf("total transactions overflow allowance: %d > %d", queued, maxQueuedInTotal) + } +} + +// Tests that if an account remains idle for a prolonged amount of time, any +// non-executable transactions queued up are dropped to prevent wasting resources +// on shuffling them around. +func TestTransactionQueueTimeLimiting(t *testing.T) { + // Reduce the queue limits to shorten test time + defer func(old time.Duration) { maxQueuedLifetime = old }(maxQueuedLifetime) + defer func(old time.Duration) { evictionInterval = old }(evictionInterval) + maxQueuedLifetime = time.Second + evictionInterval = time.Second + + // Create a test account and fund it + pool, key := setupTxPool() + account, _ := transaction(0, big.NewInt(0), key).From() + + state, _ := pool.currentState() + state.AddBalance(account, big.NewInt(1000000)) + + // Queue up a batch of transactions + for i := uint64(1); i <= maxQueuedPerAccount; i++ { + if err := pool.Add(transaction(i, big.NewInt(100000), key)); err != nil { + t.Fatalf("tx %d: failed to add transaction: %v", i, err) + } + } + // Wait until at least two expiration cycles hit and make sure the transactions are gone + time.Sleep(2 * evictionInterval) + if len(pool.queue) > 0 { + t.Fatalf("old transactions remained after eviction") } } @@ -481,7 +561,7 @@ func TestTransactionPendingLimiting(t *testing.T) { state.AddBalance(account, big.NewInt(1000000)) // Keep queuing up transactions and make sure all above a limit are dropped - for i := uint64(0); i < maxQueued+5; i++ { + for i := uint64(0); i < maxQueuedPerAccount+5; i++ { if err := pool.Add(transaction(i, big.NewInt(100000), key)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } @@ -492,8 +572,8 @@ func TestTransactionPendingLimiting(t *testing.T) { t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), 0) } } - if len(pool.all) != maxQueued+5 { - t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), maxQueued+5) + if len(pool.all) != int(maxQueuedPerAccount+5) { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), maxQueuedPerAccount+5) } } @@ -509,7 +589,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { state1, _ := pool1.currentState() state1.AddBalance(account1, big.NewInt(1000000)) - for i := uint64(0); i < maxQueued+5; i++ { + for i := uint64(0); i < maxQueuedPerAccount+5; i++ { if err := pool1.Add(transaction(origin+i, big.NewInt(100000), key1)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } @@ -521,7 +601,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { state2.AddBalance(account2, big.NewInt(1000000)) txns := []*types.Transaction{} - for i := uint64(0); i < maxQueued+5; i++ { + for i := uint64(0); i < maxQueuedPerAccount+5; i++ { txns = append(txns, transaction(origin+i, big.NewInt(100000), key2)) } pool2.AddBatch(txns) -- cgit From b4a52513915d5a39ac055fc38cafed70098eb698 Mon Sep 17 00:00:00 2001 From: Péter Szilágyi Date: Thu, 25 Aug 2016 19:04:40 +0300 Subject: core: abstract out a sorted transaction hash map --- core/tx_list.go | 395 ++++++++++++++++++++++++++------------------------- core/tx_list_test.go | 14 +- core/tx_pool.go | 16 ++- core/tx_pool_test.go | 26 ++-- 4 files changed, 229 insertions(+), 222 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index 8c69331cc..c3ddf3148 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -45,20 +45,182 @@ func (h *nonceHeap) Pop() interface{} { return x } +// txSortedMap is a nonce->transaction hash map with a heap based index to allow +// iterating over the contents in a nonce-incrementing way. +type txSortedMap struct { + items map[uint64]*types.Transaction // Hash map storing the transaction data + index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode) + cache types.Transactions // Cache of the transactions already sorted +} + +// newTxSortedMap creates a new sorted transaction map. +func newTxSortedMap() *txSortedMap { + return &txSortedMap{ + items: make(map[uint64]*types.Transaction), + index: &nonceHeap{}, + } +} + +// Get retrieves the current transactions associated with the given nonce. +func (m *txSortedMap) Get(nonce uint64) *types.Transaction { + return m.items[nonce] +} + +// Put inserts a new transaction into the map, also updating the map's nonce +// index. If a transaction already exists with the same nonce, it's overwritten. +func (m *txSortedMap) Put(tx *types.Transaction) { + nonce := tx.Nonce() + if m.items[nonce] == nil { + heap.Push(m.index, nonce) + } + m.items[nonce], m.cache = tx, nil +} + +// Forward removes all transactions from the map with a nonce lower than the +// provided threshold. Every removed transaction is returned for any post-removal +// maintenance. +func (m *txSortedMap) Forward(threshold uint64) types.Transactions { + var removed types.Transactions + + // Pop off heap items until the threshold is reached + for m.index.Len() > 0 && (*m.index)[0] < threshold { + nonce := heap.Pop(m.index).(uint64) + removed = append(removed, m.items[nonce]) + delete(m.items, nonce) + } + // If we had a cached order, shift the front + if m.cache != nil { + m.cache = m.cache[len(removed):] + } + return removed +} + +// Filter iterates over the list of transactions and removes all of them for which +// the specified function evaluates to true. +func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions { + var removed types.Transactions + + // Collect all the transactions to filter out + for nonce, tx := range m.items { + if filter(tx) { + removed = append(removed, tx) + delete(m.items, nonce) + } + } + // If transactions were removed, the heap and cache are ruined + if len(removed) > 0 { + *m.index = make([]uint64, 0, len(m.items)) + for nonce, _ := range m.items { + *m.index = append(*m.index, nonce) + } + heap.Init(m.index) + + m.cache = nil + } + return removed +} + +// Cap places a hard limit on the number of items, returning all transactions +// exceeding that limit. +func (m *txSortedMap) Cap(threshold int) types.Transactions { + // Short circuit if the number of items is under the limit + if len(m.items) <= threshold { + return nil + } + // Otherwise gather and drop the highest nonce'd transactions + var drops types.Transactions + + sort.Sort(*m.index) + for size := len(m.items); size > threshold; size-- { + drops = append(drops, m.items[(*m.index)[size-1]]) + delete(m.items, (*m.index)[size-1]) + } + *m.index = (*m.index)[:threshold] + heap.Init(m.index) + + // If we had a cache, shift the back + if m.cache != nil { + m.cache = m.cache[:len(m.cache)-len(drops)] + } + return drops +} + +// Remove deletes a transaction from the maintained map, returning whether the +// transaction was found. +func (m *txSortedMap) Remove(nonce uint64) bool { + // Short circuit if no transaction is present + _, ok := m.items[nonce] + if !ok { + return false + } + // Otherwise delete the transaction and fix the heap index + for i := 0; i < m.index.Len(); i++ { + if (*m.index)[i] == nonce { + heap.Remove(m.index, i) + break + } + } + delete(m.items, nonce) + m.cache = nil + + return true +} + +// Ready retrieves a sequentially increasing list of transactions starting at the +// provided nonce that is ready for processing. The returned transactions will be +// removed from the list. +// +// Note, all transactions with nonces lower than start will also be returned to +// prevent getting into and invalid state. This is not something that should ever +// happen but better to be self correcting than failing! +func (m *txSortedMap) Ready(start uint64) types.Transactions { + // Short circuit if no transactions are available + if m.index.Len() == 0 || (*m.index)[0] > start { + return nil + } + // Otherwise start accumulating incremental transactions + var ready types.Transactions + for next := (*m.index)[0]; m.index.Len() > 0 && (*m.index)[0] == next; next++ { + ready = append(ready, m.items[next]) + delete(m.items, next) + heap.Pop(m.index) + } + m.cache = nil + + return ready +} + +// Len returns the length of the transaction map. +func (m *txSortedMap) Len() int { + return len(m.items) +} + +// Flatten creates a nonce-sorted slice of transactions based on the loosely +// sorted internal representation. The result of the sorting is cached in case +// it's requested again before any modifications are made to the contents. +func (m *txSortedMap) Flatten() types.Transactions { + // If the sorting was not cached yet, create and cache it + if m.cache == nil { + m.cache = make(types.Transactions, 0, len(m.items)) + for _, tx := range m.items { + m.cache = append(m.cache, tx) + } + sort.Sort(types.TxByNonce(m.cache)) + } + // Copy the cache to prevent accidental modifications + txs := make(types.Transactions, len(m.cache)) + copy(txs, m.cache) + return txs +} + // txList is a "list" of transactions belonging to an account, sorted by account // nonce. The same type can be used both for storing contiguous transactions for // the executable/pending queue; and for storing gapped transactions for the non- // executable/future queue, with minor behavoiral changes. type txList struct { - strict bool // Whether nonces are strictly continuous or not - items map[uint64]*types.Transaction // Hash map storing the transaction data - cache types.Transactions // Cache of the transactions already sorted - - first uint64 // Nonce of the lowest stored transaction (strict mode) - last uint64 // Nonce of the highest stored transaction (strict mode) - index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode) - - costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance) + strict bool // Whether nonces are strictly continuous or not + txs *txSortedMap // Heap indexed sorted hash map of the transactions + costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance) } // newTxList create a new transaction list for maintaining nonce-indexable fast, @@ -66,9 +228,7 @@ type txList struct { func newTxList(strict bool) *txList { return &txList{ strict: strict, - items: make(map[uint64]*types.Transaction), - first: math.MaxUint64, - index: &nonceHeap{}, + txs: newTxSortedMap(), costcap: new(big.Int), } } @@ -76,36 +236,19 @@ func newTxList(strict bool) *txList { // Add tries to insert a new transaction into the list, returning whether the // transaction was accepted, and if yes, any previous transaction it replaced. // -// In case of strict lists (contiguous nonces) the nonce boundaries are updated -// appropriately with the new transaction. Otherwise (gapped nonces) the heap of -// nonces is expanded with the new transaction. +// If the new transaction is accepted into the list, the lists' cost threshold +// is also potentially updated. func (l *txList) Add(tx *types.Transaction) (bool, *types.Transaction) { - // If an existing transaction is better, discard new one - nonce := tx.Nonce() - - old, ok := l.items[nonce] - if ok && old.GasPrice().Cmp(tx.GasPrice()) >= 0 { + // If there's an older better transaction, abort + old := l.txs.Get(tx.Nonce()) + if old != nil && old.GasPrice().Cmp(tx.GasPrice()) >= 0 { return false, nil } - // Otherwise insert the transaction and replace any previous one - l.items[nonce] = tx + // Otherwise overwrite the old transaction with the current one + l.txs.Put(tx) if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 { l.costcap = cost } - if l.strict { - // In strict mode, maintain the nonce sequence boundaries - if nonce < l.first { - l.first = nonce - } - if nonce > l.last { - l.last = nonce - } - } else { - // In gapped mode, maintain the nonce heap - heap.Push(l.index, nonce) - } - l.cache = nil - return true, old } @@ -113,31 +256,7 @@ func (l *txList) Add(tx *types.Transaction) (bool, *types.Transaction) { // provided threshold. Every removed transaction is returned for any post-removal // maintenance. func (l *txList) Forward(threshold uint64) types.Transactions { - var removed types.Transactions - - if l.strict { - // In strict mode, push the lowest nonce forward to the threshold - for l.first < threshold { - if tx, ok := l.items[l.first]; ok { - removed = append(removed, tx) - } - delete(l.items, l.first) - l.first++ - } - if l.first > l.last { - l.last = l.first - } - } else { - // In gapped mode, pop off heap items until the threshold is reached - for l.index.Len() > 0 && (*l.index)[0] < threshold { - nonce := heap.Pop(l.index).(uint64) - removed = append(removed, l.items[nonce]) - delete(l.items, nonce) - } - } - l.cache = nil - - return removed + return l.txs.Forward(threshold) } // Filter removes all transactions from the list with a cost higher than the @@ -155,110 +274,43 @@ func (l *txList) Filter(threshold *big.Int) (types.Transactions, types.Transacti } l.costcap = new(big.Int).Set(threshold) // Lower the cap to the threshold - // Gather all the transactions needing deletion - var removed types.Transactions - for _, tx := range l.items { - if cost := tx.Cost(); cost.Cmp(threshold) > 0 { - removed = append(removed, tx) - delete(l.items, tx.Nonce()) - } - } - // Readjust the nonce boundaries/indexes and gather invalidate tranactions + // Filter out all the transactions above the account's funds + removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(threshold) > 0 }) + + // If the list was strict, filter anything above the lowest nonce var invalids types.Transactions - if l.strict { - // In strict mode iterate find the first gap and invalidate everything after it - for i := l.first; i <= l.last; i++ { - if _, ok := l.items[i]; !ok { - // Gap found, invalidate all subsequent transactions - for j := i + 1; j <= l.last; j++ { - if tx, ok := l.items[j]; ok { - invalids = append(invalids, tx) - delete(l.items, j) - } - } - // Reduce the highest transaction nonce and return - l.last = i - 1 - break + if l.strict && len(removed) > 0 { + lowest := uint64(math.MaxUint64) + for _, tx := range removed { + if nonce := tx.Nonce(); lowest > nonce { + lowest = nonce } } - } else { - // In gapped mode no transactions are invalid, but the heap is ruined - l.index = &nonceHeap{} - for nonce, _ := range l.items { - *l.index = append(*l.index, nonce) - } - heap.Init(l.index) + invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest }) } - l.cache = nil - return removed, invalids } // Cap places a hard limit on the number of items, returning all transactions // exceeding that limit. func (l *txList) Cap(threshold int) types.Transactions { - // Short circuit if the number of items is under the limit - if len(l.items) < threshold { - return nil - } - // Otherwise gather and drop the highest nonce'd transactions - var drops types.Transactions - - if l.strict { - // In strict mode, just gather top down from last to first - for len(l.items) > threshold { - if tx, ok := l.items[l.last]; ok { - drops = append(drops, tx) - delete(l.items, l.last) - l.last-- - } - } - } else { - // In gapped mode it's expensive: we need to sort and drop like that - sort.Sort(*l.index) - for size := len(l.items); size > threshold; size-- { - drops = append(drops, l.items[(*l.index)[size-1]]) - delete(l.items, (*l.index)[size-1]) - *l.index = (*l.index)[:size-1] - } - heap.Init(l.index) - } - l.cache = nil - - return drops + return l.txs.Cap(threshold) } // Remove deletes a transaction from the maintained list, returning whether the // transaction was found, and also returning any transaction invalidated due to // the deletion (strict mode only). func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) { + // Remove the transaction from the set nonce := tx.Nonce() - if _, ok := l.items[nonce]; ok { - // Remove the item and invalidate the sorted cache - delete(l.items, nonce) - l.cache = nil - - // Remove all invalidated transactions (strict mode only!) - var invalids types.Transactions - if l.strict { - invalids = make(types.Transactions, 0, l.last-nonce) - for i := nonce + 1; i <= l.last; i++ { - invalids = append(invalids, l.items[i]) - delete(l.items, i) - } - l.last = nonce - 1 - } else { - // In gapped mode, remove the nonce from the index but honour the heap - for i := 0; i < l.index.Len(); i++ { - if (*l.index)[i] == nonce { - heap.Remove(l.index, i) - break - } - } - } - return true, invalids + if removed := l.txs.Remove(nonce); !removed { + return false, nil } - return false, nil + // In strict mode, filter out non-executable transactions + if l.strict { + return true, l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > nonce }) + } + return true, nil } // Ready retrieves a sequentially increasing list of transactions starting at the @@ -269,63 +321,22 @@ func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) { // prevent getting into and invalid state. This is not something that should ever // happen but better to be self correcting than failing! func (l *txList) Ready(start uint64) types.Transactions { - var txs types.Transactions - if l.strict { - // In strict mode make sure we have valid transaction, return all contiguous - if l.first > start { - return nil - } - for { - if tx, ok := l.items[l.first]; ok { - txs = append(txs, tx) - delete(l.items, l.first) - l.first++ - continue - } - break - } - } else { - // In gapped mode, check the heap start and return all contiguous - if l.index.Len() == 0 || (*l.index)[0] > start { - return nil - } - next := (*l.index)[0] - for l.index.Len() > 0 && (*l.index)[0] == next { - txs = append(txs, l.items[next]) - delete(l.items, next) - heap.Pop(l.index) - next++ - } - } - l.cache = nil - - return txs + return l.txs.Ready(start) } // Len returns the length of the transaction list. func (l *txList) Len() int { - return len(l.items) + return l.txs.Len() } // Empty returns whether the list of transactions is empty or not. func (l *txList) Empty() bool { - return len(l.items) == 0 + return l.Len() == 0 } // Flatten creates a nonce-sorted slice of transactions based on the loosely // sorted internal representation. The result of the sorting is cached in case // it's requested again before any modifications are made to the contents. func (l *txList) Flatten() types.Transactions { - // If the sorting was not cached yet, create and cache it - if l.cache == nil { - l.cache = make(types.Transactions, 0, len(l.items)) - for _, tx := range l.items { - l.cache = append(l.cache, tx) - } - sort.Sort(types.TxByNonce(l.cache)) - } - // Copy the cache to prevent accidental modifications - txs := make(types.Transactions, len(l.cache)) - copy(txs, l.cache) - return txs + return l.txs.Flatten() } diff --git a/core/tx_list_test.go b/core/tx_list_test.go index ea83ca479..92b211937 100644 --- a/core/tx_list_test.go +++ b/core/tx_list_test.go @@ -41,18 +41,12 @@ func TestStrictTxListAdd(t *testing.T) { list.Add(txs[v]) } // Verify internal state - if list.first != 0 { - t.Errorf("lowest nonce mismatch: have %d, want %d", list.first, 0) - } - if int(list.last) != len(txs)-1 { - t.Errorf("highest nonce mismatch: have %d, want %d", list.last, len(txs)-1) - } - if len(list.items) != len(txs) { - t.Errorf("transaction count mismatch: have %d, want %d", len(list.items), len(txs)) + if len(list.txs.items) != len(txs) { + t.Errorf("transaction count mismatch: have %d, want %d", len(list.txs.items), len(txs)) } for i, tx := range txs { - if list.items[tx.Nonce()] != tx { - t.Errorf("item %d: transaction mismatch: have %v, want %v", i, list.items[tx.Nonce()], tx) + if list.txs.items[tx.Nonce()] != tx { + t.Errorf("item %d: transaction mismatch: have %v, want %v", i, list.txs.items[tx.Nonce()], tx) } } } diff --git a/core/tx_pool.go b/core/tx_pool.go index 58d304f00..f8b11a7ce 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -154,7 +154,8 @@ func (pool *TxPool) resetState() { // Update all accounts to the latest known pending nonce for addr, list := range pool.pending { - pool.pendingState.SetNonce(addr, list.last+1) + txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway + pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1) } // Check the queue and move transactions over to the pending if possible // or remove those that have become invalid @@ -366,7 +367,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() - pool.pendingState.SetNonce(addr, list.last+1) + pool.pendingState.SetNonce(addr, tx.Nonce()+1) go pool.eventMux.Post(TxPreEvent{tx}) } @@ -439,19 +440,20 @@ func (pool *TxPool) removeTx(hash common.Hash) { // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { if removed, invalids := pending.Remove(tx); removed { - // If no more transactions are left, remove the list and reset the nonce + // If no more transactions are left, remove the list if pending.Empty() { delete(pool.pending, addr) delete(pool.beats, addr) - - pool.pendingState.SetNonce(addr, tx.Nonce()) } else { - // Otherwise update the nonce and postpone any invalidated transactions - pool.pendingState.SetNonce(addr, pending.last) + // Otherwise postpone any invalidated transactions for _, tx := range invalids { pool.enqueueTx(tx.Hash(), tx) } } + // Update the account nonce if needed + if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { + pool.pendingState.SetNonce(addr, tx.Nonce()) + } } } // Transaction is in the future queue diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index f08334fa1..4bc5aed38 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -105,7 +105,7 @@ func TestTransactionQueue(t *testing.T) { currentState.SetNonce(from, 2) pool.enqueueTx(tx.Hash(), tx) pool.promoteExecutables() - if _, ok := pool.pending[from].items[tx.Nonce()]; ok { + if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { t.Error("expected transaction to be in tx pool") } @@ -224,7 +224,7 @@ func TestTransactionDoubleNonce(t *testing.T) { if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } - if tx := pool.pending[addr].items[0]; tx.Hash() != tx2.Hash() { + if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() { t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) } // Add the thid transaction and ensure it's not saved (smaller price) @@ -235,7 +235,7 @@ func TestTransactionDoubleNonce(t *testing.T) { if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } - if tx := pool.pending[addr].items[0]; tx.Hash() != tx2.Hash() { + if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() { t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) } // Ensure the total transaction count is correct @@ -346,16 +346,16 @@ func TestTransactionDropping(t *testing.T) { state.AddBalance(account, big.NewInt(-750)) pool.resetState() - if _, ok := pool.pending[account].items[tx0.Nonce()]; !ok { + if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { t.Errorf("funded pending transaction missing: %v", tx0) } - if _, ok := pool.pending[account].items[tx1.Nonce()]; ok { + if _, ok := pool.pending[account].txs.items[tx1.Nonce()]; ok { t.Errorf("out-of-fund pending transaction present: %v", tx1) } - if _, ok := pool.queue[account].items[tx10.Nonce()]; !ok { + if _, ok := pool.queue[account].txs.items[tx10.Nonce()]; !ok { t.Errorf("funded queued transaction missing: %v", tx10) } - if _, ok := pool.queue[account].items[tx11.Nonce()]; ok { + if _, ok := pool.queue[account].txs.items[tx11.Nonce()]; ok { t.Errorf("out-of-fund queued transaction present: %v", tx11) } if len(pool.all) != 2 { @@ -410,25 +410,25 @@ func TestTransactionPostponing(t *testing.T) { state.AddBalance(account, big.NewInt(-750)) pool.resetState() - if _, ok := pool.pending[account].items[txns[0].Nonce()]; !ok { + if _, ok := pool.pending[account].txs.items[txns[0].Nonce()]; !ok { t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txns[0]) } - if _, ok := pool.queue[account].items[txns[0].Nonce()]; ok { + if _, ok := pool.queue[account].txs.items[txns[0].Nonce()]; ok { t.Errorf("tx %d: valid and funded transaction present in future queue: %v", 0, txns[0]) } for i, tx := range txns[1:] { if i%2 == 1 { - if _, ok := pool.pending[account].items[tx.Nonce()]; ok { + if _, ok := pool.pending[account].txs.items[tx.Nonce()]; ok { t.Errorf("tx %d: valid but future transaction present in pending pool: %v", i+1, tx) } - if _, ok := pool.queue[account].items[tx.Nonce()]; !ok { + if _, ok := pool.queue[account].txs.items[tx.Nonce()]; !ok { t.Errorf("tx %d: valid but future transaction missing from future queue: %v", i+1, tx) } } else { - if _, ok := pool.pending[account].items[tx.Nonce()]; ok { + if _, ok := pool.pending[account].txs.items[tx.Nonce()]; ok { t.Errorf("tx %d: out-of-fund transaction present in pending pool: %v", i+1, tx) } - if _, ok := pool.queue[account].items[tx.Nonce()]; ok { + if _, ok := pool.queue[account].txs.items[tx.Nonce()]; ok { t.Errorf("tx %d: out-of-fund transaction present in future queue: %v", i+1, tx) } } -- cgit