aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSonic <sonic@dexon.org>2019-03-20 16:34:56 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:58 +0800
commit2a6418678807137dfe6e5fba2969863bd468b840 (patch)
treebea1a37f28080c1fcdf58a6f9eb9e1dbd01df16e
parent4364f00e2204debb48edf2940e1f89f46d7a9a18 (diff)
downloaddexon-2a6418678807137dfe6e5fba2969863bd468b840.tar.gz
dexon-2a6418678807137dfe6e5fba2969863bd468b840.tar.zst
dexon-2a6418678807137dfe6e5fba2969863bd468b840.zip
dex: recieve bp msg when recovery, use atomic to protect the flag (#286)
-rw-r--r--dex/blockproposer.go10
-rw-r--r--dex/handler.go26
2 files changed, 19 insertions, 17 deletions
diff --git a/dex/blockproposer.go b/dex/blockproposer.go
index 63aaa5b51..24a277335 100644
--- a/dex/blockproposer.go
+++ b/dex/blockproposer.go
@@ -91,7 +91,7 @@ func (b *blockProposer) Stop() {
defer b.mu.Unlock()
if atomic.LoadInt32(&b.running) == 1 {
- b.dex.protocolManager.receiveEnabled = false
+ atomic.StoreInt32(&b.dex.protocolManager.receiveEnabled, 0)
close(b.stopCh)
b.wg.Wait()
atomic.StoreInt32(&b.proposing, 0)
@@ -157,7 +157,7 @@ func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) {
_, coreHeight := db.GetCompactionChainTipInfo()
// Stop receiving block proposer message when syncing.
- b.dex.protocolManager.receiveEnabled = false
+ atomic.StoreInt32(&b.dex.protocolManager.receiveEnabled, 0)
Loop:
for {
@@ -211,10 +211,7 @@ ListenLoop:
log.Error("SyncBlocks fail", "err", err)
return nil, err
}
- if !b.dex.protocolManager.receiveEnabled {
- // Start receiving block proposer message.
- b.dex.protocolManager.receiveEnabled = true
- }
+ atomic.CompareAndSwapInt32(&b.dex.protocolManager.receiveEnabled, 0, 1)
if synced {
log.Debug("Consensus core synced")
break ListenLoop
@@ -252,6 +249,7 @@ ListenLoop:
log.Info("Sleeping until next starting time", "time", nextDMoment)
time.Sleep(time.Duration(nextDMoment-time.Now().Unix()) * time.Second)
+ atomic.StoreInt32(&b.dex.protocolManager.receiveEnabled, 1)
consensusSync.ForceSync(true)
break ListenLoop
}
diff --git a/dex/handler.go b/dex/handler.go
index ea5c31b36..3a5a81f50 100644
--- a/dex/handler.go
+++ b/dex/handler.go
@@ -135,7 +135,7 @@ type ProtocolManager struct {
// channels for dexon consensus core
receiveCh chan interface{}
- receiveEnabled bool
+ receiveEnabled int32
srvr p2pServer
@@ -179,12 +179,16 @@ func NewProtocolManager(
recordsyncCh: make(chan *recordsync),
quitSync: make(chan struct{}),
receiveCh: make(chan interface{}, 1024),
- receiveEnabled: isBlockProposer,
+ receiveEnabled: 0,
isBlockProposer: isBlockProposer,
app: app,
blockNumberGauge: metrics.GetOrRegisterGauge("dex/blocknumber", nil),
}
+ if isBlockProposer {
+ manager.receiveEnabled = 1
+ }
+
// Figure out whether to allow fast sync or not
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
log.Warn("Blockchain not empty, fast sync disabled")
@@ -833,7 +837,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Block proposer-only messages.
case msg.Code == CoreBlockMsg:
- if !pm.receiveEnabled {
+ if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
break
}
var blocks []*coreTypes.Block
@@ -845,7 +849,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.receiveCh <- block
}
case msg.Code == VoteMsg:
- if !pm.receiveEnabled {
+ if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
break
}
var votes []*coreTypes.Vote
@@ -859,7 +863,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.receiveCh <- vote
}
case msg.Code == AgreementMsg:
- if !pm.receiveEnabled {
+ if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
break
}
// DKG set is receiver
@@ -870,7 +874,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkAgreement(rlpHash(agreement))
pm.receiveCh <- &agreement
case msg.Code == RandomnessMsg:
- if !pm.receiveEnabled {
+ if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
break
}
// Broadcast this to all peer
@@ -883,7 +887,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
pm.receiveCh <- randomness
}
case msg.Code == DKGPrivateShareMsg:
- if !pm.receiveEnabled {
+ if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
break
}
// Do not relay this msg
@@ -894,7 +898,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.MarkDKGPrivateShares(rlpHash(ps))
pm.receiveCh <- &ps
case msg.Code == DKGPartialSignatureMsg:
- if !pm.receiveEnabled {
+ if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
break
}
// broadcast in DKG set
@@ -904,7 +908,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
pm.receiveCh <- &psig
case msg.Code == PullBlocksMsg:
- if !pm.receiveEnabled {
+ if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
break
}
var hashes coreCommon.Hashes
@@ -915,7 +919,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
log.Debug("Push blocks", "blocks", blocks)
return p.SendCoreBlocks(blocks)
case msg.Code == PullVotesMsg:
- if !pm.receiveEnabled {
+ if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
break
}
next, ok := pm.nextPullVote.Load(p.ID())
@@ -934,7 +938,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
log.Debug("Push votes", "votes", votes)
return p.SendVotes(votes)
case msg.Code == PullRandomnessMsg:
- if !pm.receiveEnabled {
+ if atomic.LoadInt32(&pm.receiveEnabled) == 0 {
break
}
var hashes coreCommon.Hashes