aboutsummaryrefslogtreecommitdiffstats
path: root/core/tx_pool.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2016-08-17 21:53:15 +0800
committerPéter Szilágyi <peterke@gmail.com>2016-09-02 19:15:40 +0800
commita183ea29f9313cb1d00ed8f73bfbc4ae51e9cb04 (patch)
tree931e3b45f436b4997f7afa42ef8b55a869ebf584 /core/tx_pool.go
parentaffffb39b366321e47784e48c469da9584ceb92c (diff)
downloadgo-tangerine-a183ea29f9313cb1d00ed8f73bfbc4ae51e9cb04.tar.gz
go-tangerine-a183ea29f9313cb1d00ed8f73bfbc4ae51e9cb04.tar.zst
go-tangerine-a183ea29f9313cb1d00ed8f73bfbc4ae51e9cb04.zip
core: add upper bound on the queued transctions
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r--core/tx_pool.go108
1 files changed, 99 insertions, 9 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go
index c4dcceba0..58d304f00 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"math/big"
+ "sort"
"sync"
"time"
@@ -44,8 +45,11 @@ var (
ErrNegativeValue = errors.New("Negative value")
)
-const (
- maxQueued = 64 // max limit of queued txs per address
+var (
+ maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
+ maxQueuedInTotal = uint64(65536) // Max limit of queued transactions from all accounts
+ maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
+ evictionInterval = time.Minute // Time interval to check for evictable transactions
)
type stateFn func() (*state.StateDB, error)
@@ -71,8 +75,10 @@ type TxPool struct {
pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
all map[common.Hash]*types.Transaction // All transactions to allow lookups
+ beats map[common.Address]time.Time // Last heartbeat from each known account
- wg sync.WaitGroup // for shutdown sync
+ wg sync.WaitGroup // for shutdown sync
+ quit chan struct{}
homestead bool
}
@@ -83,6 +89,7 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
all: make(map[common.Hash]*types.Transaction),
+ beats: make(map[common.Address]time.Time),
eventMux: eventMux,
currentState: currentStateFn,
gasLimit: gasLimitFn,
@@ -90,10 +97,12 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
pendingState: nil,
localTx: newTxSet(),
events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
+ quit: make(chan struct{}),
}
- pool.wg.Add(1)
+ pool.wg.Add(2)
go pool.eventLoop()
+ go pool.expirationLoop()
return pool
}
@@ -154,6 +163,7 @@ func (pool *TxPool) resetState() {
func (pool *TxPool) Stop() {
pool.events.Unsubscribe()
+ close(pool.quit)
pool.wg.Wait()
glog.V(logger.Info).Infoln("Transaction pool stopped")
}
@@ -290,7 +300,7 @@ func (pool *TxPool) add(tx *types.Transaction) error {
if pool.all[hash] != nil {
return fmt.Errorf("Known transaction: %x", hash[:4])
}
- // Otherwise ensure basic validation passes nd queue it up
+ // Otherwise ensure basic validation passes and queue it up
if err := pool.validateTx(tx); err != nil {
return err
}
@@ -308,7 +318,7 @@ func (pool *TxPool) add(tx *types.Transaction) error {
return nil
}
-// enqueueTx inserts a new transction into the non-executable transaction queue.
+// enqueueTx inserts a new transaction into the non-executable transaction queue.
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) {
@@ -355,6 +365,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests)
// Set the potentially new pending nonce and notify any subsystems of the new tx
+ pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, list.last+1)
go pool.eventMux.Post(TxPreEvent{tx})
}
@@ -412,8 +423,8 @@ func (pool *TxPool) RemoveBatch(txs types.Transactions) {
}
}
-// removeTx iterates removes a single transaction from the queue, moving all
-// subsequent transactions back to the future queue.
+// removeTx removes a single transaction from the queue, moving all subsequent
+// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash) {
// Fetch the transaction we wish to delete
tx, ok := pool.all[hash]
@@ -431,6 +442,8 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// If no more transactions are left, remove the list and reset the nonce
if pending.Empty() {
delete(pool.pending, addr)
+ delete(pool.beats, addr)
+
pool.pendingState.SetNonce(addr, tx.Nonce())
} else {
// Otherwise update the nonce and postpone any invalidated transactions
@@ -465,6 +478,8 @@ func (pool *TxPool) promoteExecutables() {
return
}
// Iterate over all accounts and promote any executable transactions
+ queued := uint64(0)
+
for addr, list := range pool.queue {
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(state.GetNonce(addr)) {
@@ -489,17 +504,51 @@ func (pool *TxPool) promoteExecutables() {
pool.promoteTx(addr, tx.Hash(), tx)
}
// Drop all transactions over the allowed limit
- for _, tx := range list.Cap(maxQueued) {
+ for _, tx := range list.Cap(int(maxQueuedPerAccount)) {
if glog.V(logger.Core) {
glog.Infof("Removed cap-exceeding queued transaction: %v", tx)
}
delete(pool.all, tx.Hash())
}
+ queued += uint64(list.Len())
+
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
}
}
+ // If we've queued more transactions than the hard limit, drop oldest ones
+ if queued > maxQueuedInTotal {
+ // Sort all accounts with queued transactions by heartbeat
+ addresses := make(addresssByHeartbeat, 0, len(pool.queue))
+ for addr, _ := range pool.queue {
+ addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
+ }
+ sort.Sort(addresses)
+
+ // Drop transactions until the total is below the limit
+ for drop := queued - maxQueuedInTotal; drop > 0; {
+ addr := addresses[len(addresses)-1]
+ list := pool.queue[addr.address]
+
+ addresses = addresses[:len(addresses)-1]
+
+ // Drop all transactions if they are less than the overflow
+ if size := uint64(list.Len()); size <= drop {
+ for _, tx := range list.Flatten() {
+ pool.removeTx(tx.Hash())
+ }
+ drop -= size
+ continue
+ }
+ // Otherwise drop only last few transactions
+ txs := list.Flatten()
+ for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
+ pool.removeTx(txs[i].Hash())
+ drop--
+ }
+ }
+ }
}
// demoteUnexecutables removes invalid and processed transactions from the pools
@@ -540,10 +589,51 @@ func (pool *TxPool) demoteUnexecutables() {
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.pending, addr)
+ delete(pool.beats, addr)
}
}
}
+// expirationLoop is a loop that periodically iterates over all accounts with
+// queued transactions and drop all that have been inactive for a prolonged amount
+// of time.
+func (pool *TxPool) expirationLoop() {
+ defer pool.wg.Done()
+
+ evict := time.NewTicker(evictionInterval)
+ defer evict.Stop()
+
+ for {
+ select {
+ case <-evict.C:
+ pool.mu.Lock()
+ for addr := range pool.queue {
+ if time.Since(pool.beats[addr]) > maxQueuedLifetime {
+ for _, tx := range pool.queue[addr].Flatten() {
+ pool.removeTx(tx.Hash())
+ }
+ }
+ }
+ pool.mu.Unlock()
+
+ case <-pool.quit:
+ return
+ }
+ }
+}
+
+// addressByHeartbeat is an account address tagged with its last activity timestamp.
+type addressByHeartbeat struct {
+ address common.Address
+ heartbeat time.Time
+}
+
+type addresssByHeartbeat []addressByHeartbeat
+
+func (a addresssByHeartbeat) Len() int { return len(a) }
+func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
+func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+
// txSet represents a set of transaction hashes in which entries
// are automatically dropped after txSetDuration time
type txSet struct {