diff options
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 56 |
1 files changed, 47 insertions, 9 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index 0ad765179..c3915575b 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -103,6 +103,16 @@ var ( underpricedTxCounter = metrics.NewCounter("txpool/underpriced") ) +// TxStatus is the current status of a transaction as seen py the pool. +type TxStatus uint + +const ( + TxStatusUnknown TxStatus = iota + TxStatusQueued + TxStatusPending + TxStatusIncluded +) + // blockChain provides the state of blockchain and current gas limit to do // some pre checks in tx pool and event subscribers. type blockChain interface { @@ -640,6 +650,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { pool.journalTx(from, tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) + + // We've directly injected a replacement transaction, notify subsystems + go pool.txFeed.Send(TxPreEvent{tx}) + return old != nil, nil } // New transaction isn't replacing a pending one, push into queue @@ -729,6 +743,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() pool.pendingState.SetNonce(addr, tx.Nonce()+1) + go pool.txFeed.Send(TxPreEvent{tx}) } @@ -749,14 +764,14 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { // AddLocals enqueues a batch of transactions into the pool if they are valid, // marking the senders as a local ones in the mean time, ensuring they go around // the local pricing constraints. -func (pool *TxPool) AddLocals(txs []*types.Transaction) error { +func (pool *TxPool) AddLocals(txs []*types.Transaction) []error { return pool.addTxs(txs, !pool.config.NoLocals) } // AddRemotes enqueues a batch of transactions into the pool if they are valid. // If the senders are not among the locally tracked ones, full pricing constraints // will apply. -func (pool *TxPool) AddRemotes(txs []*types.Transaction) error { +func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error { return pool.addTxs(txs, false) } @@ -779,7 +794,7 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { } // addTxs attempts to queue a batch of transactions if they are valid. -func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { +func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error { pool.mu.Lock() defer pool.mu.Unlock() @@ -788,11 +803,14 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { // addTxsLocked attempts to queue a batch of transactions if they are valid, // whilst assuming the transaction pool lock is already held. -func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error { +func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error { // Add the batch of transaction, tracking the accepted ones dirty := make(map[common.Address]struct{}) - for _, tx := range txs { - if replace, err := pool.add(tx, local); err == nil { + errs := make([]error, len(txs)) + + for i, tx := range txs { + var replace bool + if replace, errs[i] = pool.add(tx, local); errs[i] == nil { if !replace { from, _ := types.Sender(pool.signer, tx) // already validated dirty[from] = struct{}{} @@ -802,12 +820,32 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error { // Only reprocess the internal state if something was actually added if len(dirty) > 0 { addrs := make([]common.Address, 0, len(dirty)) - for addr, _ := range dirty { + for addr := range dirty { addrs = append(addrs, addr) } pool.promoteExecutables(addrs) } - return nil + return errs +} + +// Status returns the status (unknown/pending/queued) of a batch of transactions +// identified by their hashes. +func (pool *TxPool) Status(hashes []common.Hash) []TxStatus { + pool.mu.RLock() + defer pool.mu.RUnlock() + + status := make([]TxStatus, len(hashes)) + for i, hash := range hashes { + if tx := pool.all[hash]; tx != nil { + from, _ := types.Sender(pool.signer, tx) // already validated + if pool.pending[from].txs.items[tx.Nonce()] != nil { + status[i] = TxStatusPending + } else { + status[i] = TxStatusQueued + } + } + } + return status } // Get returns a transaction if it is contained in the pool @@ -869,7 +907,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { // Gather all the accounts potentially needing updates if accounts == nil { accounts = make([]common.Address, 0, len(pool.queue)) - for addr, _ := range pool.queue { + for addr := range pool.queue { accounts = append(accounts, addr) } } |