aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJimmy Hu <jimmy.hu@dexon.org>2018-11-02 10:40:58 +0800
committerWei-Ning Huang <w@byzantine-lab.io>2019-06-12 17:27:17 +0800
commitace78f3a2804f2f4953f3ad9b945a4dbea221ac7 (patch)
treeb796257f29d4a38c0c71b41e748e3c00641e8b19
parent05cdd58111500b1b40caa9feffbea6c58f277a48 (diff)
downloadgo-tangerine-ace78f3a2804f2f4953f3ad9b945a4dbea221ac7.tar.gz
go-tangerine-ace78f3a2804f2f4953f3ad9b945a4dbea221ac7.tar.zst
go-tangerine-ace78f3a2804f2f4953f3ad9b945a4dbea221ac7.zip
dex: implement PullBlocks/PullVotes (#1)
-rw-r--r--dex/cache.go115
-rw-r--r--dex/cache_test.go144
-rw-r--r--dex/handler.go80
-rw-r--r--dex/network.go2
-rw-r--r--dex/peer.go55
-rw-r--r--dex/protocol.go21
6 files changed, 415 insertions, 2 deletions
diff --git a/dex/cache.go b/dex/cache.go
new file mode 100644
index 000000000..f373a642e
--- /dev/null
+++ b/dex/cache.go
@@ -0,0 +1,115 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package dex
+
+import (
+ "sync"
+
+ coreCommon "github.com/dexon-foundation/dexon-consensus-core/common"
+ coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
+)
+
+type voteKey struct {
+ ProposerID coreTypes.NodeID
+ Type coreTypes.VoteType
+ BlockHash coreCommon.Hash
+ Period uint64
+ Position coreTypes.Position
+}
+
+func voteToKey(vote *coreTypes.Vote) voteKey {
+ return voteKey{
+ ProposerID: vote.ProposerID,
+ Type: vote.Type,
+ BlockHash: vote.BlockHash,
+ Period: vote.Period,
+ Position: vote.Position,
+ }
+}
+
+type cache struct {
+ lock sync.RWMutex
+ blockCache map[coreCommon.Hash]*coreTypes.Block
+ voteCache map[coreTypes.Position]map[voteKey]*coreTypes.Vote
+ votePosition []coreTypes.Position
+ voteSize int
+ size int
+}
+
+func newCache(size int) *cache {
+ return &cache{
+ blockCache: make(map[coreCommon.Hash]*coreTypes.Block),
+ voteCache: make(map[coreTypes.Position]map[voteKey]*coreTypes.Vote),
+ size: size,
+ }
+}
+
+func (c *cache) addVote(vote *coreTypes.Vote) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ if c.voteSize >= c.size {
+ pos := c.votePosition[0]
+ c.voteSize -= len(c.voteCache[pos])
+ delete(c.voteCache, pos)
+ }
+ if _, exist := c.voteCache[vote.Position]; !exist {
+ c.votePosition = append(c.votePosition, vote.Position)
+ c.voteCache[vote.Position] = make(map[voteKey]*coreTypes.Vote)
+ }
+ key := voteToKey(vote)
+ if _, exist := c.voteCache[vote.Position][key]; exist {
+ return
+ }
+ c.voteCache[vote.Position][key] = vote
+ c.voteSize++
+}
+
+func (c *cache) votes(pos coreTypes.Position) []*coreTypes.Vote {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ votes := make([]*coreTypes.Vote, 0, len(c.voteCache[pos]))
+ for _, vote := range c.voteCache[pos] {
+ votes = append(votes, vote)
+ }
+ return votes
+}
+
+func (c *cache) addBlock(block *coreTypes.Block) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ if len(c.blockCache) >= c.size {
+ // Randomly delete one entry.
+ for k := range c.blockCache {
+ delete(c.blockCache, k)
+ break
+ }
+ }
+ c.blockCache[block.Hash] = block
+}
+
+func (c *cache) blocks(hashes coreCommon.Hashes) []*coreTypes.Block {
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ cacheBlocks := make([]*coreTypes.Block, 0, len(hashes))
+ for _, hash := range hashes {
+ if block, exist := c.blockCache[hash]; exist {
+ cacheBlocks = append(cacheBlocks, block)
+ }
+ }
+ return cacheBlocks
+}
diff --git a/dex/cache_test.go b/dex/cache_test.go
new file mode 100644
index 000000000..96e07aa17
--- /dev/null
+++ b/dex/cache_test.go
@@ -0,0 +1,144 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package dex
+
+import (
+ "testing"
+
+ coreCommon "github.com/dexon-foundation/dexon-consensus-core/common"
+ coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
+)
+
+func TestCacheVote(t *testing.T) {
+ cache := newCache(3)
+ pos0 := coreTypes.Position{
+ Height: uint64(0),
+ }
+ pos1 := coreTypes.Position{
+ Height: uint64(1),
+ }
+ vote1 := &coreTypes.Vote{
+ BlockHash: coreCommon.NewRandomHash(),
+ Position: pos0,
+ }
+ vote2 := &coreTypes.Vote{
+ BlockHash: coreCommon.NewRandomHash(),
+ Position: pos0,
+ }
+ vote3 := &coreTypes.Vote{
+ BlockHash: coreCommon.NewRandomHash(),
+ Position: pos1,
+ }
+ vote4 := &coreTypes.Vote{
+ BlockHash: coreCommon.NewRandomHash(),
+ Position: pos1,
+ }
+ cache.addVote(vote1)
+ cache.addVote(vote2)
+ cache.addVote(vote3)
+
+ votes := cache.votes(pos0)
+ if len(votes) != 2 {
+ t.Errorf("fail to get votes: have %d, want 2", len(votes))
+ }
+ if !votes[0].BlockHash.Equal(vote1.BlockHash) {
+ t.Errorf("get wrong vote: have %s, want %s", votes[0], vote1)
+ }
+ if !votes[1].BlockHash.Equal(vote2.BlockHash) {
+ t.Errorf("get wrong vote: have %s, want %s", votes[1], vote2)
+ }
+ votes = cache.votes(pos1)
+ if len(votes) != 1 {
+ t.Errorf("fail to get votes: have %d, want 1", len(votes))
+ }
+ if !votes[0].BlockHash.Equal(vote3.BlockHash) {
+ t.Errorf("get wrong vote: have %s, want %s", votes[0], vote3)
+ }
+
+ cache.addVote(vote4)
+
+ votes = cache.votes(pos0)
+ if len(votes) != 0 {
+ t.Errorf("fail to get votes: have %d, want 0", len(votes))
+ }
+ votes = cache.votes(pos1)
+ if len(votes) != 2 {
+ t.Errorf("fail to get votes: have %d, want 1", len(votes))
+ }
+ if !votes[0].BlockHash.Equal(vote3.BlockHash) {
+ t.Errorf("get wrong vote: have %s, want %s", votes[0], vote3)
+ }
+ if !votes[1].BlockHash.Equal(vote4.BlockHash) {
+ t.Errorf("get wrong vote: have %s, want %s", votes[1], vote4)
+ }
+}
+
+func TestCacheBlock(t *testing.T) {
+ cache := newCache(3)
+ block1 := &coreTypes.Block{
+ Hash: coreCommon.NewRandomHash(),
+ }
+ block2 := &coreTypes.Block{
+ Hash: coreCommon.NewRandomHash(),
+ }
+ block3 := &coreTypes.Block{
+ Hash: coreCommon.NewRandomHash(),
+ }
+ block4 := &coreTypes.Block{
+ Hash: coreCommon.NewRandomHash(),
+ }
+ cache.addBlock(block1)
+ cache.addBlock(block2)
+ cache.addBlock(block3)
+
+ hashes := coreCommon.Hashes{block1.Hash, block2.Hash, block3.Hash, block4.Hash}
+ hashMap := map[coreCommon.Hash]struct{}{
+ block1.Hash: struct{}{},
+ block2.Hash: struct{}{},
+ block3.Hash: struct{}{},
+ }
+ blocks := cache.blocks(hashes)
+ if len(blocks) != 3 {
+ t.Errorf("fail to get blocks: have %d, want 3", len(blocks))
+ }
+ for _, block := range blocks {
+ if _, exist := hashMap[block.Hash]; !exist {
+ t.Errorf("get wrong block: have %s, want %v", block, hashMap)
+ }
+ }
+
+ cache.addBlock(block4)
+
+ blocks = cache.blocks(hashes)
+ hashMap[block4.Hash] = struct{}{}
+ if len(blocks) != 3 {
+ t.Errorf("fail to get blocks: have %d, want 3", len(blocks))
+ }
+ hasNewBlock := false
+ for _, block := range blocks {
+ if _, exist := hashMap[block.Hash]; !exist {
+ t.Errorf("get wrong block: have %s, want %v", block, hashMap)
+ }
+ if block.Hash.Equal(block4.Hash) {
+ hasNewBlock = true
+ }
+ }
+ if !hasNewBlock {
+ t.Errorf("expect block %s in cache, have %v", block4, blocks)
+ }
+}
diff --git a/dex/handler.go b/dex/handler.go
index be50cb43d..5662e2b84 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -1,3 +1,20 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
@@ -26,6 +43,7 @@ import (
"sync/atomic"
"time"
+ coreCommon "github.com/dexon-foundation/dexon-consensus-core/common"
coreCrypto "github.com/dexon-foundation/dexon-consensus-core/core/crypto"
coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
dkgTypes "github.com/dexon-foundation/dexon-consensus-core/core/types/dkg"
@@ -55,6 +73,8 @@ const (
txChanSize = 4096
metaChanSize = 10240
+
+ maxPullPeers = 3
)
// errIncompatibleConfig is returned if the requested protocols and configs are
@@ -76,6 +96,7 @@ type ProtocolManager struct {
gov governance
blockchain *core.BlockChain
chainconfig *params.ChainConfig
+ cache *cache
maxPeers int
downloader *downloader.Downloader
@@ -128,6 +149,7 @@ func NewProtocolManager(
nodeTable: tab,
gov: gov,
blockchain: blockchain,
+ cache: newCache(128),
chainconfig: config,
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
@@ -714,12 +736,16 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&block); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
+ pm.cache.addBlock(&block)
pm.receiveCh <- &block
case msg.Code == VoteMsg:
var vote coreTypes.Vote
if err := msg.Decode(&vote); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
+ if vote.Type >= coreTypes.VotePreCom {
+ pm.cache.addVote(&vote)
+ }
pm.receiveCh <- &vote
case msg.Code == AgreementMsg:
// DKG set is receiver
@@ -749,6 +775,30 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
pm.receiveCh <- &psig
+ case msg.Code == PullBlocksMsg:
+ var hashes coreCommon.Hashes
+ if err := msg.Decode(&hashes); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ blocks := pm.cache.blocks(hashes)
+ log.Debug("Push blocks", "blocks", blocks)
+ for _, block := range blocks {
+ if err := p.SendLatticeBlock(block); err != nil {
+ return err
+ }
+ }
+ case msg.Code == PullVotesMsg:
+ var pos coreTypes.Position
+ if err := msg.Decode(&pos); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ votes := pm.cache.votes(pos)
+ log.Debug("Push votes", "votes", votes)
+ for _, vote := range votes {
+ if err := p.SendVote(vote); err != nil {
+ return err
+ }
+ }
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
@@ -827,6 +877,7 @@ func (pm *ProtocolManager) BroadcastMetas(metas []*NodeMeta) {
// TODO(sonic): block size is big, try not to send to all peers
// to reduce traffic
func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) {
+ pm.cache.addBlock(block)
for _, peer := range pm.peers.PeersWithoutLatticeBlock(rlpHash(block)) {
peer.AsyncSendLatticeBlock(block)
}
@@ -834,6 +885,9 @@ func (pm *ProtocolManager) BroadcastLatticeBlock(block *coreTypes.Block) {
// BroadcastVote broadcasts the given vote to all peers in same notary set
func (pm *ProtocolManager) BroadcastVote(vote *coreTypes.Vote) {
+ if vote.Type >= coreTypes.VotePreCom {
+ pm.cache.addVote(vote)
+ }
label := peerLabel{
set: notaryset,
chainID: vote.Position.ChainID,
@@ -916,6 +970,32 @@ func (pm *ProtocolManager) BroadcastDKGPartialSignature(
}
}
+func (pm *ProtocolManager) BroadcastPullBlocks(
+ hashes coreCommon.Hashes) {
+ // TODO(jimmy-dexon): pull from notary set only.
+ for idx, peer := range pm.peers.Peers() {
+ if idx >= maxPullPeers {
+ break
+ }
+ peer.AsyncSendPullBlocks(hashes)
+ }
+}
+
+func (pm *ProtocolManager) BroadcastPullVotes(
+ pos coreTypes.Position) {
+ label := peerLabel{
+ set: notaryset,
+ chainID: pos.ChainID,
+ round: pos.Round,
+ }
+ for idx, peer := range pm.peers.PeersWithLabel(label) {
+ if idx >= maxPullPeers {
+ break
+ }
+ peer.AsyncSendPullVotes(pos)
+ }
+}
+
// Mined broadcast loop
func (pm *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
diff --git a/dex/network.go b/dex/network.go
index 58d6fd855..4f5c5ac5c 100644
--- a/dex/network.go
+++ b/dex/network.go
@@ -34,10 +34,12 @@ func NewDexconNetwork(pm *ProtocolManager) *DexconNetwork {
// PullBlocks tries to pull blocks from the DEXON network.
func (n *DexconNetwork) PullBlocks(hashes coreCommon.Hashes) {
+ n.pm.BroadcastPullBlocks(hashes)
}
// PullVotes tries to pull votes from the DEXON network.
func (n *DexconNetwork) PullVotes(pos types.Position) {
+ n.pm.BroadcastPullVotes(pos)
}
// BroadcastVote broadcasts vote to all nodes in DEXON network.
diff --git a/dex/peer.go b/dex/peer.go
index c005cec16..2fe8cac08 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -24,6 +24,7 @@ import (
"time"
mapset "github.com/deckarep/golang-set"
+ coreCommon "github.com/dexon-foundation/dexon-consensus-core/common"
coreTypes "github.com/dexon-foundation/dexon-consensus-core/core/types"
dkgTypes "github.com/dexon-foundation/dexon-consensus-core/core/types/dkg"
@@ -77,6 +78,8 @@ const (
maxQueuedRandomnesses = 16
maxQueuedDKGPrivateShare = 16
maxQueuedDKGParitialSignature = 16
+ maxQueuedPullBlocks = 128
+ maxQueuedPullVotes = 128
handshakeTimeout = 5 * time.Second
@@ -141,6 +144,8 @@ type peer struct {
queuedRandomnesses chan *coreTypes.BlockRandomnessResult
queuedDKGPrivateShares chan *dkgTypes.PrivateShare
queuedDKGPartialSignatures chan *dkgTypes.PartialSignature
+ queuedPullBlocks chan coreCommon.Hashes
+ queuedPullVotes chan coreTypes.Position
term chan struct{} // Termination channel to stop the broadcaster
}
@@ -169,7 +174,9 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
queuedRandomnesses: make(chan *coreTypes.BlockRandomnessResult, maxQueuedRandomnesses),
queuedDKGPrivateShares: make(chan *dkgTypes.PrivateShare, maxQueuedDKGPrivateShare),
queuedDKGPartialSignatures: make(chan *dkgTypes.PartialSignature, maxQueuedDKGParitialSignature),
- term: make(chan struct{}),
+ queuedPullBlocks: make(chan coreCommon.Hashes, maxQueuedPullBlocks),
+ queuedPullVotes: make(chan coreTypes.Position, maxQueuedPullVotes),
+ term: make(chan struct{}),
}
}
@@ -232,6 +239,16 @@ func (p *peer) broadcast() {
return
}
p.Log().Trace("Broadcast DKG partial signature")
+ case hashes := <-p.queuedPullBlocks:
+ if err := p.SendPullBlocks(hashes); err != nil {
+ return
+ }
+ p.Log().Trace("Pulling Blocks", "hashes", hashes)
+ case pos := <-p.queuedPullVotes:
+ if err := p.SendPullVotes(pos); err != nil {
+ return
+ }
+ p.Log().Trace("Pulling Votes", "position", pos)
case <-p.term:
return
}
@@ -472,6 +489,30 @@ func (p *peer) AsyncSendDKGPartialSignature(psig *dkgTypes.PartialSignature) {
}
}
+func (p *peer) SendPullBlocks(hashes coreCommon.Hashes) error {
+ return p2p.Send(p.rw, PullBlocksMsg, hashes)
+}
+
+func (p *peer) AsyncSendPullBlocks(hashes coreCommon.Hashes) {
+ select {
+ case p.queuedPullBlocks <- hashes:
+ default:
+ p.Log().Debug("Dropping Pull Blocks")
+ }
+}
+
+func (p *peer) SendPullVotes(pos coreTypes.Position) error {
+ return p2p.Send(p.rw, PullVotesMsg, pos)
+}
+
+func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) {
+ select {
+ case p.queuedPullVotes <- pos:
+ default:
+ p.Log().Debug("Dropping Pull Votes")
+ }
+}
+
// SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *peer) SendBlockHeaders(headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, headers)
@@ -693,6 +734,18 @@ func (ps *peerSet) Len() int {
return len(ps.peers)
}
+// Peers retrieves all of the peers.
+func (ps *peerSet) Peers() []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ list = append(list, p)
+ }
+ return list
+}
+
// PeersWithoutBlock retrieves a list of peers that do not have a given block in
// their set of known hashes.
func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer {
diff --git a/dex/protocol.go b/dex/protocol.go
index 6e531c7af..f33179b69 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -1,3 +1,20 @@
+// Copyright 2018 The dexon-consensus-core Authors
+// This file is part of the dexon-consensus-core library.
+//
+// The dexon-consensus-core library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus-core library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus-core library. If not, see
+// <http://www.gnu.org/licenses/>.
+
// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
@@ -43,7 +60,7 @@ var ProtocolName = "dex"
var ProtocolVersions = []uint{dex64}
// ProtocolLengths are the number of implemented message corresponding to different protocol versions.
-var ProtocolLengths = []uint64{38}
+var ProtocolLengths = []uint64{40}
const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message
@@ -74,6 +91,8 @@ const (
RandomnessMsg = 0x23
DKGPrivateShareMsg = 0x24
DKGPartialSignatureMsg = 0x25
+ PullBlocksMsg = 0x26
+ PullVotesMsg = 0x27
)
type errCode int