aboutsummaryrefslogtreecommitdiffstats
path: root/dex/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'dex/handler.go')
-rw-r--r--dex/handler.go164
1 files changed, 150 insertions, 14 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 21609a561..96d20b02b 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -59,6 +59,11 @@ var (
// not compatible (low protocol version restrictions and high requirements).
var errIncompatibleConfig = errors.New("incompatible configuration")
+type newNotarySetEvent struct {
+ Round uint64
+ Set map[string]struct{}
+}
+
func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
@@ -74,9 +79,10 @@ type ProtocolManager struct {
chainconfig *params.ChainConfig
maxPeers int
- downloader *downloader.Downloader
- fetcher *fetcher.Fetcher
- peers *peerSet
+ downloader *downloader.Downloader
+ fetcher *fetcher.Fetcher
+ peers *peerSet
+ notarySetManager *notarySetManager
SubProtocols []p2p.Protocol
@@ -91,6 +97,13 @@ type ProtocolManager struct {
quitSync chan struct{}
noMorePeers chan struct{}
+ // channels for notarySetLoop
+ newNotarySetCh chan newNotarySetEvent
+ newRoundCh chan uint64
+ newNotaryNodeInfoCh chan *notaryNodeInfo
+
+ srvr p2pServer
+
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
@@ -101,17 +114,22 @@ type ProtocolManager struct {
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
- networkID: networkID,
- eventMux: mux,
- txpool: txpool,
- blockchain: blockchain,
- chainconfig: config,
- peers: newPeerSet(),
- newPeerCh: make(chan *peer),
- noMorePeers: make(chan struct{}),
- txsyncCh: make(chan *txsync),
- quitSync: make(chan struct{}),
+ networkID: networkID,
+ eventMux: mux,
+ txpool: txpool,
+ blockchain: blockchain,
+ chainconfig: config,
+ peers: newPeerSet(),
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ quitSync: make(chan struct{}),
+ newNotarySetCh: make(chan newNotarySetEvent),
+ newRoundCh: make(chan uint64),
+ newNotaryNodeInfoCh: make(chan *notaryNodeInfo),
}
+ manager.notarySetManager = newNotarySetManager(manager.newNotaryNodeInfoCh)
+
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
log.Warn("Blockchain not empty, fast sync disabled")
@@ -195,8 +213,9 @@ func (pm *ProtocolManager) removePeer(id string) {
}
}
-func (pm *ProtocolManager) Start(maxPeers int) {
+func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
pm.maxPeers = maxPeers
+ pm.srvr = srvr
// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
@@ -207,6 +226,9 @@ func (pm *ProtocolManager) Start(maxPeers int) {
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
+ // run the notary set loop
+ go pm.notarySetLoop()
+
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
@@ -677,6 +699,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.txpool.AddRemotes(txs)
+ case msg.Code == NotaryNodeInfoMsg:
+ var info notaryNodeInfo
+ if err := msg.Decode(&info); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+
+ // TODO(sonic): validate signature
+ p.MarkNotaryNodeInfo(info.Hash())
+ // Broadcast to peers
+ pm.BroadcastNotaryNodeInfo(&info)
+
+ pm.notarySetManager.TryAddInfo(&info)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
@@ -735,6 +769,14 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
}
}
+// BroadcastNotaryNodeInfo will propagate notary node info to its peers.
+func (pm *ProtocolManager) BroadcastNotaryNodeInfo(info *notaryNodeInfo) {
+ peers := pm.peers.PeersWithoutNotaryNodeInfo(info.Hash())
+ for _, peer := range peers {
+ peer.AsyncSendNotaryNodeInfo(info)
+ }
+}
+
// Mined broadcast loop
func (pm *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
@@ -759,6 +801,100 @@ func (pm *ProtocolManager) txBroadcastLoop() {
}
}
+// a loop keep building and maintaining peers in notary set.
+func (pm *ProtocolManager) notarySetLoop() {
+ // TODO:
+ // * pm need to know current round, and whether we are in notary set.
+ // * (done) able to handle node info in the near future round.
+ // * check peer limit.
+ // * revisit channels (newNotarySetCh, newRoundCh, newNotaryNodeInfoCh) (buffered ?)
+
+ // If we are in notary set and we are synced with network.
+ // events:
+ // * new notary set is determined, and we are in notary set.
+ // (TBD: subscribe the event or polling govornance)
+ // - advance round
+ // - build new notary set
+ // - remove old notary set, remove old notary set from static
+
+ // * current round node info changed.
+
+ self := pm.srvr.Self()
+ var round uint64
+
+ for {
+ select {
+ case event := <-pm.newNotarySetCh:
+ // new notary set is determined.
+ if _, ok := event.Set[self.ID.String()]; ok {
+ // initialize the new notary set and add it to pm.notarySetManager
+ s := newNotarySet(event.Round, event.Set)
+ pm.notarySetManager.Register(event.Round, s)
+
+ // TODO: handle signature
+ pm.BroadcastNotaryNodeInfo(&notaryNodeInfo{
+ ID: self.ID,
+ IP: self.IP,
+ UDP: self.UDP,
+ TCP: self.TCP,
+ Round: event.Round,
+ Timestamp: time.Now().Unix(),
+ })
+ }
+
+ case r := <-pm.newRoundCh:
+ // move to new round.
+ round = r
+
+ // TODO: revisit this to make sure how many previous round's
+ // notary set we need to keep.
+
+ // rmove notary set before current round, they are useless
+ for _, s := range pm.notarySetManager.Before(round) {
+ pm.removeNotarySetFromStatic(s)
+ }
+
+ // prepare notary set connections for new round.
+ if pm.isInNotarySet(round + 1) {
+ // we assume the notary set for round + 1 is added to
+ // pm.notarySetManager before this step.
+ if n, ok := pm.notarySetManager.Round(round + 1); ok {
+ pm.addNotarySetToStatic(n)
+ }
+ }
+
+ case <-pm.newNotaryNodeInfoCh:
+ // some node info in "current round" is updated.
+ // try to connect them by the new info.
+ if pm.isInNotarySet(round) {
+ if n, ok := pm.notarySetManager.Round(round); ok {
+ pm.addNotarySetToStatic(n)
+ }
+ }
+
+ case <-pm.quitSync:
+ return
+ }
+ }
+}
+
+func (pm *ProtocolManager) isInNotarySet(r uint64) bool {
+ return false
+}
+
+func (pm *ProtocolManager) addNotarySetToStatic(n *notarySet) {
+ for _, node := range n.NodesToAdd() {
+ pm.srvr.AddNotaryPeer(node)
+ n.MarkAdded(node.ID.String())
+ }
+}
+
+func (pm *ProtocolManager) removeNotarySetFromStatic(n *notarySet) {
+ for _, node := range n.Nodes() {
+ pm.srvr.RemoveNotaryPeer(node)
+ }
+}
+
// NodeInfo represents a short summary of the Ethereum sub-protocol metadata
// known about the host peer.
type NodeInfo struct {