aboutsummaryrefslogtreecommitdiffstats
path: root/core/tx_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r--core/tx_pool.go56
1 files changed, 47 insertions, 9 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 0ad765179..c3915575b 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -103,6 +103,16 @@ var (
underpricedTxCounter = metrics.NewCounter("txpool/underpriced")
)
+// TxStatus is the current status of a transaction as seen py the pool.
+type TxStatus uint
+
+const (
+ TxStatusUnknown TxStatus = iota
+ TxStatusQueued
+ TxStatusPending
+ TxStatusIncluded
+)
+
// blockChain provides the state of blockchain and current gas limit to do
// some pre checks in tx pool and event subscribers.
type blockChain interface {
@@ -640,6 +650,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.journalTx(from, tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
+
+ // We've directly injected a replacement transaction, notify subsystems
+ go pool.txFeed.Send(TxPreEvent{tx})
+
return old != nil, nil
}
// New transaction isn't replacing a pending one, push into queue
@@ -729,6 +743,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
+
go pool.txFeed.Send(TxPreEvent{tx})
}
@@ -749,14 +764,14 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error {
// AddLocals enqueues a batch of transactions into the pool if they are valid,
// marking the senders as a local ones in the mean time, ensuring they go around
// the local pricing constraints.
-func (pool *TxPool) AddLocals(txs []*types.Transaction) error {
+func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
return pool.addTxs(txs, !pool.config.NoLocals)
}
// AddRemotes enqueues a batch of transactions into the pool if they are valid.
// If the senders are not among the locally tracked ones, full pricing constraints
// will apply.
-func (pool *TxPool) AddRemotes(txs []*types.Transaction) error {
+func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
return pool.addTxs(txs, false)
}
@@ -779,7 +794,7 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
}
// addTxs attempts to queue a batch of transactions if they are valid.
-func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
+func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error {
pool.mu.Lock()
defer pool.mu.Unlock()
@@ -788,11 +803,14 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
// addTxsLocked attempts to queue a batch of transactions if they are valid,
// whilst assuming the transaction pool lock is already held.
-func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error {
+func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
// Add the batch of transaction, tracking the accepted ones
dirty := make(map[common.Address]struct{})
- for _, tx := range txs {
- if replace, err := pool.add(tx, local); err == nil {
+ errs := make([]error, len(txs))
+
+ for i, tx := range txs {
+ var replace bool
+ if replace, errs[i] = pool.add(tx, local); errs[i] == nil {
if !replace {
from, _ := types.Sender(pool.signer, tx) // already validated
dirty[from] = struct{}{}
@@ -802,12 +820,32 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error {
// Only reprocess the internal state if something was actually added
if len(dirty) > 0 {
addrs := make([]common.Address, 0, len(dirty))
- for addr, _ := range dirty {
+ for addr := range dirty {
addrs = append(addrs, addr)
}
pool.promoteExecutables(addrs)
}
- return nil
+ return errs
+}
+
+// Status returns the status (unknown/pending/queued) of a batch of transactions
+// identified by their hashes.
+func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ status := make([]TxStatus, len(hashes))
+ for i, hash := range hashes {
+ if tx := pool.all[hash]; tx != nil {
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ if pool.pending[from].txs.items[tx.Nonce()] != nil {
+ status[i] = TxStatusPending
+ } else {
+ status[i] = TxStatusQueued
+ }
+ }
+ }
+ return status
}
// Get returns a transaction if it is contained in the pool
@@ -869,7 +907,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Gather all the accounts potentially needing updates
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
- for addr, _ := range pool.queue {
+ for addr := range pool.queue {
accounts = append(accounts, addr)
}
}