diff options
author | Péter Szilágyi <peterke@gmail.com> | 2017-07-05 21:51:55 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2017-07-05 21:51:55 +0800 |
commit | 48ee7f9de7da0455b80ee09f498dbce54127103a (patch) | |
tree | 1e16769e36f34efd5a22283b06e57e551ac948a0 /core/tx_pool.go | |
parent | a633a2d7ea8aadb1d435679449d002de880fab30 (diff) | |
download | go-tangerine-48ee7f9de7da0455b80ee09f498dbce54127103a.tar.gz go-tangerine-48ee7f9de7da0455b80ee09f498dbce54127103a.tar.zst go-tangerine-48ee7f9de7da0455b80ee09f498dbce54127103a.zip |
core, eth, les: polish txpool API around local/remote txs
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 157 |
1 files changed, 86 insertions, 71 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index 143689c59..a8018d74f 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -176,12 +176,12 @@ type TxPool struct { func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() - signer := types.NewEIP155Signer(chainconfig.ChainId) + // Create the transaction pool with its initial settings pool := &TxPool{ config: config, chainconfig: chainconfig, - signer: signer, + signer: types.NewEIP155Signer(chainconfig.ChainId), pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), @@ -191,10 +191,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *e gasLimit: gasLimitFn, gasPrice: new(big.Int).SetUint64(config.PriceLimit), pendingState: nil, - locals: newAccountSet(signer), events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}), quit: make(chan struct{}), } + pool.locals = newAccountSet(pool.signer) pool.priced = newTxPricedList(&pool.all) pool.resetState() @@ -237,7 +237,7 @@ func (pool *TxPool) eventLoop() { pool.mu.Unlock() case RemovedTransactionEvent: - pool.AddBatch(ev.Txs) + pool.addTxs(ev.Txs, false) } // Handle stats reporting ticks @@ -371,52 +371,40 @@ func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) { return pending, nil } -// SetLocal marks a transaction as local, skipping gas price -// check against local miner minimum in the future -func (pool *TxPool) SetLocal(tx *types.Transaction) { - pool.mu.Lock() - defer pool.mu.Unlock() - pool.locals.add(tx) -} - -// validateTx checks whether a transaction is valid according -// to the consensus rules. -func (pool *TxPool) validateTx(tx *types.Transaction) error { - +// validateTx checks whether a transaction is valid according to the consensus +// rules and adheres to some heuristic limits of the local node (price and size). +func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { + // Heuristic limit, reject transactions over 32KB to prevent DOS attacks + if tx.Size() > 32*1024 { + return ErrOversizedData + } + // Transactions can't be negative. This may never happen using RLP decoded + // transactions but may occur if you create a transaction using the RPC. + if tx.Value().Sign() < 0 { + return ErrNegativeValue + } + // Ensure the transaction doesn't exceed the current block limit gas. + if pool.gasLimit().Cmp(tx.Gas()) < 0 { + return ErrGasLimit + } + // Make sure the transaction is signed properly from, err := types.Sender(pool.signer, tx) if err != nil { return ErrInvalidSender } - - local := pool.locals.containsAddress(from) - // Drop transactions under our own minimal accepted gas price + // Drop non-local transactions under our own minimal accepted gas price + local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { return ErrUnderpriced } - + // Ensure the transaction adheres to nonce ordering currentState, err := pool.currentState() if err != nil { return err } - - // Last but not least check for nonce errors if currentState.GetNonce(from) > tx.Nonce() { return ErrNonceTooLow } - - // Check the transaction doesn't exceed the current - // block limit gas. - if pool.gasLimit().Cmp(tx.Gas()) < 0 { - return ErrGasLimit - } - - // Transactions can't be negative. This may never happen - // using RLP decoded transactions but may occur if you create - // a transaction using the RPC for example. - if tx.Value().Sign() < 0 { - return ErrNegativeValue - } - // Transactor should have enough funds to cover the costs // cost == V + GP * GL if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { @@ -426,11 +414,6 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { if tx.Gas().Cmp(intrGas) < 0 { return ErrIntrinsicGas } - - // Heuristic limit, reject transactions over 32KB to prevent DOS attacks - if tx.Size() > 32*1024 { - return ErrOversizedData - } return nil } @@ -438,7 +421,11 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error { // later pending promotion and execution. If the transaction is a replacement for // an already pending or queued one, it overwrites the previous and returns this // so outer code doesn't uselessly call promote. -func (pool *TxPool) add(tx *types.Transaction) (bool, error) { +// +// If a newly added transaction is marked as local, its sending account will be +// whitelisted, preventing any associated transaction from being dropped out of +// the pool due to pricing constraints. +func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // If the transaction is already known, discard it hash := tx.Hash() if pool.all[hash] != nil { @@ -446,7 +433,7 @@ func (pool *TxPool) add(tx *types.Transaction) (bool, error) { return false, fmt.Errorf("known transaction: %x", hash) } // If the transaction fails basic validation, discard it - if err := pool.validateTx(tx); err != nil { + if err := pool.validateTx(tx, local); err != nil { log.Trace("Discarding invalid transaction", "hash", hash, "err", err) invalidTxCounter.Inc(1) return false, err @@ -488,11 +475,14 @@ func (pool *TxPool) add(tx *types.Transaction) (bool, error) { log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) return old != nil, nil } - // New transaction isn't replacing a pending one, push into queue + // New transaction isn't replacing a pending one, push into queue and potentially mark local replace, err := pool.enqueueTx(hash, tx) if err != nil { return false, err } + if local { + pool.locals.add(from) + } log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replace, nil } @@ -560,13 +550,41 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T go pool.eventMux.Post(TxPreEvent{tx}) } -// Add queues a single transaction in the pool if it is valid. -func (pool *TxPool) Add(tx *types.Transaction) error { +// AddLocal enqueues a single transaction into the pool if it is valid, marking +// the sender as a local one in the mean time, ensuring it goes around the local +// pricing constraints. +func (pool *TxPool) AddLocal(tx *types.Transaction) error { + return pool.addTx(tx, true) +} + +// AddRemote enqueues a single transaction into the pool if it is valid. If the +// sender is not among the locally tracked ones, full pricing constraints will +// apply. +func (pool *TxPool) AddRemote(tx *types.Transaction) error { + return pool.addTx(tx, false) +} + +// AddLocals enqueues a batch of transactions into the pool if they are valid, +// marking the senders as a local ones in the mean time, ensuring they go around +// the local pricing constraints. +func (pool *TxPool) AddLocals(txs []*types.Transaction) error { + return pool.addTxs(txs, true) +} + +// AddRemotes enqueues a batch of transactions into the pool if they are valid. +// If the senders are not among the locally tracked ones, full pricing constraints +// will apply. +func (pool *TxPool) AddRemotes(txs []*types.Transaction) error { + return pool.addTxs(txs, false) +} + +// addTx enqueues a single transaction into the pool if it is valid. +func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { pool.mu.Lock() defer pool.mu.Unlock() // Try to inject the transaction and update any state - replace, err := pool.add(tx) + replace, err := pool.add(tx, local) if err != nil { return err } @@ -582,15 +600,15 @@ func (pool *TxPool) Add(tx *types.Transaction) error { return nil } -// AddBatch attempts to queue a batch of transactions. -func (pool *TxPool) AddBatch(txs []*types.Transaction) error { +// addTxs attempts to queue a batch of transactions if they are valid. +func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { pool.mu.Lock() defer pool.mu.Unlock() // Add the batch of transaction, tracking the accepted ones dirty := make(map[common.Address]struct{}) for _, tx := range txs { - if replace, err := pool.add(tx); err == nil { + if replace, err := pool.add(tx, local); err == nil { if !replace { from, _ := types.Sender(pool.signer, tx) // already validated dirty[from] = struct{}{} @@ -725,7 +743,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A pool.promoteTx(addr, hash, tx) } // Drop all transactions over the allowed limit - if !pool.locals.containsAddress(addr) { + if !pool.locals.contains(addr) { for _, tx := range list.Cap(int(pool.config.AccountQueue)) { hash := tx.Hash() delete(pool.all, hash) @@ -752,7 +770,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A spammers := prque.New() for addr, list := range pool.pending { // Only evict transactions from high rollers - if !pool.locals.containsAddress(addr) && uint64(list.Len()) > pool.config.AccountSlots { + if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { spammers.Push(addr, float32(list.Len())) } } @@ -818,7 +836,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A addresses := make(addresssByHeartbeat, 0, len(pool.queue)) for addr := range pool.queue { // Don't drop locals - if !pool.locals.containsAddress(addr) { + if !pool.locals.contains(addr) { addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) } } @@ -930,13 +948,15 @@ func (a addresssByHeartbeat) Len() int { return len(a) } func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) } func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// accountSet is simply a map of addresses, and a signer, to be able -// to determine the address from a tx +// accountSet is simply a set of addresses to check for existance, and a signer +// capable of deriving addresses from transactions. type accountSet struct { accounts map[common.Address]struct{} signer types.Signer } +// newAccountSet creates a new address set with an associated signer for sender +// derivations. func newAccountSet(signer types.Signer) *accountSet { return &accountSet{ accounts: make(map[common.Address]struct{}), @@ -944,27 +964,22 @@ func newAccountSet(signer types.Signer) *accountSet { } } -// containsAddress checks if a given address is within the set -func (as *accountSet) containsAddress(address common.Address) bool { - _, exist := as.accounts[address] +// contains checks if a given address is contained within the set. +func (as *accountSet) contains(addr common.Address) bool { + _, exist := as.accounts[addr] return exist } -// contains checks if the sender of a given tx is within the set -func (as *accountSet) contains(tx *types.Transaction) bool { - if address, err := types.Sender(as.signer, tx); err == nil { - return as.containsAddress(address) +// containsTx checks if the sender of a given tx is within the set. If the sender +// cannot be derived, this method returns false. +func (as *accountSet) containsTx(tx *types.Transaction) bool { + if addr, err := types.Sender(as.signer, tx); err == nil { + return as.contains(addr) } return false } -// add a transaction sender to the set -// if sender can't be derived, this is a no-op (no errors returned) -func (as *accountSet) add(tx *types.Transaction) { - if address, err := types.Sender(as.signer, tx); err == nil { - if _, exist := as.accounts[address]; !exist { - as.accounts[address] = struct{}{} - } - } - +// add inserts a new address into the set to track. +func (as *accountSet) add(addr common.Address) { + as.accounts[addr] = struct{}{} } |