aboutsummaryrefslogtreecommitdiffstats
path: root/dex/peer.go
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-10-17 18:02:52 +0800
committerWei-Ning Huang <w@dexon.org>2019-03-12 12:19:09 +0800
commit8dd647934c2836bd3a1b6a5ba197bfd4f886b720 (patch)
tree16230401ea176fcf66a16e9b19902a8762e76163 /dex/peer.go
parent71c6a58419e1b68b49b6c7d307acee305474d248 (diff)
downloaddexon-8dd647934c2836bd3a1b6a5ba197bfd4f886b720.tar.gz
dexon-8dd647934c2836bd3a1b6a5ba197bfd4f886b720.tar.zst
dexon-8dd647934c2836bd3a1b6a5ba197bfd4f886b720.zip
dex: polish network related function
Diffstat (limited to 'dex/peer.go')
-rw-r--r--dex/peer.go177
1 files changed, 120 insertions, 57 deletions
diff --git a/dex/peer.go b/dex/peer.go
index 1279a190b..db68ea590 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -46,6 +46,13 @@ const (
maxKnownMetas = 32768 // Maximum metas hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
+ maxKnownLatticeBLocks = 2048
+ maxKnownVotes = 2048
+ maxKnownAgreements = 10240
+ maxKnownRandomnesses = 10240
+ maxKnownDKGPrivateShare = 1024 // this related to DKG Size
+ maxKnownDKGPartialSignature = 1024 // this related to DKG Size
+
// maxQueuedTxs is the maximum number of transaction lists to queue up before
// dropping broadcasts. This is a sensitive number as a transaction list might
// contain a single transaction, or thousands.
@@ -63,6 +70,13 @@ const (
// above some healthy uncle limit, so use that.
maxQueuedAnns = 4
+ maxQueuedLatticeBlocks = 16
+ maxQueuedVotes = 128
+ maxQueuedAgreements = 16
+ maxQueuedRandomnesses = 16
+ maxQueuedDKGPrivateShare = 16
+ maxQueuedDKGParitialSignature = 16
+
handshakeTimeout = 5 * time.Second
groupNodeNum = 3
@@ -107,43 +121,53 @@ type peer struct {
td *big.Int
lock sync.RWMutex
- knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
- knownMetas mapset.Set // Set of node metas known to be known by this peer
- knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- knownVotes mapset.Set
- queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
- queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
- queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
- queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- queuedLatticeBlock chan *coreTypes.Block
- queuedVote chan *coreTypes.Vote
- queuedAgreement chan *coreTypes.AgreementResult
- queuedRandomness chan *coreTypes.BlockRandomnessResult
- queuedDKGPrivateShare chan *coreTypes.DKGPrivateShare
- queuedDKGPartialSignature chan *coreTypes.DKGPartialSignature
- term chan struct{} // Termination channel to stop the broadcaster
+ knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownMetas mapset.Set // Set of node metas known to be known by this peer
+ knownBlocks mapset.Set // Set of block hashes known to be known by this peer
+ knownLatticeBlocks mapset.Set
+ knownVotes mapset.Set
+ knownAgreements mapset.Set
+ knownRandomnesses mapset.Set
+ knownDKGPrivateShares mapset.Set
+ knownDKGPartialSignatures mapset.Set
+ queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedMetas chan []*NodeMeta // Queue of node metas to broadcast to the peer
+ queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
+ queuedAnns chan *types.Block // Queue of blocks to announce to the peer
+ queuedLatticeBlocks chan *coreTypes.Block
+ queuedVotes chan *coreTypes.Vote
+ queuedAgreements chan *coreTypes.AgreementResult
+ queuedRandomnesses chan *coreTypes.BlockRandomnessResult
+ queuedDKGPrivateShares chan *coreTypes.DKGPrivateShare
+ queuedDKGPartialSignatures chan *coreTypes.DKGPartialSignature
+ term chan struct{} // Termination channel to stop the broadcaster
}
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return &peer{
- Peer: p,
- rw: rw,
- version: version,
- id: p.ID().String(),
- knownTxs: mapset.NewSet(),
- knownMetas: mapset.NewSet(),
- knownBlocks: mapset.NewSet(),
- knownVotes: mapset.NewSet(),
- queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
- queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
- queuedProps: make(chan *propEvent, maxQueuedProps),
- queuedAnns: make(chan *types.Block, maxQueuedAnns),
- queuedLatticeBlock: make(chan *coreTypes.Block, 16),
- queuedVote: make(chan *coreTypes.Vote, 16),
- queuedAgreement: make(chan *coreTypes.AgreementResult, 16),
- queuedRandomness: make(chan *coreTypes.BlockRandomnessResult, 16),
- queuedDKGPrivateShare: make(chan *coreTypes.DKGPrivateShare, 16),
- queuedDKGPartialSignature: make(chan *coreTypes.DKGPartialSignature, 16),
+ Peer: p,
+ rw: rw,
+ version: version,
+ id: p.ID().String(),
+ knownTxs: mapset.NewSet(),
+ knownMetas: mapset.NewSet(),
+ knownBlocks: mapset.NewSet(),
+ knownLatticeBlocks: mapset.NewSet(),
+ knownVotes: mapset.NewSet(),
+ knownAgreements: mapset.NewSet(),
+ knownRandomnesses: mapset.NewSet(),
+ knownDKGPrivateShares: mapset.NewSet(),
+ knownDKGPartialSignatures: mapset.NewSet(),
+ queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
+ queuedMetas: make(chan []*NodeMeta, maxQueuedMetas),
+ queuedProps: make(chan *propEvent, maxQueuedProps),
+ queuedAnns: make(chan *types.Block, maxQueuedAnns),
+ queuedLatticeBlocks: make(chan *coreTypes.Block, maxQueuedLatticeBlocks),
+ queuedVotes: make(chan *coreTypes.Vote, maxQueuedVotes),
+ queuedAgreements: make(chan *coreTypes.AgreementResult, maxQueuedAgreements),
+ queuedRandomnesses: make(chan *coreTypes.BlockRandomnessResult, maxQueuedRandomnesses),
+ queuedDKGPrivateShares: make(chan *coreTypes.DKGPrivateShare, maxQueuedDKGPrivateShare),
+ queuedDKGPartialSignatures: make(chan *coreTypes.DKGPartialSignature, maxQueuedDKGParitialSignature),
term: make(chan struct{}),
}
}
@@ -177,32 +201,32 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
- case block := <-p.queuedLatticeBlock:
+ case block := <-p.queuedLatticeBlocks:
if err := p.SendLatticeBlock(block); err != nil {
return
}
p.Log().Trace("Broadcast lattice block")
- case vote := <-p.queuedVote:
+ case vote := <-p.queuedVotes:
if err := p.SendVote(vote); err != nil {
return
}
p.Log().Trace("Broadcast vote", "vote", vote.String(), "hash", rlpHash(vote))
- case agreement := <-p.queuedAgreement:
+ case agreement := <-p.queuedAgreements:
if err := p.SendAgreement(agreement); err != nil {
return
}
p.Log().Trace("Broadcast agreement")
- case randomness := <-p.queuedRandomness:
+ case randomness := <-p.queuedRandomnesses:
if err := p.SendRandomness(randomness); err != nil {
return
}
p.Log().Trace("Broadcast randomness")
- case privateShare := <-p.queuedDKGPrivateShare:
+ case privateShare := <-p.queuedDKGPrivateShares:
if err := p.SendDKGPrivateShare(privateShare); err != nil {
return
}
p.Log().Trace("Broadcast DKG private share")
- case psig := <-p.queuedDKGPartialSignature:
+ case psig := <-p.queuedDKGPartialSignatures:
if err := p.SendDKGPartialSignature(psig); err != nil {
return
}
@@ -364,72 +388,88 @@ func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
}
func (p *peer) SendLatticeBlock(block *coreTypes.Block) error {
+ r := toRLPLatticeBlock(block)
+ p.knownLatticeBlocks.Add(rlpHash(r))
return p2p.Send(p.rw, LatticeBlockMsg, toRLPLatticeBlock(block))
}
func (p *peer) AsyncSendLatticeBlock(block *coreTypes.Block) {
select {
- case p.queuedLatticeBlock <- block:
+ case p.queuedLatticeBlocks <- block:
+ r := toRLPLatticeBlock(block)
+ p.knownLatticeBlocks.Add(rlpHash(r))
default:
p.Log().Debug("Dropping lattice block propagation")
}
}
func (p *peer) SendVote(vote *coreTypes.Vote) error {
+ p.knownVotes.Add(rlpHash(vote))
return p2p.Send(p.rw, VoteMsg, vote)
}
func (p *peer) AsyncSendVote(vote *coreTypes.Vote) {
select {
- case p.queuedVote <- vote:
+ case p.queuedVotes <- vote:
+ p.knownVotes.Add(rlpHash(vote))
default:
p.Log().Debug("Dropping vote propagation")
}
}
func (p *peer) SendAgreement(agreement *coreTypes.AgreementResult) error {
+ p.knownAgreements.Add(rlpHash(agreement))
return p2p.Send(p.rw, AgreementMsg, agreement)
}
func (p *peer) AsyncSendAgreement(agreement *coreTypes.AgreementResult) {
select {
- case p.queuedAgreement <- agreement:
+ case p.queuedAgreements <- agreement:
+ p.knownAgreements.Add(rlpHash(agreement))
default:
p.Log().Debug("Dropping agreement result")
}
}
func (p *peer) SendRandomness(randomness *coreTypes.BlockRandomnessResult) error {
+ p.knownRandomnesses.Add(rlpHash(randomness))
return p2p.Send(p.rw, RandomnessMsg, randomness)
}
func (p *peer) AsyncSendRandomness(randomness *coreTypes.BlockRandomnessResult) {
select {
- case p.queuedRandomness <- randomness:
+ case p.queuedRandomnesses <- randomness:
+ p.knownRandomnesses.Add(rlpHash(randomness))
default:
p.Log().Debug("Dropping randomness result")
}
}
func (p *peer) SendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) error {
+ r := toRLPDKGPrivateShare(privateShare)
+ p.knownDKGPrivateShares.Add(rlpHash(r))
return p2p.Send(p.rw, DKGPrivateShareMsg, toRLPDKGPrivateShare(privateShare))
}
func (p *peer) AsyncSendDKGPrivateShare(privateShare *coreTypes.DKGPrivateShare) {
select {
- case p.queuedDKGPrivateShare <- privateShare:
+ case p.queuedDKGPrivateShares <- privateShare:
+ r := toRLPDKGPrivateShare(privateShare)
+ p.knownDKGPrivateShares.Add(rlpHash(r))
default:
p.Log().Debug("Dropping DKG private share")
}
}
func (p *peer) SendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) error {
+ p.knownDKGPartialSignatures.Add(rlpHash(psig))
return p2p.Send(p.rw, DKGPartialSignatureMsg, psig)
}
func (p *peer) AsyncSendDKGPartialSignature(psig *coreTypes.DKGPartialSignature) {
select {
- case p.queuedDKGPartialSignature <- psig:
+ case p.queuedDKGPartialSignatures <- psig:
+ p.knownDKGPartialSignatures.Add(rlpHash(psig))
default:
p.Log().Debug("Dropping DKG partial signature")
}
@@ -727,27 +767,50 @@ func (ps *peerSet) PeersWithoutNodeMeta(hash common.Hash) []*peer {
return list
}
-// TODO(sonic): finish the following dummy function.
+func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.knownLatticeBlocks.Contains(hash) {
+ list = append(list, p)
+ }
+ }
+ return list
+}
+
func (ps *peerSet) PeersWithoutAgreement(hash common.Hash) []*peer {
- return ps.allPeers()
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.knownAgreements.Contains(hash) {
+ list = append(list, p)
+ }
+ }
+ return list
}
func (ps *peerSet) PeersWithoutRandomness(hash common.Hash) []*peer {
- return ps.allPeers()
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.knownRandomnesses.Contains(hash) {
+ list = append(list, p)
+ }
+ }
+ return list
}
func (ps *peerSet) PeersWithoutDKGPartialSignature(hash common.Hash) []*peer {
- return ps.allPeers()
-}
-
-func (ps *peerSet) PeersWithoutLatticeBlock(hash common.Hash) []*peer {
- return ps.allPeers()
-}
-
-func (ps *peerSet) allPeers() []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
- list = append(list, p)
+ if !p.knownDKGPartialSignatures.Contains(hash) {
+ list = append(list, p)
+ }
}
return list
}