diff options
authorJimmy Hu <jimmy.hu@dexon.org>2019-03-15 18:17:53 +0800
committerWei-Ning Huang <w@dexon.org>2019-03-15 18:17:53 +0800
commitb02fa5ee430cff9dafc9d9c399099a88d554a083 (patch)
parent6a127c42323b9b5cdde1cdb17e385d22ef9dfd10 (diff)
core/syncer: add force sync (#468)
* core: Add Recovery Interface * core/syncer: modify recovery interface * core: fix Recovery interface * core/syncer: rename terminator to watchcat (#491) * core/syncer: rename terminator to watchcat * Add error log * Rename Pat to Feed * core/syncer: add force sync * run prepareRandomness if round >= DKGDelayRound * Add test for Forcsync
5 files changed, 260 insertions, 63 deletions
diff --git a/core/blockchain.go b/core/blockchain.go
index 19a580b..9263f67 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -558,8 +558,11 @@ func (bc *blockChain) prepareBlock(position types.Position,
if tipConfig.IsLastBlock(tip) {
if tip.Position.Round+1 != position.Round {
- b, err = nil, ErrRoundNotSwitch
- return
+ if !empty {
+ b, err = nil, ErrRoundNotSwitch
+ return
+ }
+ b.Position.Round = tip.Position.Round + 1
} else {
if tip.Position.Round != position.Round {
diff --git a/core/consensus.go b/core/consensus.go
index 4201cbc..5ee64c2 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -453,6 +453,7 @@ func NewConsensusForSimulation(
func NewConsensusFromSyncer(
initBlock *types.Block,
initRoundBeginHeight uint64,
+ startWithEmpty bool,
dMoment time.Time,
app Application,
gov Governance,
@@ -495,6 +496,23 @@ func NewConsensusFromSyncer(
+ if startWithEmpty {
+ pos := initBlock.Position
+ pos.Height++
+ block, err := con.bcModule.addEmptyBlock(pos)
+ if err != nil {
+ panic(err)
+ }
+ con.processBlockChan <- block
+ if pos.Round >= DKGDelayRound {
+ rand := &types.AgreementResult{
+ BlockHash: block.Hash,
+ Position: block.Position,
+ IsEmptyBlock: true,
+ }
+ go con.prepareRandomnessResult(rand)
+ }
+ }
return con, nil
@@ -863,6 +881,10 @@ func (con *Consensus) Stop() {
+ if nbApp, ok := con.app.(*nonBlocking); ok {
+ fmt.Println("Stopping nonBlocking App")
+ nbApp.wait()
+ }
func (con *Consensus) deliverNetworkMsg() {
@@ -1014,62 +1036,64 @@ func (con *Consensus) ProcessAgreementResult(
con.logger.Debug("Rebroadcast AgreementResult",
"result", rand)
+ go con.prepareRandomnessResult(rand)
+ return nil
- go func() {
- dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round)
- if err != nil {
- con.logger.Error("Failed to get dkg set",
- "round", rand.Position.Round, "error", err)
- return
- }
- if _, exist := dkgSet[con.ID]; !exist {
- return
- }
- psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash)
- if err != nil {
- con.logger.Error("Failed to prepare psig",
- "round", rand.Position.Round,
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- return
- }
- if err = con.signer.SignDKGPartialSignature(psig); err != nil {
- con.logger.Error("Failed to sign psig",
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- return
- }
- if err = con.cfgModule.processPartialSignature(psig); err != nil {
- con.logger.Error("Failed process psig",
+func (con *Consensus) prepareRandomnessResult(rand *types.AgreementResult) {
+ dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round)
+ if err != nil {
+ con.logger.Error("Failed to get dkg set",
+ "round", rand.Position.Round, "error", err)
+ return
+ }
+ if _, exist := dkgSet[con.ID]; !exist {
+ return
+ }
+ con.logger.Debug("PrepareRandomness", "round", rand.Position.Round, "hash", rand.BlockHash)
+ psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash)
+ if err != nil {
+ con.logger.Error("Failed to prepare psig",
+ "round", rand.Position.Round,
+ "hash", rand.BlockHash.String()[:6],
+ "error", err)
+ return
+ }
+ if err = con.signer.SignDKGPartialSignature(psig); err != nil {
+ con.logger.Error("Failed to sign psig",
+ "hash", rand.BlockHash.String()[:6],
+ "error", err)
+ return
+ }
+ if err = con.cfgModule.processPartialSignature(psig); err != nil {
+ con.logger.Error("Failed process psig",
+ "hash", rand.BlockHash.String()[:6],
+ "error", err)
+ return
+ }
+ con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
+ "proposer", psig.ProposerID,
+ "round", psig.Round,
+ "hash", psig.Hash.String()[:6])
+ con.network.BroadcastDKGPartialSignature(psig)
+ tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash)
+ if err != nil {
+ if err != ErrTSigAlreadyRunning {
+ con.logger.Error("Failed to run TSIG",
+ "position", rand.Position,
"hash", rand.BlockHash.String()[:6],
"error", err)
- return
- }
- con.logger.Debug("Calling Network.BroadcastDKGPartialSignature",
- "proposer", psig.ProposerID,
- "round", psig.Round,
- "hash", psig.Hash.String()[:6])
- con.network.BroadcastDKGPartialSignature(psig)
- tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash)
- if err != nil {
- if err != ErrTSigAlreadyRunning {
- con.logger.Error("Failed to run TSIG",
- "position", rand.Position,
- "hash", rand.BlockHash.String()[:6],
- "error", err)
- }
- return
- result := &types.BlockRandomnessResult{
- BlockHash: rand.BlockHash,
- Position: rand.Position,
- Randomness: tsig.Signature,
- }
- // ProcessBlockRandomnessResult is not thread-safe so we put the result in
- // the message channnel to be processed in the main thread.
- con.msgChan <- result
- }()
- return nil
+ return
+ }
+ result := &types.BlockRandomnessResult{
+ BlockHash: rand.BlockHash,
+ Position: rand.Position,
+ Randomness: tsig.Signature,
+ }
+ // ProcessBlockRandomnessResult is not thread-safe so we put the result in
+ // the message channnel to be processed in the main thread.
+ con.msgChan <- result
// ProcessBlockRandomnessResult processes the randomness result.
@@ -1094,14 +1118,6 @@ func (con *Consensus) ProcessBlockRandomnessResult(
// preProcessBlock performs Byzantine Agreement on the block.
func (con *Consensus) preProcessBlock(b *types.Block) (err error) {
- var exist bool
- exist, err = con.nodeSetCache.Exists(b.Position.Round, b.ProposerID)
- if err != nil {
- return
- }
- if !exist {
- return ErrProposerNotInNodeSet
- }
err = con.baMgr.processBlock(b)
if err == nil && con.debugApp != nil {
@@ -1137,7 +1153,10 @@ func (con *Consensus) deliveryGuard() {
defer con.waitGroup.Done()
// Node takes time to start.
- time.Sleep(60 * time.Second)
+ select {
+ case <-con.ctx.Done():
+ case <-time.After(60 * time.Second):
+ }
for {
select {
case <-con.ctx.Done():
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 25911ce..fd48793 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -82,6 +82,7 @@ type Consensus struct {
ctxCancel context.CancelFunc
syncedLastBlock *types.Block
syncedConsensus *core.Consensus
+ syncedSkipNext bool
dummyCancel context.CancelFunc
dummyFinished <-chan struct{}
dummyMsgBuffer []interface{}
@@ -180,6 +181,29 @@ func (con *Consensus) buildAllEmptyBlocks() {
+// ForceSync forces syncer to become synced.
+func (con *Consensus) ForceSync(skip bool) {
+ if con.syncedLastBlock != nil {
+ return
+ }
+ hash, _ := con.db.GetCompactionChainTipInfo()
+ var block types.Block
+ block, err := con.db.GetBlock(hash)
+ if err != nil {
+ panic(err)
+ }
+ con.logger.Info("Force Sync", "block", &block)
+ con.setupConfigsUntilRound(block.Position.Round + core.ConfigRoundShift - 1)
+ con.syncedLastBlock = &block
+ con.stopBuffering()
+ con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver(
+ context.Background(), con.network.ReceiveChan(),
+ func(msg interface{}) {
+ con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg)
+ })
+ con.syncedSkipNext = skip
// SyncBlocks syncs blocks from compaction chain, latest is true if the caller
// regards the blocks are the latest ones. Notice that latest can be true for
// many times.
@@ -279,6 +303,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) {
con.syncedConsensus, err = core.NewConsensusFromSyncer(
+ con.syncedSkipNext,
diff --git a/core/test/app.go b/core/test/app.go
index 0a17f13..4cfd580 100644
--- a/core/test/app.go
+++ b/core/test/app.go
@@ -208,6 +208,15 @@ func (app *App) BlockConfirmed(b types.Block) {
app.LastConfirmedHeight = b.Position.Height
+// ClearUndeliveredBlocks --
+func (app *App) ClearUndeliveredBlocks() {
+ app.deliveredLock.RLock()
+ defer app.deliveredLock.RUnlock()
+ app.confirmedLock.Lock()
+ defer app.confirmedLock.Unlock()
+ app.LastConfirmedHeight = uint64(len(app.DeliverSequence) - 1)
// BlockDelivered implements Application interface.
func (app *App) BlockDelivered(blockHash common.Hash, pos types.Position,
result types.FinalizationResult) {
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index 78d0de5..5131465 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -487,6 +487,147 @@ ReachAlive:
+func (s *ConsensusTestSuite) TestForceSync() {
+ // The sync test case:
+ // - No configuration change.
+ // - One node does not run when all others starts until aliveRound exceeded.
+ var (
+ req = s.Require()
+ peerCount = 4
+ dMoment = time.Now().UTC()
+ untilRound = uint64(3)
+ stopRound = uint64(1)
+ errChan = make(chan error, 100)
+ )
+ prvKeys, pubKeys, err := test.NewKeys(peerCount)
+ req.NoError(err)
+ // Setup seed governance instance. Give a short latency to make this test
+ // run faster.
+ seedGov, err := test.NewGovernance(
+ test.NewState(core.DKGDelayRound,
+ pubKeys, 100*time.Millisecond, &common.NullLogger{}, true),
+ core.ConfigRoundShift)
+ req.NoError(err)
+ req.NoError(seedGov.State().RequestChange(
+ test.StateChangeRoundLength, uint64(60)))
+ seedGov.CatchUpWithRound(0)
+ seedGov.CatchUpWithRound(1)
+ // A short round interval.
+ nodes := s.setupNodes(dMoment, prvKeys, seedGov)
+ for _, n := range nodes {
+ go n.con.Run()
+ }
+ for {
+ // Check if any error happened or sleep for a period of time.
+ select {
+ case err := <-errChan:
+ req.NoError(err)
+ case <-time.After(5 * time.Second):
+ }
+ // If one of the nodes have reached stopRound, stop all nodes to simulate
+ // crash.
+ for _, n := range nodes {
+ pos := n.app.GetLatestDeliveredPosition()
+ if pos.Round >= stopRound {
+ break ReachStop
+ } else {
+ fmt.Println("latestPos", n.ID, &pos)
+ }
+ }
+ }
+ var latestPos types.Position
+ var latestNodeID types.NodeID
+ for _, n := range nodes {
+ n.con.Stop()
+ time.Sleep(1 * time.Second)
+ }
+ for nID, n := range nodes {
+ pos := n.app.GetLatestDeliveredPosition()
+ if pos.Newer(latestPos) {
+ fmt.Println("Newe position", nID, pos)
+ latestNodeID = nID
+ latestPos = pos
+ }
+ }
+ fmt.Println("Latest node", latestNodeID, &latestPos)
+ for nID, node := range nodes {
+ if nID == latestNodeID {
+ continue
+ }
+ fmt.Printf("[%p] Clearing %s %s\n", node.app, nID, node.app.GetLatestDeliveredPosition())
+ node.app.ClearUndeliveredBlocks()
+ }
+ syncerCon := make(map[types.NodeID]*syncer.Consensus, len(nodes))
+ for _, prvKey := range prvKeys {
+ nID := types.NewNodeID(prvKey.PublicKey())
+ node := nodes[nID]
+ syncerCon[nID] = syncer.NewConsensus(
+ dMoment,
+ node.app,
+ node.gov,
+ node.db,
+ node.network,
+ prvKey,
+ &common.NullLogger{},
+ )
+ }
+ targetNode := nodes[latestNodeID]
+ for nID, node := range nodes {
+ if nID == latestNodeID {
+ continue
+ }
+ syncedHeight := node.app.GetLatestDeliveredPosition().Height + 1
+ // FinalizationHeight = Height + 1
+ syncedHeight++
+ var err error
+ for {
+ fmt.Println("Syncing", nID, syncedHeight)
+ if syncedHeight >= latestPos.Height {
+ break
+ }
+ _, syncedHeight, err = s.syncBlocksWithSomeNode(
+ targetNode, node, syncerCon[nID], syncedHeight)
+ if err != nil {
+ panic(err)
+ }
+ fmt.Println("Syncing", nID, syncedHeight)
+ }
+ fmt.Println("Synced", nID, syncedHeight)
+ }
+ for _, con := range syncerCon {
+ con.ForceSync(true)
+ }
+ for nID := range nodes {
+ con, err := syncerCon[nID].GetSyncedConsensus()
+ s.Require().NoError(err)
+ nodes[nID].con = con
+ }
+ for _, node := range nodes {
+ go node.con.Run()
+ defer node.con.Stop()
+ }
+ for {
+ <-time.After(5 * time.Second)
+ fmt.Println("check latest position delivered by each node")
+ for _, n := range nodes {
+ latestPos := n.app.GetLatestDeliveredPosition()
+ fmt.Println("latestPos", n.ID, &latestPos)
+ if latestPos.Round < untilRound {
+ continue Loop
+ }
+ }
+ // Oh ya.
+ break
+ }
+ s.verifyNodes(nodes)
func TestConsensus(t *testing.T) {
suite.Run(t, new(ConsensusTestSuite))