diff options
author | obscuren <geffobscura@gmail.com> | 2015-01-02 19:09:38 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-01-02 19:09:38 +0800 |
commit | 48d2a8b8ee9621810a988e3561e4213749c54da7 (patch) | |
tree | cbce2a2427287116420e074f4a5ecd8d4f35fbbc /core/transaction_pool.go | |
parent | 477a6d426cd798f036df85b15d73935060503a48 (diff) | |
download | go-tangerine-48d2a8b8ee9621810a988e3561e4213749c54da7.tar.gz go-tangerine-48d2a8b8ee9621810a988e3561e4213749c54da7.tar.zst go-tangerine-48d2a8b8ee9621810a988e3561e4213749c54da7.zip |
Refactored tx pool and added extra fields to block
* chain manager sets td on block + td output w/ String
* added tx pool tests for removing/adding/validating
* tx pool now uses a set for txs instead of list.List
Diffstat (limited to 'core/transaction_pool.go')
-rw-r--r-- | core/transaction_pool.go | 130 |
1 files changed, 34 insertions, 96 deletions
diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 1149d4cfb..4f91f3575 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -1,16 +1,13 @@ package core import ( - "bytes" - "container/list" "fmt" "math/big" - "sync" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/state" + "gopkg.in/fatih/set.v0" ) var txplogger = logger.NewLogger("TXP") @@ -26,86 +23,50 @@ const ( minGasPrice = 1000000 ) -var MinGasPrice = big.NewInt(10000000000000) - -func EachTx(pool *list.List, it func(*types.Transaction, *list.Element) bool) { - for e := pool.Front(); e != nil; e = e.Next() { - if it(e.Value.(*types.Transaction), e) { - break - } - } -} - -func FindTx(pool *list.List, finder func(*types.Transaction, *list.Element) bool) *types.Transaction { - for e := pool.Front(); e != nil; e = e.Next() { - if tx, ok := e.Value.(*types.Transaction); ok { - if finder(tx, e) { - return tx - } - } - } - - return nil -} - type TxProcessor interface { ProcessTransaction(tx *types.Transaction) } // The tx pool a thread safe transaction pool handler. In order to // guarantee a non blocking pool we use a queue channel which can be -// independently read without needing access to the actual pool. If the -// pool is being drained or synced for whatever reason the transactions -// will simple queue up and handled when the mutex is freed. +// independently read without needing access to the actual pool. type TxPool struct { - // The mutex for accessing the Tx pool. - mutex sync.Mutex // Queueing channel for reading and writing incoming // transactions to queueChan chan *types.Transaction // Quiting channel quit chan bool // The actual pool - pool *list.List + //pool *list.List + pool *set.Set SecondaryProcessor TxProcessor subscribers []chan TxMsg - chainManager *ChainManager - eventMux *event.TypeMux + stateQuery StateQuery + eventMux *event.TypeMux } -func NewTxPool(chainManager *ChainManager, eventMux *event.TypeMux) *TxPool { +func NewTxPool(stateQuery StateQuery, eventMux *event.TypeMux) *TxPool { return &TxPool{ - pool: list.New(), - queueChan: make(chan *types.Transaction, txPoolQueueSize), - quit: make(chan bool), - chainManager: chainManager, - eventMux: eventMux, + pool: set.New(), + queueChan: make(chan *types.Transaction, txPoolQueueSize), + quit: make(chan bool), + stateQuery: stateQuery, + eventMux: eventMux, } } -// Blocking function. Don't use directly. Use QueueTransaction instead func (pool *TxPool) addTransaction(tx *types.Transaction) { - pool.mutex.Lock() - defer pool.mutex.Unlock() - pool.pool.PushBack(tx) + pool.pool.Add(tx) // Broadcast the transaction to the rest of the peers pool.eventMux.Post(TxPreEvent{tx}) } func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { - // Get the last block so we can retrieve the sender and receiver from - // the merkle trie - block := pool.chainManager.CurrentBlock - // Something has gone horribly wrong if this happens - if block == nil { - return fmt.Errorf("No last block on the block chain") - } - if len(tx.To()) != 0 && len(tx.To()) != 20 { return fmt.Errorf("Invalid recipient. len = %d", len(tx.To())) } @@ -120,7 +81,7 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { if senderAddr == nil { return fmt.Errorf("invalid sender") } - sender := pool.chainManager.State().GetAccount(senderAddr) + sender := pool.stateQuery.GetAccount(senderAddr) totAmount := new(big.Int).Set(tx.Value()) // Make sure there's enough in the sender's account. Having insufficient @@ -129,19 +90,12 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error { return fmt.Errorf("Insufficient amount in sender's (%x) account", tx.From()) } - // Increment the nonce making each tx valid only once to prevent replay - // attacks - return nil } func (self *TxPool) Add(tx *types.Transaction) error { hash := tx.Hash() - foundTx := FindTx(self.pool, func(tx *types.Transaction, e *list.Element) bool { - return bytes.Compare(tx.Hash(), hash) == 0 - }) - - if foundTx != nil { + if self.pool.Has(tx) { return fmt.Errorf("Known transaction (%x)", hash[0:4]) } @@ -161,7 +115,7 @@ func (self *TxPool) Add(tx *types.Transaction) error { } func (self *TxPool) Size() int { - return self.pool.Len() + return self.pool.Size() } func (self *TxPool) AddTransactions(txs []*types.Transaction) { @@ -175,63 +129,47 @@ func (self *TxPool) AddTransactions(txs []*types.Transaction) { } func (pool *TxPool) GetTransactions() []*types.Transaction { - pool.mutex.Lock() - defer pool.mutex.Unlock() - - txList := make([]*types.Transaction, pool.pool.Len()) + txList := make([]*types.Transaction, pool.Size()) i := 0 - for e := pool.pool.Front(); e != nil; e = e.Next() { - tx := e.Value.(*types.Transaction) - - txList[i] = tx - + pool.pool.Each(func(v interface{}) bool { + txList[i] = v.(*types.Transaction) i++ - } + + return true + }) return txList } -func (pool *TxPool) RemoveInvalid(state *state.StateDB) { - pool.mutex.Lock() - defer pool.mutex.Unlock() - - for e := pool.pool.Front(); e != nil; e = e.Next() { - tx := e.Value.(*types.Transaction) - sender := state.GetAccount(tx.From()) +func (pool *TxPool) RemoveInvalid(query StateQuery) { + var removedTxs types.Transactions + pool.pool.Each(func(v interface{}) bool { + tx := v.(*types.Transaction) + sender := query.GetAccount(tx.From()) err := pool.ValidateTransaction(tx) if err != nil || sender.Nonce >= tx.Nonce() { - pool.pool.Remove(e) + removedTxs = append(removedTxs, tx) } - } + + return true + }) + pool.RemoveSet(removedTxs) } func (self *TxPool) RemoveSet(txs types.Transactions) { - self.mutex.Lock() - defer self.mutex.Unlock() - for _, tx := range txs { - EachTx(self.pool, func(t *types.Transaction, element *list.Element) bool { - if t == tx { - self.pool.Remove(element) - return true // To stop the loop - } - return false - }) + self.pool.Remove(tx) } } func (pool *TxPool) Flush() []*types.Transaction { txList := pool.GetTransactions() - - // Recreate a new list all together - // XXX Is this the fastest way? - pool.pool = list.New() + pool.pool.Clear() return txList } func (pool *TxPool) Start() { - //go pool.queueHandler() } func (pool *TxPool) Stop() { |