From d4183037825fe7a86e2a6653adb5e97ee0d6bbf8 Mon Sep 17 00:00:00 2001 From: Sonic Date: Wed, 5 Dec 2018 15:17:38 +0800 Subject: core, dex: polish sync (#75) - Broadcasting blocks at chain head event is not correct when the full node is not running in block proposer mode. Introduce NewFinalizedBlockEvent, this event is post by the full node which runs in block proposer mode when a block is witnessed and resulting in some blocks are considered finalized. - Non block proposer node will still broadcast blocks at the following moment (same as ethereum): 1. a sync with a peer is terminated successfully 2. a block passes the fetcher's header check during inserting blocks 3. a block is successfully inserted by fetcher - Don't trigger a sync when we are not behind other peers more than acceptable distance. Fetcher is able to cover this. --- dex/handler.go | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) (limited to 'dex/handler.go') diff --git a/dex/handler.go b/dex/handler.go index 5753a7cd8..51167c9cb 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -72,6 +72,8 @@ const ( // The number is referenced from the size of tx pool. txChanSize = 4096 + finalizedBlockChanSize = 128 + metaChanSize = 10240 maxPullPeers = 3 @@ -133,6 +135,10 @@ type ProtocolManager struct { // Dexcon isBlockProposer bool + app dexconApp + + finalizedBlockCh chan core.NewFinalizedBlockEvent + finalizedBlockSub event.Subscription } // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -141,7 +147,7 @@ func NewProtocolManager( config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, - isBlockProposer bool, gov governance) (*ProtocolManager, error) { + isBlockProposer bool, gov governance, app dexconApp) (*ProtocolManager, error) { tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ @@ -160,6 +166,7 @@ func NewProtocolManager( quitSync: make(chan struct{}), receiveCh: make(chan interface{}, 1024), isBlockProposer: isBlockProposer, + app: app, } // Figure out whether to allow fast sync or not @@ -255,6 +262,15 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() + if pm.isBlockProposer { + // broadcast finalized blocks + pm.finalizedBlockCh = make(chan core.NewFinalizedBlockEvent, + finalizedBlockChanSize) + pm.finalizedBlockSub = pm.app.SubscribeNewFinalizedBlockEvent( + pm.finalizedBlockCh) + go pm.finalizedBlockBroadcastLoop() + } + // broadcast node metas pm.metasCh = make(chan newMetasEvent, metaChanSize) pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) @@ -1128,6 +1144,20 @@ func (pm *ProtocolManager) txBroadcastLoop() { } } +func (pm *ProtocolManager) finalizedBlockBroadcastLoop() { + for { + select { + case event := <-pm.finalizedBlockCh: + pm.BroadcastBlock(event.Block, true) + pm.BroadcastBlock(event.Block, false) + + // Err() channel will be closed when unsubscribing. + case <-pm.finalizedBlockSub.Err(): + return + } + } +} + func (pm *ProtocolManager) metaBroadcastLoop() { for { select { @@ -1154,10 +1184,7 @@ func (pm *ProtocolManager) peerSetLoop() { for { select { - case event := <-pm.chainHeadCh: - pm.BroadcastBlock(event.Block, true) // First propagate block to peers - pm.BroadcastBlock(event.Block, false) // Only then announce to the rest - + case <-pm.chainHeadCh: if !pm.isBlockProposer { break } -- cgit