diff options
author | Mission Liao <mission.liao@dexon.org> | 2018-11-20 13:57:24 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-11-20 13:57:24 +0800 |
commit | 6d95559bf7eb62e6c114ca4d4040c44ffd553629 (patch) | |
tree | 5c62252e4cfe59c318ef3bb67b6c55891a58a4d3 | |
parent | e5891f7ca08737c3f3bc37fd523537cb243f8b0d (diff) | |
download | tangerine-consensus-6d95559bf7eb62e6c114ca4d4040c44ffd553629.tar.gz tangerine-consensus-6d95559bf7eb62e6c114ca4d4040c44ffd553629.tar.zst tangerine-consensus-6d95559bf7eb62e6c114ca4d4040c44ffd553629.zip |
core: support NumChains change for BA modules (#339)
-rw-r--r-- | core/agreement-mgr.go | 423 | ||||
-rw-r--r-- | core/agreement.go | 11 | ||||
-rw-r--r-- | core/consensus.go | 286 | ||||
-rw-r--r-- | core/consensus_test.go | 37 | ||||
-rw-r--r-- | core/lattice-data.go | 113 | ||||
-rw-r--r-- | core/lattice-data_test.go | 14 | ||||
-rw-r--r-- | core/lattice.go | 58 | ||||
-rw-r--r-- | core/test/app.go | 3 | ||||
-rw-r--r-- | core/test/governance.go | 12 | ||||
-rw-r--r-- | core/test/governance_test.go | 6 | ||||
-rw-r--r-- | core/types/block-randomness.go | 7 | ||||
-rw-r--r-- | core/utils.go | 6 | ||||
-rw-r--r-- | integration_test/consensus_test.go | 92 | ||||
-rw-r--r-- | integration_test/with_scheduler_test.go | 26 | ||||
-rw-r--r-- | simulation/simulation.go | 2 |
15 files changed, 763 insertions, 333 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go new file mode 100644 index 0000000..10469de --- /dev/null +++ b/core/agreement-mgr.go @@ -0,0 +1,423 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package core + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +// Errors returned from BA modules +var ( + ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished") +) + +// genValidLeader generate a validLeader function for agreement modules. +func genValidLeader( + mgr *agreementMgr) func(*types.Block) (bool, error) { + return func(block *types.Block) (bool, error) { + if block.Timestamp.After(time.Now()) { + return false, nil + } + if err := mgr.lattice.SanityCheck(block); err != nil { + if err == ErrRetrySanityCheckLater { + return false, nil + } + return false, err + } + mgr.logger.Debug("Calling Application.VerifyBlock", "block", block) + switch mgr.app.VerifyBlock(block) { + case types.VerifyInvalidBlock: + return false, ErrInvalidBlock + case types.VerifyRetryLater: + return false, nil + default: + } + return true, nil + } +} + +type agreementMgrConfig struct { + beginTime time.Time + numChains uint32 + roundInterval time.Duration + notarySetSize uint32 + lambdaBA time.Duration + crs common.Hash +} + +type baRoundSetting struct { + chainID uint32 + notarySet map[types.NodeID]struct{} + agr *agreement + recv *consensusBAReceiver + ticker Ticker + crs common.Hash +} + +type agreementMgr struct { + // TODO(mission): unbound Consensus instance from this module. + con *Consensus + ID types.NodeID + app Application + gov Governance + network Network + logger common.Logger + cache *utils.NodeSetCache + auth *Authenticator + lattice *Lattice + ctx context.Context + lastEndTime time.Time + configs []*agreementMgrConfig + baModules []*agreement + waitGroup sync.WaitGroup + pendingVotes map[uint64][]*types.Vote + pendingBlocks map[uint64][]*types.Block + + // This lock should be used when attempting to: + // - add a new baModule. + // - remove all baModules when stopping. In this case, the cleaner need + // to wait for all routines runnning baModules finished. + // - access a method of baModule. + // - append a config from new round. + // The routine running corresponding baModule, however, doesn't have to + // acquire this lock. + lock sync.RWMutex +} + +func newAgreementMgr(con *Consensus, dMoment time.Time) *agreementMgr { + return &agreementMgr{ + con: con, + ID: con.ID, + app: con.app, + gov: con.gov, + network: con.network, + logger: con.logger, + cache: con.nodeSetCache, + auth: con.authModule, + lattice: con.lattice, + ctx: con.ctx, + lastEndTime: dMoment, + } +} + +func (mgr *agreementMgr) appendConfig( + round uint64, config *types.Config, crs common.Hash) (err error) { + mgr.lock.Lock() + defer mgr.lock.Unlock() + // TODO(mission): initiate this module from some round > 0. + if round != uint64(len(mgr.configs)) { + return ErrRoundNotIncreasing + } + newConfig := &agreementMgrConfig{ + beginTime: mgr.lastEndTime, + numChains: config.NumChains, + roundInterval: config.RoundInterval, + notarySetSize: config.NotarySetSize, + lambdaBA: config.LambdaBA, + crs: crs, + } + mgr.configs = append(mgr.configs, newConfig) + mgr.lastEndTime = mgr.lastEndTime.Add(config.RoundInterval) + // Create baModule for newly added chain. + for i := uint32(len(mgr.baModules)); i < newConfig.numChains; i++ { + // Prepare modules. + recv := &consensusBAReceiver{ + consensus: mgr.con, + chainID: i, + restartNotary: make(chan bool, 1), + } + agrModule := newAgreement( + mgr.con.ID, + recv, + newLeaderSelector(genValidLeader(mgr), mgr.logger), + mgr.auth) + // Hacky way to make agreement module self contained. + recv.agreementModule = agrModule + mgr.baModules = append(mgr.baModules, agrModule) + go mgr.runBA(round, i) + } + return nil +} + +func (mgr *agreementMgr) processVote(v *types.Vote) error { + v = v.Clone() + mgr.lock.RLock() + defer mgr.lock.RUnlock() + if v.Position.ChainID >= uint32(len(mgr.baModules)) { + mgr.logger.Error("Process vote for unknown chain to BA", + "position", &v.Position, + "baChain", len(mgr.baModules), + "baRound", len(mgr.configs)) + return utils.ErrInvalidChainID + } + return mgr.baModules[v.Position.ChainID].processVote(v) +} + +func (mgr *agreementMgr) processBlock(b *types.Block) error { + mgr.lock.RLock() + defer mgr.lock.RUnlock() + if b.Position.ChainID >= uint32(len(mgr.baModules)) { + mgr.logger.Error("Process block for unknown chain to BA", + "position", &b.Position, + "baChain", len(mgr.baModules), + "baRound", len(mgr.configs)) + return utils.ErrInvalidChainID + } + return mgr.baModules[b.Position.ChainID].processBlock(b) +} + +func (mgr *agreementMgr) processAgreementResult( + result *types.AgreementResult) error { + mgr.lock.RLock() + defer mgr.lock.RUnlock() + if result.Position.ChainID >= uint32(len(mgr.baModules)) { + mgr.logger.Error("Process unknown result for unknown chain to BA", + "position", &result.Position, + "baChain", len(mgr.baModules), + "baRound", len(mgr.configs)) + return utils.ErrInvalidChainID + } + agreement := mgr.baModules[result.Position.ChainID] + aID := agreement.agreementID() + if isStop(aID) { + return nil + } + if result.Position.Newer(&aID) { + mgr.logger.Info("Syncing BA", "position", &result.Position) + nodes, err := mgr.cache.GetNodeSet(result.Position.Round) + if err != nil { + return err + } + mgr.logger.Debug("Calling Network.PullBlocks for syncing BA", + "hash", result.BlockHash) + mgr.network.PullBlocks(common.Hashes{result.BlockHash}) + mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round) + crs := mgr.gov.CRS(result.Position.Round) + nIDs := nodes.GetSubSet( + int(mgr.gov.Configuration(result.Position.Round).NotarySetSize), + types.NewNotarySetTarget(crs, result.Position.ChainID)) + for _, vote := range result.Votes { + agreement.processVote(&vote) + } + agreement.restart(nIDs, result.Position, crs) + } + return nil +} + +func (mgr *agreementMgr) stop() { + // Stop all running agreement modules. + func() { + mgr.lock.Lock() + defer mgr.lock.Unlock() + for _, agr := range mgr.baModules { + agr.stop() + } + }() + // Block until all routines are done. + mgr.waitGroup.Wait() +} + +func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { + mgr.waitGroup.Add(1) + defer mgr.waitGroup.Done() + // Acquire agreement module. + agr, recv := func() (*agreement, *consensusBAReceiver) { + mgr.lock.RLock() + defer mgr.lock.RUnlock() + agr := mgr.baModules[chainID] + return agr, agr.data.recv.(*consensusBAReceiver) + }() + // These are round based variables. + var ( + currentRound uint64 + nextRound = initRound + setting = baRoundSetting{ + chainID: chainID, + agr: agr, + recv: recv, + } + roundBeginTime time.Time + roundEndTime time.Time + tickDuration time.Duration + ) + + // Check if this routine needs to awake in this round and prepare essential + // variables when yes. + checkRound := func() (awake bool) { + defer func() { + currentRound = nextRound + nextRound++ + }() + // Wait until the configuartion for next round is ready. + var config *agreementMgrConfig + for { + config = func() *agreementMgrConfig { + mgr.lock.RLock() + defer mgr.lock.RUnlock() + if nextRound < uint64(len(mgr.configs)) { + return mgr.configs[nextRound] + } + return nil + }() + if config != nil { + break + } else { + mgr.logger.Info("round is not ready", "round", nextRound) + time.Sleep(1 * time.Second) + } + } + // Set next checkpoint. + roundBeginTime = config.beginTime + roundEndTime = config.beginTime.Add(config.roundInterval) + // Check if this chain handled by this routine included in this round. + if chainID >= config.numChains { + return false + } + // Check if this node in notary set of this chain in this round. + nodeSet, err := mgr.cache.GetNodeSet(nextRound) + if err != nil { + panic(err) + } + setting.crs = config.crs + setting.notarySet = nodeSet.GetSubSet( + int(config.notarySetSize), + types.NewNotarySetTarget(config.crs, chainID)) + _, awake = setting.notarySet[mgr.ID] + // Setup ticker + if tickDuration != config.lambdaBA { + if setting.ticker != nil { + setting.ticker.Stop() + } + setting.ticker = newTicker(mgr.gov, nextRound, TickerBA) + tickDuration = config.lambdaBA + } + return + } +Loop: + for { + select { + case <-mgr.ctx.Done(): + break Loop + default: + } + now := time.Now().UTC() + if !checkRound() { + if now.After(roundEndTime) { + // That round is passed. + continue Loop + } + // Sleep until next checkpoint. + select { + case <-mgr.ctx.Done(): + break Loop + case <-time.After(roundEndTime.Sub(now)): + continue Loop + } + } + // Sleep until round begin. Here a biased round begin time would be + // used instead of the one in config. The reason it to disperse the load + // of fullnodes to verify confirmed blocks from each chain. + if now.Before(pickBiasedTime(roundBeginTime, 4*tickDuration)) { + select { + case <-mgr.ctx.Done(): + break Loop + case <-time.After(roundBeginTime.Sub(now)): + } + // Clean the tick channel after awake: the tick would be queued in + // channel, thus the first few ticks would not tick on expected + // interval. + <-setting.ticker.Tick() + <-setting.ticker.Tick() + } + // Run BA for this round. + recv.round = currentRound + recv.changeNotaryTime = roundEndTime + recv.restartNotary <- false + if err := mgr.baRoutineForOneRound(&setting); err != nil { + mgr.logger.Error("BA routine failed", + "error", err, + "nodeID", mgr.ID, + "chain", chainID) + break Loop + } + } +} + +func (mgr *agreementMgr) baRoutineForOneRound( + setting *baRoundSetting) (err error) { + agr := setting.agr + recv := setting.recv +Loop: + for { + select { + case <-mgr.ctx.Done(): + break Loop + default: + } + select { + case newNotary := <-recv.restartNotary: + if newNotary { + // This round is finished. + break Loop + } + nextHeight, err := mgr.lattice.NextHeight(recv.round, setting.chainID) + if err != nil { + panic(err) + } + agr.restart(setting.notarySet, types.Position{ + Round: recv.round, + ChainID: setting.chainID, + Height: nextHeight, + }, setting.crs) + default: + } + if agr.pullVotes() { + pos := agr.agreementID() + mgr.logger.Debug("Calling Network.PullVotes for syncing votes", + "position", &pos) + mgr.network.PullVotes(pos) + } + if err = agr.nextState(); err != nil { + mgr.logger.Error("Failed to proceed to next state", + "nodeID", mgr.ID.String(), + "error", err) + break Loop + } + for i := 0; i < agr.clocks(); i++ { + // Priority select for agreement.done(). + select { + case <-agr.done(): + continue Loop + default: + } + select { + case <-agr.done(): + continue Loop + case <-setting.ticker.Tick(): + } + } + } + return nil +} diff --git a/core/agreement.go b/core/agreement.go index ff1c71a..4fb0dea 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -311,12 +311,13 @@ func (a *agreement) processVote(vote *types.Vote) error { return err } aID := a.agreementID() + // Agreement module has stopped. + if isStop(aID) { + return nil + } if vote.Position != aID { - // Agreement module has stopped. - if !isStop(aID) { - if aID.Newer(&vote.Position) { - return nil - } + if aID.Newer(&vote.Position) { + return nil } a.pendingVote = append(a.pendingVote, pendingVote{ vote: vote, diff --git a/core/consensus.go b/core/consensus.go index e09ee25..49874d3 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -56,6 +56,8 @@ var ( "incorrect vote position") ErrIncorrectVoteProposer = fmt.Errorf( "incorrect vote proposer") + ErrCRSNotReady = fmt.Errorf( + "CRS not ready") ) // consensusBAReceiver implements agreementReceiver. @@ -103,20 +105,20 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { func (recv *consensusBAReceiver) ConfirmBlock( hash common.Hash, votes map[types.NodeID]*types.Vote) { var block *types.Block - if (hash == common.Hash{}) { + isEmptyBlockConfirmed := hash == common.Hash{} + if isEmptyBlockConfirmed { aID := recv.agreementModule.agreementID() recv.consensus.logger.Info("Empty block is confirmed", "position", &aID) var err error - block, err = recv.consensus.proposeEmptyBlock(recv.chainID) + block, err = recv.consensus.proposeEmptyBlock(recv.round, recv.chainID) if err != nil { recv.consensus.logger.Error("Propose empty block failed", "error", err) return } } else { var exist bool - block, exist = recv.consensus.baModules[recv.chainID]. - findCandidateBlockNoLock(hash) + block, exist = recv.agreementModule.findCandidateBlockNoLock(hash) if !exist { recv.consensus.logger.Error("Unknown block confirmed", "hash", hash, @@ -150,9 +152,10 @@ func (recv *consensusBAReceiver) ConfirmBlock( voteList = append(voteList, *vote) } result := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - Votes: voteList, + BlockHash: block.Hash, + Position: block.Position, + Votes: voteList, + IsEmptyBlock: isEmptyBlockConfirmed, } recv.consensus.logger.Debug("Calling Network.BroadcastAgreementResult", "result", result) @@ -273,8 +276,7 @@ type Consensus struct { authModule *Authenticator // BA. - baModules []*agreement - receivers []*consensusBAReceiver + baMgr *agreementMgr baConfirmedBlock map[common.Hash]chan<- *types.Block // DKG. @@ -365,49 +367,8 @@ func NewConsensus( event: common.NewEvent(), logger: logger, } - - validLeader := func(block *types.Block) (bool, error) { - if block.Timestamp.After(time.Now()) { - return false, nil - } - if err := lattice.SanityCheck(block); err != nil { - if err == ErrRetrySanityCheckLater { - return false, nil - } - return false, err - } - logger.Debug("Calling Application.VerifyBlock", "block", block) - switch app.VerifyBlock(block) { - case types.VerifyInvalidBlock: - return false, ErrInvalidBlock - case types.VerifyRetryLater: - return false, nil - default: - } - return true, nil - } - - con.baModules = make([]*agreement, config.NumChains) - con.receivers = make([]*consensusBAReceiver, config.NumChains) - for i := uint32(0); i < config.NumChains; i++ { - chainID := i - recv := &consensusBAReceiver{ - consensus: con, - chainID: chainID, - restartNotary: make(chan bool, 1), - } - agreementModule := newAgreement( - con.ID, - recv, - newLeaderSelector(validLeader, logger), - con.authModule, - ) - // Hacky way to make agreement module self contained. - recv.agreementModule = agreementModule - recv.changeNotaryTime = dMoment - con.baModules[chainID] = agreementModule - con.receivers[chainID] = recv - } + con.ctx, con.ctxCancel = context.WithCancel(context.Background()) + con.baMgr = newAgreementMgr(con, dMoment) return con } @@ -420,14 +381,27 @@ func (con *Consensus) Run(initBlock *types.Block) { con.logger.Debug("Calling Governance.Configuration", "round", initRound) initConfig := con.gov.Configuration(initRound) // Setup context. - con.ctx, con.ctxCancel = context.WithCancel(context.Background()) con.ccModule.init(initBlock) // TODO(jimmy-dexon): change AppendConfig to add config for specific round. - for i := uint64(0); i <= initRound; i++ { - con.logger.Debug("Calling Governance.Configuration", "round", i+1) - cfg := con.gov.Configuration(i + 1) - if err := con.lattice.AppendConfig(i+1, cfg); err != nil { - panic(err) + for i := uint64(0); i <= initRound+1; i++ { + con.logger.Debug("Calling Governance.Configuration", "round", i) + cfg := con.gov.Configuration(i) + // 0 round is already given to core.Lattice module when constructing. + if i > 0 { + if err := con.lattice.AppendConfig(i, cfg); err != nil { + panic(err) + } + } + // Corresponding CRS might not be ready for next round to initRound. + if i < initRound+1 { + con.logger.Debug("Calling Governance.CRS", "round", i) + crs := con.gov.CRS(i) + if (crs == common.Hash{}) { + panic(ErrCRSNotReady) + } + if err := con.baMgr.appendConfig(i, cfg, crs); err != nil { + panic(err) + } } } dkgSet, err := con.nodeSetCache.GetDKGSet(initRound) @@ -447,103 +421,9 @@ func (con *Consensus) Run(initBlock *types.Block) { }) } con.initialRound(con.dMoment, initRound, initConfig) - ticks := make([]chan struct{}, 0, initConfig.NumChains) - for i := uint32(0); i < initConfig.NumChains; i++ { - tick := make(chan struct{}) - ticks = append(ticks, tick) - // TODO(jimmy-dexon): this is a temporary solution to offset BA time. - // The complelete solution should be delivered along with config change. - offset := time.Duration(i*uint32(4)/initConfig.NumChains) * - initConfig.LambdaBA - go func(chainID uint32, offset time.Duration) { - time.Sleep(offset) - con.runBA(chainID, tick) - }(i, offset) - } - - // Reset ticker. - <-con.tickerObj.Tick() - <-con.tickerObj.Tick() - for { - <-con.tickerObj.Tick() - for _, tick := range ticks { - select { - case tick <- struct{}{}: - default: - } - } - } -} - -func (con *Consensus) runBA(chainID uint32, tick <-chan struct{}) { - // TODO(jimmy-dexon): move this function inside agreement. - agreement := con.baModules[chainID] - recv := con.receivers[chainID] - recv.restartNotary <- true - nIDs := make(map[types.NodeID]struct{}) - crs := common.Hash{} - // Reset ticker - <-tick -BALoop: - for { - select { - case <-con.ctx.Done(): - break BALoop - default: - } - select { - case newNotary := <-recv.restartNotary: - if newNotary { - con.logger.Debug("Calling Governance.CRS", "round", recv.round) - crs = con.gov.CRS(recv.round) - if (crs == common.Hash{}) { - // Governance is out-of-sync. - continue BALoop - } - configForNewRound := con.gov.Configuration(recv.round) - recv.changeNotaryTime = - recv.changeNotaryTime.Add(configForNewRound.RoundInterval) - nodes, err := con.nodeSetCache.GetNodeSet(recv.round) - if err != nil { - panic(err) - } - con.logger.Debug("Calling Governance.Configuration", - "round", recv.round) - nIDs = nodes.GetSubSet( - int(configForNewRound.NotarySetSize), - types.NewNotarySetTarget(crs, chainID)) - } - nextPos := con.lattice.NextPosition(chainID) - nextPos.Round = recv.round - agreement.restart(nIDs, nextPos, crs) - default: - } - if agreement.pullVotes() { - pos := agreement.agreementID() - con.logger.Debug("Calling Network.PullVotes for syncing votes", - "position", &pos) - con.network.PullVotes(pos) - } - err := agreement.nextState() - if err != nil { - con.logger.Error("Failed to proceed to next state", - "nodeID", con.ID.String(), - "error", err) - break BALoop - } - for i := 0; i < agreement.clocks(); i++ { - // Priority select for agreement.done(). - select { - case <-agreement.done(): - continue BALoop - default: - } - select { - case <-agreement.done(): - continue BALoop - case <-tick: - } - } + // Block until done. + select { + case <-con.ctx.Done(): } } @@ -622,6 +502,7 @@ func (con *Consensus) initialRound( con.logger.Error("Error getting DKG set", "round", round, "error", err) curDkgSet = make(map[types.NodeID]struct{}) } + // Initiate CRS routine. if _, exist := curDkgSet[con.ID]; exist { con.event.RegisterTime(startTime.Add(config.RoundInterval/2), func(time.Time) { @@ -630,7 +511,31 @@ func (con *Consensus) initialRound( }() }) } - + // Initiate BA modules. + con.event.RegisterTime( + startTime.Add(config.RoundInterval/2+config.LambdaDKG), + func(time.Time) { + go func(nextRound uint64) { + for (con.gov.CRS(nextRound) == common.Hash{}) { + con.logger.Info("CRS is not ready yet. Try again later...", + "nodeID", con.ID, + "round", nextRound) + time.Sleep(500 * time.Millisecond) + } + // Notify BA for new round. + con.logger.Debug("Calling Governance.Configuration", + "round", nextRound) + nextConfig := con.gov.Configuration(nextRound) + con.logger.Debug("Calling Governance.CRS", + "round", nextRound) + nextCRS := con.gov.CRS(nextRound) + if err := con.baMgr.appendConfig( + nextRound, nextConfig, nextCRS); err != nil { + panic(err) + } + }(round + 1) + }) + // Initiate DKG for this round. con.event.RegisterTime(startTime.Add(config.RoundInterval/2+config.LambdaDKG), func(time.Time) { go func(nextRound uint64) { @@ -670,6 +575,7 @@ func (con *Consensus) initialRound( }) }(round + 1) }) + // Prepare lattice module for next round and next "initialRound" routine. con.event.RegisterTime(startTime.Add(config.RoundInterval), func(time.Time) { // Change round. @@ -685,9 +591,7 @@ func (con *Consensus) initialRound( // Stop the Consensus core. func (con *Consensus) Stop() { - for _, a := range con.baModules { - a.stop() - } + con.baMgr.stop() con.event.Reset() con.ctxCancel() } @@ -785,9 +689,10 @@ func (con *Consensus) proposeBlock(chainID uint32, round uint64) *types.Block { } func (con *Consensus) proposeEmptyBlock( - chainID uint32) (*types.Block, error) { + round uint64, chainID uint32) (*types.Block, error) { block := &types.Block{ Position: types.Position{ + Round: round, ChainID: chainID, }, } @@ -799,15 +704,9 @@ func (con *Consensus) proposeEmptyBlock( // ProcessVote is the entry point to submit ont vote to a Consensus instance. func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { - if vote.Position.ChainID >= uint32(len(con.baModules)) { - return nil - } - if isStop(con.baModules[vote.Position.ChainID].agreementID()) { - return nil - } v := vote.Clone() - err = con.baModules[v.Position.ChainID].processVote(v) - return err + err = con.baMgr.processVote(v) + return } // ProcessAgreementResult processes the randomness request. @@ -826,8 +725,14 @@ func (con *Consensus) ProcessAgreementResult( return ErrIncorrectVoteProposer } for _, vote := range rand.Votes { - if vote.BlockHash != rand.BlockHash { - return ErrIncorrectVoteBlockHash + if rand.IsEmptyBlock { + if (vote.BlockHash != common.Hash{}) { + return ErrIncorrectVoteBlockHash + } + } else { + if vote.BlockHash != rand.BlockHash { + return ErrIncorrectVoteBlockHash + } } if vote.Type != types.VoteCom { return ErrIncorrectVoteType @@ -847,29 +752,8 @@ func (con *Consensus) ProcessAgreementResult( } } // Syncing BA Module. - agreement := con.baModules[rand.Position.ChainID] - aID := agreement.agreementID() - if isStop(aID) { - return nil - } - if rand.Position.Newer(&aID) { - con.logger.Info("Syncing BA", "position", &rand.Position) - nodes, err := con.nodeSetCache.GetNodeSet(rand.Position.Round) - if err != nil { - return err - } - con.logger.Debug("Calling Network.PullBlocks for syncing BA", - "hash", rand.BlockHash) - con.network.PullBlocks(common.Hashes{rand.BlockHash}) - con.logger.Debug("Calling Governance.CRS", "round", rand.Position.Round) - crs := con.gov.CRS(rand.Position.Round) - nIDs := nodes.GetSubSet( - int(con.gov.Configuration(rand.Position.Round).NotarySetSize), - types.NewNotarySetTarget(crs, rand.Position.ChainID)) - for _, vote := range rand.Votes { - agreement.processVote(&vote) - } - agreement.restart(nIDs, rand.Position, crs) + if err := con.baMgr.processAgreementResult(rand); err != nil { + return err } // Calculating randomness. if rand.Position.Round == 0 { @@ -950,9 +834,7 @@ func (con *Consensus) ProcessBlockRandomnessResult( // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { - if err = con.baModules[b.Position.ChainID].processBlock(b); err != nil { - return err - } + err = con.baMgr.processBlock(b) return } @@ -1064,9 +946,15 @@ func (con *Consensus) prepareBlock(b *types.Block, if err = con.lattice.PrepareBlock(b, proposeTime); err != nil { return } - con.logger.Debug("Calling Governance.CRS", "round", 0) - if err = - con.authModule.SignCRS(b, con.gov.CRS(b.Position.Round)); err != nil { + con.logger.Debug("Calling Governance.CRS", "round", b.Position.Round) + crs := con.gov.CRS(b.Position.Round) + if crs.Equal(common.Hash{}) { + con.logger.Error("CRS for round is not ready, unable to prepare block", + "position", &b.Position) + err = ErrCRSNotReady + return + } + if err = con.authModule.SignCRS(b, crs); err != nil { return } return diff --git a/core/consensus_test.go b/core/consensus_test.go index 802182a..f5f1182 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -187,24 +187,18 @@ func (s *ConsensusTestSuite) prepareConsensus( dMoment, app, gov, db, network, prvKey, &common.NullLogger{}) con.ccModule.init(&types.Block{}) conn.setCon(nID, con) - round := uint64(0) - nodes, err := con.nodeSetCache.GetNodeSet(round) - s.Require().NoError(err) - for i, agreement := range con.baModules { - chainID := uint32(i) - nIDs := nodes.GetSubSet( - int(gov.Configuration(round).NotarySetSize), - types.NewNotarySetTarget( - gov.CRS(round), chainID)) - agreement.restart(nIDs, types.Position{ - Round: round, - ChainID: chainID, - Height: uint64(0), - }, gov.CRS(round)) - } return app, con } +func (s *ConsensusTestSuite) prepareAgreementMgrWithoutRunning( + con *Consensus, numChains uint32) { + // This is a workaround to setup agreementMgr. + con.baMgr.appendConfig(0, &types.Config{ + NumChains: numChains, + RoundInterval: time.Hour, + }, common.NewRandomHash()) +} + func (s *ConsensusTestSuite) TestSimpleDeliverBlock() { // This test scenario: // o o o o <- this layer makes older blocks strongly acked. @@ -453,6 +447,7 @@ func (s *ConsensusTestSuite) TestPrepareBlock() { cons := map[types.NodeID]*Consensus{} for _, key := range prvKeys { _, con := s.prepareConsensus(dMoment, gov, key, conn) + s.prepareAgreementMgrWithoutRunning(con, 4) nID := types.NewNodeID(key.PublicKey()) cons[nID] = con nodes = append(nodes, nID) @@ -492,6 +487,7 @@ func (s *ConsensusTestSuite) TestPrepareGenesisBlock() { s.Require().NoError(err) prvKey := prvKeys[0] _, con := s.prepareConsensus(time.Now().UTC(), gov, prvKey, conn) + s.prepareAgreementMgrWithoutRunning(con, 4) block := &types.Block{ Position: types.Position{ChainID: 0}, } @@ -547,13 +543,15 @@ func (s *ConsensusTestSuite) TestDKGCRS() { } func (s *ConsensusTestSuite) TestSyncBA() { + lambdaBA := time.Second conn := s.newNetworkConnection() prvKeys, pubKeys, err := test.NewKeys(4) s.Require().NoError(err) - gov, err := test.NewGovernance(pubKeys, time.Second, ConfigRoundShift) + gov, err := test.NewGovernance(pubKeys, lambdaBA, ConfigRoundShift) s.Require().NoError(err) prvKey := prvKeys[0] _, con := s.prepareConsensus(time.Now().UTC(), gov, prvKey, conn) + go con.Run(&types.Block{}) hash := common.NewRandomHash() auths := make([]*Authenticator, 0, len(prvKeys)) for _, prvKey := range prvKeys { @@ -574,8 +572,13 @@ func (s *ConsensusTestSuite) TestSyncBA() { s.Require().NoError(auth.SignVote(vote)) baResult.Votes = append(baResult.Votes, *vote) } + // Make sure each agreement module is running. ProcessAgreementResult only + // works properly when agreement module is running: + // - the bias for round begin time would be 4 * lambda. + // - the ticker is 1 lambdaa. + time.Sleep(5 * lambdaBA) s.Require().NoError(con.ProcessAgreementResult(baResult)) - aID := con.baModules[0].agreementID() + aID := con.baMgr.baModules[0].agreementID() s.Equal(pos, aID) // Test negative case. diff --git a/core/lattice-data.go b/core/lattice-data.go index 6fe810a..f1ab2de 100644 --- a/core/lattice-data.go +++ b/core/lattice-data.go @@ -362,6 +362,35 @@ func (data *latticeData) addFinalizedBlock(block *types.Block) (err error) { return } +// isBindTip checks if a block's fields should follow up its parent block. +func (data *latticeData) isBindTip( + pos types.Position, tip *types.Block) (bindTip bool, err error) { + if tip == nil { + return + } + if pos.Round < tip.Position.Round { + err = ErrInvalidRoundID + return + } + tipConfig := data.getConfig(tip.Position.Round) + if tip.Timestamp.After(tipConfig.roundEndTime) { + if pos.Round == tip.Position.Round { + err = ErrRoundNotSwitch + return + } + if pos.Round == tip.Position.Round+1 { + bindTip = true + } + } else { + if pos.Round != tip.Position.Round { + err = ErrInvalidRoundID + return + } + bindTip = true + } + return +} + // prepareBlock setups fields of a block based on its ChainID and Round, // including: // - Acks @@ -375,7 +404,6 @@ func (data *latticeData) prepareBlock(b *types.Block) error { config *latticeDataConfig acks common.Hashes bindTip bool - chainTip *types.Block ) if config = data.getConfig(b.Position.Round); config == nil { return ErrUnknownRoundID @@ -388,30 +416,16 @@ func (data *latticeData) prepareBlock(b *types.Block) error { b.Position.Height = 0 b.ParentHash = common.Hash{} // Decide valid timestamp range. - homeChain := data.chains[b.Position.ChainID] - if homeChain.tip != nil { - chainTip = homeChain.tip - if b.Position.Round < chainTip.Position.Round { - return ErrInvalidRoundID - } - chainTipConfig := data.getConfig(chainTip.Position.Round) - if chainTip.Timestamp.After(chainTipConfig.roundEndTime) { - if b.Position.Round == chainTip.Position.Round { - return ErrRoundNotSwitch - } - if b.Position.Round == chainTip.Position.Round+1 { - bindTip = true - } - } else { - if b.Position.Round != chainTip.Position.Round { - return ErrInvalidRoundID - } - bindTip = true - } + chainTip := data.chains[b.Position.ChainID].tip + if chainTip != nil { // TODO(mission): find a way to prevent us to assign a witness height // from Jurassic period. b.Witness.Height = chainTip.Witness.Height } + bindTip, err := data.isBindTip(b.Position, chainTip) + if err != nil { + return err + } // For blocks with continuous round ID, assign timestamp range based on // parent block and bound config. if bindTip { @@ -461,40 +475,48 @@ func (data *latticeData) prepareBlock(b *types.Block) error { // - ParentHash and Height from parent block. If there is no valid parent block // (ex. Newly added chain or bootstrap), these fields would be setup as // genesis block. -func (data *latticeData) prepareEmptyBlock(b *types.Block) { +func (data *latticeData) prepareEmptyBlock(b *types.Block) (err error) { // emptyBlock has no proposer. b.ProposerID = types.NodeID{} - var acks common.Hashes // Reset fields to make sure we got these information from parent block. b.Position.Height = 0 - b.Position.Round = 0 b.ParentHash = common.Hash{} b.Timestamp = time.Time{} // Decide valid timestamp range. - homeChain := data.chains[b.Position.ChainID] - if homeChain.tip != nil { - chainTip := homeChain.tip + config := data.getConfig(b.Position.Round) + chainTip := data.chains[b.Position.ChainID].tip + bindTip, err := data.isBindTip(b.Position, chainTip) + if err != nil { + return + } + if bindTip { b.ParentHash = chainTip.Hash - chainTipConfig := data.getConfig(chainTip.Position.Round) - if chainTip.Timestamp.After(chainTipConfig.roundEndTime) { - b.Position.Round = chainTip.Position.Round + 1 - } else { - b.Position.Round = chainTip.Position.Round - } b.Position.Height = chainTip.Position.Height + 1 - b.Timestamp = chainTip.Timestamp.Add(chainTipConfig.minBlockTimeInterval) + b.Timestamp = chainTip.Timestamp.Add(config.minBlockTimeInterval) b.Witness.Height = chainTip.Witness.Height b.Witness.Data = make([]byte, len(chainTip.Witness.Data)) copy(b.Witness.Data, chainTip.Witness.Data) - acks = append(acks, chainTip.Hash) + b.Acks = common.NewSortedHashes(common.Hashes{chainTip.Hash}) + } else { + b.Timestamp = config.roundBeginTime } - b.Acks = common.NewSortedHashes(acks) + return } // TODO(mission): make more abstraction for this method. // nextHeight returns the next height of a chain. -func (data *latticeData) nextPosition(chainID uint32) types.Position { - return data.chains[chainID].nextPosition() +func (data *latticeData) nextHeight( + round uint64, chainID uint32) (uint64, error) { + chainTip := data.chains[chainID].tip + bindTip, err := data.isBindTip( + types.Position{Round: round, ChainID: chainID}, chainTip) + if err != nil { + return 0, err + } + if bindTip { + return chainTip.Position.Height + 1, nil + } + return 0, nil } // findBlock seeks blocks in memory or db. @@ -609,21 +631,6 @@ func (s *chainStatus) addBlock(b *types.Block) { s.tip = b } -// TODO(mission): change back to nextHeight. -// nextPosition returns a valid position for new block in this chain. -func (s *chainStatus) nextPosition() types.Position { - if s.tip == nil { - return types.Position{ - ChainID: s.ID, - Height: 0, - } - } - return types.Position{ - ChainID: s.ID, - Height: s.tip.Position.Height + 1, - } -} - // purgeBlock purges a block from cache, make sure this block is already saved // in blockdb. func (s *chainStatus) purgeBlock(b *types.Block) error { diff --git a/core/lattice-data_test.go b/core/lattice-data_test.go index 24f45e6..e939c81 100644 --- a/core/lattice-data_test.go +++ b/core/lattice-data_test.go @@ -550,11 +550,13 @@ func (s *LatticeDataTestSuite) TestPrepareBlock() { req.Equal(b01.Position.Height, uint64(1)) } -func (s *LatticeDataTestSuite) TestNextPosition() { - // Test 'NextPosition' method when lattice is ready. +func (s *LatticeDataTestSuite) TestNextHeight() { + // Test 'NextHeight' method when lattice is ready. data, _ := s.genTestCase1() - s.Equal(data.nextPosition(0), types.Position{ChainID: 0, Height: 4}) - // Test 'NextPosition' method when lattice is empty. + h, err := data.nextHeight(0, 0) + s.Require().NoError(err) + s.Require().Equal(h, uint64(4)) + // Test 'NextHeight' method when lattice is empty. // Setup a configuration that no restriction on block interval and // round cutting. genesisConfig := &types.Config{ @@ -563,7 +565,9 @@ func (s *LatticeDataTestSuite) TestNextPosition() { MinBlockInterval: 1 * time.Second, } data = newLatticeData(nil, time.Now().UTC(), 0, genesisConfig) - s.Equal(data.nextPosition(0), types.Position{ChainID: 0, Height: 0}) + h, err = data.nextHeight(0, 0) + s.Require().NoError(err) + s.Require().Equal(h, uint64(0)) } func (s *LatticeDataTestSuite) TestPrepareEmptyBlock() { diff --git a/core/lattice.go b/core/lattice.go index 7b66bd5..402a468 100644 --- a/core/lattice.go +++ b/core/lattice.go @@ -41,7 +41,6 @@ type Lattice struct { pool blockPool retryAdd bool data *latticeData - toSyncer *totalOrderingSyncer toModule *totalOrdering ctModule *consensusTimestamp logger common.Logger @@ -65,7 +64,6 @@ func NewLattice( debug: debug, pool: newBlockPool(cfg.NumChains), data: newLatticeData(db, dMoment, round, cfg), - toSyncer: newTotalOrderingSyncer(cfg.NumChains), toModule: newTotalOrdering(dMoment, cfg), ctModule: newConsensusTimestamp(dMoment, round, cfg.NumChains), logger: logger, @@ -102,7 +100,9 @@ func (l *Lattice) PrepareBlock( func (l *Lattice) PrepareEmptyBlock(b *types.Block) (err error) { l.lock.RLock() defer l.lock.RUnlock() - l.data.prepareEmptyBlock(b) + if err = l.data.prepareEmptyBlock(b); err != nil { + return + } if b.Hash, err = hashBlock(b); err != nil { return } @@ -237,40 +237,37 @@ func (l *Lattice) ProcessBlock( return } - for _, blockToSyncer := range inLattice { - toTotalOrdering := l.toSyncer.processBlock(blockToSyncer) - // Perform total ordering for each block added to lattice. - for _, b = range toTotalOrdering { - toDelivered, deliveredMode, err = l.toModule.processBlock(b) - if err != nil { - // All errors from total ordering is serious, should panic. - panic(err) - } - if len(toDelivered) == 0 { - continue - } - hashes := make(common.Hashes, len(toDelivered)) - for idx := range toDelivered { - hashes[idx] = toDelivered[idx].Hash - } - if l.debug != nil { - l.debug.TotalOrderingDelivered(hashes, deliveredMode) - } - // Perform consensus timestamp module. - if err = l.ctModule.processBlocks(toDelivered); err != nil { - return - } - delivered = append(delivered, toDelivered...) + for _, b = range inLattice { + toDelivered, deliveredMode, err = l.toModule.processBlock(b) + if err != nil { + // All errors from total ordering is serious, should panic. + panic(err) + } + if len(toDelivered) == 0 { + continue + } + hashes := make(common.Hashes, len(toDelivered)) + for idx := range toDelivered { + hashes[idx] = toDelivered[idx].Hash + } + if l.debug != nil { + l.debug.TotalOrderingDelivered(hashes, deliveredMode) + } + // Perform consensus timestamp module. + if err = l.ctModule.processBlocks(toDelivered); err != nil { + return } + delivered = append(delivered, toDelivered...) } return } -// NextPosition returns expected position of incoming block for specified chain. -func (l *Lattice) NextPosition(chainID uint32) types.Position { +// NextHeight returns expected height of incoming block for specified chain and +// given round. +func (l *Lattice) NextHeight(round uint64, chainID uint32) (uint64, error) { l.lock.RLock() defer l.lock.RUnlock() - return l.data.nextPosition(chainID) + return l.data.nextHeight(round, chainID) } // PurgeBlocks purges blocks' cache in memory, this is called when the caller @@ -308,5 +305,4 @@ func (l *Lattice) ProcessFinalizedBlock(b *types.Block) { panic(err) } l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) - l.toSyncer.processFinalizedBlock(b) } diff --git a/core/test/app.go b/core/test/app.go index a5d0270..e67d5c9 100644 --- a/core/test/app.go +++ b/core/test/app.go @@ -198,6 +198,9 @@ func (app *App) GetLatestDeliveredPosition() types.Position { defer app.deliveredLock.RUnlock() app.blocksLock.RLock() defer app.blocksLock.RUnlock() + if len(app.DeliverSequence) == 0 { + return types.Position{} + } return app.blocks[app.DeliverSequence[len(app.DeliverSequence)-1]].Position } diff --git a/core/test/governance.go b/core/test/governance.go index f96e9e7..14c8177 100644 --- a/core/test/governance.go +++ b/core/test/governance.go @@ -33,6 +33,9 @@ import ( typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" ) +// TODO(mission): add a method to compare config/crs between governance +// instances. + // Governance is an implementation of Goverance for testing purpose. type Governance struct { roundShift uint64 @@ -105,6 +108,13 @@ func (g *Governance) NotifyRoundHeight(round, height uint64) { func() { g.lock.Lock() defer g.lock.Unlock() + // Check if there is any pending changes for previous rounds. + for r := range g.pendingConfigChanges { + if r < shiftedRound+1 { + panic(fmt.Errorf("pending change no longer applied: %v, now: %v", + r, shiftedRound+1)) + } + } for t, v := range g.pendingConfigChanges[shiftedRound+1] { if err := g.stateModule.RequestChange(t, v); err != nil { panic(err) @@ -346,7 +356,7 @@ func (g *Governance) RegisterConfigChange( } g.lock.Lock() defer g.lock.Unlock() - if round <= uint64(len(g.configs)) { + if round < uint64(len(g.configs)) { return errors.New( "attempt to register state change for prepared rounds") } diff --git a/core/test/governance_test.go b/core/test/governance_test.go index 07b0d46..01993f9 100644 --- a/core/test/governance_test.go +++ b/core/test/governance_test.go @@ -77,13 +77,13 @@ func (s *GovernanceTestSuite) TestRegisterChange() { req.Equal(g.Configuration(4).NumChains, uint32(20)) // Unable to register change for prepared round. req.Error(g.RegisterConfigChange(4, StateChangeNumChains, uint32(32))) - // Unable to register change for next notified round. - req.Error(g.RegisterConfigChange(5, StateChangeNumChains, uint32(32))) // It's ok to make some change when condition is met. + req.NoError(g.RegisterConfigChange(5, StateChangeNumChains, uint32(32))) req.NoError(g.RegisterConfigChange(6, StateChangeNumChains, uint32(32))) req.NoError(g.RegisterConfigChange(7, StateChangeNumChains, uint32(40))) // In local mode, state for round 6 would be ready after notified with - // round 5. + // round 2. + g.NotifyRoundHeight(2, 0) g.NotifyRoundHeight(3, 0) // In local mode, state for round 7 would be ready after notified with // round 6. diff --git a/core/types/block-randomness.go b/core/types/block-randomness.go index 6df245b..1c64d4a 100644 --- a/core/types/block-randomness.go +++ b/core/types/block-randomness.go @@ -25,9 +25,10 @@ import ( // AgreementResult describes an agremeent result. type AgreementResult struct { - BlockHash common.Hash `json:"block_hash"` - Position Position `json:"position"` - Votes []Vote `json:"votes"` + BlockHash common.Hash `json:"block_hash"` + Position Position `json:"position"` + Votes []Vote `json:"votes"` + IsEmptyBlock bool `json:"is_empty_block"` } func (r *AgreementResult) String() string { diff --git a/core/utils.go b/core/utils.go index f6be461..4e9cfdc 100644 --- a/core/utils.go +++ b/core/utils.go @@ -20,6 +20,7 @@ package core import ( "errors" "fmt" + "math/rand" "os" "sort" "time" @@ -114,6 +115,11 @@ func removeFromSortedUint32Slice(xs []uint32, x uint32) []uint32 { return append(xs[:indexToRemove], xs[indexToRemove+1:]...) } +// pickBiasedTime returns a biased time based on a given range. +func pickBiasedTime(base time.Time, biasedRange time.Duration) time.Time { + return base.Add(time.Duration(rand.Intn(int(biasedRange)))) +} + // HashConfigurationBlock returns the hash value of configuration block. func HashConfigurationBlock( notarySet map[types.NodeID]struct{}, diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go index 6bc6c4b..8fd3fa4 100644 --- a/integration_test/consensus_test.go +++ b/integration_test/consensus_test.go @@ -95,6 +95,18 @@ func (s *ConsensusTestSuite) setupNodes( return nodes } +func (s *ConsensusTestSuite) verifyNodes(nodes map[types.NodeID]*node) { + for ID, node := range nodes { + s.Require().NoError(node.app.Verify()) + for otherID, otherNode := range nodes { + if ID == otherID { + continue + } + s.Require().NoError(node.app.Compare(otherNode.app)) + } + } +} + func (s *ConsensusTestSuite) TestSimple() { // The simplest test case: // - Node set is equals to DKG set and notary set for each chain in each @@ -108,15 +120,18 @@ func (s *ConsensusTestSuite) TestSimple() { dMoment = time.Now().UTC() untilRound = uint64(5) ) + if testing.Short() { + untilRound = 2 + } prvKeys, pubKeys, err := test.NewKeys(peerCount) req.NoError(err) // Setup seed governance instance. Give a short latency to make this test // run faster. seedGov, err := test.NewGovernance( - pubKeys, 30*time.Millisecond, core.ConfigRoundShift) + pubKeys, 100*time.Millisecond, core.ConfigRoundShift) req.NoError(err) req.NoError(seedGov.State().RequestChange( - test.StateChangeRoundInterval, 25*time.Second)) + test.StateChangeRoundInterval, 50*time.Second)) // A short round interval. nodes := s.setupNodes(dMoment, prvKeys, seedGov) for _, n := range nodes { @@ -136,6 +151,79 @@ Loop: // Oh ya. break } + s.verifyNodes(nodes) +} + +func (s *ConsensusTestSuite) TestNumChainsChange() { + var ( + req = s.Require() + peerCount = 4 + dMoment = time.Now().UTC() + untilRound = uint64(6) + ) + if testing.Short() { + // Short test won't test configuration change packed as payload of + // blocks and applied when delivered. + untilRound = 5 + } + prvKeys, pubKeys, err := test.NewKeys(peerCount) + req.NoError(err) + // Setup seed governance instance. + seedGov, err := test.NewGovernance( + pubKeys, 100*time.Millisecond, core.ConfigRoundShift) + req.NoError(err) + // Setup configuration for round 0 and round 1. + req.NoError(seedGov.State().RequestChange( + test.StateChangeRoundInterval, 45*time.Second)) + seedGov.CatchUpWithRound(1) + // Setup configuration for round 2. + req.NoError(seedGov.State().RequestChange( + test.StateChangeNumChains, uint32(5))) + req.NoError(seedGov.State().RequestChange( + test.StateChangeRoundInterval, 55*time.Second)) + seedGov.CatchUpWithRound(2) + // Setup configuration for round 3. + req.NoError(seedGov.State().RequestChange( + test.StateChangeNumChains, uint32(6))) + req.NoError(seedGov.State().RequestChange( + test.StateChangeRoundInterval, 75*time.Second)) + seedGov.CatchUpWithRound(3) + // Setup nodes. + nodes := s.setupNodes(dMoment, prvKeys, seedGov) + // Pick master node, and register changes on it. + var pickedNode *node + for _, pickedNode = range nodes { + break + } + // Register configuration changes for round 4. + req.NoError(pickedNode.gov.RegisterConfigChange( + 4, test.StateChangeNumChains, uint32(4))) + req.NoError(pickedNode.gov.RegisterConfigChange( + 4, test.StateChangeRoundInterval, 45*time.Second)) + // Register configuration changes for round 5. + req.NoError(pickedNode.gov.RegisterConfigChange( + 5, test.StateChangeNumChains, uint32(5))) + req.NoError(pickedNode.gov.RegisterConfigChange( + 5, test.StateChangeRoundInterval, 55*time.Second)) + // Run test. + for _, n := range nodes { + go n.con.Run(&types.Block{}) + } +Loop: + for { + <-time.After(5 * time.Second) + s.T().Log("check latest position delivered by each node") + for _, n := range nodes { + latestPos := n.app.GetLatestDeliveredPosition() + s.T().Log("latestPos", n.con.ID, &latestPos) + if latestPos.Round < untilRound { + continue Loop + } + } + // Oh ya. + break + } + s.verifyNodes(nodes) } func TestConsensus(t *testing.T) { diff --git a/integration_test/with_scheduler_test.go b/integration_test/with_scheduler_test.go index a9f229a..6a94f01 100644 --- a/integration_test/with_scheduler_test.go +++ b/integration_test/with_scheduler_test.go @@ -112,28 +112,28 @@ func (s *WithSchedulerTestSuite) TestConfigurationChange() { for _, pickedNode = range nodes { break } - // Config changes for round 4, numChains from 4 to 7. + // Config changes for round 5, numChains from 4 to 7. req.NoError(pickedNode.gov().RegisterConfigChange( - 4, test.StateChangeNumChains, uint32(7))) + 5, test.StateChangeNumChains, uint32(7))) req.NoError(pickedNode.gov().RegisterConfigChange( - 4, test.StateChangeK, 3)) + 5, test.StateChangeK, 3)) req.NoError(pickedNode.gov().RegisterConfigChange( - 4, test.StateChangePhiRatio, float32(0.5))) - // Config changes for round 5, numChains from 7 to 9. + 5, test.StateChangePhiRatio, float32(0.5))) + // Config changes for round 6, numChains from 7 to 9. req.NoError(pickedNode.gov().RegisterConfigChange( - 5, test.StateChangeNumChains, maxNumChains)) + 6, test.StateChangeNumChains, maxNumChains)) req.NoError(pickedNode.gov().RegisterConfigChange( - 5, test.StateChangeK, 0)) - // Config changes for round 6, numChains from 9 to 7. + 6, test.StateChangeK, 0)) + // Config changes for round 7, numChains from 9 to 7. req.NoError(pickedNode.gov().RegisterConfigChange( - 6, test.StateChangeNumChains, uint32(7))) + 7, test.StateChangeNumChains, uint32(7))) req.NoError(pickedNode.gov().RegisterConfigChange( - 6, test.StateChangeK, 1)) - // Config changes for round 6, numChains from 7 to 5. + 7, test.StateChangeK, 1)) + // Config changes for round 8, numChains from 7 to 5. req.NoError(pickedNode.gov().RegisterConfigChange( - 7, test.StateChangeNumChains, uint32(5))) + 8, test.StateChangeNumChains, uint32(5))) req.NoError(pickedNode.gov().RegisterConfigChange( - 7, test.StateChangeK, 1)) + 8, test.StateChangeK, 1)) // Perform test. sch.Run(4) // Check results by comparing test.App instances. diff --git a/simulation/simulation.go b/simulation/simulation.go index 4e97900..dcb7225 100644 --- a/simulation/simulation.go +++ b/simulation/simulation.go @@ -44,7 +44,7 @@ func Run(cfg *config.Config) { panic(fmt.Errorf("DKGSetSze should not be larger the node num")) } - dMoment := time.Now().UTC().Add(1 * time.Second) + dMoment := time.Now().UTC() // init is a function to init a node. init := func(serverEndpoint interface{}) { |