aboutsummaryrefslogtreecommitdiffstats
path: root/eth/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/handler.go')
-rw-r--r--eth/handler.go66
1 files changed, 64 insertions, 2 deletions
diff --git a/eth/handler.go b/eth/handler.go
index 622f22132..d466dbfee 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -44,6 +44,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p"
@@ -77,12 +78,17 @@ type ProtocolManager struct {
peers map[string]*peer
SubProtocol p2p.Protocol
+
+ eventMux *event.TypeMux
+ txSub event.Subscription
+ minedBlockSub event.Subscription
}
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
-func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager {
+func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager {
manager := &ProtocolManager{
+ eventMux: mux,
txpool: txpool,
chainman: chainman,
downloader: downloader,
@@ -105,6 +111,21 @@ func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman
return manager
}
+func (pm *ProtocolManager) Start() {
+ // broadcast transactions
+ pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
+ go pm.txBroadcastLoop()
+
+ // broadcast mined blocks
+ pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
+ go pm.minedBroadcastLoop()
+}
+
+func (pm *ProtocolManager) Stop() {
+ pm.txSub.Unsubscribe() // quits txBroadcastLoop
+ pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
+}
+
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
td, current, genesis := pm.chainman.Status()
@@ -326,10 +347,51 @@ func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block)
}
}
// Broadcast block to peer set
- // XXX due to the current shit state of the network disable the limit
peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
peer.sendNewBlock(block)
}
glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers")
}
+
+// BroadcastTx will propagate the block to its connected peers. It will sort
+// out which peers do not contain the block in their block set and will do a
+// sqrt(peers) to determine the amount of peers we broadcast to.
+func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
+ pm.pmu.Lock()
+ defer pm.pmu.Unlock()
+
+ // Find peers who don't know anything about the given hash. Peers that
+ // don't know about the hash will be a candidate for the broadcast loop
+ var peers []*peer
+ for _, peer := range pm.peers {
+ if !peer.txHashes.Has(hash) {
+ peers = append(peers, peer)
+ }
+ }
+ // Broadcast block to peer set
+ peers = peers[:int(math.Sqrt(float64(len(peers))))]
+ for _, peer := range peers {
+ peer.sendTransaction(tx)
+ }
+ glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers")
+}
+
+// Mined broadcast loop
+func (self *ProtocolManager) minedBroadcastLoop() {
+ // automatically stops if unsubscribe
+ for obj := range self.minedBlockSub.Chan() {
+ switch ev := obj.(type) {
+ case core.NewMinedBlockEvent:
+ self.BroadcastBlock(ev.Block.Hash(), ev.Block)
+ }
+ }
+}
+
+func (self *ProtocolManager) txBroadcastLoop() {
+ // automatically stops if unsubscribe
+ for obj := range self.txSub.Chan() {
+ event := obj.(core.TxPreEvent)
+ self.BroadcastTx(event.Tx.Hash(), event.Tx)
+ }
+}