From 08befff8f168f0fd59a3ff36a7205ba44fb82540 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 3 Jun 2015 14:06:20 +0200 Subject: core: compute less transaction hashes in TxPool --- core/transaction_pool.go | 144 +++++++++++++++++++++++------------------------ 1 file changed, 72 insertions(+), 72 deletions(-) (limited to 'core/transaction_pool.go') diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 4296c79f6..d8debe1c0 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -61,7 +61,7 @@ type TxPool struct { txs map[common.Hash]*types.Transaction invalidHashes *set.Set - queue map[common.Address]types.Transactions + queue map[common.Address]map[common.Hash]*types.Transaction subscribers []chan TxMsg @@ -71,7 +71,7 @@ type TxPool struct { func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool { txPool := &TxPool{ txs: make(map[common.Hash]*types.Transaction), - queue: make(map[common.Address]types.Transactions), + queue: make(map[common.Address]map[common.Hash]*types.Transaction), queueChan: make(chan *types.Transaction, txPoolQueueSize), quit: make(chan bool), eventMux: eventMux, @@ -157,22 +157,20 @@ func (self *TxPool) add(tx *types.Transaction) error { if err != nil { return err } - - self.queueTx(tx) - - var toname string - if to := tx.To(); to != nil { - toname = common.Bytes2Hex(to[:4]) - } else { - toname = "[NEW_CONTRACT]" - } - // we can ignore the error here because From is - // verified in ValidateTransaction. - f, _ := tx.From() - from := common.Bytes2Hex(f[:4]) + self.queueTx(hash, tx) if glog.V(logger.Debug) { - glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash()) + var toname string + if to := tx.To(); to != nil { + toname = common.Bytes2Hex(to[:4]) + } else { + toname = "[NEW_CONTRACT]" + } + // we can ignore the error here because From is + // verified in ValidateTransaction. + f, _ := tx.From() + from := common.Bytes2Hex(f[:4]) + glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash) } return nil @@ -211,16 +209,12 @@ func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction { if tx, ok := tp.txs[hash]; ok { return tx } - // check queue for _, txs := range tp.queue { - for _, tx := range txs { - if tx.Hash() == hash { - return tx - } + if tx, ok := txs[hash]; ok { + return tx } } - return nil } @@ -234,26 +228,26 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) { txs[i] = tx i++ } - - return + return txs } func (self *TxPool) GetQueuedTransactions() types.Transactions { self.mu.RLock() defer self.mu.RUnlock() - var txs types.Transactions - for _, ts := range self.queue { - txs = append(txs, ts...) + var ret types.Transactions + for _, txs := range self.queue { + for _, tx := range txs { + ret = append(ret, tx) + } } - - return txs + sort.Sort(types.TxByNonce{ret}) + return ret } func (self *TxPool) RemoveTransactions(txs types.Transactions) { self.mu.Lock() defer self.mu.Unlock() - for _, tx := range txs { self.removeTx(tx.Hash()) } @@ -270,14 +264,17 @@ func (pool *TxPool) Stop() { glog.V(logger.Info).Infoln("TX Pool stopped") } -func (self *TxPool) queueTx(tx *types.Transaction) { +func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) { from, _ := tx.From() // already validated - self.queue[from] = append(self.queue[from], tx) + if self.queue[from] == nil { + self.queue[from] = make(map[common.Hash]*types.Transaction) + } + self.queue[from][hash] = tx } -func (pool *TxPool) addTx(tx *types.Transaction) { - if _, ok := pool.txs[tx.Hash()]; !ok { - pool.txs[tx.Hash()] = tx +func (pool *TxPool) addTx(hash common.Hash, tx *types.Transaction) { + if _, ok := pool.txs[hash]; !ok { + pool.txs[hash] = tx // Notify the subscribers. This event is posted in a goroutine // because it's possible that somewhere during the post "Remove transaction" // gets called which will then wait for the global tx pool lock and deadlock. @@ -291,36 +288,33 @@ func (pool *TxPool) checkQueue() { defer pool.mu.Unlock() statedb := pool.currentState() + var addq txQueue for address, txs := range pool.queue { - sort.Sort(types.TxByNonce{txs}) - - var ( - nonce = statedb.GetNonce(address) - start int - ) - // Clean up the transactions first and determine the start of the nonces - for _, tx := range txs { - if tx.Nonce() >= nonce { - break + curnonce := statedb.GetNonce(address) + addq := addq[:0] + for hash, tx := range txs { + if tx.AccountNonce < curnonce { + // Drop queued transactions whose nonce is lower than + // the account nonce because they have been processed. + delete(txs, hash) + } else { + // Collect the remaining transactions for the next pass. + addq = append(addq, txQueueEntry{hash, tx}) } - start++ } - pool.queue[address] = txs[start:] - - // expected nonce - enonce := nonce - for _, tx := range pool.queue[address] { - // If the expected nonce does not match up with the next one - // (i.e. a nonce gap), we stop the loop - if enonce != tx.Nonce() { + // Find the next consecutive nonce range starting at the + // current account nonce. + sort.Sort(addq) + for _, e := range addq { + if e.AccountNonce != curnonce { break } - enonce++ - - pool.addTx(tx) + curnonce++ + delete(txs, e.hash) + pool.addTx(e.hash, e.Transaction) } - // delete the entire queue entry if it's empty. There's no need to keep it - if len(pool.queue[address]) == 0 { + // Delete the entire queue entry if it became empty. + if len(txs) == 0 { delete(pool.queue, address) } } @@ -329,20 +323,16 @@ func (pool *TxPool) checkQueue() { func (pool *TxPool) removeTx(hash common.Hash) { // delete from pending pool delete(pool.txs, hash) - // delete from queue -out: for address, txs := range pool.queue { - for i, tx := range txs { - if tx.Hash() == hash { - if len(txs) == 1 { - // if only one tx, remove entire address entry - delete(pool.queue, address) - } else { - pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...) - } - break out + if _, ok := txs[hash]; ok { + if len(txs) == 1 { + // if only one tx, remove entire address entry. + delete(pool.queue, address) + } else { + delete(txs, hash) } + break } } } @@ -356,8 +346,18 @@ func (pool *TxPool) validatePool() { if glog.V(logger.Info) { glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err) } - - pool.removeTx(hash) + delete(pool.txs, hash) } } } + +type txQueue []txQueueEntry + +type txQueueEntry struct { + hash common.Hash + *types.Transaction +} + +func (q txQueue) Len() int { return len(q) } +func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } +func (q txQueue) Less(i, j int) bool { return q[i].AccountNonce < q[j].AccountNonce } -- cgit