aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/events.go4
-rw-r--r--core/tx_list.go166
-rw-r--r--core/tx_pool.go269
-rw-r--r--core/tx_pool_test.go261
4 files changed, 599 insertions, 101 deletions
diff --git a/core/events.go b/core/events.go
index 106b52c80..ce1f5aebc 100644
--- a/core/events.go
+++ b/core/events.go
@@ -17,8 +17,6 @@
package core
import (
- "math/big"
-
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
@@ -67,8 +65,6 @@ type ChainUncleEvent struct {
type ChainHeadEvent struct{ Block *types.Block }
-type GasPriceChanged struct{ Price *big.Int }
-
// Mining operation events
type StartMining struct{}
type TopMining struct{}
diff --git a/core/tx_list.go b/core/tx_list.go
index 535cb9dd6..eb380da0b 100644
--- a/core/tx_list.go
+++ b/core/tx_list.go
@@ -22,7 +22,9 @@ import (
"math/big"
"sort"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/log"
)
// nonceHeap is a heap.Interface implementation over 64bit unsigned integers for
@@ -53,11 +55,11 @@ type txSortedMap struct {
cache types.Transactions // Cache of the transactions already sorted
}
-// newTxSortedMap creates a new sorted transaction map.
+// newTxSortedMap creates a new nonce-sorted transaction map.
func newTxSortedMap() *txSortedMap {
return &txSortedMap{
items: make(map[uint64]*types.Transaction),
- index: &nonceHeap{},
+ index: new(nonceHeap),
}
}
@@ -233,6 +235,12 @@ func newTxList(strict bool) *txList {
}
}
+// Overlaps returns whether the transaction specified has the same nonce as one
+// already contained within the list.
+func (l *txList) Overlaps(tx *types.Transaction) bool {
+ return l.txs.Get(tx.Nonce()) != nil
+}
+
// Add tries to insert a new transaction into the list, returning whether the
// transaction was accepted, and if yes, any previous transaction it replaced.
//
@@ -241,8 +249,11 @@ func newTxList(strict bool) *txList {
func (l *txList) Add(tx *types.Transaction) (bool, *types.Transaction) {
// If there's an older better transaction, abort
old := l.txs.Get(tx.Nonce())
- if old != nil && old.GasPrice().Cmp(tx.GasPrice()) >= 0 {
- return false, nil
+ if old != nil {
+ threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+minPriceBumpPercent)), big.NewInt(100))
+ if threshold.Cmp(tx.GasPrice()) >= 0 {
+ return false, nil
+ }
}
// Otherwise overwrite the old transaction with the current one
l.txs.Put(tx)
@@ -340,3 +351,150 @@ func (l *txList) Empty() bool {
func (l *txList) Flatten() types.Transactions {
return l.txs.Flatten()
}
+
+// priceHeap is a heap.Interface implementation over transactions for retrieving
+// price-sorted transactions to discard when the pool fills up.
+type priceHeap []*types.Transaction
+
+func (h priceHeap) Len() int { return len(h) }
+func (h priceHeap) Less(i, j int) bool { return h[i].GasPrice().Cmp(h[j].GasPrice()) < 0 }
+func (h priceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+
+func (h *priceHeap) Push(x interface{}) {
+ *h = append(*h, x.(*types.Transaction))
+}
+
+func (h *priceHeap) Pop() interface{} {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ *h = old[0 : n-1]
+ return x
+}
+
+// txPricedList is a price-sorted heap to allow operating on transactions pool
+// contents in a price-incrementing way.
+type txPricedList struct {
+ all *map[common.Hash]*types.Transaction // Pointer to the map of all transactions
+ items *priceHeap // Heap of prices of all the stored transactions
+ stales int // Number of stale price points to (re-heap trigger)
+}
+
+// newTxPricedList creates a new price-sorted transaction heap.
+func newTxPricedList(all *map[common.Hash]*types.Transaction) *txPricedList {
+ return &txPricedList{
+ all: all,
+ items: new(priceHeap),
+ }
+}
+
+// Put inserts a new transaction into the heap.
+func (l *txPricedList) Put(tx *types.Transaction) {
+ heap.Push(l.items, tx)
+}
+
+// Removed notifies the prices transaction list that an old transaction dropped
+// from the pool. The list will just keep a counter of stale objects and update
+// the heap if a large enough ratio of transactions go stale.
+func (l *txPricedList) Removed() {
+ // Bump the stale counter, but exit if still too low (< 25%)
+ l.stales++
+ if l.stales <= len(*l.items)/4 {
+ return
+ }
+ // Seems we've reached a critical number of stale transactions, reheap
+ reheap := make(priceHeap, 0, len(*l.all))
+
+ l.stales, l.items = 0, &reheap
+ for _, tx := range *l.all {
+ *l.items = append(*l.items, tx)
+ }
+ heap.Init(l.items)
+}
+
+// Discard finds all the transactions below the given price threshold, drops them
+// from the priced list and returs them for further removal from the entire pool.
+func (l *txPricedList) Cap(threshold *big.Int, local *txSet) types.Transactions {
+ drop := make(types.Transactions, 0, 128) // Remote underpriced transactions to drop
+ save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep
+
+ for len(*l.items) > 0 {
+ // Discard stale transactions if found during cleanup
+ tx := heap.Pop(l.items).(*types.Transaction)
+
+ hash := tx.Hash()
+ if _, ok := (*l.all)[hash]; !ok {
+ l.stales--
+ continue
+ }
+ // Stop the discards if we've reached the threshold
+ if tx.GasPrice().Cmp(threshold) >= 0 {
+ break
+ }
+ // Non stale transaction found, discard unless local
+ if local.contains(hash) {
+ save = append(save, tx)
+ } else {
+ drop = append(drop, tx)
+ }
+ }
+ for _, tx := range save {
+ heap.Push(l.items, tx)
+ }
+ return drop
+}
+
+// Underpriced checks whether a transaction is cheaper than (or as cheap as) the
+// lowest priced transaction currently being tracked.
+func (l *txPricedList) Underpriced(tx *types.Transaction, local *txSet) bool {
+ // Local transactions cannot be underpriced
+ if local.contains(tx.Hash()) {
+ return false
+ }
+ // Discard stale price points if found at the heap start
+ for len(*l.items) > 0 {
+ head := []*types.Transaction(*l.items)[0]
+ if _, ok := (*l.all)[head.Hash()]; !ok {
+ l.stales--
+ heap.Pop(l.items)
+ continue
+ }
+ break
+ }
+ // Check if the transaction is underpriced or not
+ if len(*l.items) == 0 {
+ log.Error("Pricing query for empty pool") // This cannot happen, print to catch programming errors
+ return false
+ }
+ cheapest := []*types.Transaction(*l.items)[0]
+ return cheapest.GasPrice().Cmp(tx.GasPrice()) >= 0
+}
+
+// Discard finds a number of most underpriced transactions, removes them from the
+// priced list and returs them for further removal from the entire pool.
+func (l *txPricedList) Discard(count int, local *txSet) types.Transactions {
+ drop := make(types.Transactions, 0, count) // Remote underpriced transactions to drop
+ save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep
+
+ for len(*l.items) > 0 && count > 0 {
+ // Discard stale transactions if found during cleanup
+ tx := heap.Pop(l.items).(*types.Transaction)
+
+ hash := tx.Hash()
+ if _, ok := (*l.all)[hash]; !ok {
+ l.stales--
+ continue
+ }
+ // Non stale transaction found, discard unless local
+ if local.contains(hash) {
+ save = append(save, tx)
+ } else {
+ drop = append(drop, tx)
+ count--
+ }
+ }
+ for _, tx := range save {
+ heap.Push(l.items, tx)
+ }
+ return drop
+}
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 49bd81e48..a0373ca7d 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -36,23 +36,26 @@ import (
var (
// Transaction Pool Errors
- ErrInvalidSender = errors.New("Invalid sender")
- ErrNonce = errors.New("Nonce too low")
- ErrCheap = errors.New("Gas price too low for acceptance")
- ErrBalance = errors.New("Insufficient balance")
- ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value")
- ErrIntrinsicGas = errors.New("Intrinsic gas too low")
- ErrGasLimit = errors.New("Exceeds block gas limit")
- ErrNegativeValue = errors.New("Negative value")
+ ErrInvalidSender = errors.New("invalid sender")
+ ErrNonce = errors.New("nonce too low")
+ ErrUnderpriced = errors.New("transaction underpriced")
+ ErrReplaceUnderpriced = errors.New("replacement transaction underpriced")
+ ErrBalance = errors.New("insufficient balance")
+ ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value")
+ ErrIntrinsicGas = errors.New("intrinsic gas too low")
+ ErrGasLimit = errors.New("exceeds block gas limit")
+ ErrNegativeValue = errors.New("negative value")
)
var (
- minPendingPerAccount = uint64(16) // Min number of guaranteed transaction slots per address
- maxPendingTotal = uint64(4096) // Max limit of pending transactions from all accounts (soft)
- maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
- maxQueuedInTotal = uint64(1024) // Max limit of queued transactions from all accounts
- maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
- evictionInterval = time.Minute // Time interval to check for evictable transactions
+ minPendingPerAccount = uint64(16) // Min number of guaranteed transaction slots per address
+ maxPendingTotal = uint64(4096) // Max limit of pending transactions from all accounts (soft)
+ maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
+ maxQueuedTotal = uint64(1024) // Max limit of queued transactions from all accounts
+ maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
+ minPriceBumpPercent = int64(10) // Minimum price bump needed to replace an old transaction
+ evictionInterval = time.Minute // Time interval to check for evictable transactions
+ statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
)
var (
@@ -69,7 +72,8 @@ var (
queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds") // Dropped due to out-of-funds
// General tx metrics
- invalidTxCounter = metrics.NewCounter("txpool/invalid")
+ invalidTxCounter = metrics.NewCounter("txpool/invalid")
+ underpricedTxCounter = metrics.NewCounter("txpool/underpriced")
)
type stateFn func() (*state.StateDB, error)
@@ -86,17 +90,18 @@ type TxPool struct {
currentState stateFn // The state function which will allow us to do some pre checks
pendingState *state.ManagedState
gasLimit func() *big.Int // The current gas limit function callback
- minGasPrice *big.Int
+ gasPrice *big.Int
eventMux *event.TypeMux
events *event.TypeMuxSubscription
- localTx *txSet
+ locals *txSet
signer types.Signer
mu sync.RWMutex
pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
- all map[common.Hash]*types.Transaction // All transactions to allow lookups
beats map[common.Address]time.Time // Last heartbeat from each known account
+ all map[common.Hash]*types.Transaction // All transactions to allow lookups
+ priced *txPricedList // All transactions sorted by price
wg sync.WaitGroup // for shutdown sync
quit chan struct{}
@@ -110,18 +115,18 @@ func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentState
signer: types.NewEIP155Signer(config.ChainId),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
- all: make(map[common.Hash]*types.Transaction),
beats: make(map[common.Address]time.Time),
+ all: make(map[common.Hash]*types.Transaction),
eventMux: eventMux,
currentState: currentStateFn,
gasLimit: gasLimitFn,
- minGasPrice: new(big.Int),
+ gasPrice: big.NewInt(1),
pendingState: nil,
- localTx: newTxSet(),
- events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
+ locals: newTxSet(),
+ events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}),
quit: make(chan struct{}),
}
-
+ pool.priced = newTxPricedList(&pool.all)
pool.resetState()
pool.wg.Add(2)
@@ -134,27 +139,48 @@ func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentState
func (pool *TxPool) eventLoop() {
defer pool.wg.Done()
+ // Start a ticker and keep track of interesting pool stats to report
+ var prevPending, prevQueued, prevStales int
+
+ report := time.NewTicker(statsReportInterval)
+ defer report.Stop()
+
// Track chain events. When a chain events occurs (new chain canon block)
// we need to know the new state. The new state will help us determine
// the nonces in the managed state
- for ev := range pool.events.Chan() {
- switch ev := ev.Data.(type) {
- case ChainHeadEvent:
- pool.mu.Lock()
- if ev.Block != nil {
- if pool.config.IsHomestead(ev.Block.Number()) {
- pool.homestead = true
+ for {
+ select {
+ // Handle any events fired by the system
+ case ev, ok := <-pool.events.Chan():
+ if !ok {
+ return
+ }
+ switch ev := ev.Data.(type) {
+ case ChainHeadEvent:
+ pool.mu.Lock()
+ if ev.Block != nil {
+ if pool.config.IsHomestead(ev.Block.Number()) {
+ pool.homestead = true
+ }
}
+ pool.resetState()
+ pool.mu.Unlock()
+
+ case RemovedTransactionEvent:
+ pool.AddBatch(ev.Txs)
}
- pool.resetState()
- pool.mu.Unlock()
- case GasPriceChanged:
- pool.mu.Lock()
- pool.minGasPrice = ev.Price
- pool.mu.Unlock()
- case RemovedTransactionEvent:
- pool.AddBatch(ev.Txs)
+ // Handle stats reporting ticks
+ case <-report.C:
+ pool.mu.RLock()
+ pending, queued := pool.stats()
+ stales := pool.priced.stales
+ pool.mu.RUnlock()
+
+ if pending != prevPending || queued != prevQueued || stales != prevStales {
+ log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales)
+ prevPending, prevQueued, prevStales = pending, queued, stales
+ }
}
}
}
@@ -191,6 +217,27 @@ func (pool *TxPool) Stop() {
log.Info("Transaction pool stopped")
}
+// GasPrice returns the current gas price enforced by the transaction pool.
+func (pool *TxPool) GasPrice() *big.Int {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return new(big.Int).Set(pool.gasPrice)
+}
+
+// SetGasPrice updates the minimum price required by the transaction pool for a
+// new transaction, and drops all transactions below this threshold.
+func (pool *TxPool) SetGasPrice(price *big.Int) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ pool.gasPrice = price
+ for _, tx := range pool.priced.Cap(price, pool.locals) {
+ pool.removeTx(tx.Hash())
+ }
+ log.Info("Transaction pool price threshold updated", "price", price)
+}
+
func (pool *TxPool) State() *state.ManagedState {
pool.mu.RLock()
defer pool.mu.RUnlock()
@@ -200,17 +247,25 @@ func (pool *TxPool) State() *state.ManagedState {
// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
-func (pool *TxPool) Stats() (pending int, queued int) {
+func (pool *TxPool) Stats() (int, int) {
pool.mu.RLock()
defer pool.mu.RUnlock()
+ return pool.stats()
+}
+
+// stats retrieves the current pool stats, namely the number of pending and the
+// number of queued (non-executable) transactions.
+func (pool *TxPool) stats() (int, int) {
+ pending := 0
for _, list := range pool.pending {
pending += list.Len()
}
+ queued := 0
for _, list := range pool.queue {
queued += list.Len()
}
- return
+ return pending, queued
}
// Content retrieves the data content of the transaction pool, returning all the
@@ -260,16 +315,16 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
func (pool *TxPool) SetLocal(tx *types.Transaction) {
pool.mu.Lock()
defer pool.mu.Unlock()
- pool.localTx.add(tx.Hash())
+ pool.locals.add(tx.Hash())
}
// validateTx checks whether a transaction is valid according
// to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error {
- local := pool.localTx.contains(tx.Hash())
+ local := pool.locals.contains(tx.Hash())
// Drop transactions under our own minimal accepted gas price
- if !local && pool.minGasPrice.Cmp(tx.GasPrice()) > 0 {
- return ErrCheap
+ if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
+ return ErrUnderpriced
}
currentState, err := pool.currentState()
@@ -314,31 +369,72 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error {
}
// add validates a transaction and inserts it into the non-executable queue for
-// later pending promotion and execution.
-func (pool *TxPool) add(tx *types.Transaction) error {
+// later pending promotion and execution. If the transaction is a replacement for
+// an already pending or queued one, it overwrites the previous and returns this
+// so outer code doesn't uselessly call promote.
+func (pool *TxPool) add(tx *types.Transaction) (bool, error) {
// If the transaction is already known, discard it
hash := tx.Hash()
if pool.all[hash] != nil {
log.Trace("Discarding already known transaction", "hash", hash)
- return fmt.Errorf("known transaction: %x", hash)
+ return false, fmt.Errorf("known transaction: %x", hash)
}
- // Otherwise ensure basic validation passes and queue it up
+ // If the transaction fails basic validation, discard it
if err := pool.validateTx(tx); err != nil {
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
invalidTxCounter.Inc(1)
- return err
+ return false, err
+ }
+ // If the transaction pool is full, discard underpriced transactions
+ if uint64(len(pool.all)) >= maxPendingTotal+maxQueuedTotal {
+ // If the new transaction is underpriced, don't accept it
+ if pool.priced.Underpriced(tx, pool.locals) {
+ log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
+ underpricedTxCounter.Inc(1)
+ return false, ErrUnderpriced
+ }
+ // New transaction is better than our worse ones, make room for it
+ drop := pool.priced.Discard(len(pool.all)-int(maxPendingTotal+maxQueuedTotal-1), pool.locals)
+ for _, tx := range drop {
+ log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
+ underpricedTxCounter.Inc(1)
+ pool.removeTx(tx.Hash())
+ }
}
- pool.enqueueTx(hash, tx)
+ // If the transaction is replacing an already pending one, do directly
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
+ // Nonce already pending, check if required price bump is met
+ inserted, old := list.Add(tx)
+ if !inserted {
+ pendingDiscardCounter.Inc(1)
+ return false, ErrReplaceUnderpriced
+ }
+ // New transaction is better, replace old one
+ if old != nil {
+ delete(pool.all, old.Hash())
+ pool.priced.Removed()
+ pendingReplaceCounter.Inc(1)
+ }
+ pool.all[tx.Hash()] = tx
+ pool.priced.Put(tx)
- // Print a log message if low enough level is set
- log.Debug("Pooled new transaction", "hash", hash, "from", log.Lazy{Fn: func() common.Address { from, _ := types.Sender(pool.signer, tx); return from }}, "to", tx.To())
- return nil
+ log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
+ return old != nil, nil
+ }
+ // New transaction isn't replacing a pending one, push into queue
+ replace, err := pool.enqueueTx(hash, tx)
+ if err != nil {
+ return false, err
+ }
+ log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
+ return replace, nil
}
// enqueueTx inserts a new transaction into the non-executable transaction queue.
//
// Note, this method assumes the pool lock is held!
-func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) {
+func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil {
@@ -346,15 +442,19 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) {
}
inserted, old := pool.queue[from].Add(tx)
if !inserted {
+ // An older transaction was better, discard this
queuedDiscardCounter.Inc(1)
- return // An older transaction was better, discard this
+ return false, ErrReplaceUnderpriced
}
// Discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
+ pool.priced.Removed()
queuedReplaceCounter.Inc(1)
}
pool.all[hash] = tx
+ pool.priced.Put(tx)
+ return old != nil, nil
}
// promoteTx adds a transaction to the pending (processable) list of transactions.
@@ -371,16 +471,23 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
if !inserted {
// An older transaction was better, discard this
delete(pool.all, hash)
+ pool.priced.Removed()
+
pendingDiscardCounter.Inc(1)
return
}
// Otherwise discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
+ pool.priced.Removed()
+
pendingReplaceCounter.Inc(1)
}
- pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests)
-
+ // Failsafe to work around direct pending inserts (tests)
+ if pool.all[hash] == nil {
+ pool.all[hash] = tx
+ pool.priced.Put(tx)
+ }
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
@@ -392,16 +499,19 @@ func (pool *TxPool) Add(tx *types.Transaction) error {
pool.mu.Lock()
defer pool.mu.Unlock()
- if err := pool.add(tx); err != nil {
+ // Try to inject the transaction and update any state
+ replace, err := pool.add(tx)
+ if err != nil {
return err
}
-
state, err := pool.currentState()
if err != nil {
return err
}
- pool.promoteExecutables(state)
-
+ // If we added a new transaction, run promotion checks and return
+ if !replace {
+ pool.promoteExecutables(state)
+ }
return nil
}
@@ -411,10 +521,13 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) error {
defer pool.mu.Unlock()
// Add the batch of transaction, tracking the accepted ones
- added := 0
+ replaced, added := true, 0
for _, tx := range txs {
- if err := pool.add(tx); err == nil {
+ if replace, err := pool.add(tx); err == nil {
added++
+ if !replace {
+ replaced = false
+ }
}
}
// Only reprocess the internal state if something was actually added
@@ -423,7 +536,9 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) error {
if err != nil {
return err
}
- pool.promoteExecutables(state)
+ if !replaced {
+ pool.promoteExecutables(state)
+ }
}
return nil
}
@@ -467,6 +582,7 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// Remove it from the list of known transactions
delete(pool.all, hash)
+ pool.priced.Removed()
// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
@@ -506,28 +622,31 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) {
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(state.GetNonce(addr)) {
hash := tx.Hash()
- log.Debug("Removed old queued transaction", "hash", hash)
+ log.Trace("Removed old queued transaction", "hash", hash)
delete(pool.all, hash)
+ pool.priced.Removed()
}
// Drop all transactions that are too costly (low balance)
drops, _ := list.Filter(state.GetBalance(addr))
for _, tx := range drops {
hash := tx.Hash()
- log.Debug("Removed unpayable queued transaction", "hash", hash)
+ log.Trace("Removed unpayable queued transaction", "hash", hash)
delete(pool.all, hash)
+ pool.priced.Removed()
queuedNofundsCounter.Inc(1)
}
// Gather all executable transactions and promote them
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
hash := tx.Hash()
- log.Debug("Promoting queued transaction", "hash", hash)
+ log.Trace("Promoting queued transaction", "hash", hash)
pool.promoteTx(addr, hash, tx)
}
// Drop all transactions over the allowed limit
for _, tx := range list.Cap(int(maxQueuedPerAccount)) {
hash := tx.Hash()
- log.Debug("Removed cap-exceeding queued transaction", "hash", hash)
+ log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
delete(pool.all, hash)
+ pool.priced.Removed()
queuedRLCounter.Inc(1)
}
queued += uint64(list.Len())
@@ -551,7 +670,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) {
if uint64(list.Len()) > minPendingPerAccount {
// Skip local accounts as pools should maintain backlogs for themselves
for _, tx := range list.txs.items {
- if !pool.localTx.contains(tx.Hash()) {
+ if !pool.locals.contains(tx.Hash()) {
spammers.Push(addr, float32(list.Len()))
}
break // Checking on transaction for locality is enough
@@ -593,7 +712,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) {
pendingRLCounter.Inc(int64(pendingBeforeCap - pending))
}
// If we've queued more transactions than the hard limit, drop oldest ones
- if queued > maxQueuedInTotal {
+ if queued > maxQueuedTotal {
// Sort all accounts with queued transactions by heartbeat
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
@@ -602,7 +721,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) {
sort.Sort(addresses)
// Drop transactions until the total is below the limit
- for drop := queued - maxQueuedInTotal; drop > 0; {
+ for drop := queued - maxQueuedTotal; drop > 0; {
addr := addresses[len(addresses)-1]
list := pool.queue[addr.address]
@@ -639,20 +758,22 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(nonce) {
hash := tx.Hash()
- log.Debug("Removed old pending transaction", "hash", hash)
+ log.Trace("Removed old pending transaction", "hash", hash)
delete(pool.all, hash)
+ pool.priced.Removed()
}
// Drop all transactions that are too costly (low balance), and queue any invalids back for later
drops, invalids := list.Filter(state.GetBalance(addr))
for _, tx := range drops {
hash := tx.Hash()
- log.Debug("Removed unpayable pending transaction", "hash", hash)
+ log.Trace("Removed unpayable pending transaction", "hash", hash)
delete(pool.all, hash)
+ pool.priced.Removed()
pendingNofundsCounter.Inc(1)
}
for _, tx := range invalids {
hash := tx.Hash()
- log.Debug("Demoting pending transaction", "hash", hash)
+ log.Trace("Demoting pending transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
// Delete the entire queue entry if it became empty.
diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go
index 765577933..e773daa2c 100644
--- a/core/tx_pool_test.go
+++ b/core/tx_pool_test.go
@@ -33,7 +33,11 @@ import (
)
func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction {
- tx, _ := types.SignTx(types.NewTransaction(nonce, common.Address{}, big.NewInt(100), gaslimit, big.NewInt(1), nil), types.HomesteadSigner{}, key)
+ return pricedTransaction(nonce, gaslimit, big.NewInt(1), key)
+}
+
+func pricedTransaction(nonce uint64, gaslimit, gasprice *big.Int, key *ecdsa.PrivateKey) *types.Transaction {
+ tx, _ := types.SignTx(types.NewTransaction(nonce, common.Address{}, big.NewInt(100), gaslimit, gasprice, nil), types.HomesteadSigner{}, key)
return tx
}
@@ -151,9 +155,9 @@ func TestInvalidTransactions(t *testing.T) {
}
tx = transaction(1, big.NewInt(100000), key)
- pool.minGasPrice = big.NewInt(1000)
- if err := pool.Add(tx); err != ErrCheap {
- t.Error("expected", ErrCheap, "got", err)
+ pool.gasPrice = big.NewInt(1000)
+ if err := pool.Add(tx); err != ErrUnderpriced {
+ t.Error("expected", ErrUnderpriced, "got", err)
}
pool.SetLocal(tx)
@@ -262,14 +266,14 @@ func TestTransactionChainFork(t *testing.T) {
resetState()
tx := transaction(0, big.NewInt(100000), key)
- if err := pool.add(tx); err != nil {
+ if _, err := pool.add(tx); err != nil {
t.Error("didn't expect error", err)
}
pool.RemoveBatch([]*types.Transaction{tx})
// reset the pool's internal state
resetState()
- if err := pool.add(tx); err != nil {
+ if _, err := pool.add(tx); err != nil {
t.Error("didn't expect error", err)
}
}
@@ -293,11 +297,11 @@ func TestTransactionDoubleNonce(t *testing.T) {
tx3, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), big.NewInt(1000000), big.NewInt(1), nil), signer, key)
// Add the first two transaction, ensure higher priced stays only
- if err := pool.add(tx1); err != nil {
- t.Error("didn't expect error", err)
+ if replace, err := pool.add(tx1); err != nil || replace {
+ t.Errorf("first transaction insert failed (%v) or reported replacement (%v)", err, replace)
}
- if err := pool.add(tx2); err != nil {
- t.Error("didn't expect error", err)
+ if replace, err := pool.add(tx2); err != nil || !replace {
+ t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace)
}
state, _ := pool.currentState()
pool.promoteExecutables(state)
@@ -308,9 +312,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash())
}
// Add the thid transaction and ensure it's not saved (smaller price)
- if err := pool.add(tx3); err != nil {
- t.Error("didn't expect error", err)
- }
+ pool.add(tx3)
pool.promoteExecutables(state)
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
@@ -330,7 +332,7 @@ func TestMissingNonce(t *testing.T) {
currentState, _ := pool.currentState()
currentState.AddBalance(addr, big.NewInt(100000000000000))
tx := transaction(1, big.NewInt(100000), key)
- if err := pool.add(tx); err != nil {
+ if _, err := pool.add(tx); err != nil {
t.Error("didn't expect error", err)
}
if len(pool.pending) != 0 {
@@ -557,8 +559,8 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
// some threshold, the higher transactions are dropped to prevent DOS attacks.
func TestTransactionQueueGlobalLimiting(t *testing.T) {
// Reduce the queue limits to shorten test time
- defer func(old uint64) { maxQueuedInTotal = old }(maxQueuedInTotal)
- maxQueuedInTotal = maxQueuedPerAccount * 3
+ defer func(old uint64) { maxQueuedTotal = old }(maxQueuedTotal)
+ maxQueuedTotal = maxQueuedPerAccount * 3
// Create the pool to test the limit enforcement with
db, _ := ethdb.NewMemDatabase()
@@ -578,7 +580,7 @@ func TestTransactionQueueGlobalLimiting(t *testing.T) {
// Generate and queue a batch of transactions
nonces := make(map[common.Address]uint64)
- txs := make(types.Transactions, 0, 3*maxQueuedInTotal)
+ txs := make(types.Transactions, 0, 3*maxQueuedTotal)
for len(txs) < cap(txs) {
key := keys[rand.Intn(len(keys))]
addr := crypto.PubkeyToAddress(key.PublicKey)
@@ -596,8 +598,8 @@ func TestTransactionQueueGlobalLimiting(t *testing.T) {
}
queued += list.Len()
}
- if queued > int(maxQueuedInTotal) {
- t.Fatalf("total transactions overflow allowance: %d > %d", queued, maxQueuedInTotal)
+ if queued > int(maxQueuedTotal) {
+ t.Fatalf("total transactions overflow allowance: %d > %d", queued, maxQueuedTotal)
}
}
@@ -791,6 +793,227 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
}
}
+// Tests that setting the transaction pool gas price to a higher value correctly
+// discards everything cheaper than that and moves any gapped transactions back
+// from the pending pool to the queue.
+//
+// Note, local transactions are never allowed to be dropped.
+func TestTransactionPoolRepricing(t *testing.T) {
+ // Create the pool to test the pricing enforcement with
+ db, _ := ethdb.NewMemDatabase()
+ statedb, _ := state.New(common.Hash{}, db)
+
+ pool := NewTxPool(params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+ pool.resetState()
+
+ // Create a number of test accounts and fund them
+ state, _ := pool.currentState()
+
+ keys := make([]*ecdsa.PrivateKey, 3)
+ for i := 0; i < len(keys); i++ {
+ keys[i], _ = crypto.GenerateKey()
+ state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
+ }
+ // Generate and queue a batch of transactions, both pending and queued
+ txs := types.Transactions{}
+
+ txs = append(txs, pricedTransaction(0, big.NewInt(100000), big.NewInt(2), keys[0]))
+ txs = append(txs, pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[0]))
+ txs = append(txs, pricedTransaction(2, big.NewInt(100000), big.NewInt(2), keys[0]))
+
+ txs = append(txs, pricedTransaction(1, big.NewInt(100000), big.NewInt(2), keys[1]))
+ txs = append(txs, pricedTransaction(2, big.NewInt(100000), big.NewInt(1), keys[1]))
+ txs = append(txs, pricedTransaction(3, big.NewInt(100000), big.NewInt(2), keys[1]))
+
+ txs = append(txs, pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[2]))
+ pool.SetLocal(txs[len(txs)-1]) // prevent this one from ever being dropped
+
+ // Import the batch and that both pending and queued transactions match up
+ pool.AddBatch(txs)
+
+ pending, queued := pool.stats()
+ if pending != 4 {
+ t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4)
+ }
+ if queued != 3 {
+ t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
+ }
+ // Reprice the pool and check that underpriced transactions get dropped
+ pool.SetGasPrice(big.NewInt(2))
+
+ pending, queued = pool.stats()
+ if pending != 2 {
+ t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
+ }
+ if queued != 3 {
+ t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
+ }
+ // Check that we can't add the old transactions back
+ if err := pool.Add(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[0])); err != ErrUnderpriced {
+ t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
+ }
+ if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced {
+ t.Fatalf("adding underpriced queued transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
+ }
+ // However we can add local underpriced transactions
+ tx := pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[2])
+
+ pool.SetLocal(tx) // prevent this one from ever being dropped
+ if err := pool.Add(tx); err != nil {
+ t.Fatalf("failed to add underpriced local transaction: %v", err)
+ }
+ if pending, _ = pool.stats(); pending != 3 {
+ t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
+ }
+}
+
+// Tests that when the pool reaches its global transaction limit, underpriced
+// transactions are gradually shifted out for more expensive ones and any gapped
+// pending transactions are moved into te queue.
+//
+// Note, local transactions are never allowed to be dropped.
+func TestTransactionPoolUnderpricing(t *testing.T) {
+ // Reduce the queue limits to shorten test time
+ defer func(old uint64) { maxPendingTotal = old }(maxPendingTotal)
+ maxPendingTotal = 2
+
+ defer func(old uint64) { maxQueuedTotal = old }(maxQueuedTotal)
+ maxQueuedTotal = 2
+
+ // Create the pool to test the pricing enforcement with
+ db, _ := ethdb.NewMemDatabase()
+ statedb, _ := state.New(common.Hash{}, db)
+
+ pool := NewTxPool(params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+ pool.resetState()
+
+ // Create a number of test accounts and fund them
+ state, _ := pool.currentState()
+
+ keys := make([]*ecdsa.PrivateKey, 3)
+ for i := 0; i < len(keys); i++ {
+ keys[i], _ = crypto.GenerateKey()
+ state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
+ }
+ // Generate and queue a batch of transactions, both pending and queued
+ txs := types.Transactions{}
+
+ txs = append(txs, pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[0]))
+ txs = append(txs, pricedTransaction(1, big.NewInt(100000), big.NewInt(2), keys[0]))
+
+ txs = append(txs, pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[1]))
+
+ txs = append(txs, pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[2]))
+ pool.SetLocal(txs[len(txs)-1]) // prevent this one from ever being dropped
+
+ // Import the batch and that both pending and queued transactions match up
+ pool.AddBatch(txs)
+
+ pending, queued := pool.stats()
+ if pending != 3 {
+ t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
+ }
+ if queued != 1 {
+ t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
+ }
+ // Ensure that adding an underpriced transaction on block limit fails
+ if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced {
+ t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
+ }
+ // Ensure that adding high priced transactions drops cheap ones, but not own
+ if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(3), keys[1])); err != nil {
+ t.Fatalf("failed to add well priced transaction: %v", err)
+ }
+ if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(4), keys[1])); err != nil {
+ t.Fatalf("failed to add well priced transaction: %v", err)
+ }
+ if err := pool.Add(pricedTransaction(3, big.NewInt(100000), big.NewInt(5), keys[1])); err != nil {
+ t.Fatalf("failed to add well priced transaction: %v", err)
+ }
+ pending, queued = pool.stats()
+ if pending != 2 {
+ t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
+ }
+ if queued != 2 {
+ t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
+ }
+ // Ensure that adding local transactions can push out even higher priced ones
+ tx := pricedTransaction(1, big.NewInt(100000), big.NewInt(0), keys[2])
+
+ pool.SetLocal(tx) // prevent this one from ever being dropped
+ if err := pool.Add(tx); err != nil {
+ t.Fatalf("failed to add underpriced local transaction: %v", err)
+ }
+ pending, queued = pool.stats()
+ if pending != 2 {
+ t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
+ }
+ if queued != 2 {
+ t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
+ }
+}
+
+// Tests that the pool rejects replacement transactions that don't meet the minimum
+// price bump required.
+func TestTransactionReplacement(t *testing.T) {
+ // Create the pool to test the pricing enforcement with
+ db, _ := ethdb.NewMemDatabase()
+ statedb, _ := state.New(common.Hash{}, db)
+
+ pool := NewTxPool(params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
+ pool.resetState()
+
+ // Create a a test account to add transactions with
+ key, _ := crypto.GenerateKey()
+
+ state, _ := pool.currentState()
+ state.AddBalance(crypto.PubkeyToAddress(key.PublicKey), big.NewInt(1000000000))
+
+ // Add pending transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too)
+ price := int64(100)
+ threshold := (price * (100 + minPriceBumpPercent)) / 100
+
+ if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), key)); err != nil {
+ t.Fatalf("failed to add original cheap pending transaction: %v", err)
+ }
+ if err := pool.Add(pricedTransaction(0, big.NewInt(100001), big.NewInt(1), key)); err != ErrReplaceUnderpriced {
+ t.Fatalf("original cheap pending transaction replacement error mismatch: have %v, want %v", err, ErrReplaceUnderpriced)
+ }
+ if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(2), key)); err != nil {
+ t.Fatalf("failed to replace original cheap pending transaction: %v", err)
+ }
+
+ if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(price), key)); err != nil {
+ t.Fatalf("failed to add original proper pending transaction: %v", err)
+ }
+ if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(threshold), key)); err != ErrReplaceUnderpriced {
+ t.Fatalf("original proper pending transaction replacement error mismatch: have %v, want %v", err, ErrReplaceUnderpriced)
+ }
+ if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil {
+ t.Fatalf("failed to replace original proper pending transaction: %v", err)
+ }
+ // Add queued transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too)
+ if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), key)); err != nil {
+ t.Fatalf("failed to add original queued transaction: %v", err)
+ }
+ if err := pool.Add(pricedTransaction(2, big.NewInt(100001), big.NewInt(1), key)); err != ErrReplaceUnderpriced {
+ t.Fatalf("original queued transaction replacement error mismatch: have %v, want %v", err, ErrReplaceUnderpriced)
+ }
+ if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(2), key)); err != nil {
+ t.Fatalf("failed to replace original queued transaction: %v", err)
+ }
+
+ if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(price), key)); err != nil {
+ t.Fatalf("failed to add original queued transaction: %v", err)
+ }
+ if err := pool.Add(pricedTransaction(2, big.NewInt(100001), big.NewInt(threshold), key)); err != ErrReplaceUnderpriced {
+ t.Fatalf("original queued transaction replacement error mismatch: have %v, want %v", err, ErrReplaceUnderpriced)
+ }
+ if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil {
+ t.Fatalf("failed to replace original queued transaction: %v", err)
+ }
+}
+
// Benchmarks the speed of validating the contents of the pending queue of the
// transaction pool.
func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) }