From d3be1a271961f13f5bd056d195b790c668552fe1 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 22 Apr 2015 17:56:06 +0200 Subject: eth: moved mined, tx events to protocol-hnd and improved tx propagation Transactions are now propagated to peers from which we have not yet received the transaction. This will significantly reduce the chatter on the network. Moved new mined block handler to the protocol handler and moved transaction handling to protocol handler. --- eth/handler.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 2 deletions(-) (limited to 'eth/handler.go') diff --git a/eth/handler.go b/eth/handler.go index 622f22132..d466dbfee 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -44,6 +44,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" @@ -77,12 +78,17 @@ type ProtocolManager struct { peers map[string]*peer SubProtocol p2p.Protocol + + eventMux *event.TypeMux + txSub event.Subscription + minedBlockSub event.Subscription } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { +func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { manager := &ProtocolManager{ + eventMux: mux, txpool: txpool, chainman: chainman, downloader: downloader, @@ -105,6 +111,21 @@ func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman return manager } +func (pm *ProtocolManager) Start() { + // broadcast transactions + pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) + go pm.txBroadcastLoop() + + // broadcast mined blocks + pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go pm.minedBroadcastLoop() +} + +func (pm *ProtocolManager) Stop() { + pm.txSub.Unsubscribe() // quits txBroadcastLoop + pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop +} + func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { td, current, genesis := pm.chainman.Status() @@ -326,10 +347,51 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) } } // Broadcast block to peer set - // XXX due to the current shit state of the network disable the limit peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.sendNewBlock(block) } glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers") } + +// BroadcastTx will propagate the block to its connected peers. It will sort +// out which peers do not contain the block in their block set and will do a +// sqrt(peers) to determine the amount of peers we broadcast to. +func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { + pm.pmu.Lock() + defer pm.pmu.Unlock() + + // Find peers who don't know anything about the given hash. Peers that + // don't know about the hash will be a candidate for the broadcast loop + var peers []*peer + for _, peer := range pm.peers { + if !peer.txHashes.Has(hash) { + peers = append(peers, peer) + } + } + // Broadcast block to peer set + peers = peers[:int(math.Sqrt(float64(len(peers))))] + for _, peer := range peers { + peer.sendTransaction(tx) + } + glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers") +} + +// Mined broadcast loop +func (self *ProtocolManager) minedBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range self.minedBlockSub.Chan() { + switch ev := obj.(type) { + case core.NewMinedBlockEvent: + self.BroadcastBlock(ev.Block.Hash(), ev.Block) + } + } +} + +func (self *ProtocolManager) txBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range self.txSub.Chan() { + event := obj.(core.TxPreEvent) + self.BroadcastTx(event.Tx.Hash(), event.Tx) + } +} -- cgit