diff options
Diffstat (limited to 'dex/handler.go')
-rw-r--r-- | dex/handler.go | 84 |
1 files changed, 38 insertions, 46 deletions
diff --git a/dex/handler.go b/dex/handler.go index 71962b865..f56c6f5dc 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -40,7 +40,6 @@ import ( "fmt" "math" "math/rand" - "net" "sync" "sync/atomic" "time" @@ -794,24 +793,28 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if !pm.isBlockProposer { break } - var block coreTypes.Block - if err := msg.Decode(&block); err != nil { + var blocks []*coreTypes.Block + if err := msg.Decode(&blocks); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - pm.cache.addBlock(&block) - pm.receiveCh <- &block + for _, block := range blocks { + pm.cache.addBlock(block) + pm.receiveCh <- block + } case msg.Code == VoteMsg: if !pm.isBlockProposer { break } - var vote coreTypes.Vote - if err := msg.Decode(&vote); err != nil { + var votes []*coreTypes.Vote + if err := msg.Decode(&votes); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - if vote.Type >= coreTypes.VotePreCom { - pm.cache.addVote(&vote) + for _, vote := range votes { + if vote.Type >= coreTypes.VotePreCom { + pm.cache.addVote(vote) + } + pm.receiveCh <- vote } - pm.receiveCh <- &vote case msg.Code == AgreementMsg: if !pm.isBlockProposer { break @@ -821,17 +824,21 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&agreement); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } + p.MarkAgreement(rlpHash(agreement)) pm.receiveCh <- &agreement case msg.Code == RandomnessMsg: if !pm.isBlockProposer { break } // Broadcast this to all peer - var randomness coreTypes.BlockRandomnessResult - if err := msg.Decode(&randomness); err != nil { + var randomnesses []*coreTypes.BlockRandomnessResult + if err := msg.Decode(&randomnesses); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - pm.receiveCh <- &randomness + for _, randomness := range randomnesses { + p.MarkRandomness(rlpHash(randomness)) + pm.receiveCh <- randomness + } case msg.Code == DKGPrivateShareMsg: if !pm.isBlockProposer { break @@ -841,6 +848,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&ps); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } + p.MarkDKGPrivateShares(rlpHash(ps)) pm.receiveCh <- &ps case msg.Code == DKGPartialSignatureMsg: if !pm.isBlockProposer { @@ -862,11 +870,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } blocks := pm.cache.blocks(hashes) log.Debug("Push blocks", "blocks", blocks) - for _, block := range blocks { - if err := p.SendLatticeBlock(block); err != nil { - return err - } - } + return p.SendLatticeBlocks(blocks) case msg.Code == PullVotesMsg: if !pm.isBlockProposer { break @@ -885,11 +889,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } votes := pm.cache.votes(pos) log.Debug("Push votes", "votes", votes) - for _, vote := range votes { - if err := p.SendVote(vote); err != nil { - return err - } - } + return p.SendVotes(votes) case msg.Code == PullRandomnessMsg: if !pm.isBlockProposer { break @@ -898,13 +898,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&hashes); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - randomness := pm.cache.randomness(hashes) - log.Debug("Push randomness", "randomness", randomness) - for _, randomness := range randomness { - if err := p.SendRandomness(randomness); err != nil { - return err - } - } + randomnesses := pm.cache.randomness(hashes) + log.Debug("Push randomness", "randomness", randomnesses) + return p.SendRandomnesses(randomnesses) case msg.Code == GetGovStateMsg: var hash common.Hash if err := msg.Decode(&hash); err != nil { @@ -995,12 +991,11 @@ func (pm *ProtocolManager) BroadcastRecords(records []*enr.Record) { } } -// TODO(sonic): block size is big, try not to send to all peers -// to reduce traffic +// BroadcastLatticeBlock broadcasts the lattice block to all its peers. func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) { pm.cache.addBlock(block) - for _, peer := range pm.peers.PeersWithoutLatticeBlock(rlpHash(block)) { - peer.AsyncSendLatticeBlock(block) + for _, peer := range pm.peers.Peers() { + peer.AsyncSendLatticeBlocks([]*coreTypes.Block{block}) } } @@ -1014,11 +1009,8 @@ func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) { chainID: vote.Position.ChainID, round: vote.Position.Round, } - h := rlpHash(vote) for _, peer := range pm.peers.PeersWithLabel(label) { - if !peer.knownVotes.Contains(h) { - peer.AsyncSendVote(vote) - } + peer.AsyncSendVotes([]*coreTypes.Vote{vote}) } } @@ -1050,15 +1042,16 @@ func (pm *ProtocolManager) BroadcastRandomnessResult( chainID: randomness.Position.ChainID, round: randomness.Position.Round, } + randomnesses := []*coreTypes.BlockRandomnessResult{randomness} for _, peer := range pm.peers.PeersWithLabel(label) { if !peer.knownRandomnesses.Contains(rlpHash(randomness)) { - peer.AsyncSendRandomness(randomness) + peer.AsyncSendRandomnesses(randomnesses) } } // TODO(sonic): send to some of other nodes (gossip) for _, peer := range pm.peers.PeersWithoutRandomness(rlpHash(randomness)) { - peer.AsyncSendRandomness(randomness) + peer.AsyncSendRandomnesses(randomnesses) } } @@ -1069,12 +1062,13 @@ func (pm *ProtocolManager) SendDKGPrivateShare( if err != nil { panic(err) } - n := enode.NewV4(pk, net.IP{}, 0, 0) - if p := pm.peers.Peer(n.ID().String()); p != nil { + id := enode.PubkeyToIDV4(pk) + + if p := pm.peers.Peer(id.String()); p != nil { p.AsyncSendDKGPrivateShare(privateShare) } else { - log.Error("Failed to send DKG private share", "publicKey", n.ID().String()) + log.Error("Failed to send DKG private share", "publicKey", id.String()) } } @@ -1092,9 +1086,7 @@ func (pm *ProtocolManager) BroadcastDKGPartialSignature( psig *dkgTypes.PartialSignature) { label := peerLabel{set: dkgset, round: psig.Round} for _, peer := range pm.peers.PeersWithLabel(label) { - if !peer.knownDKGPartialSignatures.Contains(rlpHash(psig)) { - peer.AsyncSendDKGPartialSignature(psig) - } + peer.AsyncSendDKGPartialSignature(psig) } } |