diff options
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 116 |
1 files changed, 116 insertions, 0 deletions
diff --git a/dex/handler.go b/dex/handler.go index 67cbe8a63..e013b9722 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -26,6 +26,9 @@ import ( "sync/atomic" "time" + "github.com/dexon-foundation/dexon-consensus-core/core/crypto" + coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types" + "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/consensus" "github.com/dexon-foundation/dexon/core" @@ -97,6 +100,9 @@ type ProtocolManager struct { crsCh chan core.NewCRSEvent crsSub event.Subscription + // channels for dexon consensus core + receiveCh chan interface{} + srvr p2pServer // wait group is used for graceful shutdowns during downloading @@ -125,6 +131,7 @@ func NewProtocolManager( noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), + receiveCh: make(chan interface{}, 1024), } // Figure out whether to allow fast sync or not @@ -267,6 +274,10 @@ func (pm *ProtocolManager) Stop() { log.Info("Ethereum protocol stopped") } +func (pm *ProtocolManager) ReceiveChan() <-chan interface{} { + return pm.receiveCh +} + func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { return newPeer(pv, p, newMeteredMsgWriter(rw)) } @@ -666,6 +677,47 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.MarkNodeMeta(meta.Hash()) } pm.nodeTable.Add(metas) + case msg.Code == LatticeBlockMsg: + var rb rlpLatticeBlock + if err := msg.Decode(&rb); err != nil { + fmt.Println("decode lattice block error", err) + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + pm.receiveCh <- fromRLPLatticeBlock(&rb) + case msg.Code == VoteMsg: + var vote coreTypes.Vote + if err := msg.Decode(&vote); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + pm.receiveCh <- &vote + case msg.Code == AgreementMsg: + // DKG set is receiver + var agreement coreTypes.AgreementResult + if err := msg.Decode(&agreement); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + pm.receiveCh <- &agreement + case msg.Code == RandomnessMsg: + // Broadcast this to all peer + var randomness coreTypes.BlockRandomnessResult + if err := msg.Decode(&randomness); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + pm.receiveCh <- &randomness + case msg.Code == DKGPrivateShareMsg: + // Do not relay this msg + var rps rlpDKGPrivateShare + if err := msg.Decode(&rps); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + pm.receiveCh <- fromRLPDKGPrivateShare(&rps) + case msg.Code == DKGPartialSignatureMsg: + // broadcast in DKG set + var psig coreTypes.DKGPartialSignature + if err := msg.Decode(&psig); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + pm.receiveCh <- &psig default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -741,6 +793,68 @@ func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) { } } +func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) { + label := peerLabel{ + set: notaryset, + chainID: vote.Position.ChainID, + round: vote.Position.Round, + } + + for _, peer := range pm.peers.PeersWithoutVote(rlpHash(vote), label) { + peer.AsyncSendVote(vote) + } +} + +// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) { + hash := rlpHash(toRLPLatticeBlock(block)) + for _, peer := range pm.peers.PeersWithoutLatticeBlock(hash) { + peer.AsyncSendLatticeBlock(block) + } +} + +// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) SendDKGPrivateShare( + pub crypto.PublicKey, privateShare *coreTypes.DKGPrivateShare) { + id := discover.MustBytesID(pub.Bytes()[1:]) + if p := pm.peers.Peer(id.String()); p != nil { + p.AsyncSendDKGPrivateShare(privateShare) + } +} + +// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) BroadcastDKGPrivateShare( + privateShare *coreTypes.DKGPrivateShare) { + for _, peer := range pm.peers.allPeers() { + peer.AsyncSendDKGPrivateShare(privateShare) + } +} + +// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) BroadcastAgreementResult( + agreement *coreTypes.AgreementResult) { + for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) { + peer.AsyncSendAgreement(agreement) + } +} + +// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) BroadcastRandomnessResult( + randomness *coreTypes.BlockRandomnessResult) { + // random pick n peers + for _, peer := range pm.peers.PeersWithoutRandomness(rlpHash(randomness)) { + peer.AsyncSendRandomness(randomness) + } +} + +// TODO(sonic): try to reduce traffic +func (pm *ProtocolManager) BroadcastDKGPartialSignature( + psig *coreTypes.DKGPartialSignature) { + for _, peer := range pm.peers.PeersWithoutDKGPartialSignature(rlpHash(psig)) { + peer.AsyncSendDKGPartialSignature(psig) + } +} + // Mined broadcast loop func (pm *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe @@ -781,6 +895,8 @@ func (pm *ProtocolManager) metaBroadcastLoop() { // a loop keep building and maintaining peers in notary set. // TODO: finish this func (pm *ProtocolManager) peerSetLoop() { + + log.Debug("start peer set loop") for { select { case event := <-pm.crsCh: |