aboutsummaryrefslogtreecommitdiffstats
path: root/dex/handler.go
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2019-03-27 20:47:32 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:59 +0800
commit0c63646ca8b06bb527737cd6e2a7fe58f169efff (patch)
treeb0666613c2a3cb84d53b60597bfef5ec45548c3a /dex/handler.go
parent91981becf98b988470810aa1c26d86de2d294e29 (diff)
downloaddexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar.gz
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.tar.zst
dexon-0c63646ca8b06bb527737cd6e2a7fe58f169efff.zip
core: merge notarySet and DKGSet (#265)
* vendor: sync to latest core * core: merge notarySet and dkgSet * dex: optimize network traffic for finalized block
Diffstat (limited to 'dex/handler.go')
-rw-r--r--dex/handler.go120
1 files changed, 58 insertions, 62 deletions
diff --git a/dex/handler.go b/dex/handler.go
index 20df41709..8971ad500 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -87,6 +87,9 @@ const (
maxPullVotePeers = 1
pullVoteRateLimit = 10 * time.Second
+
+ maxAgreementResultBroadcast = 3
+ maxFinalizedBlockBroadcast = 3
)
// errIncompatibleConfig is returned if the requested protocols and configs are
@@ -888,19 +891,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
p.MarkAgreement(rlpHash(agreement))
pm.receiveCh <- &agreement
- case msg.Code == RandomnessMsg:
- if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
- break
- }
- // Broadcast this to all peer
- var randomnesses []*coreTypes.BlockRandomnessResult
- if err := msg.Decode(&randomnesses); err != nil {
- return errResp(ErrDecode, "msg %v: %v", msg, err)
- }
- for _, randomness := range randomnesses {
- p.MarkRandomness(rlpHash(randomness))
- pm.receiveCh <- randomness
- }
case msg.Code == DKGPrivateShareMsg:
if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
break
@@ -949,20 +939,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&pos); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
+ if block := pm.cache.finalizedBlock(pos); block != nil {
+ log.Debug("Push finalized block as votes", "block", block)
+ return p.SendCoreBlocks([]*coreTypes.Block{block})
+ }
votes := pm.cache.votes(pos)
log.Debug("Push votes", "votes", votes)
return p.SendVotes(votes)
- case msg.Code == PullRandomnessMsg:
- if atomic.LoadInt32(&pm.receiveCoreMessage) == 0 {
- break
- }
- var hashes coreCommon.Hashes
- if err := msg.Decode(&hashes); err != nil {
- return errResp(ErrDecode, "msg %v: %v", msg, err)
- }
- randomnesses := pm.cache.randomness(hashes)
- log.Debug("Push randomness", "randomness", randomnesses)
- return p.SendRandomnesses(randomnesses)
case msg.Code == GetGovStateMsg:
var hash common.Hash
if err := msg.Decode(&hash); err != nil {
@@ -1098,6 +1081,31 @@ func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) {
}
}
+// BroadcastFinalizedBlock broadcasts the finalized core block to some of its peers.
+func (pm *ProtocolManager) BroadcastFinalizedBlock(block *coreTypes.Block) {
+ if len(block.Finalization.Randomness) == 0 {
+ log.Warn("Ignore broadcast finalized block without randomness", "block", block)
+ return
+ }
+ pm.cache.addFinalizedBlock(block)
+
+ // send to notary nodes first (direct)
+ label := peerLabel{
+ set: notaryset,
+ round: block.Position.Round,
+ }
+ peers := pm.peers.PeersWithLabel(label)
+ count := maxFinalizedBlockBroadcast
+ for _, peer := range peers {
+ if count <= 0 {
+ break
+ } else {
+ count--
+ peer.AsyncSendCoreBlocks([]*coreTypes.Block{block})
+ }
+ }
+}
+
// BroadcastCoreBlock broadcasts the core block to all its peers.
func (pm *ProtocolManager) BroadcastCoreBlock(block *coreTypes.Block) {
pm.cache.addBlock(block)
@@ -1122,39 +1130,32 @@ func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) {
func (pm *ProtocolManager) BroadcastAgreementResult(
agreement *coreTypes.AgreementResult) {
- // send to dkg nodes first (direct)
- label := peerLabel{
- set: dkgset,
- round: agreement.Position.Round,
- }
- for _, peer := range pm.peers.PeersWithLabel(label) {
- if !peer.knownAgreements.Contains(rlpHash(agreement)) {
- peer.AsyncSendAgreement(agreement)
- }
+ block := pm.cache.blocks(coreCommon.Hashes{agreement.BlockHash})
+ if len(block) != 0 {
+ block[0].Finalization.Height = agreement.FinalizationHeight
+ block[0].Finalization.Randomness = agreement.Randomness
+ pm.cache.addFinalizedBlock(block[0])
}
- for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) {
- peer.AsyncSendAgreement(agreement)
- }
-}
-
-func (pm *ProtocolManager) BroadcastRandomnessResult(
- randomness *coreTypes.BlockRandomnessResult) {
- pm.cache.addRandomness(randomness)
// send to notary nodes first (direct)
label := peerLabel{
set: notaryset,
- round: randomness.Position.Round,
+ round: agreement.Position.Round,
}
- randomnesses := []*coreTypes.BlockRandomnessResult{randomness}
- for _, peer := range pm.peers.PeersWithLabel(label) {
- if !peer.knownRandomnesses.Contains(rlpHash(randomness)) {
- peer.AsyncSendRandomnesses(randomnesses)
+ peers := pm.peers.PeersWithLabel(label)
+ count := maxAgreementResultBroadcast
+ agrHash := rlpHash(agreement)
+ for _, peer := range peers {
+ if count <= 0 {
+ peer.MarkAgreement(agrHash)
+ } else if !peer.knownAgreements.Contains(agrHash) {
+ count--
+ peer.AsyncSendAgreement(agreement)
}
}
- for _, peer := range pm.peers.PeersWithoutRandomness(rlpHash(randomness)) {
- peer.AsyncSendRandomnesses(randomnesses)
+ for _, peer := range pm.peers.PeersWithoutAgreement(rlpHash(agreement)) {
+ peer.AsyncSendAgreement(agreement)
}
}
@@ -1177,7 +1178,7 @@ func (pm *ProtocolManager) SendDKGPrivateShare(
func (pm *ProtocolManager) BroadcastDKGPrivateShare(
privateShare *dkgTypes.PrivateShare) {
- label := peerLabel{set: dkgset, round: privateShare.Round}
+ label := peerLabel{set: notaryset, round: privateShare.Round}
for _, peer := range pm.peers.PeersWithLabel(label) {
if !peer.knownDKGPrivateShares.Contains(rlpHash(privateShare)) {
peer.AsyncSendDKGPrivateShare(privateShare)
@@ -1187,7 +1188,7 @@ func (pm *ProtocolManager) BroadcastDKGPrivateShare(
func (pm *ProtocolManager) BroadcastDKGPartialSignature(
psig *dkgTypes.PartialSignature) {
- label := peerLabel{set: dkgset, round: psig.Round}
+ label := peerLabel{set: notaryset, round: psig.Round}
for _, peer := range pm.peers.PeersWithLabel(label) {
peer.AsyncSendDKGPartialSignature(psig)
}
@@ -1218,17 +1219,6 @@ func (pm *ProtocolManager) BroadcastPullVotes(
}
}
-func (pm *ProtocolManager) BroadcastPullRandomness(
- hashes coreCommon.Hashes) {
- // TODO(jimmy-dexon): pull from dkg set only.
- for idx, peer := range pm.peers.Peers() {
- if idx >= maxPullPeers {
- break
- }
- peer.AsyncSendPullRandomness(hashes)
- }
-}
-
func (pm *ProtocolManager) txBroadcastLoop() {
queueSizeMax := common.StorageSize(100 * 1024) // 100 KB
currentSize := common.StorageSize(0)
@@ -1321,9 +1311,15 @@ func (pm *ProtocolManager) peerSetLoop() {
for i := round; i <= dexCore.DKGDelayRound; i++ {
pm.peers.BuildConnection(i)
}
+ round = dexCore.DKGDelayRound
} else {
pm.peers.BuildConnection(round)
}
+ CRSRound := pm.gov.CRSRound()
+ if CRSRound > round {
+ pm.peers.BuildConnection(CRSRound)
+ round = CRSRound
+ }
for {
select {
@@ -1340,7 +1336,7 @@ func (pm *ProtocolManager) peerSetLoop() {
}
log.Debug("ProtocolManager: new round", "round", newRound)
- if newRound == round {
+ if newRound <= round {
break
}