diff options
author | obscuren <geffobscura@gmail.com> | 2015-04-22 23:56:06 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-04-23 17:50:12 +0800 |
commit | d3be1a271961f13f5bd056d195b790c668552fe1 (patch) | |
tree | a586f4e4d0a4e1110a67cba24588bf2c64d9edf3 /eth/handler.go | |
parent | 888ece0cb2c9d07ae821398aeffb0000ef28fb23 (diff) | |
download | dexon-d3be1a271961f13f5bd056d195b790c668552fe1.tar.gz dexon-d3be1a271961f13f5bd056d195b790c668552fe1.tar.zst dexon-d3be1a271961f13f5bd056d195b790c668552fe1.zip |
eth: moved mined, tx events to protocol-hnd and improved tx propagation
Transactions are now propagated to peers from which we have not yet
received the transaction. This will significantly reduce the chatter on
the network.
Moved new mined block handler to the protocol handler and moved
transaction handling to protocol handler.
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 66 |
1 files changed, 64 insertions, 2 deletions
diff --git a/eth/handler.go b/eth/handler.go index 622f22132..d466dbfee 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -44,6 +44,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" @@ -77,12 +78,17 @@ type ProtocolManager struct { peers map[string]*peer SubProtocol p2p.Protocol + + eventMux *event.TypeMux + txSub event.Subscription + minedBlockSub event.Subscription } // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { +func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { manager := &ProtocolManager{ + eventMux: mux, txpool: txpool, chainman: chainman, downloader: downloader, @@ -105,6 +111,21 @@ func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman return manager } +func (pm *ProtocolManager) Start() { + // broadcast transactions + pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) + go pm.txBroadcastLoop() + + // broadcast mined blocks + pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go pm.minedBroadcastLoop() +} + +func (pm *ProtocolManager) Stop() { + pm.txSub.Unsubscribe() // quits txBroadcastLoop + pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop +} + func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { td, current, genesis := pm.chainman.Status() @@ -326,10 +347,51 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) } } // Broadcast block to peer set - // XXX due to the current shit state of the network disable the limit peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.sendNewBlock(block) } glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers") } + +// BroadcastTx will propagate the block to its connected peers. It will sort +// out which peers do not contain the block in their block set and will do a +// sqrt(peers) to determine the amount of peers we broadcast to. +func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { + pm.pmu.Lock() + defer pm.pmu.Unlock() + + // Find peers who don't know anything about the given hash. Peers that + // don't know about the hash will be a candidate for the broadcast loop + var peers []*peer + for _, peer := range pm.peers { + if !peer.txHashes.Has(hash) { + peers = append(peers, peer) + } + } + // Broadcast block to peer set + peers = peers[:int(math.Sqrt(float64(len(peers))))] + for _, peer := range peers { + peer.sendTransaction(tx) + } + glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers") +} + +// Mined broadcast loop +func (self *ProtocolManager) minedBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range self.minedBlockSub.Chan() { + switch ev := obj.(type) { + case core.NewMinedBlockEvent: + self.BroadcastBlock(ev.Block.Hash(), ev.Block) + } + } +} + +func (self *ProtocolManager) txBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range self.txSub.Chan() { + event := obj.(core.TxPreEvent) + self.BroadcastTx(event.Tx.Hash(), event.Tx) + } +} |