diff options
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 269 |
1 files changed, 195 insertions, 74 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index 49bd81e48..a0373ca7d 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -36,23 +36,26 @@ import ( var ( // Transaction Pool Errors - ErrInvalidSender = errors.New("Invalid sender") - ErrNonce = errors.New("Nonce too low") - ErrCheap = errors.New("Gas price too low for acceptance") - ErrBalance = errors.New("Insufficient balance") - ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value") - ErrIntrinsicGas = errors.New("Intrinsic gas too low") - ErrGasLimit = errors.New("Exceeds block gas limit") - ErrNegativeValue = errors.New("Negative value") + ErrInvalidSender = errors.New("invalid sender") + ErrNonce = errors.New("nonce too low") + ErrUnderpriced = errors.New("transaction underpriced") + ErrReplaceUnderpriced = errors.New("replacement transaction underpriced") + ErrBalance = errors.New("insufficient balance") + ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value") + ErrIntrinsicGas = errors.New("intrinsic gas too low") + ErrGasLimit = errors.New("exceeds block gas limit") + ErrNegativeValue = errors.New("negative value") ) var ( - minPendingPerAccount = uint64(16) // Min number of guaranteed transaction slots per address - maxPendingTotal = uint64(4096) // Max limit of pending transactions from all accounts (soft) - maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address - maxQueuedInTotal = uint64(1024) // Max limit of queued transactions from all accounts - maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued - evictionInterval = time.Minute // Time interval to check for evictable transactions + minPendingPerAccount = uint64(16) // Min number of guaranteed transaction slots per address + maxPendingTotal = uint64(4096) // Max limit of pending transactions from all accounts (soft) + maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address + maxQueuedTotal = uint64(1024) // Max limit of queued transactions from all accounts + maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued + minPriceBumpPercent = int64(10) // Minimum price bump needed to replace an old transaction + evictionInterval = time.Minute // Time interval to check for evictable transactions + statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats ) var ( @@ -69,7 +72,8 @@ var ( queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds") // Dropped due to out-of-funds // General tx metrics - invalidTxCounter = metrics.NewCounter("txpool/invalid") + invalidTxCounter = metrics.NewCounter("txpool/invalid") + underpricedTxCounter = metrics.NewCounter("txpool/underpriced") ) type stateFn func() (*state.StateDB, error) @@ -86,17 +90,18 @@ type TxPool struct { currentState stateFn // The state function which will allow us to do some pre checks pendingState *state.ManagedState gasLimit func() *big.Int // The current gas limit function callback - minGasPrice *big.Int + gasPrice *big.Int eventMux *event.TypeMux events *event.TypeMuxSubscription - localTx *txSet + locals *txSet signer types.Signer mu sync.RWMutex pending map[common.Address]*txList // All currently processable transactions queue map[common.Address]*txList // Queued but non-processable transactions - all map[common.Hash]*types.Transaction // All transactions to allow lookups beats map[common.Address]time.Time // Last heartbeat from each known account + all map[common.Hash]*types.Transaction // All transactions to allow lookups + priced *txPricedList // All transactions sorted by price wg sync.WaitGroup // for shutdown sync quit chan struct{} @@ -110,18 +115,18 @@ func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentState signer: types.NewEIP155Signer(config.ChainId), pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), - all: make(map[common.Hash]*types.Transaction), beats: make(map[common.Address]time.Time), + all: make(map[common.Hash]*types.Transaction), eventMux: eventMux, currentState: currentStateFn, gasLimit: gasLimitFn, - minGasPrice: new(big.Int), + gasPrice: big.NewInt(1), pendingState: nil, - localTx: newTxSet(), - events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}), + locals: newTxSet(), + events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), quit: make(chan struct{}), } - + pool.priced = newTxPricedList(&pool.all) pool.resetState() pool.wg.Add(2) @@ -134,27 +139,48 @@ func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentState func (pool *TxPool) eventLoop() { defer pool.wg.Done() + // Start a ticker and keep track of interesting pool stats to report + var prevPending, prevQueued, prevStales int + + report := time.NewTicker(statsReportInterval) + defer report.Stop() + // Track chain events. When a chain events occurs (new chain canon block) // we need to know the new state. The new state will help us determine // the nonces in the managed state - for ev := range pool.events.Chan() { - switch ev := ev.Data.(type) { - case ChainHeadEvent: - pool.mu.Lock() - if ev.Block != nil { - if pool.config.IsHomestead(ev.Block.Number()) { - pool.homestead = true + for { + select { + // Handle any events fired by the system + case ev, ok := <-pool.events.Chan(): + if !ok { + return + } + switch ev := ev.Data.(type) { + case ChainHeadEvent: + pool.mu.Lock() + if ev.Block != nil { + if pool.config.IsHomestead(ev.Block.Number()) { + pool.homestead = true + } } + pool.resetState() + pool.mu.Unlock() + + case RemovedTransactionEvent: + pool.AddBatch(ev.Txs) } - pool.resetState() - pool.mu.Unlock() - case GasPriceChanged: - pool.mu.Lock() - pool.minGasPrice = ev.Price - pool.mu.Unlock() - case RemovedTransactionEvent: - pool.AddBatch(ev.Txs) + // Handle stats reporting ticks + case <-report.C: + pool.mu.RLock() + pending, queued := pool.stats() + stales := pool.priced.stales + pool.mu.RUnlock() + + if pending != prevPending || queued != prevQueued || stales != prevStales { + log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) + prevPending, prevQueued, prevStales = pending, queued, stales + } } } } @@ -191,6 +217,27 @@ func (pool *TxPool) Stop() { log.Info("Transaction pool stopped") } +// GasPrice returns the current gas price enforced by the transaction pool. +func (pool *TxPool) GasPrice() *big.Int { + pool.mu.RLock() + defer pool.mu.RUnlock() + + return new(big.Int).Set(pool.gasPrice) +} + +// SetGasPrice updates the minimum price required by the transaction pool for a +// new transaction, and drops all transactions below this threshold. +func (pool *TxPool) SetGasPrice(price *big.Int) { + pool.mu.Lock() + defer pool.mu.Unlock() + + pool.gasPrice = price + for _, tx := range pool.priced.Cap(price, pool.locals) { + pool.removeTx(tx.Hash()) + } + log.Info("Transaction pool price threshold updated", "price", price) +} + func (pool *TxPool) State() *state.ManagedState { pool.mu.RLock() defer pool.mu.RUnlock() @@ -200,17 +247,25 @@ func (pool *TxPool) State() *state.ManagedState { // Stats retrieves the current pool stats, namely the number of pending and the // number of queued (non-executable) transactions. -func (pool *TxPool) Stats() (pending int, queued int) { +func (pool *TxPool) Stats() (int, int) { pool.mu.RLock() defer pool.mu.RUnlock() + return pool.stats() +} + +// stats retrieves the current pool stats, namely the number of pending and the +// number of queued (non-executable) transactions. +func (pool *TxPool) stats() (int, int) { + pending := 0 for _, list := range pool.pending { pending += list.Len() } + queued := 0 for _, list := range pool.queue { queued += list.Len() } - return + return pending, queued } // Content retrieves the data content of the transaction pool, returning all the @@ -260,16 +315,16 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { func (pool *TxPool) SetLocal(tx *types.Transaction) { pool.mu.Lock() defer pool.mu.Unlock() - pool.localTx.add(tx.Hash()) + pool.locals.add(tx.Hash()) } // validateTx checks whether a transaction is valid according // to the consensus rules. func (pool *TxPool) validateTx(tx *types.Transaction) error { - local := pool.localTx.contains(tx.Hash()) + local := pool.locals.contains(tx.Hash()) // Drop transactions under our own minimal accepted gas price - if !local && pool.minGasPrice.Cmp(tx.GasPrice()) > 0 { - return ErrCheap + if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { + return ErrUnderpriced } currentState, err := pool.currentState() @@ -314,31 +369,72 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { } // add validates a transaction and inserts it into the non-executable queue for -// later pending promotion and execution. -func (pool *TxPool) add(tx *types.Transaction) error { +// later pending promotion and execution. If the transaction is a replacement for +// an already pending or queued one, it overwrites the previous and returns this +// so outer code doesn't uselessly call promote. +func (pool *TxPool) add(tx *types.Transaction) (bool, error) { // If the transaction is already known, discard it hash := tx.Hash() if pool.all[hash] != nil { log.Trace("Discarding already known transaction", "hash", hash) - return fmt.Errorf("known transaction: %x", hash) + return false, fmt.Errorf("known transaction: %x", hash) } - // Otherwise ensure basic validation passes and queue it up + // If the transaction fails basic validation, discard it if err := pool.validateTx(tx); err != nil { log.Trace("Discarding invalid transaction", "hash", hash, "err", err) invalidTxCounter.Inc(1) - return err + return false, err + } + // If the transaction pool is full, discard underpriced transactions + if uint64(len(pool.all)) >= maxPendingTotal+maxQueuedTotal { + // If the new transaction is underpriced, don't accept it + if pool.priced.Underpriced(tx, pool.locals) { + log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice()) + underpricedTxCounter.Inc(1) + return false, ErrUnderpriced + } + // New transaction is better than our worse ones, make room for it + drop := pool.priced.Discard(len(pool.all)-int(maxPendingTotal+maxQueuedTotal-1), pool.locals) + for _, tx := range drop { + log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice()) + underpricedTxCounter.Inc(1) + pool.removeTx(tx.Hash()) + } } - pool.enqueueTx(hash, tx) + // If the transaction is replacing an already pending one, do directly + from, _ := types.Sender(pool.signer, tx) // already validated + if list := pool.pending[from]; list != nil && list.Overlaps(tx) { + // Nonce already pending, check if required price bump is met + inserted, old := list.Add(tx) + if !inserted { + pendingDiscardCounter.Inc(1) + return false, ErrReplaceUnderpriced + } + // New transaction is better, replace old one + if old != nil { + delete(pool.all, old.Hash()) + pool.priced.Removed() + pendingReplaceCounter.Inc(1) + } + pool.all[tx.Hash()] = tx + pool.priced.Put(tx) - // Print a log message if low enough level is set - log.Debug("Pooled new transaction", "hash", hash, "from", log.Lazy{Fn: func() common.Address { from, _ := types.Sender(pool.signer, tx); return from }}, "to", tx.To()) - return nil + log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) + return old != nil, nil + } + // New transaction isn't replacing a pending one, push into queue + replace, err := pool.enqueueTx(hash, tx) + if err != nil { + return false, err + } + log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) + return replace, nil } // enqueueTx inserts a new transaction into the non-executable transaction queue. // // Note, this method assumes the pool lock is held! -func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) { +func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) { // Try to insert the transaction into the future queue from, _ := types.Sender(pool.signer, tx) // already validated if pool.queue[from] == nil { @@ -346,15 +442,19 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) { } inserted, old := pool.queue[from].Add(tx) if !inserted { + // An older transaction was better, discard this queuedDiscardCounter.Inc(1) - return // An older transaction was better, discard this + return false, ErrReplaceUnderpriced } // Discard any previous transaction and mark this if old != nil { delete(pool.all, old.Hash()) + pool.priced.Removed() queuedReplaceCounter.Inc(1) } pool.all[hash] = tx + pool.priced.Put(tx) + return old != nil, nil } // promoteTx adds a transaction to the pending (processable) list of transactions. @@ -371,16 +471,23 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T if !inserted { // An older transaction was better, discard this delete(pool.all, hash) + pool.priced.Removed() + pendingDiscardCounter.Inc(1) return } // Otherwise discard any previous transaction and mark this if old != nil { delete(pool.all, old.Hash()) + pool.priced.Removed() + pendingReplaceCounter.Inc(1) } - pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests) - + // Failsafe to work around direct pending inserts (tests) + if pool.all[hash] == nil { + pool.all[hash] = tx + pool.priced.Put(tx) + } // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) @@ -392,16 +499,19 @@ func (pool *TxPool) Add(tx *types.Transaction) error { pool.mu.Lock() defer pool.mu.Unlock() - if err := pool.add(tx); err != nil { + // Try to inject the transaction and update any state + replace, err := pool.add(tx) + if err != nil { return err } - state, err := pool.currentState() if err != nil { return err } - pool.promoteExecutables(state) - + // If we added a new transaction, run promotion checks and return + if !replace { + pool.promoteExecutables(state) + } return nil } @@ -411,10 +521,13 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) error { defer pool.mu.Unlock() // Add the batch of transaction, tracking the accepted ones - added := 0 + replaced, added := true, 0 for _, tx := range txs { - if err := pool.add(tx); err == nil { + if replace, err := pool.add(tx); err == nil { added++ + if !replace { + replaced = false + } } } // Only reprocess the internal state if something was actually added @@ -423,7 +536,9 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) error { if err != nil { return err } - pool.promoteExecutables(state) + if !replaced { + pool.promoteExecutables(state) + } } return nil } @@ -467,6 +582,7 @@ func (pool *TxPool) removeTx(hash common.Hash) { // Remove it from the list of known transactions delete(pool.all, hash) + pool.priced.Removed() // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { @@ -506,28 +622,31 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { // Drop all transactions that are deemed too old (low nonce) for _, tx := range list.Forward(state.GetNonce(addr)) { hash := tx.Hash() - log.Debug("Removed old queued transaction", "hash", hash) + log.Trace("Removed old queued transaction", "hash", hash) delete(pool.all, hash) + pool.priced.Removed() } // Drop all transactions that are too costly (low balance) drops, _ := list.Filter(state.GetBalance(addr)) for _, tx := range drops { hash := tx.Hash() - log.Debug("Removed unpayable queued transaction", "hash", hash) + log.Trace("Removed unpayable queued transaction", "hash", hash) delete(pool.all, hash) + pool.priced.Removed() queuedNofundsCounter.Inc(1) } // Gather all executable transactions and promote them for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) { hash := tx.Hash() - log.Debug("Promoting queued transaction", "hash", hash) + log.Trace("Promoting queued transaction", "hash", hash) pool.promoteTx(addr, hash, tx) } // Drop all transactions over the allowed limit for _, tx := range list.Cap(int(maxQueuedPerAccount)) { hash := tx.Hash() - log.Debug("Removed cap-exceeding queued transaction", "hash", hash) + log.Trace("Removed cap-exceeding queued transaction", "hash", hash) delete(pool.all, hash) + pool.priced.Removed() queuedRLCounter.Inc(1) } queued += uint64(list.Len()) @@ -551,7 +670,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { if uint64(list.Len()) > minPendingPerAccount { // Skip local accounts as pools should maintain backlogs for themselves for _, tx := range list.txs.items { - if !pool.localTx.contains(tx.Hash()) { + if !pool.locals.contains(tx.Hash()) { spammers.Push(addr, float32(list.Len())) } break // Checking on transaction for locality is enough @@ -593,7 +712,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { pendingRLCounter.Inc(int64(pendingBeforeCap - pending)) } // If we've queued more transactions than the hard limit, drop oldest ones - if queued > maxQueuedInTotal { + if queued > maxQueuedTotal { // Sort all accounts with queued transactions by heartbeat addresses := make(addresssByHeartbeat, 0, len(pool.queue)) for addr := range pool.queue { @@ -602,7 +721,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) { sort.Sort(addresses) // Drop transactions until the total is below the limit - for drop := queued - maxQueuedInTotal; drop > 0; { + for drop := queued - maxQueuedTotal; drop > 0; { addr := addresses[len(addresses)-1] list := pool.queue[addr.address] @@ -639,20 +758,22 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) { // Drop all transactions that are deemed too old (low nonce) for _, tx := range list.Forward(nonce) { hash := tx.Hash() - log.Debug("Removed old pending transaction", "hash", hash) + log.Trace("Removed old pending transaction", "hash", hash) delete(pool.all, hash) + pool.priced.Removed() } // Drop all transactions that are too costly (low balance), and queue any invalids back for later drops, invalids := list.Filter(state.GetBalance(addr)) for _, tx := range drops { hash := tx.Hash() - log.Debug("Removed unpayable pending transaction", "hash", hash) + log.Trace("Removed unpayable pending transaction", "hash", hash) delete(pool.all, hash) + pool.priced.Removed() pendingNofundsCounter.Inc(1) } for _, tx := range invalids { hash := tx.Hash() - log.Debug("Demoting pending transaction", "hash", hash) + log.Trace("Demoting pending transaction", "hash", hash) pool.enqueueTx(hash, tx) } // Delete the entire queue entry if it became empty. |