diff options
author | Miya Chen <miyatlchen@gmail.com> | 2017-08-18 18:58:36 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-08-18 18:58:36 +0800 |
commit | bf1e2631281e1e439533f2abcf1e99a7b2f9552a (patch) | |
tree | a8b86720edf085a6531e7042ef33f36a993540d5 /eth/handler.go | |
parent | a4da8416eec6a00c358b6a612d21e7cdf859d588 (diff) | |
download | dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.gz dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.tar.zst dexon-bf1e2631281e1e439533f2abcf1e99a7b2f9552a.zip |
core, light: send chain events using event.Feed (#14865)
Diffstat (limited to 'eth/handler.go')
-rw-r--r-- | eth/handler.go | 23 |
1 files changed, 17 insertions, 6 deletions
diff --git a/eth/handler.go b/eth/handler.go index e6a9c86d7..9d230a4ad 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -45,6 +45,10 @@ import ( const ( softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header + + // txChanSize is the size of channel listening to TxPreEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 ) var ( @@ -78,7 +82,8 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol eventMux *event.TypeMux - txSub *event.TypeMuxSubscription + txCh chan core.TxPreEvent + txSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop @@ -200,7 +205,8 @@ func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) Start() { // broadcast transactions - pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{}) + pm.txCh = make(chan core.TxPreEvent, txChanSize) + pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) go pm.txBroadcastLoop() // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) @@ -724,10 +730,15 @@ func (self *ProtocolManager) minedBroadcastLoop() { } func (self *ProtocolManager) txBroadcastLoop() { - // automatically stops if unsubscribe - for obj := range self.txSub.Chan() { - event := obj.Data.(core.TxPreEvent) - self.BroadcastTx(event.Tx.Hash(), event.Tx) + for { + select { + case event := <-self.txCh: + self.BroadcastTx(event.Tx.Hash(), event.Tx) + + // Err() channel will be closed when unsubscribing. + case <-self.txSub.Err(): + return + } } } |