diff options
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 37 |
1 files changed, 32 insertions, 5 deletions
diff --git a/dex/handler.go b/dex/handler.go index 5753a7cd8..51167c9cb 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -72,6 +72,8 @@ const ( // The number is referenced from the size of tx pool. txChanSize = 4096 + finalizedBlockChanSize = 128 + metaChanSize = 10240 maxPullPeers = 3 @@ -133,6 +135,10 @@ type ProtocolManager struct { // Dexcon isBlockProposer bool + app dexconApp + + finalizedBlockCh chan core.NewFinalizedBlockEvent + finalizedBlockSub event.Subscription } // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -141,7 +147,7 @@ func NewProtocolManager( config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, - isBlockProposer bool, gov governance) (*ProtocolManager, error) { + isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) { tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ @@ -160,6 +166,7 @@ func NewProtocolManager( quitSync: make(chan struct{}), receiveCh: make(chan interface{}, 1024), isBlockProposer: isBlockProposer, + app: app, } // Figure out whether to allow fast sync or not @@ -255,6 +262,15 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() + if pm.isBlockProposer { + // broadcast finalized blocks + pm.finalizedBlockCh = make(chan core.NewFinalizedBlockEvent, + finalizedBlockChanSize) + pm.finalizedBlockSub = pm.app.SubscribeNewFinalizedBlockEvent( + pm.finalizedBlockCh) + go pm.finalizedBlockBroadcastLoop() + } + // broadcast node metas pm.metasCh = make(chan newMetasEvent, metaChanSize) pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) @@ -1128,6 +1144,20 @@ func (pm *ProtocolManager) txBroadcastLoop() { } } +func (pm *ProtocolManager) finalizedBlockBroadcastLoop() { + for { + select { + case event := <-pm.finalizedBlockCh: + pm.BroadcastBlock(event.Block, true) + pm.BroadcastBlock(event.Block, false) + + // Err() channel will be closed when unsubscribing. + case <-pm.finalizedBlockSub.Err(): + return + } + } +} + func (pm *ProtocolManager) metaBroadcastLoop() { for { select { @@ -1154,10 +1184,7 @@ func (pm *ProtocolManager) peerSetLoop() { for { select { - case event := <-pm.chainHeadCh: - pm.BroadcastBlock(event.Block, true) // First propagate block to peers - pm.BroadcastBlock(event.Block, false) // Only then announce to the rest - + case <-pm.chainHeadCh: if !pm.isBlockProposer { break } |