aboutsummaryrefslogtreecommitdiffstats
path: root/dex/peer.go
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-11-02 10:40:58 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:52 +0800
commit119eb5889104d870ed608dcd9ccc7581489054b0 (patch)
treea340373d962d9bad8ecc8c14783857e7ad77e45c /dex/peer.go
parent2f936e7546afb84d7397e3dac6b577aea27220c6 (diff)
downloaddexon-119eb5889104d870ed608dcd9ccc7581489054b0.tar.gz
dexon-119eb5889104d870ed608dcd9ccc7581489054b0.tar.zst
dexon-119eb5889104d870ed608dcd9ccc7581489054b0.zip
dex: implement PullBlocks/PullVotes (#1)
Diffstat (limited to 'dex/peer.go')
-rw-r--r--dex/peer.go55
1 files changed, 54 insertions, 1 deletions
diff --git a/dex/peer.go b/dex/peer.go
index c005cec16..2fe8cac08 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -24,6 +24,7 @@ import (
"time"
mapset "github.com/deckarep/golang-set"
+ coreCommon "github.com/dexon-foundation/dexon-consensus-core/common"
coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
dkgTypes "github.com/dexon-foundation/dexon-consensus-core/core/types/dkg"
@@ -77,6 +78,8 @@ const (
maxQueuedRandomnesses = 16
maxQueuedDKGPrivateShare = 16
maxQueuedDKGParitialSignature = 16
+ maxQueuedPullBlocks = 128
+ maxQueuedPullVotes = 128
handshakeTimeout = 5 * time.Second
@@ -141,6 +144,8 @@ type peer struct {
queuedRandomnesses chan *coreTypes.BlockRandomnessResult
queuedDKGPrivateShares chan *dkgTypes.PrivateShare
queuedDKGPartialSignatures chan *dkgTypes.PartialSignature
+ queuedPullBlocks chan coreCommon.Hashes
+ queuedPullVotes chan coreTypes.Position
term chan struct{} // Termination channel to stop the broadcaster
}
@@ -169,7 +174,9 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
queuedRandomnesses: make(chan *coreTypes.BlockRandomnessResult, maxQueuedRandomnesses),
queuedDKGPrivateShares: make(chan *dkgTypes.PrivateShare, maxQueuedDKGPrivateShare),
queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature),
- term: make(chan struct{}),
+ queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks),
+ queuedPullVotes: make(chan coreTypes.Position, maxQueuedPullVotes),
+ term: make(chan struct{}),
}
}
@@ -232,6 +239,16 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Broadcast DKG partial signature")
+ case hashes := <-p.queuedPullBlocks:
+ if err := p.SendPullBlocks(hashes); err != nil {
+ return
+ }
+ p.Log().Trace("Pulling Blocks", "hashes", hashes)
+ case pos := <-p.queuedPullVotes:
+ if err := p.SendPullVotes(pos); err != nil {
+ return
+ }
+ p.Log().Trace("Pulling Votes", "position", pos)
case <-p.term:
return
}
@@ -472,6 +489,30 @@ func (p *peer) AsyncSendDKGPartialSignature(psig *dkgTypes.PartialSignature) {
}
}
+func (p *peer) SendPullBlocks(hashes coreCommon.Hashes) error {
+ return p2p.Send(p.rw, PullBlocksMsg, hashes)
+}
+
+func (p *peer) AsyncSendPullBlocks(hashes coreCommon.Hashes) {
+ select {
+ case p.queuedPullBlocks <- hashes:
+ default:
+ p.Log().Debug("Dropping Pull Blocks")
+ }
+}
+
+func (p *peer) SendPullVotes(pos coreTypes.Position) error {
+ return p2p.Send(p.rw, PullVotesMsg, pos)
+}
+
+func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) {
+ select {
+ case p.queuedPullVotes <- pos:
+ default:
+ p.Log().Debug("Dropping Pull Votes")
+ }
+}
+
// SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *peer) SendBlockHeaders(headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, headers)
@@ -693,6 +734,18 @@ func (ps *peerSet) Len() int {
return len(ps.peers)
}
+// Peers retrieves all of the peers.
+func (ps *peerSet) Peers() []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ list = append(list, p)
+ }
+ return list
+}
+
// PeersWithoutBlock retrieves a list of peers that do not have a given block in
// their set of known hashes.
func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer {