diff options
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 230 |
1 files changed, 123 insertions, 107 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index 2f3cd1e93..8e2d1b31d 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -99,6 +99,8 @@ type stateFn func() (*state.StateDB, error) // TxPoolConfig are the configuration parameters of the transaction pool. type TxPoolConfig struct { + NoLocals bool // Whether local transaction handling should be disabled + PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) @@ -155,7 +157,7 @@ type TxPool struct { gasPrice *big.Int eventMux *event.TypeMux events *event.TypeMuxSubscription - locals *txSet + locals *accountSet signer types.Signer mu sync.RWMutex @@ -191,10 +193,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e gasLimit: gasLimitFn, gasPrice: new(big.Int).SetUint64(config.PriceLimit), pendingState: nil, - locals: newTxSet(), events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), quit: make(chan struct{}), } + pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(&pool.all) pool.resetState() @@ -237,7 +239,7 @@ func (pool *TxPool) eventLoop() { pool.mu.Unlock() case RemovedTransactionEvent: - pool.AddBatch(ev.Txs) + pool.addTxs(ev.Txs, false) } // Handle stats reporting ticks @@ -371,50 +373,40 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { return pending, nil } -// SetLocal marks a transaction as local, skipping gas price -// check against local miner minimum in the future -func (pool *TxPool) SetLocal(tx *types.Transaction) { - pool.mu.Lock() - defer pool.mu.Unlock() - pool.locals.add(tx.Hash()) -} - -// validateTx checks whether a transaction is valid according -// to the consensus rules. -func (pool *TxPool) validateTx(tx *types.Transaction) error { - local := pool.locals.contains(tx.Hash()) - // Drop transactions under our own minimal accepted gas price +// validateTx checks whether a transaction is valid according to the consensus +// rules and adheres to some heuristic limits of the local node (price and size). +func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { + // Heuristic limit, reject transactions over 32KB to prevent DOS attacks + if tx.Size() > 32*1024 { + return ErrOversizedData + } + // Transactions can't be negative. This may never happen using RLP decoded + // transactions but may occur if you create a transaction using the RPC. + if tx.Value().Sign() < 0 { + return ErrNegativeValue + } + // Ensure the transaction doesn't exceed the current block limit gas. + if pool.gasLimit().Cmp(tx.Gas()) < 0 { + return ErrGasLimit + } + // Make sure the transaction is signed properly + from, err := types.Sender(pool.signer, tx) + if err != nil { + return ErrInvalidSender + } + // Drop non-local transactions under our own minimal accepted gas price + local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { return ErrUnderpriced } - + // Ensure the transaction adheres to nonce ordering currentState, err := pool.currentState() if err != nil { return err } - - from, err := types.Sender(pool.signer, tx) - if err != nil { - return ErrInvalidSender - } - // Last but not least check for nonce errors if currentState.GetNonce(from) > tx.Nonce() { return ErrNonceTooLow } - - // Check the transaction doesn't exceed the current - // block limit gas. - if pool.gasLimit().Cmp(tx.Gas()) < 0 { - return ErrGasLimit - } - - // Transactions can't be negative. This may never happen - // using RLP decoded transactions but may occur if you create - // a transaction using the RPC for example. - if tx.Value().Sign() < 0 { - return ErrNegativeValue - } - // Transactor should have enough funds to cover the costs // cost == V + GP * GL if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { @@ -424,11 +416,6 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { if tx.Gas().Cmp(intrGas) < 0 { return ErrIntrinsicGas } - - // Heuristic limit, reject transactions over 32KB to prevent DOS attacks - if tx.Size() > 32*1024 { - return ErrOversizedData - } return nil } @@ -436,7 +423,11 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { // later pending promotion and execution. If the transaction is a replacement for // an already pending or queued one, it overwrites the previous and returns this // so outer code doesn't uselessly call promote. -func (pool *TxPool) add(tx *types.Transaction) (bool, error) { +// +// If a newly added transaction is marked as local, its sending account will be +// whitelisted, preventing any associated transaction from being dropped out of +// the pool due to pricing constraints. +func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // If the transaction is already known, discard it hash := tx.Hash() if pool.all[hash] != nil { @@ -444,7 +435,7 @@ func (pool *TxPool) add(tx *types.Transaction) (bool, error) { return false, fmt.Errorf("known transaction: %x", hash) } // If the transaction fails basic validation, discard it - if err := pool.validateTx(tx); err != nil { + if err := pool.validateTx(tx, local); err != nil { log.Trace("Discarding invalid transaction", "hash", hash, "err", err) invalidTxCounter.Inc(1) return false, err @@ -486,11 +477,14 @@ func (pool *TxPool) add(tx *types.Transaction) (bool, error) { log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) return old != nil, nil } - // New transaction isn't replacing a pending one, push into queue + // New transaction isn't replacing a pending one, push into queue and potentially mark local replace, err := pool.enqueueTx(hash, tx) if err != nil { return false, err } + if local { + pool.locals.add(from) + } log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replace, nil } @@ -558,13 +552,41 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T go pool.eventMux.Post(TxPreEvent{tx}) } -// Add queues a single transaction in the pool if it is valid. -func (pool *TxPool) Add(tx *types.Transaction) error { +// AddLocal enqueues a single transaction into the pool if it is valid, marking +// the sender as a local one in the mean time, ensuring it goes around the local +// pricing constraints. +func (pool *TxPool) AddLocal(tx *types.Transaction) error { + return pool.addTx(tx, !pool.config.NoLocals) +} + +// AddRemote enqueues a single transaction into the pool if it is valid. If the +// sender is not among the locally tracked ones, full pricing constraints will +// apply. +func (pool *TxPool) AddRemote(tx *types.Transaction) error { + return pool.addTx(tx, false) +} + +// AddLocals enqueues a batch of transactions into the pool if they are valid, +// marking the senders as a local ones in the mean time, ensuring they go around +// the local pricing constraints. +func (pool *TxPool) AddLocals(txs []*types.Transaction) error { + return pool.addTxs(txs, !pool.config.NoLocals) +} + +// AddRemotes enqueues a batch of transactions into the pool if they are valid. +// If the senders are not among the locally tracked ones, full pricing constraints +// will apply. +func (pool *TxPool) AddRemotes(txs []*types.Transaction) error { + return pool.addTxs(txs, false) +} + +// addTx enqueues a single transaction into the pool if it is valid. +func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { pool.mu.Lock() defer pool.mu.Unlock() // Try to inject the transaction and update any state - replace, err := pool.add(tx) + replace, err := pool.add(tx, local) if err != nil { return err } @@ -580,15 +602,15 @@ func (pool *TxPool) Add(tx *types.Transaction) error { return nil } -// AddBatch attempts to queue a batch of transactions. -func (pool *TxPool) AddBatch(txs []*types.Transaction) error { +// addTxs attempts to queue a batch of transactions if they are valid. +func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { pool.mu.Lock() defer pool.mu.Unlock() // Add the batch of transaction, tracking the accepted ones dirty := make(map[common.Address]struct{}) for _, tx := range txs { - if replace, err := pool.add(tx); err == nil { + if replace, err := pool.add(tx, local); err == nil { if !replace { from, _ := types.Sender(pool.signer, tx) // already validated dirty[from] = struct{}{} @@ -694,7 +716,6 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A } } // Iterate over all accounts and promote any executable transactions - queued := uint64(0) for _, addr := range accounts { list := pool.queue[addr] if list == nil { @@ -723,15 +744,15 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A pool.promoteTx(addr, hash, tx) } // Drop all transactions over the allowed limit - for _, tx := range list.Cap(int(pool.config.AccountQueue)) { - hash := tx.Hash() - delete(pool.all, hash) - pool.priced.Removed() - queuedRateLimitCounter.Inc(1) - log.Trace("Removed cap-exceeding queued transaction", "hash", hash) + if !pool.locals.contains(addr) { + for _, tx := range list.Cap(int(pool.config.AccountQueue)) { + hash := tx.Hash() + delete(pool.all, hash) + pool.priced.Removed() + queuedRateLimitCounter.Inc(1) + log.Trace("Removed cap-exceeding queued transaction", "hash", hash) + } } - queued += uint64(list.Len()) - // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.queue, addr) @@ -748,14 +769,8 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A spammers := prque.New() for addr, list := range pool.pending { // Only evict transactions from high rollers - if uint64(list.Len()) > pool.config.AccountSlots { - // Skip local accounts as pools should maintain backlogs for themselves - for _, tx := range list.txs.items { - if !pool.locals.contains(tx.Hash()) { - spammers.Push(addr, float32(list.Len())) - } - break // Checking on transaction for locality is enough - } + if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { + spammers.Push(addr, float32(list.Len())) } } // Gradually drop transactions from offenders @@ -815,16 +830,22 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending)) } // If we've queued more transactions than the hard limit, drop oldest ones + queued := uint64(0) + for _, list := range pool.queue { + queued += uint64(list.Len()) + } if queued > pool.config.GlobalQueue { // Sort all accounts with queued transactions by heartbeat addresses := make(addresssByHeartbeat, 0, len(pool.queue)) for addr := range pool.queue { - addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) + if !pool.locals.contains(addr) { // don't drop locals + addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) + } } sort.Sort(addresses) - // Drop transactions until the total is below the limit - for drop := queued - pool.config.GlobalQueue; drop > 0; { + // Drop transactions until the total is below the limit or only locals remain + for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; { addr := addresses[len(addresses)-1] list := pool.queue[addr.address] @@ -903,6 +924,11 @@ func (pool *TxPool) expirationLoop() { case <-evict.C: pool.mu.Lock() for addr := range pool.queue { + // Skip local transactions from the eviction mechanism + if pool.locals.contains(addr) { + continue + } + // Any non-locals old enough should be removed if time.Since(pool.beats[addr]) > pool.config.Lifetime { for _, tx := range pool.queue[addr].Flatten() { pool.removeTx(tx.Hash()) @@ -929,48 +955,38 @@ func (a addresssByHeartbeat) Len() int { return len(a) } func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) } func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// txSet represents a set of transaction hashes in which entries -// are automatically dropped after txSetDuration time -type txSet struct { - txMap map[common.Hash]struct{} - txOrd map[uint64]txOrdType - addPtr, delPtr uint64 -} - -const txSetDuration = time.Hour * 2 - -// txOrdType represents an entry in the time-ordered list of transaction hashes -type txOrdType struct { - hash common.Hash - time time.Time +// accountSet is simply a set of addresses to check for existance, and a signer +// capable of deriving addresses from transactions. +type accountSet struct { + accounts map[common.Address]struct{} + signer types.Signer } -// newTxSet creates a new transaction set -func newTxSet() *txSet { - return &txSet{ - txMap: make(map[common.Hash]struct{}), - txOrd: make(map[uint64]txOrdType), +// newAccountSet creates a new address set with an associated signer for sender +// derivations. +func newAccountSet(signer types.Signer) *accountSet { + return &accountSet{ + accounts: make(map[common.Address]struct{}), + signer: signer, } } -// contains returns true if the set contains the given transaction hash -// (not thread safe, should be called from a locked environment) -func (ts *txSet) contains(hash common.Hash) bool { - _, ok := ts.txMap[hash] - return ok +// contains checks if a given address is contained within the set. +func (as *accountSet) contains(addr common.Address) bool { + _, exist := as.accounts[addr] + return exist } -// add adds a transaction hash to the set, then removes entries older than txSetDuration -// (not thread safe, should be called from a locked environment) -func (ts *txSet) add(hash common.Hash) { - ts.txMap[hash] = struct{}{} - now := time.Now() - ts.txOrd[ts.addPtr] = txOrdType{hash: hash, time: now} - ts.addPtr++ - delBefore := now.Add(-txSetDuration) - for ts.delPtr < ts.addPtr && ts.txOrd[ts.delPtr].time.Before(delBefore) { - delete(ts.txMap, ts.txOrd[ts.delPtr].hash) - delete(ts.txOrd, ts.delPtr) - ts.delPtr++ +// containsTx checks if the sender of a given tx is within the set. If the sender +// cannot be derived, this method returns false. +func (as *accountSet) containsTx(tx *types.Transaction) bool { + if addr, err := types.Sender(as.signer, tx); err == nil { + return as.contains(addr) } + return false +} + +// add inserts a new address into the set to track. +func (as *accountSet) add(addr common.Address) { + as.accounts[addr] = struct{}{} } |