diff options
-rw-r--r-- | core/tx_pool.go | 105 | ||||
-rw-r--r-- | core/tx_pool_test.go | 135 | ||||
-rw-r--r-- | light/txpool.go | 2 |
3 files changed, 200 insertions, 42 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index 04ffa8a98..2f3cd1e93 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -35,16 +35,41 @@ import ( ) var ( - // Transaction Pool Errors - ErrInvalidSender = errors.New("invalid sender") - ErrNonce = errors.New("nonce too low") - ErrUnderpriced = errors.New("transaction underpriced") + // ErrInvalidSender is returned if the transaction contains an invalid signature. + ErrInvalidSender = errors.New("invalid sender") + + // ErrNonceTooLow is returned if the nonce of a transaction is lower than the + // one present in the local chain. + ErrNonceTooLow = errors.New("nonce too low") + + // ErrUnderpriced is returned if a transaction's gas price is below the minimum + // configured for the transaction pool. + ErrUnderpriced = errors.New("transaction underpriced") + + // ErrReplaceUnderpriced is returned if a transaction is attempted to be replaced + // with a different one without the required price bump. ErrReplaceUnderpriced = errors.New("replacement transaction underpriced") - ErrBalance = errors.New("insufficient balance") - ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value") - ErrIntrinsicGas = errors.New("intrinsic gas too low") - ErrGasLimit = errors.New("exceeds block gas limit") - ErrNegativeValue = errors.New("negative value") + + // ErrInsufficientFunds is returned if the total cost of executing a transaction + // is higher than the balance of the user's account. + ErrInsufficientFunds = errors.New("insufficient funds for gas * price + value") + + // ErrIntrinsicGas is returned if the transaction is specified to use less gas + // than required to start the invocation. + ErrIntrinsicGas = errors.New("intrinsic gas too low") + + // ErrGasLimit is returned if a transaction's requested gas limit exceeds the + // maximum allowance of the current block. + ErrGasLimit = errors.New("exceeds block gas limit") + + // ErrNegativeValue is a sanity error to ensure noone is able to specify a + // transaction with a negative value. + ErrNegativeValue = errors.New("negative value") + + // ErrOversizedData is returned if the input data of a transaction is greater + // than some meaningful limit a user might use. This is not a consensus error + // making the transaction invalid, rather a DOS protection. + ErrOversizedData = errors.New("oversized data") ) var ( @@ -54,16 +79,16 @@ var ( var ( // Metrics for the pending pool - pendingDiscardCounter = metrics.NewCounter("txpool/pending/discard") - pendingReplaceCounter = metrics.NewCounter("txpool/pending/replace") - pendingRLCounter = metrics.NewCounter("txpool/pending/ratelimit") // Dropped due to rate limiting - pendingNofundsCounter = metrics.NewCounter("txpool/pending/nofunds") // Dropped due to out-of-funds + pendingDiscardCounter = metrics.NewCounter("txpool/pending/discard") + pendingReplaceCounter = metrics.NewCounter("txpool/pending/replace") + pendingRateLimitCounter = metrics.NewCounter("txpool/pending/ratelimit") // Dropped due to rate limiting + pendingNofundsCounter = metrics.NewCounter("txpool/pending/nofunds") // Dropped due to out-of-funds // Metrics for the queued pool - queuedDiscardCounter = metrics.NewCounter("txpool/queued/discard") - queuedReplaceCounter = metrics.NewCounter("txpool/queued/replace") - queuedRLCounter = metrics.NewCounter("txpool/queued/ratelimit") // Dropped due to rate limiting - queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds") // Dropped due to out-of-funds + queuedDiscardCounter = metrics.NewCounter("txpool/queued/discard") + queuedReplaceCounter = metrics.NewCounter("txpool/queued/replace") + queuedRateLimitCounter = metrics.NewCounter("txpool/queued/ratelimit") // Dropped due to rate limiting + queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds") // Dropped due to out-of-funds // General tx metrics invalidTxCounter = metrics.NewCounter("txpool/invalid") @@ -374,7 +399,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { } // Last but not least check for nonce errors if currentState.GetNonce(from) > tx.Nonce() { - return ErrNonce + return ErrNonceTooLow } // Check the transaction doesn't exceed the current @@ -395,12 +420,15 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { return ErrInsufficientFunds } - intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead) if tx.Gas().Cmp(intrGas) < 0 { return ErrIntrinsicGas } + // Heuristic limit, reject transactions over 32KB to prevent DOS attacks + if tx.Size() > 32*1024 { + return ErrOversizedData + } return nil } @@ -638,8 +666,9 @@ func (pool *TxPool) removeTx(hash common.Hash) { } // Update the account nonce if needed if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { - pool.pendingState.SetNonce(addr, tx.Nonce()) + pool.pendingState.SetNonce(addr, nonce) } + return } } // Transaction is in the future queue @@ -696,10 +725,10 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A // Drop all transactions over the allowed limit for _, tx := range list.Cap(int(pool.config.AccountQueue)) { hash := tx.Hash() - log.Trace("Removed cap-exceeding queued transaction", "hash", hash) delete(pool.all, hash) pool.priced.Removed() - queuedRLCounter.Inc(1) + queuedRateLimitCounter.Inc(1) + log.Trace("Removed cap-exceeding queued transaction", "hash", hash) } queued += uint64(list.Len()) @@ -745,7 +774,18 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold { for i := 0; i < len(offenders)-1; i++ { list := pool.pending[offenders[i]] - list.Cap(list.Len() - 1) + for _, tx := range list.Cap(list.Len() - 1) { + // Drop the transaction from the global pools too + hash := tx.Hash() + delete(pool.all, hash) + pool.priced.Removed() + + // Update the account nonce to the dropped transaction + if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce { + pool.pendingState.SetNonce(offenders[i], nonce) + } + log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) + } pending-- } } @@ -756,12 +796,23 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots { for _, addr := range offenders { list := pool.pending[addr] - list.Cap(list.Len() - 1) + for _, tx := range list.Cap(list.Len() - 1) { + // Drop the transaction from the global pools too + hash := tx.Hash() + delete(pool.all, hash) + pool.priced.Removed() + + // Update the account nonce to the dropped transaction + if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { + pool.pendingState.SetNonce(addr, nonce) + } + log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) + } pending-- } } } - pendingRLCounter.Inc(int64(pendingBeforeCap - pending)) + pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending)) } // If we've queued more transactions than the hard limit, drop oldest ones if queued > pool.config.GlobalQueue { @@ -785,7 +836,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A pool.removeTx(tx.Hash()) } drop -= size - queuedRLCounter.Inc(int64(size)) + queuedRateLimitCounter.Inc(int64(size)) continue } // Otherwise drop only last few transactions @@ -793,7 +844,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A for i := len(txs) - 1; i >= 0 && drop > 0; i-- { pool.removeTx(txs[i].Hash()) drop-- - queuedRLCounter.Inc(1) + queuedRateLimitCounter.Inc(1) } } } diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 94b07170d..4e28522e9 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -18,6 +18,7 @@ package core import ( "crypto/ecdsa" + "fmt" "math/big" "math/rand" "testing" @@ -52,6 +53,35 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) { return newPool, key } +// validateTxPoolInternals checks various consistency invariants within the pool. +func validateTxPoolInternals(pool *TxPool) error { + pool.mu.RLock() + defer pool.mu.RUnlock() + + // Ensure the total transaction set is consistent with pending + queued + pending, queued := pool.stats() + if total := len(pool.all); total != pending+queued { + return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued) + } + if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued { + return fmt.Errorf("total priced transaction count %d != %d pending + %d queued", priced, pending, queued) + } + // Ensure the next nonce to assign is the correct one + for addr, txs := range pool.pending { + // Find the last transaction + var last uint64 + for nonce, _ := range txs.txs.items { + if last < nonce { + last = nonce + } + } + if nonce := pool.pendingState.GetNonce(addr); nonce != last+1 { + return fmt.Errorf("pending nonce mismatch: have %v, want %v", nonce, last+1) + } + } + return nil +} + func deriveSender(tx *types.Transaction) (common.Address, error) { return types.Sender(types.HomesteadSigner{}, tx) } @@ -150,8 +180,8 @@ func TestInvalidTransactions(t *testing.T) { currentState.SetNonce(from, 1) currentState.AddBalance(from, big.NewInt(0xffffffffffffff)) tx = transaction(0, big.NewInt(100000), key) - if err := pool.Add(tx); err != ErrNonce { - t.Error("expected", ErrNonce) + if err := pool.Add(tx); err != ErrNonceTooLow { + t.Error("expected", ErrNonceTooLow) } tx = transaction(1, big.NewInt(100000), key) @@ -218,20 +248,25 @@ func TestTransactionQueue(t *testing.T) { func TestRemoveTx(t *testing.T) { pool, key := setupTxPool() - tx := transaction(0, big.NewInt(100), key) - from, _ := deriveSender(tx) + addr := crypto.PubkeyToAddress(key.PublicKey) currentState, _ := pool.currentState() - currentState.AddBalance(from, big.NewInt(1)) + currentState.AddBalance(addr, big.NewInt(1)) + + tx1 := transaction(0, big.NewInt(100), key) + tx2 := transaction(2, big.NewInt(100), key) + + pool.promoteTx(addr, tx1.Hash(), tx1) + pool.enqueueTx(tx2.Hash(), tx2) - pool.enqueueTx(tx.Hash(), tx) - pool.promoteTx(from, tx.Hash(), tx) if len(pool.queue) != 1 { t.Error("expected queue to be 1, got", len(pool.queue)) } if len(pool.pending) != 1 { t.Error("expected pending to be 1, got", len(pool.pending)) } - pool.Remove(tx.Hash()) + pool.Remove(tx1.Hash()) + pool.Remove(tx2.Hash()) + if len(pool.queue) > 0 { t.Error("expected queue to be 0, got", len(pool.queue)) } @@ -404,10 +439,10 @@ func TestTransactionDropping(t *testing.T) { ) pool.promoteTx(account, tx0.Hash(), tx0) pool.promoteTx(account, tx1.Hash(), tx1) - pool.promoteTx(account, tx1.Hash(), tx2) + pool.promoteTx(account, tx2.Hash(), tx2) pool.enqueueTx(tx10.Hash(), tx10) pool.enqueueTx(tx11.Hash(), tx11) - pool.enqueueTx(tx11.Hash(), tx12) + pool.enqueueTx(tx12.Hash(), tx12) // Check that pre and post validations leave the pool as is if pool.pending[account].Len() != 3 { @@ -416,8 +451,8 @@ func TestTransactionDropping(t *testing.T) { if pool.queue[account].Len() != 3 { t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3) } - if len(pool.all) != 4 { - t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) + if len(pool.all) != 6 { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6) } pool.resetState() if pool.pending[account].Len() != 3 { @@ -426,8 +461,8 @@ func TestTransactionDropping(t *testing.T) { if pool.queue[account].Len() != 3 { t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3) } - if len(pool.all) != 4 { - t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4) + if len(pool.all) != 6 { + t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6) } // Reduce the balance of the account, and check that invalidated transactions are dropped state.AddBalance(account, big.NewInt(-650)) @@ -730,6 +765,12 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { if len(pool1.all) != len(pool2.all) { t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", len(pool1.all), len(pool2.all)) } + if err := validateTxPoolInternals(pool1); err != nil { + t.Errorf("pool 1 internal state corrupted: %v", err) + } + if err := validateTxPoolInternals(pool2); err != nil { + t.Errorf("pool 2 internal state corrupted: %v", err) + } } // Tests that if the transaction count belonging to multiple accounts go above @@ -776,6 +817,45 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { if pending > int(DefaultTxPoolConfig.GlobalSlots) { t.Fatalf("total pending transactions overflow allowance: %d > %d", pending, DefaultTxPoolConfig.GlobalSlots) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } +} + +// Tests that if transactions start being capped, transasctions are also removed from 'all' +func TestTransactionCapClearsFromAll(t *testing.T) { + // Reduce the queue limits to shorten test time + defer func(old uint64) { DefaultTxPoolConfig.AccountSlots = old }(DefaultTxPoolConfig.AccountSlots) + defer func(old uint64) { DefaultTxPoolConfig.AccountQueue = old }(DefaultTxPoolConfig.AccountQueue) + defer func(old uint64) { DefaultTxPoolConfig.GlobalSlots = old }(DefaultTxPoolConfig.GlobalSlots) + + DefaultTxPoolConfig.AccountSlots = 2 + DefaultTxPoolConfig.AccountQueue = 2 + DefaultTxPoolConfig.GlobalSlots = 8 + + // Create the pool to test the limit enforcement with + db, _ := ethdb.NewMemDatabase() + statedb, _ := state.New(common.Hash{}, db) + + pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool.resetState() + + // Create a number of test accounts and fund them + state, _ := pool.currentState() + + key, _ := crypto.GenerateKey() + addr := crypto.PubkeyToAddress(key.PublicKey) + state.AddBalance(addr, big.NewInt(1000000)) + + txs := types.Transactions{} + for j := 0; j < int(DefaultTxPoolConfig.GlobalSlots)*2; j++ { + txs = append(txs, transaction(uint64(j), big.NewInt(100000), key)) + } + // Import the batch and verify that limits have been enforced + pool.AddBatch(txs) + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } } // Tests that if the transaction count belonging to multiple accounts go above @@ -820,6 +900,9 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { t.Errorf("addr %x: total pending transactions mismatch: have %d, want %d", addr, list.Len(), DefaultTxPoolConfig.AccountSlots) } } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } } // Tests that setting the transaction pool gas price to a higher value correctly @@ -867,6 +950,9 @@ func TestTransactionPoolRepricing(t *testing.T) { if queued != 3 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } // Reprice the pool and check that underpriced transactions get dropped pool.SetGasPrice(big.NewInt(2)) @@ -877,6 +963,9 @@ func TestTransactionPoolRepricing(t *testing.T) { if queued != 3 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } // Check that we can't add the old transactions back if err := pool.Add(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[0])); err != ErrUnderpriced { t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced) @@ -884,6 +973,9 @@ func TestTransactionPoolRepricing(t *testing.T) { if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced { t.Fatalf("adding underpriced queued transaction error mismatch: have %v, want %v", err, ErrUnderpriced) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } // However we can add local underpriced transactions tx := pricedTransaction(1, big.NewInt(100000), big.NewInt(1), keys[2]) @@ -894,6 +986,9 @@ func TestTransactionPoolRepricing(t *testing.T) { if pending, _ = pool.stats(); pending != 3 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } } // Tests that when the pool reaches its global transaction limit, underpriced @@ -945,6 +1040,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if queued != 1 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } // Ensure that adding an underpriced transaction on block limit fails if err := pool.Add(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), keys[1])); err != ErrUnderpriced { t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced) @@ -966,6 +1064,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if queued != 2 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } // Ensure that adding local transactions can push out even higher priced ones tx := pricedTransaction(1, big.NewInt(100000), big.NewInt(0), keys[2]) @@ -980,6 +1081,9 @@ func TestTransactionPoolUnderpricing(t *testing.T) { if queued != 2 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } } // Tests that the pool rejects replacement transactions that don't meet the minimum @@ -1041,6 +1145,9 @@ func TestTransactionReplacement(t *testing.T) { if err := pool.Add(pricedTransaction(2, big.NewInt(100000), big.NewInt(threshold+1), key)); err != nil { t.Fatalf("failed to replace original queued transaction: %v", err) } + if err := validateTxPoolInternals(pool); err != nil { + t.Fatalf("pool internal state corrupted: %v", err) + } } // Benchmarks the speed of validating the contents of the pending queue of the diff --git a/light/txpool.go b/light/txpool.go index 446195806..7276874b8 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -360,7 +360,7 @@ func (pool *TxPool) validateTx(ctx context.Context, tx *types.Transaction) error currentState := pool.currentState() if n, err := currentState.GetNonce(ctx, from); err == nil { if n > tx.Nonce() { - return core.ErrNonce + return core.ErrNonceTooLow } } else { return err |