diff options
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go')
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go | 156 |
1 files changed, 99 insertions, 57 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go index 423174c48..8cb4c2e37 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go @@ -22,11 +22,11 @@ import ( "errors" "math" "sync" - "sync/atomic" "time" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/types" + typesDKG "github.com/dexon-foundation/dexon-consensus/core/types/dkg" "github.com/dexon-foundation/dexon-consensus/core/utils" ) @@ -35,6 +35,7 @@ var ( ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished") ErrRoundOutOfRange = errors.New("round out of range") ErrInvalidBlock = errors.New("invalid block") + ErrNoValidLeader = errors.New("no valid leader") ) const maxResultCache = 100 @@ -89,7 +90,9 @@ func newAgreementMgrConfig(prev agreementMgrConfig, config *types.Config, } type baRoundSetting struct { - notarySet map[types.NodeID]struct{} + round uint64 + dkgSet map[types.NodeID]struct{} + threshold int ticker Ticker crs common.Hash } @@ -132,13 +135,9 @@ func newAgreementMgr(con *Consensus) (mgr *agreementMgr, err error) { voteFilter: utils.NewVoteFilter(), } mgr.recv = &consensusBAReceiver{ - consensus: con, - restartNotary: make(chan types.Position, 1), - roundValue: &atomic.Value{}, - changeNotaryHeightValue: &atomic.Value{}, + consensus: con, + restartNotary: make(chan types.Position, 1), } - mgr.recv.updateRound(uint64(0)) - mgr.recv.changeNotaryHeightValue.Store(uint64(0)) return mgr, nil } @@ -177,6 +176,19 @@ func (mgr *agreementMgr) run() { }() } +func (mgr *agreementMgr) calcLeader( + dkgSet map[types.NodeID]struct{}, + crs common.Hash, pos types.Position) ( + types.NodeID, error) { + nodeSet := types.NewNodeSetFromMap(dkgSet) + leader := nodeSet.GetSubSet(1, types.NewNodeLeaderTarget( + crs, pos.Height)) + for nID := range leader { + return nID, nil + } + return types.NodeID{}, ErrNoValidLeader +} + func (mgr *agreementMgr) config(round uint64) *agreementMgrConfig { mgr.lock.RLock() defer mgr.lock.RUnlock() @@ -201,13 +213,6 @@ func (mgr *agreementMgr) notifyRoundEvents(evts []utils.RoundEventParam) error { } if lastCfg.RoundID() == e.Round { mgr.configs[len(mgr.configs)-1].ExtendLength() - // It's not an atomic operation to update an atomic value based - // on another. However, it's the best way so far to extend - // length of round without refactoring. - if mgr.recv.round() == e.Round { - mgr.recv.changeNotaryHeightValue.Store( - mgr.configs[len(mgr.configs)-1].RoundEndHeight()) - } } else if lastCfg.RoundID()+1 == e.Round { mgr.configs = append(mgr.configs, newAgreementMgrConfig( lastCfg, e.Config, e.CRS)) @@ -285,10 +290,6 @@ func (mgr *agreementMgr) processAgreementResult( } } else if result.Position.Newer(aID) { mgr.logger.Info("Fast syncing BA", "position", result.Position) - nIDs, err := mgr.cache.GetNotarySet(result.Position.Round) - if err != nil { - return err - } if result.Position.Round < DKGDelayRound { mgr.logger.Debug("Calling Network.PullBlocks for fast syncing BA", "hash", result.BlockHash) @@ -299,13 +300,19 @@ func (mgr *agreementMgr) processAgreementResult( } } } - mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round) - crs := utils.GetCRSWithPanic(mgr.gov, result.Position.Round, mgr.logger) - leader, err := mgr.cache.GetLeaderNode(result.Position) + setting := mgr.generateSetting(result.Position.Round) + if setting == nil { + mgr.logger.Warn("unable to get setting", "round", + result.Position.Round) + return ErrConfigurationNotReady + } + leader, err := mgr.calcLeader(setting.dkgSet, setting.crs, result.Position) if err != nil { return err } - mgr.baModule.restart(nIDs, result.Position, leader, crs) + mgr.baModule.restart( + setting.dkgSet, setting.threshold, + result.Position, leader, setting.crs) if result.Position.Round >= DKGDelayRound { return mgr.baModule.processAgreementResult(result) } @@ -333,57 +340,87 @@ func (mgr *agreementMgr) stop() { mgr.waitGroup.Wait() } +func (mgr *agreementMgr) generateSetting(round uint64) *baRoundSetting { + curConfig := mgr.config(round) + if curConfig == nil { + return nil + } + var dkgSet map[types.NodeID]struct{} + if round >= DKGDelayRound { + _, qualidifed, err := typesDKG.CalcQualifyNodes( + mgr.gov.DKGMasterPublicKeys(round), + mgr.gov.DKGComplaints(round), + utils.GetDKGThreshold(mgr.gov.Configuration(round)), + ) + if err != nil { + mgr.logger.Error("Failed to get gpk", "round", round, "error", err) + return nil + } + dkgSet = qualidifed + } + if len(dkgSet) == 0 { + var err error + dkgSet, err = mgr.cache.GetNotarySet(round) + if err != nil { + mgr.logger.Error("Failed to get notarySet", "round", round) + return nil + } + } + return &baRoundSetting{ + crs: curConfig.crs, + dkgSet: dkgSet, + round: round, + threshold: utils.GetBAThreshold(&types.Config{ + NotarySetSize: curConfig.notarySetSize}), + } +} + func (mgr *agreementMgr) runBA(initRound uint64) { // These are round based variables. var ( currentRound uint64 nextRound = initRound curConfig = mgr.config(initRound) - setting = baRoundSetting{} + setting = &baRoundSetting{} tickDuration time.Duration + ticker Ticker ) // Check if this routine needs to awake in this round and prepare essential // variables when yes. - checkRound := func() (isNotary bool) { + checkRound := func() (isDKG bool) { defer func() { currentRound = nextRound nextRound++ }() // Wait until the configuartion for next round is ready. for { - if curConfig = mgr.config(nextRound); curConfig != nil { + if setting = mgr.generateSetting(nextRound); setting != nil { break } else { mgr.logger.Debug("Round is not ready", "round", nextRound) time.Sleep(1 * time.Second) } } - // Check if this node in notary set of this chain in this round. - notarySet, err := mgr.cache.GetNotarySet(nextRound) - if err != nil { - panic(err) - } - setting.crs = curConfig.crs - setting.notarySet = notarySet - _, isNotary = setting.notarySet[mgr.ID] - if isNotary { - mgr.logger.Info("Selected as notary set", + _, isDKG = setting.dkgSet[mgr.ID] + if isDKG { + mgr.logger.Info("Selected as dkg set", "ID", mgr.ID, "round", nextRound) } else { - mgr.logger.Info("Not selected as notary set", + mgr.logger.Info("Not selected as dkg set", "ID", mgr.ID, "round", nextRound) } // Setup ticker if tickDuration != curConfig.lambdaBA { - if setting.ticker != nil { - setting.ticker.Stop() + if ticker != nil { + ticker.Stop() } - setting.ticker = newTicker(mgr.gov, nextRound, TickerBA) + ticker = newTicker(mgr.gov, nextRound, TickerBA) tickDuration = curConfig.lambdaBA } + setting.ticker = ticker return } Loop: @@ -395,15 +432,25 @@ Loop: } mgr.recv.isNotary = checkRound() // Run BA for this round. - mgr.recv.updateRound(currentRound) - mgr.recv.changeNotaryHeightValue.Store(curConfig.RoundEndHeight()) mgr.recv.restartNotary <- types.Position{ - Round: mgr.recv.round(), + Round: currentRound, Height: math.MaxUint64, } mgr.voteFilter = utils.NewVoteFilter() mgr.recv.emptyBlockHashMap = &sync.Map{} - if err := mgr.baRoutineForOneRound(&setting); err != nil { + if currentRound >= DKGDelayRound && mgr.recv.isNotary { + var err error + mgr.recv.npks, mgr.recv.psigSigner, err = + mgr.con.cfgModule.getDKGInfo(currentRound, false) + if err != nil { + mgr.logger.Warn("cannot get dkg info", + "round", currentRound, "error", err) + } + } else { + mgr.recv.npks = nil + mgr.recv.psigSigner = nil + } + if err := mgr.baRoutineForOneRound(setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, "nodeID", mgr.ID) @@ -419,7 +466,7 @@ func (mgr *agreementMgr) baRoutineForOneRound( oldPos := agr.agreementID() restart := func(restartPos types.Position) (breakLoop bool, err error) { if !isStop(restartPos) { - if restartPos.Round > oldPos.Round { + if restartPos.Height+1 >= mgr.config(setting.round).RoundEndHeight() { for { select { case <-mgr.ctx.Done(): @@ -427,14 +474,12 @@ func (mgr *agreementMgr) baRoutineForOneRound( default: } tipRound := mgr.bcModule.tipRound() - if tipRound > restartPos.Round { - // It's a vary rare that this go routine sleeps for entire round. + if tipRound > setting.round { break - } else if tipRound != restartPos.Round { - mgr.logger.Debug("Waiting blockChain to change round...", - "pos", restartPos) } else { - break + mgr.logger.Debug("Waiting blockChain to change round...", + "curRound", setting.round, + "tipRound", tipRound) } time.Sleep(100 * time.Millisecond) } @@ -459,9 +504,6 @@ func (mgr *agreementMgr) baRoutineForOneRound( default: } nextHeight, nextTime = mgr.bcModule.nextBlock() - if isStop(oldPos) && nextHeight == 0 { - break - } if isStop(restartPos) { break } @@ -473,18 +515,18 @@ func (mgr *agreementMgr) baRoutineForOneRound( time.Sleep(100 * time.Millisecond) } nextPos := types.Position{ - Round: recv.round(), + Round: setting.round, Height: nextHeight, } oldPos = nextPos var leader types.NodeID - leader, err = mgr.cache.GetLeaderNode(nextPos) + leader, err = mgr.calcLeader(setting.dkgSet, setting.crs, nextPos) if err != nil { return } time.Sleep(nextTime.Sub(time.Now())) setting.ticker.Restart() - agr.restart(setting.notarySet, nextPos, leader, setting.crs) + agr.restart(setting.dkgSet, setting.threshold, nextPos, leader, setting.crs) return } Loop: |