diff options
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 119 |
1 files changed, 44 insertions, 75 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index 5fdc91e65..0f008ddc0 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -103,6 +103,16 @@ var ( underpricedTxCounter = metrics.NewCounter("txpool/underpriced") ) +// TxStatus is the current status of a transaction as seen py the pool. +type TxStatus uint + +const ( + TxStatusUnknown TxStatus = iota + TxStatusQueued + TxStatusPending + TxStatusIncluded +) + // blockChain provides the state of blockchain and current gas limit to do // some pre checks in tx pool and event subscribers. type blockChain interface { @@ -192,22 +202,17 @@ type TxPool struct { locals *accountSet // Set of local transaction to exepmt from evicion rules journal *txJournal // Journal of local transaction to back up to disk - pending map[common.Address]*txList // All currently processable transactions - queue map[common.Address]*txList // Queued but non-processable transactions - beats map[common.Address]time.Time // Last heartbeat from each known account - all map[common.Hash]txLookupRec // All transactions to allow lookups - priced *txPricedList // All transactions sorted by price + pending map[common.Address]*txList // All currently processable transactions + queue map[common.Address]*txList // Queued but non-processable transactions + beats map[common.Address]time.Time // Last heartbeat from each known account + all map[common.Hash]*types.Transaction // All transactions to allow lookups + priced *txPricedList // All transactions sorted by price wg sync.WaitGroup // for shutdown sync homestead bool } -type txLookupRec struct { - tx *types.Transaction - pending bool -} - // NewTxPool creates a new transaction pool to gather, sort and filter inbound // trnsactions from the network. func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool { @@ -223,7 +228,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), - all: make(map[common.Hash]txLookupRec), + all: make(map[common.Hash]*types.Transaction), chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), gasPrice: new(big.Int).SetUint64(config.PriceLimit), } @@ -599,7 +604,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // If the transaction is already known, discard it hash := tx.Hash() - if _, ok := pool.all[hash]; ok { + if pool.all[hash] != nil { log.Trace("Discarding already known transaction", "hash", hash) return false, fmt.Errorf("known transaction: %x", hash) } @@ -640,7 +645,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { pool.priced.Removed() pendingReplaceCounter.Inc(1) } - pool.all[tx.Hash()] = txLookupRec{tx, false} + pool.all[tx.Hash()] = tx pool.priced.Put(tx) pool.journalTx(from, tx) @@ -687,7 +692,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er pool.priced.Removed() queuedReplaceCounter.Inc(1) } - pool.all[hash] = txLookupRec{tx, false} + pool.all[hash] = tx pool.priced.Put(tx) return old != nil, nil } @@ -730,13 +735,10 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pendingReplaceCounter.Inc(1) } - if pool.all[hash].tx == nil { - // Failsafe to work around direct pending inserts (tests) - pool.all[hash] = txLookupRec{tx, true} + // Failsafe to work around direct pending inserts (tests) + if pool.all[hash] == nil { + pool.all[hash] = tx pool.priced.Put(tx) - } else { - // set pending flag to true - pool.all[hash] = txLookupRec{tx, true} } // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() @@ -762,17 +764,15 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { // AddLocals enqueues a batch of transactions into the pool if they are valid, // marking the senders as a local ones in the mean time, ensuring they go around // the local pricing constraints. -func (pool *TxPool) AddLocals(txs []*types.Transaction) error { - pool.addTxs(txs, !pool.config.NoLocals) - return nil +func (pool *TxPool) AddLocals(txs []*types.Transaction) []error { + return pool.addTxs(txs, !pool.config.NoLocals) } // AddRemotes enqueues a batch of transactions into the pool if they are valid. // If the senders are not among the locally tracked ones, full pricing constraints // will apply. -func (pool *TxPool) AddRemotes(txs []*types.Transaction) error { - pool.addTxs(txs, false) - return nil +func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error { + return pool.addTxs(txs, false) } // addTx enqueues a single transaction into the pool if it is valid. @@ -806,10 +806,11 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error { func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error { // Add the batch of transaction, tracking the accepted ones dirty := make(map[common.Address]struct{}) - txErr := make([]error, len(txs)) + errs := make([]error, len(txs)) + for i, tx := range txs { var replace bool - if replace, txErr[i] = pool.add(tx, local); txErr[i] == nil { + if replace, errs[i] = pool.add(tx, local); errs[i] == nil { if !replace { from, _ := types.Sender(pool.signer, tx) // already validated dirty[from] = struct{}{} @@ -824,54 +825,23 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error { } pool.promoteExecutables(addrs) } - return txErr -} - -// TxStatusData is returned by AddOrGetTxStatus for each transaction -type TxStatusData struct { - Status uint - Data []byte + return errs } -const ( - TxStatusUnknown = iota - TxStatusQueued - TxStatusPending - TxStatusIncluded // Data contains a TxChainPos struct - TxStatusError // Data contains the error string -) - -// AddOrGetTxStatus returns the status (unknown/pending/queued) of a batch of transactions -// identified by their hashes in txHashes. Optionally the transactions themselves can be -// passed too in txs, in which case the function will try adding the previously unknown ones -// to the pool. If a new transaction cannot be added, TxStatusError is returned. Adding already -// known transactions will return their previous status. -// If txs is specified, txHashes is still required and has to match the transactions in txs. - -// Note: TxStatusIncluded is never returned by this function since the pool does not track -// mined transactions. Included status can be checked by the caller (as it happens in the -// LES protocol manager) -func (pool *TxPool) AddOrGetTxStatus(txs []*types.Transaction, txHashes []common.Hash) []TxStatusData { - status := make([]TxStatusData, len(txHashes)) - if txs != nil { - if len(txs) != len(txHashes) { - panic(nil) - } - txErr := pool.addTxs(txs, false) - for i, err := range txErr { - if err != nil { - status[i] = TxStatusData{TxStatusError, ([]byte)(err.Error())} - } - } - } +// Status returns the status (unknown/pending/queued) of a batch of transactions +// identified by their hashes. +func (pool *TxPool) Status(hashes []common.Hash) []TxStatus { + pool.mu.RLock() + defer pool.mu.RUnlock() - for i, hash := range txHashes { - r, ok := pool.all[hash] - if ok { - if r.pending { - status[i] = TxStatusData{TxStatusPending, nil} + status := make([]TxStatus, len(hashes)) + for i, hash := range hashes { + if tx := pool.all[hash]; tx != nil { + from, _ := types.Sender(pool.signer, tx) // already validated + if pool.pending[from].txs.items[tx.Nonce()] != nil { + status[i] = TxStatusPending } else { - status[i] = TxStatusData{TxStatusQueued, nil} + status[i] = TxStatusQueued } } } @@ -884,18 +854,17 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction { pool.mu.RLock() defer pool.mu.RUnlock() - return pool.all[hash].tx + return pool.all[hash] } // removeTx removes a single transaction from the queue, moving all subsequent // transactions back to the future queue. func (pool *TxPool) removeTx(hash common.Hash) { // Fetch the transaction we wish to delete - txl, ok := pool.all[hash] + tx, ok := pool.all[hash] if !ok { return } - tx := txl.tx addr, _ := types.Sender(pool.signer, tx) // already validated during insertion // Remove it from the list of known transactions |