aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-10-16 17:01:19 +0800
committerWei-Ning Huang <w@dexon.org>2018-12-19 20:54:27 +0800
commit4f2617ae78da30a29deaddeb9cd6a2cf6d00d88e (patch)
tree997b6e9d0ad73b9ee88028eeebe85678ef9ae677
parenta9f70b56420502d7af5f38d318456d6de20c05eb (diff)
downloaddexon-4f2617ae78da30a29deaddeb9cd6a2cf6d00d88e.tar.gz
dexon-4f2617ae78da30a29deaddeb9cd6a2cf6d00d88e.tar.zst
dexon-4f2617ae78da30a29deaddeb9cd6a2cf6d00d88e.zip
dex: implement peerSetLoop
-rw-r--r--dex/governance.go5
-rw-r--r--dex/handler.go54
-rw-r--r--dex/helper_test.go5
-rw-r--r--dex/nodetable.go3
-rw-r--r--dex/peer.go16
-rw-r--r--dex/protocol.go2
6 files changed, 74 insertions, 11 deletions
diff --git a/dex/governance.go b/dex/governance.go
index 22452dea3..a78cf2d4f 100644
--- a/dex/governance.go
+++ b/dex/governance.go
@@ -132,6 +132,11 @@ func (d *DexconGovernance) CRS(round uint64) coreCommon.Hash {
return coreCommon.Hash(s.CRS(big.NewInt(int64(round))))
}
+func (d *DexconGovernance) LenCRS() uint64 {
+ s := d.getGovState()
+ return s.LenCRS().Uint64()
+}
+
// ProposeCRS send proposals of a new CRS
func (d *DexconGovernance) ProposeCRS(signedCRS []byte) {
method := vm.GovernanceContractName2Method["proposeCRS"]
diff --git a/dex/handler.go b/dex/handler.go
index 276dd433c..b605c907a 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -98,8 +98,8 @@ type ProtocolManager struct {
noMorePeers chan struct{}
// channels for peerSetLoop
- crsCh chan core.NewCRSEvent
- crsSub event.Subscription
+ chainHeadCh chan core.ChainHeadEvent
+ chainHeadSub event.Subscription
// channels for dexon consensus core
receiveCh chan interface{}
@@ -239,8 +239,8 @@ func (pm *ProtocolManager) Start(srvr p2pServer, maxPeers int) {
go pm.minedBroadcastLoop()
// run the peer set loop
- pm.crsCh = make(chan core.NewCRSEvent)
- // pm.crsSub = pm.gov.SubscribeNewCRSEvent(pm.crsCh)
+ pm.chainHeadCh = make(chan core.ChainHeadEvent)
+ pm.chainHeadSub = pm.blockchain.SubscribeChainHeadEvent(pm.chainHeadCh)
go pm.peerSetLoop()
// start sync handlers
@@ -284,6 +284,7 @@ func (pm *ProtocolManager) Stop() {
pm.txsSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
+ pm.chainHeadSub.Unsubscribe()
// Quit the sync loop.
// After this send has completed, no new peers will be accepted.
@@ -918,16 +919,47 @@ func (pm *ProtocolManager) metaBroadcastLoop() {
// a loop keep building and maintaining peers in notary set.
// TODO: finish this
func (pm *ProtocolManager) peerSetLoop() {
-
log.Debug("start peer set loop")
+ round := pm.gov.LenCRS() - 1
+ log.Trace("first len crs", "len", round+1, "round", round)
+ if round >= 1 {
+ pm.peers.BuildNotaryConn(round - 1)
+ pm.peers.BuildDKGConn(round - 1)
+ }
+ pm.peers.BuildNotaryConn(round)
+ pm.peers.BuildDKGConn(round)
+
for {
select {
- case event := <-pm.crsCh:
- pm.peers.BuildNotaryConn(event.Round)
- pm.peers.BuildDKGConn(event.Round)
- pm.peers.ForgetNotaryConn(event.Round - 1)
- pm.peers.ForgetDKGConn(event.Round - 1)
- case <-pm.quitSync:
+ case <-pm.chainHeadCh:
+ newRound := pm.gov.LenCRS() - 1
+ log.Trace("new round", "round", newRound)
+ if newRound == round {
+ break
+ }
+ if newRound == round+1 {
+ pm.peers.BuildNotaryConn(newRound)
+ pm.peers.BuildDKGConn(newRound)
+ pm.peers.ForgetNotaryConn(round - 1)
+ pm.peers.ForgetDKGConn(round - 1)
+ } else {
+ // just forget all network connection and rebuild.
+ pm.peers.ForgetNotaryConn(round)
+ pm.peers.ForgetDKGConn(round)
+
+ if newRound >= 1 {
+ pm.peers.BuildNotaryConn(newRound - 1)
+ pm.peers.BuildDKGConn(newRound - 1)
+ }
+ pm.peers.BuildNotaryConn(newRound)
+ pm.peers.BuildDKGConn(newRound)
+ }
+ round = newRound
+ case <-time.After(5 * time.Second):
+ pm.peers.lock.Lock()
+ pm.peers.dumpPeerLabel("ticker")
+ pm.peers.lock.Unlock()
+ case <-pm.chainHeadSub.Err():
return
}
}
diff --git a/dex/helper_test.go b/dex/helper_test.go
index 09d15d42f..5b73c4afa 100644
--- a/dex/helper_test.go
+++ b/dex/helper_test.go
@@ -183,6 +183,7 @@ func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *typ
// testGovernance is a fake, helper governance for testing purposes
type testGovernance struct {
numChainsFunc func(uint64) uint32
+ lenCRSFunc func() uint64
notarySetFunc func(uint64, uint32) (map[string]struct{}, error)
dkgSetFunc func(uint64) (map[string]struct{}, error)
}
@@ -191,6 +192,10 @@ func (g *testGovernance) GetNumChains(round uint64) uint32 {
return g.numChainsFunc(round)
}
+func (g *testGovernance) LenCRS() uint64 {
+ return g.lenCRSFunc()
+}
+
func (g *testGovernance) NotarySet(
round uint64, chainID uint32) (map[string]struct{}, error) {
return g.notarySetFunc(round, chainID)
diff --git a/dex/nodetable.go b/dex/nodetable.go
index 929b168a8..f1291b4fd 100644
--- a/dex/nodetable.go
+++ b/dex/nodetable.go
@@ -7,6 +7,7 @@ import (
"github.com/dexon-foundation/dexon/common"
"github.com/dexon-foundation/dexon/crypto/sha3"
"github.com/dexon-foundation/dexon/event"
+ "github.com/dexon-foundation/dexon/log"
"github.com/dexon-foundation/dexon/p2p/enode"
"github.com/dexon-foundation/dexon/rlp"
)
@@ -59,6 +60,8 @@ func (t *nodeTable) Add(metas []*NodeMeta) {
}
t.entry[meta.ID] = meta
newMetas = append(newMetas, meta)
+ log.Trace("add new node meta", "id", meta.ID[:8],
+ "ip", meta.IP, "udp", meta.UDP, "tcp", meta.TCP)
}
t.feed.Send(newMetasEvent{newMetas})
}
diff --git a/dex/peer.go b/dex/peer.go
index 342d0f033..1f2b9518d 100644
--- a/dex/peer.go
+++ b/dex/peer.go
@@ -778,6 +778,7 @@ func (ps *peerSet) Close() {
func (ps *peerSet) BuildNotaryConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
+ defer ps.dumpPeerLabel(fmt.Sprintf("BuildNotaryConn: %d", round))
if _, ok := ps.notaryHistory[round]; ok {
return
@@ -816,9 +817,21 @@ func (ps *peerSet) BuildNotaryConn(round uint64) {
}
}
+func (ps *peerSet) dumpPeerLabel(s string) {
+ log.Trace(s, "peer num", len(ps.peers))
+ for id, labels := range ps.peer2Labels {
+ _, ok := ps.peers[id]
+ for label := range labels {
+ log.Trace(s, "connected", ok, "id", id[:16],
+ "round", label.round, "cid", label.chainID, "set", label.set)
+ }
+ }
+}
+
func (ps *peerSet) ForgetNotaryConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
+ defer ps.dumpPeerLabel(fmt.Sprintf("ForgetNotaryConn: %d", round))
// forget all the rounds before the given round
for r := range ps.notaryHistory {
@@ -862,6 +875,7 @@ func notarySetName(chainID uint32, round uint64) string {
func (ps *peerSet) BuildDKGConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
+ defer ps.dumpPeerLabel(fmt.Sprintf("BuildDKGConn: %d", round))
selfID := ps.srvr.Self().ID.String()
s, err := ps.gov.DKGSet(round)
if err != nil {
@@ -886,6 +900,7 @@ func (ps *peerSet) BuildDKGConn(round uint64) {
func (ps *peerSet) ForgetDKGConn(round uint64) {
ps.lock.Lock()
defer ps.lock.Unlock()
+ defer ps.dumpPeerLabel(fmt.Sprintf("ForgetDKGConn: %d", round))
// forget all the rounds before the given round
for r := range ps.dkgHistory {
@@ -919,6 +934,7 @@ func (ps *peerSet) forgetDKGConn(round uint64) {
// make sure the ps.lock is hold
func (ps *peerSet) addDirectPeer(id string, label peerLabel) {
+ log.Trace("peerSet addDirectPeer", "id", id[:8], "round", label.round, "cid", label.chainID)
// if the peer exists add the label
if p, ok := ps.peers[id]; ok {
p.addLabel(label)
diff --git a/dex/protocol.go b/dex/protocol.go
index a82d23479..d531103fa 100644
--- a/dex/protocol.go
+++ b/dex/protocol.go
@@ -127,6 +127,8 @@ type txPool interface {
type governance interface {
GetNumChains(uint64) uint32
+ LenCRS() uint64
+
NotarySet(uint64, uint32) (map[string]struct{}, error)
DKGSet(uint64) (map[string]struct{}, error)