aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeffrey Wilcke <jeffrey@ethereum.org>2015-04-23 17:59:56 +0800
committerJeffrey Wilcke <jeffrey@ethereum.org>2015-04-23 17:59:56 +0800
commit2f8809df404b83824ffadd61ff766cbf9e4c5419 (patch)
treed3c168060e3efffb945079be1d221d14b4ec79d1
parent2fe54ab233c0cd1bf09b49085477c961dcc1980f (diff)
parent7f14fbd57936cf74627572da4a06585d35161ea9 (diff)
downloaddexon-2f8809df404b83824ffadd61ff766cbf9e4c5419.tar.gz
dexon-2f8809df404b83824ffadd61ff766cbf9e4c5419.tar.zst
dexon-2f8809df404b83824ffadd61ff766cbf9e4c5419.zip
Merge pull request #769 from obscuren/develop
core: transaction queue
-rw-r--r--common/natspec/natspec_e2e_test.go2
-rw-r--r--core/block_processor.go2
-rw-r--r--core/chain_makers.go4
-rw-r--r--core/chain_manager.go2
-rw-r--r--core/error.go2
-rw-r--r--core/transaction_pool.go142
-rw-r--r--core/transaction_pool_test.go54
-rw-r--r--eth/backend.go36
-rw-r--r--eth/handler.go66
-rw-r--r--eth/peer.go6
-rw-r--r--miner/worker.go7
-rw-r--r--xeth/xeth.go4
12 files changed, 261 insertions, 66 deletions
diff --git a/common/natspec/natspec_e2e_test.go b/common/natspec/natspec_e2e_test.go
index 147abe162..6bdaec8a1 100644
--- a/common/natspec/natspec_e2e_test.go
+++ b/common/natspec/natspec_e2e_test.go
@@ -220,7 +220,7 @@ func (self *testFrontend) applyTxs() {
block := self.ethereum.ChainManager().NewBlock(cb)
coinbase := self.stateDb.GetStateObject(cb)
coinbase.SetGasPool(big.NewInt(10000000))
- txs := self.ethereum.TxPool().GetTransactions()
+ txs := self.ethereum.TxPool().GetQueuedTransactions()
for i := 0; i < len(txs); i++ {
for _, tx := range txs {
diff --git a/core/block_processor.go b/core/block_processor.go
index f33f0d433..af47069ad 100644
--- a/core/block_processor.go
+++ b/core/block_processor.go
@@ -258,7 +258,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
state.Sync()
// Remove transactions from the pool
- sm.txpool.RemoveSet(block.Transactions())
+ sm.txpool.RemoveTransactions(block.Transactions())
// This puts transactions in a extra db for rpc
for i, tx := range block.Transactions() {
diff --git a/core/chain_makers.go b/core/chain_makers.go
index 250671ef8..9b4911fba 100644
--- a/core/chain_makers.go
+++ b/core/chain_makers.go
@@ -108,7 +108,9 @@ func makeChain(bman *BlockProcessor, parent *types.Block, max int, db common.Dat
// Create a new chain manager starting from given block
// Effectively a fork factory
func newChainManager(block *types.Block, eventMux *event.TypeMux, db common.Database) *ChainManager {
- bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: GenesisBlock(db), eventMux: eventMux}
+ genesis := GenesisBlock(db)
+ bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: genesis, eventMux: eventMux}
+ bc.txState = state.ManageState(state.New(genesis.Root(), db))
bc.futureBlocks = NewBlockCache(1000)
if block == nil {
bc.Reset()
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 1df56b27f..47f84b80a 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -576,7 +576,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
})
self.setTransState(state.New(block.Root(), self.stateDb))
- self.setTxState(state.New(block.Root(), self.stateDb))
+ self.txState.SetState(state.New(block.Root(), self.stateDb))
queue[i] = ChainEvent{block, logs}
queueEvent.canonicalCount++
diff --git a/core/error.go b/core/error.go
index 0642948cd..40db99ecd 100644
--- a/core/error.go
+++ b/core/error.go
@@ -81,7 +81,7 @@ func (err *NonceErr) Error() string {
}
func NonceError(is, exp uint64) *NonceErr {
- return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce (%d / %d)", is, exp), Is: is, Exp: exp}
+ return &NonceErr{Message: fmt.Sprintf("Transaction w/ invalid nonce. tx=%d state=%d)", is, exp), Is: is, Exp: exp}
}
func IsNonceErr(err error) bool {
diff --git a/core/transaction_pool.go b/core/transaction_pool.go
index eaddcfa09..392e17856 100644
--- a/core/transaction_pool.go
+++ b/core/transaction_pool.go
@@ -4,7 +4,9 @@ import (
"errors"
"fmt"
"math/big"
+ "sort"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
@@ -17,7 +19,7 @@ import (
var (
ErrInvalidSender = errors.New("Invalid sender")
- ErrImpossibleNonce = errors.New("Impossible nonce")
+ ErrNonce = errors.New("Nonce too low")
ErrNonExistentAccount = errors.New("Account does not exist")
ErrInsufficientFunds = errors.New("Insufficient funds")
ErrIntrinsicGas = errors.New("Intrinsic gas too low")
@@ -54,20 +56,43 @@ type TxPool struct {
txs map[common.Hash]*types.Transaction
invalidHashes *set.Set
+ queue map[common.Address]types.Transactions
+
subscribers []chan TxMsg
eventMux *event.TypeMux
}
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool {
- return &TxPool{
+ txPool := &TxPool{
txs: make(map[common.Hash]*types.Transaction),
+ queue: make(map[common.Address]types.Transactions),
queueChan: make(chan *types.Transaction, txPoolQueueSize),
quit: make(chan bool),
eventMux: eventMux,
invalidHashes: set.New(),
currentState: currentStateFn,
}
+ return txPool
+}
+
+func (pool *TxPool) Start() {
+ // Queue timer will tick so we can attempt to move items from the queue to the
+ // main transaction pool.
+ queueTimer := time.NewTicker(300 * time.Millisecond)
+ // Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
+ removalTimer := time.NewTicker(1 * time.Second)
+done:
+ for {
+ select {
+ case <-queueTimer.C:
+ pool.checkQueue()
+ case <-removalTimer.C:
+ pool.validatePool()
+ case <-pool.quit:
+ break done
+ }
+ }
}
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
@@ -100,16 +125,12 @@ func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
}
if pool.currentState().GetNonce(from) > tx.Nonce() {
- return ErrImpossibleNonce
+ return ErrNonce
}
return nil
}
-func (self *TxPool) addTx(tx *types.Transaction) {
- self.txs[tx.Hash()] = tx
-}
-
func (self *TxPool) add(tx *types.Transaction) error {
hash := tx.Hash()
@@ -127,7 +148,7 @@ func (self *TxPool) add(tx *types.Transaction) error {
return err
}
- self.addTx(tx)
+ self.queueTx(tx)
var toname string
if to := tx.To(); to != nil {
@@ -144,9 +165,6 @@ func (self *TxPool) add(tx *types.Transaction) error {
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
}
- // Notify the subscribers
- go self.eventMux.Post(TxPreEvent{tx})
-
return nil
}
@@ -189,34 +207,108 @@ func (self *TxPool) GetTransactions() (txs types.Transactions) {
return
}
-func (self *TxPool) RemoveSet(txs types.Transactions) {
- self.mu.Lock()
- defer self.mu.Unlock()
- for _, tx := range txs {
- delete(self.txs, tx.Hash())
+func (self *TxPool) GetQueuedTransactions() types.Transactions {
+ self.mu.RLock()
+ defer self.mu.RUnlock()
+
+ var txs types.Transactions
+ for _, ts := range self.queue {
+ txs = append(txs, ts...)
}
+
+ return txs
}
-func (self *TxPool) InvalidateSet(hashes *set.Set) {
+func (self *TxPool) RemoveTransactions(txs types.Transactions) {
self.mu.Lock()
defer self.mu.Unlock()
- hashes.Each(func(v interface{}) bool {
- delete(self.txs, v.(common.Hash))
- return true
- })
- self.invalidHashes.Merge(hashes)
+ for _, tx := range txs {
+ delete(self.txs, tx.Hash())
+ }
}
func (pool *TxPool) Flush() {
pool.txs = make(map[common.Hash]*types.Transaction)
}
-func (pool *TxPool) Start() {
-}
-
func (pool *TxPool) Stop() {
pool.Flush()
+ close(pool.quit)
glog.V(logger.Info).Infoln("TX Pool stopped")
}
+
+func (self *TxPool) queueTx(tx *types.Transaction) {
+ from, _ := tx.From()
+ self.queue[from] = append(self.queue[from], tx)
+}
+
+func (pool *TxPool) addTx(tx *types.Transaction) {
+ if _, ok := pool.txs[tx.Hash()]; !ok {
+ pool.txs[tx.Hash()] = tx
+ // Notify the subscribers. This event is posted in a goroutine
+ // because it's possible that somewhere during the post "Remove transaction"
+ // gets called which will then wait for the global tx pool lock and deadlock.
+ go pool.eventMux.Post(TxPreEvent{tx})
+ }
+}
+
+// check queue will attempt to insert
+func (pool *TxPool) checkQueue() {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ statedb := pool.currentState()
+ for address, txs := range pool.queue {
+ sort.Sort(types.TxByNonce{txs})
+
+ var (
+ nonce = statedb.GetNonce(address)
+ start int
+ )
+ // Clean up the transactions first and determine the start of the nonces
+ for _, tx := range txs {
+ if tx.Nonce() >= nonce {
+ break
+ }
+ start++
+ }
+ pool.queue[address] = txs[start:]
+
+ // expected nonce
+ enonce := nonce
+ for _, tx := range pool.queue[address] {
+ // If the expected nonce does not match up with the next one
+ // (i.e. a nonce gap), we stop the loop
+ if enonce != tx.Nonce() {
+ break
+ }
+ enonce++
+
+ pool.addTx(tx)
+ }
+ //pool.queue[address] = txs[i:]
+ // delete the entire queue entry if it's empty. There's no need to keep it
+ if len(pool.queue[address]) == 0 {
+ delete(pool.queue, address)
+ }
+ }
+}
+
+func (pool *TxPool) validatePool() {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+
+ statedb := pool.currentState()
+ for hash, tx := range pool.txs {
+ from, _ := tx.From()
+ if nonce := statedb.GetNonce(from); nonce > tx.Nonce() {
+ if glog.V(logger.Debug) {
+ glog.Infof("removed tx (%x) from pool due to nonce error. state=%d tx=%d\n", hash[:4], nonce, tx.Nonce())
+ }
+
+ delete(pool.txs, hash)
+ }
+ }
+}
diff --git a/core/transaction_pool_test.go b/core/transaction_pool_test.go
index b7486adb3..0e049139e 100644
--- a/core/transaction_pool_test.go
+++ b/core/transaction_pool_test.go
@@ -56,7 +56,57 @@ func TestInvalidTransactions(t *testing.T) {
tx.SignECDSA(key)
err = pool.Add(tx)
- if err != ErrImpossibleNonce {
- t.Error("expected", ErrImpossibleNonce)
+ if err != ErrNonce {
+ t.Error("expected", ErrNonce)
+ }
+}
+
+func TestTransactionQueue(t *testing.T) {
+ pool, key := setupTxPool()
+ tx := transaction()
+ tx.SignECDSA(key)
+ from, _ := tx.From()
+ pool.currentState().AddBalance(from, big.NewInt(1))
+ pool.queueTx(tx)
+
+ pool.checkQueue()
+ if len(pool.txs) != 1 {
+ t.Error("expected valid txs to be 1 is", len(pool.txs))
+ }
+
+ tx = transaction()
+ tx.SignECDSA(key)
+ from, _ = tx.From()
+ pool.currentState().SetNonce(from, 10)
+ tx.SetNonce(1)
+ pool.queueTx(tx)
+ pool.checkQueue()
+ if _, ok := pool.txs[tx.Hash()]; ok {
+ t.Error("expected transaction to be in tx pool")
+ }
+
+ if len(pool.queue[from]) != 0 {
+ t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
+ }
+
+ pool, key = setupTxPool()
+ tx1, tx2, tx3 := transaction(), transaction(), transaction()
+ tx2.SetNonce(10)
+ tx3.SetNonce(11)
+ tx1.SignECDSA(key)
+ tx2.SignECDSA(key)
+ tx3.SignECDSA(key)
+ pool.queueTx(tx1)
+ pool.queueTx(tx2)
+ pool.queueTx(tx3)
+ from, _ = tx1.From()
+ pool.checkQueue()
+
+ if len(pool.txs) != 1 {
+ t.Error("expected tx pool to be 1 =")
+ }
+
+ if len(pool.queue[from]) != 3 {
+ t.Error("expected transaction queue to be empty. is", len(pool.queue[from]))
}
}
diff --git a/eth/backend.go b/eth/backend.go
index 88456e448..646a4eaf2 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -3,7 +3,6 @@ package eth
import (
"crypto/ecdsa"
"fmt"
- "math"
"path"
"strings"
@@ -136,11 +135,10 @@ type Ethereum struct {
protocolManager *ProtocolManager
downloader *downloader.Downloader
- net *p2p.Server
- eventMux *event.TypeMux
- txSub event.Subscription
- minedBlockSub event.Subscription
- miner *miner.Miner
+ net *p2p.Server
+ eventMux *event.TypeMux
+ txSub event.Subscription
+ miner *miner.Miner
// logger logger.LogSystem
@@ -222,7 +220,7 @@ func New(config *Config) (*Ethereum, error) {
eth.whisper = whisper.New()
eth.shhVersionId = int(eth.whisper.Version())
eth.miner = miner.New(eth, eth.pow, config.MinerThreads)
- eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.txPool, eth.chainManager, eth.downloader)
+ eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader)
netprv, err := config.nodeKey()
if err != nil {
@@ -379,7 +377,8 @@ func (s *Ethereum) Start() error {
}
// Start services
- s.txPool.Start()
+ go s.txPool.Start()
+ s.protocolManager.Start()
if s.whisper != nil {
s.whisper.Start()
@@ -389,10 +388,6 @@ func (s *Ethereum) Start() error {
s.txSub = s.eventMux.Subscribe(core.TxPreEvent{})
go s.txBroadcastLoop()
- // broadcast mined blocks
- s.minedBlockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{})
- go s.minedBroadcastLoop()
-
glog.V(logger.Info).Infoln("Server started")
return nil
}
@@ -422,9 +417,9 @@ func (s *Ethereum) Stop() {
defer s.stateDb.Close()
defer s.extraDb.Close()
- s.txSub.Unsubscribe() // quits txBroadcastLoop
- s.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
+ s.txSub.Unsubscribe() // quits txBroadcastLoop
+ s.protocolManager.Stop()
s.txPool.Stop()
s.eventMux.Stop()
if s.whisper != nil {
@@ -440,13 +435,10 @@ func (s *Ethereum) WaitForShutdown() {
<-s.shutdownChan
}
-// now tx broadcasting is taken out of txPool
-// handled here via subscription, efficiency?
func (self *Ethereum) txBroadcastLoop() {
// automatically stops if unsubscribe
for obj := range self.txSub.Chan() {
event := obj.(core.TxPreEvent)
- self.net.BroadcastLimited("eth", TxMsg, math.Sqrt, []*types.Transaction{event.Tx})
self.syncAccounts(event.Tx)
}
}
@@ -465,16 +457,6 @@ func (self *Ethereum) syncAccounts(tx *types.Transaction) {
}
}
-func (self *Ethereum) minedBroadcastLoop() {
- // automatically stops if unsubscribe
- for obj := range self.minedBlockSub.Chan() {
- switch ev := obj.(type) {
- case core.NewMinedBlockEvent:
- self.protocolManager.BroadcastBlock(ev.Block.Hash(), ev.Block)
- }
- }
-}
-
func saveProtocolVersion(db common.Database, protov int) {
d, _ := db.Get([]byte("ProtocolVersion"))
protocolVersion := common.NewValue(d).Uint()
diff --git a/eth/handler.go b/eth/handler.go
index 622f22132..d466dbfee 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -44,6 +44,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p"
@@ -77,12 +78,17 @@ type ProtocolManager struct {
peers map[string]*peer
SubProtocol p2p.Protocol
+
+ eventMux *event.TypeMux
+ txSub event.Subscription
+ minedBlockSub event.Subscription
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager {
+func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager {
manager := &ProtocolManager{
+ eventMux: mux,
txpool: txpool,
chainman: chainman,
downloader: downloader,
@@ -105,6 +111,21 @@ func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman
return manager
}
+func (pm *ProtocolManager) Start() {
+ // broadcast transactions
+ pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
+ go pm.txBroadcastLoop()
+
+ // broadcast mined blocks
+ pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
+ go pm.minedBroadcastLoop()
+}
+
+func (pm *ProtocolManager) Stop() {
+ pm.txSub.Unsubscribe() // quits txBroadcastLoop
+ pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
+}
+
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
td, current, genesis := pm.chainman.Status()
@@ -326,10 +347,51 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block)
}
}
// Broadcast block to peer set
- // XXX due to the current shit state of the network disable the limit
peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
peer.sendNewBlock(block)
}
glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers")
}
+
+// BroadcastTx will propagate the block to its connected peers. It will sort
+// out which peers do not contain the block in their block set and will do a
+// sqrt(peers) to determine the amount of peers we broadcast to.
+func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
+ pm.pmu.Lock()
+ defer pm.pmu.Unlock()
+
+ // Find peers who don't know anything about the given hash. Peers that
+ // don't know about the hash will be a candidate for the broadcast loop
+ var peers []*peer
+ for _, peer := range pm.peers {
+ if !peer.txHashes.Has(hash) {
+ peers = append(peers, peer)
+ }
+ }
+ // Broadcast block to peer set
+ peers = peers[:int(math.Sqrt(float64(len(peers))))]
+ for _, peer := range peers {
+ peer.sendTransaction(tx)
+ }
+ glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers")
+}
+
+// Mined broadcast loop
+func (self *ProtocolManager) minedBroadcastLoop() {
+ // automatically stops if unsubscribe
+ for obj := range self.minedBlockSub.Chan() {
+ switch ev := obj.(type) {
+ case core.NewMinedBlockEvent:
+ self.BroadcastBlock(ev.Block.Hash(), ev.Block)
+ }
+ }
+}
+
+func (self *ProtocolManager) txBroadcastLoop() {
+ // automatically stops if unsubscribe
+ for obj := range self.txSub.Chan() {
+ event := obj.(core.TxPreEvent)
+ self.BroadcastTx(event.Tx.Hash(), event.Tx)
+ }
+}
diff --git a/eth/peer.go b/eth/peer.go
index 972880845..ec0c4b1f3 100644
--- a/eth/peer.go
+++ b/eth/peer.go
@@ -86,6 +86,12 @@ func (p *peer) sendNewBlock(block *types.Block) error {
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, block.Td})
}
+func (p *peer) sendTransaction(tx *types.Transaction) error {
+ p.txHashes.Add(tx.Hash())
+
+ return p2p.Send(p.rw, TxMsg, []*types.Transaction{tx})
+}
+
func (p *peer) requestHashes(from common.Hash) error {
glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, maxHashes, from[:4])
return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes})
diff --git a/miner/worker.go b/miner/worker.go
index dc1f04d87..19ede3c93 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -258,7 +258,7 @@ func (self *worker) commitNewWork() {
tcount = 0
ignoredTransactors = set.New()
)
- //gasLimit:
+
for _, tx := range transactions {
// We can skip err. It has already been validated in the tx pool
from, _ := tx.From()
@@ -290,13 +290,12 @@ func (self *worker) commitNewWork() {
// ignore the transactor so no nonce errors will be thrown for this account
// next time the worker is run, they'll be picked up again.
ignoredTransactors.Add(from)
- //glog.V(logger.Debug).Infof("Gas limit reached for block. %d TXs included in this block\n", i)
- //break gasLimit
+
+ glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
default:
tcount++
}
}
- //self.eth.TxPool().InvalidateSet(remove)
var (
uncles []*types.Header
diff --git a/xeth/xeth.go b/xeth/xeth.go
index afcb33e4c..251b070e4 100644
--- a/xeth/xeth.go
+++ b/xeth/xeth.go
@@ -682,9 +682,11 @@ func (self *XEth) Transact(fromStr, toStr, valueStr, gasStr, gasPriceStr, codeSt
if contractCreation {
addr := core.AddressFromMessage(tx)
- glog.V(logger.Info).Infof("Contract addr %x\n", addr)
+ glog.V(logger.Info).Infof("Tx(%x) created: %x\n", tx.Hash(), addr)
return core.AddressFromMessage(tx).Hex(), nil
+ } else {
+ glog.V(logger.Info).Infof("Tx(%x) to: %x\n", tx.Hash(), tx.To())
}
return tx.Hash().Hex(), nil
}