diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-15 18:17:53 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-03-15 18:17:53 +0800 |
commit | b02fa5ee430cff9dafc9d9c399099a88d554a083 (patch) | |
tree | 31f962f1e40e18a656693ebf38e8176b6e09c9f6 | |
parent | 6a127c42323b9b5cdde1cdb17e385d22ef9dfd10 (diff) | |
download | dexon-consensus-b02fa5ee430cff9dafc9d9c399099a88d554a083.tar.gz dexon-consensus-b02fa5ee430cff9dafc9d9c399099a88d554a083.tar.zst dexon-consensus-b02fa5ee430cff9dafc9d9c399099a88d554a083.zip |
core/syncer: add force sync (#468)
* core: Add Recovery Interface
* core/syncer: modify recovery interface
* core: fix Recovery interface
* core/syncer: rename terminator to watchcat (#491)
* core/syncer: rename terminator to watchcat
* Add error log
* Rename Pat to Feed
* core/syncer: add force sync
* run prepareRandomness if round >= DKGDelayRound
* Add test for Forcsync
-rw-r--r-- | core/blockchain.go | 7 | ||||
-rw-r--r-- | core/consensus.go | 141 | ||||
-rw-r--r-- | core/syncer/consensus.go | 25 | ||||
-rw-r--r-- | core/test/app.go | 9 | ||||
-rw-r--r-- | integration_test/consensus_test.go | 141 |
5 files changed, 260 insertions, 63 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 19a580b..9263f67 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -558,8 +558,11 @@ func (bc *blockChain) prepareBlock(position types.Position, } if tipConfig.IsLastBlock(tip) { if tip.Position.Round+1 != position.Round { - b, err = nil, ErrRoundNotSwitch - return + if !empty { + b, err = nil, ErrRoundNotSwitch + return + } + b.Position.Round = tip.Position.Round + 1 } } else { if tip.Position.Round != position.Round { diff --git a/core/consensus.go b/core/consensus.go index 4201cbc..5ee64c2 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -453,6 +453,7 @@ func NewConsensusForSimulation( func NewConsensusFromSyncer( initBlock *types.Block, initRoundBeginHeight uint64, + startWithEmpty bool, dMoment time.Time, app Application, gov Governance, @@ -495,6 +496,23 @@ func NewConsensusFromSyncer( continue } } + if startWithEmpty { + pos := initBlock.Position + pos.Height++ + block, err := con.bcModule.addEmptyBlock(pos) + if err != nil { + panic(err) + } + con.processBlockChan <- block + if pos.Round >= DKGDelayRound { + rand := &types.AgreementResult{ + BlockHash: block.Hash, + Position: block.Position, + IsEmptyBlock: true, + } + go con.prepareRandomnessResult(rand) + } + } return con, nil } @@ -863,6 +881,10 @@ func (con *Consensus) Stop() { con.baMgr.stop() con.event.Reset() con.waitGroup.Wait() + if nbApp, ok := con.app.(*nonBlocking); ok { + fmt.Println("Stopping nonBlocking App") + nbApp.wait() + } } func (con *Consensus) deliverNetworkMsg() { @@ -1014,62 +1036,64 @@ func (con *Consensus) ProcessAgreementResult( con.logger.Debug("Rebroadcast AgreementResult", "result", rand) con.network.BroadcastAgreementResult(rand) + go con.prepareRandomnessResult(rand) + return nil +} - go func() { - dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) - if err != nil { - con.logger.Error("Failed to get dkg set", - "round", rand.Position.Round, "error", err) - return - } - if _, exist := dkgSet[con.ID]; !exist { - return - } - psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash) - if err != nil { - con.logger.Error("Failed to prepare psig", - "round", rand.Position.Round, - "hash", rand.BlockHash.String()[:6], - "error", err) - return - } - if err = con.signer.SignDKGPartialSignature(psig); err != nil { - con.logger.Error("Failed to sign psig", - "hash", rand.BlockHash.String()[:6], - "error", err) - return - } - if err = con.cfgModule.processPartialSignature(psig); err != nil { - con.logger.Error("Failed process psig", +func (con *Consensus) prepareRandomnessResult(rand *types.AgreementResult) { + dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) + if err != nil { + con.logger.Error("Failed to get dkg set", + "round", rand.Position.Round, "error", err) + return + } + if _, exist := dkgSet[con.ID]; !exist { + return + } + con.logger.Debug("PrepareRandomness", "round", rand.Position.Round, "hash", rand.BlockHash) + psig, err := con.cfgModule.preparePartialSignature(rand.Position.Round, rand.BlockHash) + if err != nil { + con.logger.Error("Failed to prepare psig", + "round", rand.Position.Round, + "hash", rand.BlockHash.String()[:6], + "error", err) + return + } + if err = con.signer.SignDKGPartialSignature(psig); err != nil { + con.logger.Error("Failed to sign psig", + "hash", rand.BlockHash.String()[:6], + "error", err) + return + } + if err = con.cfgModule.processPartialSignature(psig); err != nil { + con.logger.Error("Failed process psig", + "hash", rand.BlockHash.String()[:6], + "error", err) + return + } + con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", + "proposer", psig.ProposerID, + "round", psig.Round, + "hash", psig.Hash.String()[:6]) + con.network.BroadcastDKGPartialSignature(psig) + tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) + if err != nil { + if err != ErrTSigAlreadyRunning { + con.logger.Error("Failed to run TSIG", + "position", rand.Position, "hash", rand.BlockHash.String()[:6], "error", err) - return - } - con.logger.Debug("Calling Network.BroadcastDKGPartialSignature", - "proposer", psig.ProposerID, - "round", psig.Round, - "hash", psig.Hash.String()[:6]) - con.network.BroadcastDKGPartialSignature(psig) - tsig, err := con.cfgModule.runTSig(rand.Position.Round, rand.BlockHash) - if err != nil { - if err != ErrTSigAlreadyRunning { - con.logger.Error("Failed to run TSIG", - "position", rand.Position, - "hash", rand.BlockHash.String()[:6], - "error", err) - } - return } - result := &types.BlockRandomnessResult{ - BlockHash: rand.BlockHash, - Position: rand.Position, - Randomness: tsig.Signature, - } - // ProcessBlockRandomnessResult is not thread-safe so we put the result in - // the message channnel to be processed in the main thread. - con.msgChan <- result - }() - return nil + return + } + result := &types.BlockRandomnessResult{ + BlockHash: rand.BlockHash, + Position: rand.Position, + Randomness: tsig.Signature, + } + // ProcessBlockRandomnessResult is not thread-safe so we put the result in + // the message channnel to be processed in the main thread. + con.msgChan <- result } // ProcessBlockRandomnessResult processes the randomness result. @@ -1094,14 +1118,6 @@ func (con *Consensus) ProcessBlockRandomnessResult( // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { - var exist bool - exist, err = con.nodeSetCache.Exists(b.Position.Round, b.ProposerID) - if err != nil { - return - } - if !exist { - return ErrProposerNotInNodeSet - } err = con.baMgr.processBlock(b) if err == nil && con.debugApp != nil { con.debugApp.BlockReceived(b.Hash) @@ -1137,7 +1153,10 @@ func (con *Consensus) deliveryGuard() { defer con.waitGroup.Done() time.Sleep(con.dMoment.Sub(time.Now())) // Node takes time to start. - time.Sleep(60 * time.Second) + select { + case <-con.ctx.Done(): + case <-time.After(60 * time.Second): + } for { select { case <-con.ctx.Done(): diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index 25911ce..fd48793 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -82,6 +82,7 @@ type Consensus struct { ctxCancel context.CancelFunc syncedLastBlock *types.Block syncedConsensus *core.Consensus + syncedSkipNext bool dummyCancel context.CancelFunc dummyFinished <-chan struct{} dummyMsgBuffer []interface{} @@ -180,6 +181,29 @@ func (con *Consensus) buildAllEmptyBlocks() { } } +// ForceSync forces syncer to become synced. +func (con *Consensus) ForceSync(skip bool) { + if con.syncedLastBlock != nil { + return + } + hash, _ := con.db.GetCompactionChainTipInfo() + var block types.Block + block, err := con.db.GetBlock(hash) + if err != nil { + panic(err) + } + con.logger.Info("Force Sync", "block", &block) + con.setupConfigsUntilRound(block.Position.Round + core.ConfigRoundShift - 1) + con.syncedLastBlock = &block + con.stopBuffering() + con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( + context.Background(), con.network.ReceiveChan(), + func(msg interface{}) { + con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) + }) + con.syncedSkipNext = skip +} + // SyncBlocks syncs blocks from compaction chain, latest is true if the caller // regards the blocks are the latest ones. Notice that latest can be true for // many times. @@ -279,6 +303,7 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { con.syncedConsensus, err = core.NewConsensusFromSyncer( con.syncedLastBlock, con.roundBeginHeights[con.syncedLastBlock.Position.Round], + con.syncedSkipNext, con.dMoment, con.app, con.gov, diff --git a/core/test/app.go b/core/test/app.go index 0a17f13..4cfd580 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -208,6 +208,15 @@ func (app *App) BlockConfirmed(b types.Block) { app.LastConfirmedHeight = b.Position.Height } +// ClearUndeliveredBlocks -- +func (app *App) ClearUndeliveredBlocks() { + app.deliveredLock.RLock() + defer app.deliveredLock.RUnlock() + app.confirmedLock.Lock() + defer app.confirmedLock.Unlock() + app.LastConfirmedHeight = uint64(len(app.DeliverSequence) - 1) +} + // BlockDelivered implements Application interface. func (app *App) BlockDelivered(blockHash common.Hash, pos types.Position, result types.FinalizationResult) { diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index 78d0de5..5131465 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -487,6 +487,147 @@ ReachAlive: } } +func (s *ConsensusTestSuite) TestForceSync() { + // The sync test case: + // - No configuration change. + // - One node does not run when all others starts until aliveRound exceeded. + var ( + req = s.Require() + peerCount = 4 + dMoment = time.Now().UTC() + untilRound = uint64(3) + stopRound = uint64(1) + errChan = make(chan error, 100) + ) + prvKeys, pubKeys, err := test.NewKeys(peerCount) + req.NoError(err) + // Setup seed governance instance. Give a short latency to make this test + // run faster. + seedGov, err := test.NewGovernance( + test.NewState(core.DKGDelayRound, + pubKeys, 100*time.Millisecond, &common.NullLogger{}, true), + core.ConfigRoundShift) + req.NoError(err) + req.NoError(seedGov.State().RequestChange( + test.StateChangeRoundLength, uint64(60))) + seedGov.CatchUpWithRound(0) + seedGov.CatchUpWithRound(1) + // A short round interval. + nodes := s.setupNodes(dMoment, prvKeys, seedGov) + for _, n := range nodes { + go n.con.Run() + } +ReachStop: + for { + // Check if any error happened or sleep for a period of time. + select { + case err := <-errChan: + req.NoError(err) + case <-time.After(5 * time.Second): + } + // If one of the nodes have reached stopRound, stop all nodes to simulate + // crash. + for _, n := range nodes { + pos := n.app.GetLatestDeliveredPosition() + if pos.Round >= stopRound { + break ReachStop + } else { + fmt.Println("latestPos", n.ID, &pos) + } + } + } + + var latestPos types.Position + var latestNodeID types.NodeID + for _, n := range nodes { + n.con.Stop() + time.Sleep(1 * time.Second) + } + for nID, n := range nodes { + pos := n.app.GetLatestDeliveredPosition() + if pos.Newer(latestPos) { + fmt.Println("Newe position", nID, pos) + latestNodeID = nID + latestPos = pos + } + } + fmt.Println("Latest node", latestNodeID, &latestPos) + for nID, node := range nodes { + if nID == latestNodeID { + continue + } + fmt.Printf("[%p] Clearing %s %s\n", node.app, nID, node.app.GetLatestDeliveredPosition()) + node.app.ClearUndeliveredBlocks() + } + syncerCon := make(map[types.NodeID]*syncer.Consensus, len(nodes)) + for _, prvKey := range prvKeys { + nID := types.NewNodeID(prvKey.PublicKey()) + node := nodes[nID] + syncerCon[nID] = syncer.NewConsensus( + dMoment, + node.app, + node.gov, + node.db, + node.network, + prvKey, + &common.NullLogger{}, + ) + } + + targetNode := nodes[latestNodeID] + for nID, node := range nodes { + if nID == latestNodeID { + continue + } + syncedHeight := node.app.GetLatestDeliveredPosition().Height + 1 + // FinalizationHeight = Height + 1 + syncedHeight++ + var err error + for { + fmt.Println("Syncing", nID, syncedHeight) + if syncedHeight >= latestPos.Height { + break + } + _, syncedHeight, err = s.syncBlocksWithSomeNode( + targetNode, node, syncerCon[nID], syncedHeight) + if err != nil { + panic(err) + } + fmt.Println("Syncing", nID, syncedHeight) + } + fmt.Println("Synced", nID, syncedHeight) + } + + for _, con := range syncerCon { + con.ForceSync(true) + } + for nID := range nodes { + con, err := syncerCon[nID].GetSyncedConsensus() + s.Require().NoError(err) + nodes[nID].con = con + } + for _, node := range nodes { + go node.con.Run() + defer node.con.Stop() + } + +Loop: + for { + <-time.After(5 * time.Second) + fmt.Println("check latest position delivered by each node") + for _, n := range nodes { + latestPos := n.app.GetLatestDeliveredPosition() + fmt.Println("latestPos", n.ID, &latestPos) + if latestPos.Round < untilRound { + continue Loop + } + } + // Oh ya. + break + } + s.verifyNodes(nodes) +} + func TestConsensus(t *testing.T) { suite.Run(t, new(ConsensusTestSuite)) } |