diff options
author | Sonic <sonic@cobinhood.com> | 2018-09-25 20:37:11 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-04-09 21:32:49 +0800 |
commit | 8335eac4a488716c396f114cbe7522919b97e224 (patch) | |
tree | c680248b9a7346e63ddde403689c2a484a43c4b4 /dex/handler.go | |
parent | 6f442cd7793daad014aa0d55b3b7320392c22f02 (diff) | |
download | dexon-8335eac4a488716c396f114cbe7522919b97e224.tar.gz dexon-8335eac4a488716c396f114cbe7522919b97e224.tar.zst dexon-8335eac4a488716c396f114cbe7522919b97e224.zip |
dex: redesign p2p network topology
- Let p2p server support direct connection and group connection.
- Introduce node meta table to maintain IP of all nodes in node set,
in memory and let nodes in the network can sync this table.
- Let peerSet able to manage direct connections to notary set and dkg set.
The mechanism to refresh the network topology when configuration round
change is not done yet.
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 216 |
1 files changed, 87 insertions, 129 deletions
diff --git a/dex/handler.go b/dex/handler.go index bc932fb28..5348b06f2 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -49,6 +49,8 @@ const ( // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 + + metaChanSize = 10240 ) var ( @@ -59,11 +61,6 @@ var ( // not compatible (low protocol version restrictions and high requirements). var errIncompatibleConfig = errors.New("incompatible configuration") -type newNotarySetEvent struct { - Round uint64 - Set map[string]struct{} -} - func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -75,32 +72,35 @@ type ProtocolManager struct { acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) txpool txPool + nodeTable *nodeTable + gov governance blockchain *core.BlockChain chainconfig *params.ChainConfig maxPeers int - downloader *downloader.Downloader - fetcher *fetcher.Fetcher - peers *peerSet - notarySetManager *notarySetManager + downloader *downloader.Downloader + fetcher *fetcher.Fetcher + peers *peerSet SubProtocols []p2p.Protocol eventMux *event.TypeMux txsCh chan core.NewTxsEvent txsSub event.Subscription + metasCh chan newMetasEvent + metasSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer txsyncCh chan *txsync + metasyncCh chan *metasync quitSync chan struct{} noMorePeers chan struct{} - // channels for notarySetLoop - newNotarySetCh chan newNotarySetEvent - newRoundCh chan uint64 - newNotaryNodeInfoCh chan *notaryNodeInfo + // channels for peerSetLoop + crsCh chan core.NewCRSEvent + crsSub event.Subscription srvr p2pServer @@ -111,24 +111,26 @@ type ProtocolManager struct { // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the Ethereum network. -func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { +func NewProtocolManager( + config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, + mux *event.TypeMux, txpool txPool, engine consensus.Engine, + blockchain *core.BlockChain, chaindb ethdb.Database, + gov governance) (*ProtocolManager, error) { + tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - eventMux: mux, - txpool: txpool, - blockchain: blockchain, - chainconfig: config, - peers: newPeerSet(), - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), - newNotarySetCh: make(chan newNotarySetEvent), - newRoundCh: make(chan uint64), - newNotaryNodeInfoCh: make(chan *notaryNodeInfo), - } - manager.notarySetManager = newNotarySetManager(manager.newNotaryNodeInfoCh) + networkID: networkID, + eventMux: mux, + txpool: txpool, + nodeTable: tab, + gov: gov, + blockchain: blockchain, + chainconfig: config, + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), + } // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { @@ -216,22 +218,33 @@ func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.maxPeers = maxPeers pm.srvr = srvr + pm.peers = newPeerSet(pm.gov, pm.srvr, pm.nodeTable) + + // if our self in node set build the node info // broadcast transactions pm.txsCh = make(chan core.NewTxsEvent, txChanSize) pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() + // broadcast node metas + pm.metasCh = make(chan newMetasEvent, metaChanSize) + pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) + go pm.metaBroadcastLoop() + // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() - // run the notary set loop - go pm.notarySetLoop() + // run the peer set loop + pm.crsCh = make(chan core.NewCRSEvent) + pm.crsSub = pm.gov.SubscribeNewCRSEvent(pm.crsCh) + go pm.peerSetLoop() // start sync handlers go pm.syncer() go pm.txsyncLoop() + go pm.metasyncLoop() } func (pm *ProtocolManager) Stop() { @@ -301,6 +314,7 @@ func (pm *ProtocolManager) handle(p *peer) error { // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) + pm.syncNodeMetas(p) // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil { @@ -699,18 +713,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.txpool.AddRemotes(txs) - case msg.Code == NotaryNodeInfoMsg: - var info notaryNodeInfo - if err := msg.Decode(&info); err != nil { + case msg.Code == MetaMsg: + var metas []*NodeMeta + if err := msg.Decode(&metas); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - - // TODO(sonic): validate signature - p.MarkNotaryNodeInfo(info.Hash()) - // Broadcast to peers - pm.BroadcastNotaryNodeInfo(&info) - - pm.notarySetManager.TryAddInfo(&info) + for i, meta := range metas { + if meta == nil { + return errResp(ErrDecode, "node meta %d is nil", i) + } + p.MarkNodeMeta(meta.Hash()) + } + pm.nodeTable.Add(metas) default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -769,11 +783,20 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { } } -// BroadcastNotaryNodeInfo will propagate notary node info to its peers. -func (pm *ProtocolManager) BroadcastNotaryNodeInfo(info *notaryNodeInfo) { - peers := pm.peers.PeersWithoutNotaryNodeInfo(info.Hash()) - for _, peer := range peers { - peer.AsyncSendNotaryNodeInfo(info) +// BroadcastMetas will propagate node metas to its peers. +func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) { + var metaset = make(map[*peer][]*NodeMeta) + + for _, meta := range metas { + peers := pm.peers.PeersWithoutNodeMeta(meta.Hash()) + for _, peer := range peers { + metaset[peer] = append(metaset[peer], meta) + } + log.Trace("Broadcast meta", "ID", meta.ID, "recipients", len(peers)) + } + + for peer, metas := range metaset { + peer.AsyncSendNodeMetas(metas) } } @@ -801,97 +824,32 @@ func (pm *ProtocolManager) txBroadcastLoop() { } } -// a loop keep building and maintaining peers in notary set. -func (pm *ProtocolManager) notarySetLoop() { - // TODO: - // * pm need to know current round, and whether we are in notary set. - // * (done) able to handle node info in the near future round. - // * check peer limit. - // * revisit channels (newNotarySetCh, newRoundCh, newNotaryNodeInfoCh) (buffered ?) - - // If we are in notary set and we are synced with network. - // events: - // * new notary set is determined, and we are in notary set. - // (TBD: subscribe the event or polling govornance) - // - advance round - // - build new notary set - // - remove old notary set, remove old notary set from static - - // * current round node info changed. - - self := pm.srvr.Self() - var round uint64 - +func (pm *ProtocolManager) metaBroadcastLoop() { for { select { - case event := <-pm.newNotarySetCh: - // new notary set is determined. - if _, ok := event.Set[self.ID.String()]; ok { - // initialize the new notary set and add it to pm.notarySetManager - s := newNotarySet(event.Round, event.Set) - pm.notarySetManager.Register(event.Round, s) - - // TODO: handle signature - pm.BroadcastNotaryNodeInfo(¬aryNodeInfo{ - ID: self.ID, - IP: self.IP, - UDP: self.UDP, - TCP: self.TCP, - Round: event.Round, - Timestamp: time.Now().Unix(), - }) - } - - case r := <-pm.newRoundCh: - // move to new round. - round = r + case event := <-pm.metasCh: + pm.BroadcastMetas(event.Metas) - // TODO: revisit this to make sure how many previous round's - // notary set we need to keep. - - // rmove notary set before current round, they are useless - for _, s := range pm.notarySetManager.Before(round) { - pm.removeNotarySetFromStatic(s) - } - - // prepare notary set connections for new round. - if pm.isInNotarySet(round + 1) { - // we assume the notary set for round + 1 is added to - // pm.notarySetManager before this step. - if n, ok := pm.notarySetManager.Round(round + 1); ok { - pm.addNotarySetToStatic(n) - } - } - - case <-pm.newNotaryNodeInfoCh: - // some node info in "current round" is updated. - // try to connect them by the new info. - if pm.isInNotarySet(round) { - if n, ok := pm.notarySetManager.Round(round); ok { - pm.addNotarySetToStatic(n) - } - } - - case <-pm.quitSync: + // Err() channel will be closed when unsubscribing. + case <-pm.metasSub.Err(): return } } } -func (pm *ProtocolManager) isInNotarySet(r uint64) bool { - return false -} - -func (pm *ProtocolManager) addNotarySetToStatic(n *notarySet) { - for _, node := range n.NodesToAdd() { - pm.srvr.AddNotaryPeer(node) - n.MarkAdded(node.ID.String()) - } -} - -func (pm *ProtocolManager) removeNotarySetFromStatic(n *notarySet) { - for _, node := range n.Nodes() { - pm.srvr.RemoveNotaryPeer(node) +// a loop keep building and maintaining peers in notary set. +// TODO: finish this +func (pm *ProtocolManager) peerSetLoop() { + for { + select { + case event := <-pm.crsCh: + pm.peers.BuildNotaryConn(event.Round) + pm.peers.BuildDKGConn(event.Round) + pm.peers.ForgetNotaryConn(event.Round - 1) + pm.peers.ForgetDKGConn(event.Round - 1) + case <-pm.quitSync: + return + } } } |