aboutsummaryrefslogtreecommitdiffstats
path: root/core/transaction_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'core/transaction_pool.go')
-rw-r--r--core/transaction_pool.go371
1 files changed, 197 insertions, 174 deletions
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index c896488d1..a2f970195 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -6,7 +6,6 @@ import (
"math/big"
"sort"
"sync"
- "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
@@ -14,10 +13,10 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
- "gopkg.in/fatih/set.v0"
)
var (
+ // Transaction Pool Errors
ErrInvalidSender = errors.New("Invalid sender")
ErrNonce = errors.New("Nonce too low")
ErrBalance = errors.New("Insufficient balance")
@@ -28,118 +27,141 @@ var (
ErrNegativeValue = errors.New("Negative value")
)
-const txPoolQueueSize = 50
-
-type TxPoolHook chan *types.Transaction
-type TxMsg struct{ Tx *types.Transaction }
-
type stateFn func() *state.StateDB
-const (
- minGasPrice = 1000000
-)
-
-type TxProcessor interface {
- ProcessTransaction(tx *types.Transaction)
-}
-
-// The tx pool a thread safe transaction pool handler. In order to
-// guarantee a non blocking pool we use a queue channel which can be
-// independently read without needing access to the actual pool.
+// TxPool contains all currently known transactions. Transactions
+// enter the pool when they are received from the network or submitted
+// locally. They exit the pool when they are included in the blockchain.
+//
+// The pool separates processable transactions (which can be applied to the
+// current state) and future transactions. Transactions move between those
+// two states over time as they are received and processed.
type TxPool struct {
- mu sync.RWMutex
- // Queueing channel for reading and writing incoming
- // transactions to
- queueChan chan *types.Transaction
- // Quiting channel
- quit chan bool
- // The state function which will allow us to do some pre checkes
- currentState stateFn
- // The current gas limit function callback
- gasLimit func() *big.Int
- // The actual pool
- txs map[common.Hash]*types.Transaction
- invalidHashes *set.Set
-
- queue map[common.Address]types.Transactions
-
- subscribers []chan TxMsg
-
- eventMux *event.TypeMux
+ quit chan bool // Quiting channel
+ currentState stateFn // The state function which will allow us to do some pre checkes
+ pendingState *state.ManagedState
+ gasLimit func() *big.Int // The current gas limit function callback
+ eventMux *event.TypeMux
+ events event.Subscription
+
+ mu sync.RWMutex
+ pending map[common.Hash]*types.Transaction // processable transactions
+ queue map[common.Address]map[common.Hash]*types.Transaction
}
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
- txPool := &TxPool{
- txs: make(map[common.Hash]*types.Transaction),
- queue: make(map[common.Address]types.Transactions),
- queueChan: make(chan *types.Transaction, txPoolQueueSize),
- quit: make(chan bool),
- eventMux: eventMux,
- invalidHashes: set.New(),
- currentState: currentStateFn,
- gasLimit: gasLimitFn,
+ return &TxPool{
+ pending: make(map[common.Hash]*types.Transaction),
+ queue: make(map[common.Address]map[common.Hash]*types.Transaction),
+ quit: make(chan bool),
+ eventMux: eventMux,
+ currentState: currentStateFn,
+ gasLimit: gasLimitFn,
+ pendingState: state.ManageState(currentStateFn()),
}
- return txPool
}
func (pool *TxPool) Start() {
- // Queue timer will tick so we can attempt to move items from the queue to the
- // main transaction pool.
- queueTimer := time.NewTicker(300 * time.Millisecond)
- // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
- removalTimer := time.NewTicker(1 * time.Second)
-done:
- for {
- select {
- case <-queueTimer.C:
- pool.checkQueue()
- case <-removalTimer.C:
- pool.validatePool()
- case <-pool.quit:
- break done
+ // Track chain events. When a chain events occurs (new chain canon block)
+ // we need to know the new state. The new state will help us determine
+ // the nonces in the managed state
+ pool.events = pool.eventMux.Subscribe(ChainEvent{})
+ for _ = range pool.events.Chan() {
+ pool.mu.Lock()
+
+ pool.resetState()
+
+ pool.mu.Unlock()
+ }
+}
+
+func (pool *TxPool) resetState() {
+ pool.pendingState = state.ManageState(pool.currentState())
+
+ // validate the pool of pending transactions, this will remove
+ // any transactions that have been included in the block or
+ // have been invalidated because of another transaction (e.g.
+ // higher gas price)
+ pool.validatePool()
+
+ // Loop over the pending transactions and base the nonce of the new
+ // pending transaction set.
+ for _, tx := range pool.pending {
+ if addr, err := tx.From(); err == nil {
+ // Set the nonce. Transaction nonce can never be lower
+ // than the state nonce; validatePool took care of that.
+ pool.pendingState.SetNonce(addr, tx.Nonce())
}
}
+
+ // Check the queue and move transactions over to the pending if possible
+ // or remove those that have become invalid
+ pool.checkQueue()
+}
+
+func (pool *TxPool) Stop() {
+ pool.pending = make(map[common.Hash]*types.Transaction)
+ close(pool.quit)
+ pool.events.Unsubscribe()
+ glog.V(logger.Info).Infoln("TX Pool stopped")
}
-func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
+func (pool *TxPool) State() *state.ManagedState {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+
+ return pool.pendingState
+}
+
+// validateTx checks whether a transaction is valid according
+// to the consensus rules.
+func (pool *TxPool) validateTx(tx *types.Transaction) error {
// Validate sender
var (
from common.Address
err error
)
+ // Validate the transaction sender and it's sig. Throw
+ // if the from fields is invalid.
if from, err = tx.From(); err != nil {
return ErrInvalidSender
}
- // Validate curve param
- v, _, _ := tx.Curve()
- if v > 28 || v < 27 {
- return fmt.Errorf("tx.v != (28 || 27) => %v", v)
- }
-
+ // Make sure the account exist. Non existant accounts
+ // haven't got funds and well therefor never pass.
if !pool.currentState().HasAccount(from) {
return ErrNonExistentAccount
}
+ // Check the transaction doesn't exceed the current
+ // block limit gas.
if pool.gasLimit().Cmp(tx.GasLimit) < 0 {
return ErrGasLimit
}
+ // Transactions can't be negative. This may never happen
+ // using RLP decoded transactions but may occur if you create
+ // a transaction using the RPC for example.
if tx.Amount.Cmp(common.Big0) < 0 {
return ErrNegativeValue
}
+ // Transactor should have enough funds to cover the costs
+ // cost == V + GP * GL
total := new(big.Int).Mul(tx.Price, tx.GasLimit)
total.Add(total, tx.Value())
if pool.currentState().GetBalance(from).Cmp(total) < 0 {
return ErrInsufficientFunds
}
+ // Should supply enough intrinsic gas
if tx.GasLimit.Cmp(IntrinsicGas(tx)) < 0 {
return ErrIntrinsicGas
}
+ // Last but not least check for nonce errors (intensive
+ // operation, saved for last)
if pool.currentState().GetNonce(from) > tx.Nonce() {
return ErrNonce
}
@@ -156,38 +178,36 @@ func (self *TxPool) add(tx *types.Transaction) error {
return fmt.Errorf("Invalid transaction (%x)", hash[:4])
}
*/
- if self.txs[hash] != nil {
+ if self.pending[hash] != nil {
return fmt.Errorf("Known transaction (%x)", hash[:4])
}
- err := self.ValidateTransaction(tx)
+ err := self.validateTx(tx)
if err != nil {
return err
}
-
- self.queueTx(tx)
-
- var toname string
- if to := tx.To(); to != nil {
- toname = common.Bytes2Hex(to[:4])
- } else {
- toname = "[NEW_CONTRACT]"
- }
- // we can ignore the error here because From is
- // verified in ValidateTransaction.
- f, _ := tx.From()
- from := common.Bytes2Hex(f[:4])
+ self.queueTx(hash, tx)
if glog.V(logger.Debug) {
- glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
+ var toname string
+ if to := tx.To(); to != nil {
+ toname = common.Bytes2Hex(to[:4])
+ } else {
+ toname = "[NEW_CONTRACT]"
+ }
+ // we can ignore the error here because From is
+ // verified in ValidateTransaction.
+ f, _ := tx.From()
+ from := common.Bytes2Hex(f[:4])
+ glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
}
- return nil
-}
+ // check and validate the queueue
+ self.checkQueue()
-func (self *TxPool) Size() int {
- return len(self.txs)
+ return nil
}
+// Add queues a single transaction in the pool if it is valid.
func (self *TxPool) Add(tx *types.Transaction) error {
self.mu.Lock()
defer self.mu.Unlock()
@@ -195,6 +215,7 @@ func (self *TxPool) Add(tx *types.Transaction) error {
return self.add(tx)
}
+// AddTransactions attempts to queue all valid transactions in txs.
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
self.mu.Lock()
defer self.mu.Unlock()
@@ -209,81 +230,81 @@ func (self *TxPool) AddTransactions(txs []*types.Transaction) {
}
}
-// GetTransaction allows you to check the pending and queued transaction in the
-// transaction pool.
-// It has two stategies, first check the pool (map) then check the queue
+// GetTransaction returns a transaction if it is contained in the pool
+// and nil otherwise.
func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
// check the txs first
- if tx, ok := tp.txs[hash]; ok {
+ if tx, ok := tp.pending[hash]; ok {
return tx
}
-
// check queue
for _, txs := range tp.queue {
- for _, tx := range txs {
- if tx.Hash() == hash {
- return tx
- }
+ if tx, ok := txs[hash]; ok {
+ return tx
}
}
-
return nil
}
+// GetTransactions returns all currently processable transactions.
+// The returned slice may be modified by the caller.
func (self *TxPool) GetTransactions() (txs types.Transactions) {
- self.mu.RLock()
- defer self.mu.RUnlock()
+ self.mu.Lock()
+ defer self.mu.Unlock()
+
+ // check queue first
+ self.checkQueue()
+ // invalidate any txs
+ self.validatePool()
- txs = make(types.Transactions, self.Size())
+ txs = make(types.Transactions, len(self.pending))
i := 0
- for _, tx := range self.txs {
+ for _, tx := range self.pending {
txs[i] = tx
i++
}
-
- return
+ return txs
}
+// GetQueuedTransactions returns all non-processable transactions.
func (self *TxPool) GetQueuedTransactions() types.Transactions {
self.mu.RLock()
defer self.mu.RUnlock()
- var txs types.Transactions
- for _, ts := range self.queue {
- txs = append(txs, ts...)
+ var ret types.Transactions
+ for _, txs := range self.queue {
+ for _, tx := range txs {
+ ret = append(ret, tx)
+ }
}
-
- return txs
+ sort.Sort(types.TxByNonce{ret})
+ return ret
}
+// RemoveTransactions removes all given transactions from the pool.
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
self.mu.Lock()
defer self.mu.Unlock()
-
for _, tx := range txs {
self.removeTx(tx.Hash())
}
}
-func (pool *TxPool) Flush() {
- pool.txs = make(map[common.Hash]*types.Transaction)
-}
-
-func (pool *TxPool) Stop() {
- pool.Flush()
- close(pool.quit)
-
- glog.V(logger.Info).Infoln("TX Pool stopped")
+func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
+ from, _ := tx.From() // already validated
+ if self.queue[from] == nil {
+ self.queue[from] = make(map[common.Hash]*types.Transaction)
+ }
+ self.queue[from][hash] = tx
}
-func (self *TxPool) queueTx(tx *types.Transaction) {
- from, _ := tx.From()
- self.queue[from] = append(self.queue[from], tx)
-}
+func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
+ if _, ok := pool.pending[hash]; !ok {
+ pool.pending[hash] = tx
-func (pool *TxPool) addTx(tx *types.Transaction) {
- if _, ok := pool.txs[tx.Hash()]; !ok {
- pool.txs[tx.Hash()] = tx
+ // Increment the nonce on the pending state. This can only happen if
+ // the nonce is +1 to the previous one.
+ pool.pendingState.SetNonce(addr, tx.AccountNonce+1)
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
@@ -291,42 +312,39 @@ func (pool *TxPool) addTx(tx *types.Transaction) {
}
}
-// check queue will attempt to insert
+// checkQueue moves transactions that have become processable to main pool.
func (pool *TxPool) checkQueue() {
- pool.mu.Lock()
- defer pool.mu.Unlock()
+ state := pool.pendingState
- statedb := pool.currentState()
+ var addq txQueue
for address, txs := range pool.queue {
- sort.Sort(types.TxByNonce{txs})
-
- var (
- nonce = statedb.GetNonce(address)
- start int
- )
- // Clean up the transactions first and determine the start of the nonces
- for _, tx := range txs {
- if tx.Nonce() >= nonce {
- break
+ // guessed nonce is the nonce currently kept by the tx pool (pending state)
+ guessedNonce := state.GetNonce(address)
+ // true nonce is the nonce known by the last state
+ trueNonce := pool.currentState().GetNonce(address)
+ addq := addq[:0]
+ for hash, tx := range txs {
+ if tx.AccountNonce < trueNonce {
+ // Drop queued transactions whose nonce is lower than
+ // the account nonce because they have been processed.
+ delete(txs, hash)
+ } else {
+ // Collect the remaining transactions for the next pass.
+ addq = append(addq, txQueueEntry{hash, address, tx})
}
- start++
}
- pool.queue[address] = txs[start:]
-
- // expected nonce
- enonce := nonce
- for _, tx := range pool.queue[address] {
- // If the expected nonce does not match up with the next one
- // (i.e. a nonce gap), we stop the loop
- if enonce != tx.Nonce() {
+ // Find the next consecutive nonce range starting at the
+ // current account nonce.
+ sort.Sort(addq)
+ for _, e := range addq {
+ if e.AccountNonce > guessedNonce {
break
}
- enonce++
-
- pool.addTx(tx)
+ delete(txs, e.hash)
+ pool.addTx(e.hash, address, e.Transaction)
}
- // delete the entire queue entry if it's empty. There's no need to keep it
- if len(pool.queue[address]) == 0 {
+ // Delete the entire queue entry if it became empty.
+ if len(txs) == 0 {
delete(pool.queue, address)
}
}
@@ -334,36 +352,41 @@ func (pool *TxPool) checkQueue() {
func (pool *TxPool) removeTx(hash common.Hash) {
// delete from pending pool
- delete(pool.txs, hash)
-
+ delete(pool.pending, hash)
// delete from queue
-out:
for address, txs := range pool.queue {
- for i, tx := range txs {
- if tx.Hash() == hash {
- if len(txs) == 1 {
- // if only one tx, remove entire address entry
- delete(pool.queue, address)
- } else {
- pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...)
- }
- break out
+ if _, ok := txs[hash]; ok {
+ if len(txs) == 1 {
+ // if only one tx, remove entire address entry.
+ delete(pool.queue, address)
+ } else {
+ delete(txs, hash)
}
+ break
}
}
}
+// validatePool removes invalid and processed transactions from the main pool.
func (pool *TxPool) validatePool() {
- pool.mu.Lock()
- defer pool.mu.Unlock()
-
- for hash, tx := range pool.txs {
- if err := pool.ValidateTransaction(tx); err != nil {
- if glog.V(logger.Info) {
+ for hash, tx := range pool.pending {
+ if err := pool.validateTx(tx); err != nil {
+ if glog.V(logger.Core) {
glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err)
}
-
- pool.removeTx(hash)
+ delete(pool.pending, hash)
}
}
}
+
+type txQueue []txQueueEntry
+
+type txQueueEntry struct {
+ hash common.Hash
+ addr common.Address
+ *types.Transaction
+}
+
+func (q txQueue) Len() int { return len(q) }
+func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] }
+func (q txQueue) Less(i, j int) bool { return q[i].AccountNonce < q[j].AccountNonce }