diff options
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 216 |
1 files changed, 87 insertions, 129 deletions
diff --git a/dex/handler.go b/dex/handler.go index bc932fb28..5348b06f2 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -49,6 +49,8 @@ const ( // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 + + metaChanSize = 10240 ) var ( @@ -59,11 +61,6 @@ var ( // not compatible (low protocol version restrictions and high requirements). var errIncompatibleConfig = errors.New("incompatible configuration") -type newNotarySetEvent struct { - Round uint64 - Set map[string]struct{} -} - func errResp(code errCode, format string, v ...interface{}) error { return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) } @@ -75,32 +72,35 @@ type ProtocolManager struct { acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) txpool txPool + nodeTable *nodeTable + gov governance blockchain *core.BlockChain chainconfig *params.ChainConfig maxPeers int - downloader *downloader.Downloader - fetcher *fetcher.Fetcher - peers *peerSet - notarySetManager *notarySetManager + downloader *downloader.Downloader + fetcher *fetcher.Fetcher + peers *peerSet SubProtocols []p2p.Protocol eventMux *event.TypeMux txsCh chan core.NewTxsEvent txsSub event.Subscription + metasCh chan newMetasEvent + metasSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer txsyncCh chan *txsync + metasyncCh chan *metasync quitSync chan struct{} noMorePeers chan struct{} - // channels for notarySetLoop - newNotarySetCh chan newNotarySetEvent - newRoundCh chan uint64 - newNotaryNodeInfoCh chan *notaryNodeInfo + // channels for peerSetLoop + crsCh chan core.NewCRSEvent + crsSub event.Subscription srvr p2pServer @@ -111,24 +111,26 @@ type ProtocolManager struct { // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the Ethereum network. -func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { +func NewProtocolManager( + config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, + mux *event.TypeMux, txpool txPool, engine consensus.Engine, + blockchain *core.BlockChain, chaindb ethdb.Database, + gov governance) (*ProtocolManager, error) { + tab := newNodeTable() // Create the protocol manager with the base fields manager := &ProtocolManager{ - networkID: networkID, - eventMux: mux, - txpool: txpool, - blockchain: blockchain, - chainconfig: config, - peers: newPeerSet(), - newPeerCh: make(chan *peer), - noMorePeers: make(chan struct{}), - txsyncCh: make(chan *txsync), - quitSync: make(chan struct{}), - newNotarySetCh: make(chan newNotarySetEvent), - newRoundCh: make(chan uint64), - newNotaryNodeInfoCh: make(chan *notaryNodeInfo), - } - manager.notarySetManager = newNotarySetManager(manager.newNotaryNodeInfoCh) + networkID: networkID, + eventMux: mux, + txpool: txpool, + nodeTable: tab, + gov: gov, + blockchain: blockchain, + chainconfig: config, + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), + } // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { @@ -216,22 +218,33 @@ func (pm *ProtocolManager) removePeer(id string) { func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) { pm.maxPeers = maxPeers pm.srvr = srvr + pm.peers = newPeerSet(pm.gov, pm.srvr, pm.nodeTable) + + // if our self in node set build the node info // broadcast transactions pm.txsCh = make(chan core.NewTxsEvent, txChanSize) pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) go pm.txBroadcastLoop() + // broadcast node metas + pm.metasCh = make(chan newMetasEvent, metaChanSize) + pm.metasSub = pm.nodeTable.SubscribeNewMetasEvent(pm.metasCh) + go pm.metaBroadcastLoop() + // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() - // run the notary set loop - go pm.notarySetLoop() + // run the peer set loop + pm.crsCh = make(chan core.NewCRSEvent) + pm.crsSub = pm.gov.SubscribeNewCRSEvent(pm.crsCh) + go pm.peerSetLoop() // start sync handlers go pm.syncer() go pm.txsyncLoop() + go pm.metasyncLoop() } func (pm *ProtocolManager) Stop() { @@ -301,6 +314,7 @@ func (pm *ProtocolManager) handle(p *peer) error { // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) + pm.syncNodeMetas(p) // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil { @@ -699,18 +713,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } pm.txpool.AddRemotes(txs) - case msg.Code == NotaryNodeInfoMsg: - var info notaryNodeInfo - if err := msg.Decode(&info); err != nil { + case msg.Code == MetaMsg: + var metas []*NodeMeta + if err := msg.Decode(&metas); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - - // TODO(sonic): validate signature - p.MarkNotaryNodeInfo(info.Hash()) - // Broadcast to peers - pm.BroadcastNotaryNodeInfo(&info) - - pm.notarySetManager.TryAddInfo(&info) + for i, meta := range metas { + if meta == nil { + return errResp(ErrDecode, "node meta %d is nil", i) + } + p.MarkNodeMeta(meta.Hash()) + } + pm.nodeTable.Add(metas) default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -769,11 +783,20 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { } } -// BroadcastNotaryNodeInfo will propagate notary node info to its peers. -func (pm *ProtocolManager) BroadcastNotaryNodeInfo(info *notaryNodeInfo) { - peers := pm.peers.PeersWithoutNotaryNodeInfo(info.Hash()) - for _, peer := range peers { - peer.AsyncSendNotaryNodeInfo(info) +// BroadcastMetas will propagate node metas to its peers. +func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) { + var metaset = make(map[*peer][]*NodeMeta) + + for _, meta := range metas { + peers := pm.peers.PeersWithoutNodeMeta(meta.Hash()) + for _, peer := range peers { + metaset[peer] = append(metaset[peer], meta) + } + log.Trace("Broadcast meta", "ID", meta.ID, "recipients", len(peers)) + } + + for peer, metas := range metaset { + peer.AsyncSendNodeMetas(metas) } } @@ -801,97 +824,32 @@ func (pm *ProtocolManager) txBroadcastLoop() { } } -// a loop keep building and maintaining peers in notary set. -func (pm *ProtocolManager) notarySetLoop() { - // TODO: - // * pm need to know current round, and whether we are in notary set. - // * (done) able to handle node info in the near future round. - // * check peer limit. - // * revisit channels (newNotarySetCh, newRoundCh, newNotaryNodeInfoCh) (buffered ?) - - // If we are in notary set and we are synced with network. - // events: - // * new notary set is determined, and we are in notary set. - // (TBD: subscribe the event or polling govornance) - // - advance round - // - build new notary set - // - remove old notary set, remove old notary set from static - - // * current round node info changed. - - self := pm.srvr.Self() - var round uint64 - +func (pm *ProtocolManager) metaBroadcastLoop() { for { select { - case event := <-pm.newNotarySetCh: - // new notary set is determined. - if _, ok := event.Set[self.ID.String()]; ok { - // initialize the new notary set and add it to pm.notarySetManager - s := newNotarySet(event.Round, event.Set) - pm.notarySetManager.Register(event.Round, s) - - // TODO: handle signature - pm.BroadcastNotaryNodeInfo(¬aryNodeInfo{ - ID: self.ID, - IP: self.IP, - UDP: self.UDP, - TCP: self.TCP, - Round: event.Round, - Timestamp: time.Now().Unix(), - }) - } - - case r := <-pm.newRoundCh: - // move to new round. - round = r + case event := <-pm.metasCh: + pm.BroadcastMetas(event.Metas) - // TODO: revisit this to make sure how many previous round's - // notary set we need to keep. - - // rmove notary set before current round, they are useless - for _, s := range pm.notarySetManager.Before(round) { - pm.removeNotarySetFromStatic(s) - } - - // prepare notary set connections for new round. - if pm.isInNotarySet(round + 1) { - // we assume the notary set for round + 1 is added to - // pm.notarySetManager before this step. - if n, ok := pm.notarySetManager.Round(round + 1); ok { - pm.addNotarySetToStatic(n) - } - } - - case <-pm.newNotaryNodeInfoCh: - // some node info in "current round" is updated. - // try to connect them by the new info. - if pm.isInNotarySet(round) { - if n, ok := pm.notarySetManager.Round(round); ok { - pm.addNotarySetToStatic(n) - } - } - - case <-pm.quitSync: + // Err() channel will be closed when unsubscribing. + case <-pm.metasSub.Err(): return } } } -func (pm *ProtocolManager) isInNotarySet(r uint64) bool { - return false -} - -func (pm *ProtocolManager) addNotarySetToStatic(n *notarySet) { - for _, node := range n.NodesToAdd() { - pm.srvr.AddNotaryPeer(node) - n.MarkAdded(node.ID.String()) - } -} - -func (pm *ProtocolManager) removeNotarySetFromStatic(n *notarySet) { - for _, node := range n.Nodes() { - pm.srvr.RemoveNotaryPeer(node) +// a loop keep building and maintaining peers in notary set. +// TODO: finish this +func (pm *ProtocolManager) peerSetLoop() { + for { + select { + case event := <-pm.crsCh: + pm.peers.BuildNotaryConn(event.Round) + pm.peers.BuildDKGConn(event.Round) + pm.peers.ForgetNotaryConn(event.Round - 1) + pm.peers.ForgetDKGConn(event.Round - 1) + case <-pm.quitSync: + return + } } } |