diff options
Diffstat (limited to 'vendor/github.com/byzantine-lab/dexon-consensus/core/agreement-mgr.go')
-rw-r--r-- | vendor/github.com/byzantine-lab/dexon-consensus/core/agreement-mgr.go | 676 |
1 files changed, 676 insertions, 0 deletions
diff --git a/vendor/github.com/byzantine-lab/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/byzantine-lab/dexon-consensus/core/agreement-mgr.go new file mode 100644 index 000000000..cdbfadf13 --- /dev/null +++ b/vendor/github.com/byzantine-lab/dexon-consensus/core/agreement-mgr.go @@ -0,0 +1,676 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + +package core + +import ( + "context" + "errors" + "math" + "sync" + "time" + + lru "github.com/hashicorp/golang-lru" + + "github.com/byzantine-lab/dexon-consensus/common" + "github.com/byzantine-lab/dexon-consensus/core/types" + typesDKG "github.com/byzantine-lab/dexon-consensus/core/types/dkg" + "github.com/byzantine-lab/dexon-consensus/core/utils" +) + +// Errors returned from BA modules +var ( + ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished") + ErrRoundOutOfRange = errors.New("round out of range") + ErrInvalidBlock = errors.New("invalid block") + ErrNoValidLeader = errors.New("no valid leader") + ErrIncorrectCRSSignature = errors.New("incorrect CRS signature") + ErrBlockTooOld = errors.New("block too old") +) + +const maxResultCache = 100 +const settingLimit = 3 + +// genValidLeader generate a validLeader function for agreement modules. +func genValidLeader( + mgr *agreementMgr) validLeaderFn { + return func(block *types.Block, crs common.Hash) (bool, error) { + if block.Timestamp.After(time.Now()) { + return false, nil + } + if block.Position.Round >= DKGDelayRound { + if mgr.recv.npks == nil { + return false, nil + } + if block.Position.Round > mgr.recv.npks.Round { + return false, nil + } + if block.Position.Round < mgr.recv.npks.Round { + return false, ErrBlockTooOld + } + } + if !utils.VerifyCRSSignature(block, crs, mgr.recv.npks) { + return false, ErrIncorrectCRSSignature + } + if err := mgr.bcModule.sanityCheck(block); err != nil { + if err == ErrRetrySanityCheckLater { + return false, nil + } + return false, err + } + mgr.logger.Debug("Calling Application.VerifyBlock", "block", block) + switch mgr.app.VerifyBlock(block) { + case types.VerifyInvalidBlock: + return false, ErrInvalidBlock + case types.VerifyRetryLater: + return false, nil + default: + } + return true, nil + } +} + +type agreementMgrConfig struct { + utils.RoundBasedConfig + + notarySetSize uint32 + lambdaBA time.Duration + crs common.Hash +} + +func (c *agreementMgrConfig) from( + round uint64, config *types.Config, crs common.Hash) { + c.notarySetSize = config.NotarySetSize + c.lambdaBA = config.LambdaBA + c.crs = crs + c.SetupRoundBasedFields(round, config) +} + +func newAgreementMgrConfig(prev agreementMgrConfig, config *types.Config, + crs common.Hash) (c agreementMgrConfig) { + c = agreementMgrConfig{} + c.from(prev.RoundID()+1, config, crs) + c.AppendTo(prev.RoundBasedConfig) + return +} + +type baRoundSetting struct { + round uint64 + dkgSet map[types.NodeID]struct{} + threshold int + ticker Ticker + crs common.Hash +} + +type agreementMgr struct { + // TODO(mission): unbound Consensus instance from this module. + con *Consensus + ID types.NodeID + app Application + gov Governance + network Network + logger common.Logger + cache *utils.NodeSetCache + signer *utils.Signer + bcModule *blockChain + ctx context.Context + configs []agreementMgrConfig + baModule *agreement + recv *consensusBAReceiver + processedBAResult map[types.Position]struct{} + voteFilter *utils.VoteFilter + settingCache *lru.Cache + curRoundSetting *baRoundSetting + waitGroup sync.WaitGroup + isRunning bool + lock sync.RWMutex +} + +func newAgreementMgr(con *Consensus) (mgr *agreementMgr, err error) { + settingCache, _ := lru.New(settingLimit) + mgr = &agreementMgr{ + con: con, + ID: con.ID, + app: con.app, + gov: con.gov, + network: con.network, + logger: con.logger, + cache: con.nodeSetCache, + signer: con.signer, + bcModule: con.bcModule, + ctx: con.ctx, + processedBAResult: make(map[types.Position]struct{}, maxResultCache), + voteFilter: utils.NewVoteFilter(), + settingCache: settingCache, + } + mgr.recv = &consensusBAReceiver{ + consensus: con, + restartNotary: make(chan types.Position, 1), + } + return mgr, nil +} + +func (mgr *agreementMgr) prepare() { + round := mgr.bcModule.tipRound() + agr := newAgreement( + mgr.ID, + mgr.recv, + newLeaderSelector(genValidLeader(mgr), mgr.logger), + mgr.signer, + mgr.logger) + setting := mgr.generateSetting(round) + if setting == nil { + mgr.logger.Warn("Unable to prepare init setting", "round", round) + return + } + mgr.curRoundSetting = setting + agr.notarySet = mgr.curRoundSetting.dkgSet + // Hacky way to make agreement module self contained. + mgr.recv.agreementModule = agr + mgr.baModule = agr + if round >= DKGDelayRound { + if _, exist := setting.dkgSet[mgr.ID]; exist { + mgr.logger.Debug("Preparing signer and npks.", "round", round) + npk, signer, err := mgr.con.cfgModule.getDKGInfo(round, false) + if err != nil { + mgr.logger.Error("Failed to prepare signer and npks.", + "round", round, + "error", err) + } + mgr.logger.Debug("Prepared signer and npks.", + "round", round, "signer", signer != nil, "npks", npk != nil) + } + } + return +} + +func (mgr *agreementMgr) run() { + mgr.lock.Lock() + defer mgr.lock.Unlock() + if mgr.isRunning { + return + } + mgr.isRunning = true + mgr.waitGroup.Add(1) + go func() { + defer mgr.waitGroup.Done() + mgr.runBA(mgr.bcModule.tipRound()) + }() +} + +func (mgr *agreementMgr) calcLeader( + dkgSet map[types.NodeID]struct{}, + crs common.Hash, pos types.Position) ( + types.NodeID, error) { + nodeSet := types.NewNodeSetFromMap(dkgSet) + leader := nodeSet.GetSubSet(1, types.NewNodeLeaderTarget( + crs, pos.Height)) + for nID := range leader { + return nID, nil + } + return types.NodeID{}, ErrNoValidLeader +} + +func (mgr *agreementMgr) config(round uint64) *agreementMgrConfig { + mgr.lock.RLock() + defer mgr.lock.RUnlock() + if round < mgr.configs[0].RoundID() { + panic(ErrRoundOutOfRange) + } + roundIndex := round - mgr.configs[0].RoundID() + if roundIndex >= uint64(len(mgr.configs)) { + return nil + } + return &mgr.configs[roundIndex] +} + +func (mgr *agreementMgr) notifyRoundEvents(evts []utils.RoundEventParam) error { + mgr.lock.Lock() + defer mgr.lock.Unlock() + apply := func(e utils.RoundEventParam) error { + if len(mgr.configs) > 0 { + lastCfg := mgr.configs[len(mgr.configs)-1] + if e.BeginHeight != lastCfg.RoundEndHeight() { + return ErrInvalidBlockHeight + } + if lastCfg.RoundID() == e.Round { + mgr.configs[len(mgr.configs)-1].ExtendLength() + } else if lastCfg.RoundID()+1 == e.Round { + mgr.configs = append(mgr.configs, newAgreementMgrConfig( + lastCfg, e.Config, e.CRS)) + } else { + return ErrInvalidRoundID + } + } else { + c := agreementMgrConfig{} + c.from(e.Round, e.Config, e.CRS) + c.SetRoundBeginHeight(e.BeginHeight) + mgr.configs = append(mgr.configs, c) + } + return nil + } + for _, e := range evts { + if err := apply(e); err != nil { + return err + } + } + return nil +} + +func (mgr *agreementMgr) checkProposer( + round uint64, proposerID types.NodeID) error { + if round == mgr.curRoundSetting.round { + if _, exist := mgr.curRoundSetting.dkgSet[proposerID]; !exist { + return ErrNotInNotarySet + } + } else if round == mgr.curRoundSetting.round+1 { + setting := mgr.generateSetting(round) + if setting == nil { + return ErrConfigurationNotReady + } + if _, exist := setting.dkgSet[proposerID]; !exist { + return ErrNotInNotarySet + } + } + return nil +} + +func (mgr *agreementMgr) processVote(v *types.Vote) (err error) { + if !mgr.recv.isNotary { + return nil + } + if mgr.voteFilter.Filter(v) { + return nil + } + if err := mgr.checkProposer(v.Position.Round, v.ProposerID); err != nil { + return err + } + if err = mgr.baModule.processVote(v); err == nil { + mgr.baModule.updateFilter(mgr.voteFilter) + mgr.voteFilter.AddVote(v) + } + if err == ErrSkipButNoError { + err = nil + } + return +} + +func (mgr *agreementMgr) processBlock(b *types.Block) error { + if err := mgr.checkProposer(b.Position.Round, b.ProposerID); err != nil { + return err + } + return mgr.baModule.processBlock(b) +} + +func (mgr *agreementMgr) touchAgreementResult( + result *types.AgreementResult) (first bool) { + // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!! + if _, exist := mgr.processedBAResult[result.Position]; !exist { + first = true + if len(mgr.processedBAResult) > maxResultCache { + for k := range mgr.processedBAResult { + // Randomly drop one element. + delete(mgr.processedBAResult, k) + break + } + } + mgr.processedBAResult[result.Position] = struct{}{} + } + return +} + +func (mgr *agreementMgr) untouchAgreementResult( + result *types.AgreementResult) { + // DO NOT LOCK THIS FUNCTION!!!!!!!! YOU WILL REGRET IT!!!!! + delete(mgr.processedBAResult, result.Position) +} + +func (mgr *agreementMgr) processAgreementResult( + result *types.AgreementResult) error { + aID := mgr.baModule.agreementID() + if isStop(aID) { + return nil + } + if result.Position == aID && !mgr.baModule.confirmed() { + mgr.logger.Info("Syncing BA", "position", result.Position) + if result.Position.Round >= DKGDelayRound { + return mgr.baModule.processAgreementResult(result) + } + for key := range result.Votes { + if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { + return err + } + } + } else if result.Position.Newer(aID) { + mgr.logger.Info("Fast syncing BA", "position", result.Position) + if result.Position.Round < DKGDelayRound { + mgr.logger.Debug("Calling Network.PullBlocks for fast syncing BA", + "hash", result.BlockHash) + mgr.network.PullBlocks(common.Hashes{result.BlockHash}) + for key := range result.Votes { + if err := mgr.baModule.processVote(&result.Votes[key]); err != nil { + return err + } + } + } + setting := mgr.generateSetting(result.Position.Round) + if setting == nil { + mgr.logger.Warn("unable to get setting", "round", + result.Position.Round) + return ErrConfigurationNotReady + } + mgr.curRoundSetting = setting + leader, err := mgr.calcLeader(setting.dkgSet, setting.crs, result.Position) + if err != nil { + return err + } + mgr.baModule.restart( + setting.dkgSet, setting.threshold, + result.Position, leader, setting.crs) + if result.Position.Round >= DKGDelayRound { + return mgr.baModule.processAgreementResult(result) + } + } + return nil +} + +func (mgr *agreementMgr) processFinalizedBlock(block *types.Block) error { + aID := mgr.baModule.agreementID() + if block.Position.Older(aID) { + return nil + } + mgr.baModule.processFinalizedBlock(block) + return nil +} + +func (mgr *agreementMgr) stop() { + // Stop all running agreement modules. + func() { + mgr.lock.Lock() + defer mgr.lock.Unlock() + mgr.baModule.stop() + }() + // Block until all routines are done. + mgr.waitGroup.Wait() +} + +func (mgr *agreementMgr) generateSetting(round uint64) *baRoundSetting { + if setting, exist := mgr.settingCache.Get(round); exist { + return setting.(*baRoundSetting) + } + curConfig := mgr.config(round) + if curConfig == nil { + return nil + } + var dkgSet map[types.NodeID]struct{} + if round >= DKGDelayRound { + _, qualidifed, err := typesDKG.CalcQualifyNodes( + mgr.gov.DKGMasterPublicKeys(round), + mgr.gov.DKGComplaints(round), + utils.GetDKGThreshold(mgr.gov.Configuration(round)), + ) + if err != nil { + mgr.logger.Error("Failed to get gpk", "round", round, "error", err) + return nil + } + dkgSet = qualidifed + } + if len(dkgSet) == 0 { + var err error + dkgSet, err = mgr.cache.GetNotarySet(round) + if err != nil { + mgr.logger.Error("Failed to get notarySet", "round", round, "error", err) + return nil + } + } + setting := &baRoundSetting{ + crs: curConfig.crs, + dkgSet: dkgSet, + round: round, + threshold: utils.GetBAThreshold(&types.Config{ + NotarySetSize: curConfig.notarySetSize}), + } + mgr.settingCache.Add(round, setting) + return setting +} + +func (mgr *agreementMgr) runBA(initRound uint64) { + // These are round based variables. + var ( + currentRound uint64 + nextRound = initRound + curConfig = mgr.config(initRound) + setting = &baRoundSetting{} + tickDuration time.Duration + ticker Ticker + ) + + // Check if this routine needs to awake in this round and prepare essential + // variables when yes. + checkRound := func() (isDKG bool) { + defer func() { + currentRound = nextRound + nextRound++ + }() + // Wait until the configuartion for next round is ready. + for { + if setting = mgr.generateSetting(nextRound); setting != nil { + break + } else { + mgr.logger.Debug("Round is not ready", "round", nextRound) + time.Sleep(1 * time.Second) + } + } + _, isDKG = setting.dkgSet[mgr.ID] + if isDKG { + mgr.logger.Info("Selected as dkg set", + "ID", mgr.ID, + "round", nextRound) + } else { + mgr.logger.Info("Not selected as dkg set", + "ID", mgr.ID, + "round", nextRound) + } + // Setup ticker + if tickDuration != curConfig.lambdaBA { + if ticker != nil { + ticker.Stop() + } + ticker = newTicker(mgr.gov, nextRound, TickerBA) + tickDuration = curConfig.lambdaBA + } + setting.ticker = ticker + return + } +Loop: + for { + select { + case <-mgr.ctx.Done(): + break Loop + default: + } + mgr.recv.isNotary = checkRound() + mgr.voteFilter = utils.NewVoteFilter() + mgr.voteFilter.Position.Round = currentRound + mgr.recv.emptyBlockHashMap = &sync.Map{} + if currentRound >= DKGDelayRound && mgr.recv.isNotary { + var err error + mgr.recv.npks, mgr.recv.psigSigner, err = + mgr.con.cfgModule.getDKGInfo(currentRound, false) + if err != nil { + mgr.logger.Warn("cannot get dkg info", + "round", currentRound, "error", err) + } + } else { + mgr.recv.npks = nil + mgr.recv.psigSigner = nil + } + // Run BA for this round. + mgr.recv.restartNotary <- types.Position{ + Round: currentRound, + Height: math.MaxUint64, + } + if err := mgr.baRoutineForOneRound(setting); err != nil { + mgr.logger.Error("BA routine failed", + "error", err, + "nodeID", mgr.ID) + break Loop + } + } +} + +func (mgr *agreementMgr) baRoutineForOneRound( + setting *baRoundSetting) (err error) { + agr := mgr.baModule + recv := mgr.recv + oldPos := agr.agreementID() + restart := func(restartPos types.Position) (breakLoop bool, err error) { + if !isStop(restartPos) { + if restartPos.Height+1 >= mgr.config(setting.round).RoundEndHeight() { + for { + select { + case <-mgr.ctx.Done(): + break + default: + } + tipRound := mgr.bcModule.tipRound() + if tipRound > setting.round { + break + } else { + mgr.logger.Debug("Waiting blockChain to change round...", + "curRound", setting.round, + "tipRound", tipRound) + } + time.Sleep(100 * time.Millisecond) + } + // This round is finished. + breakLoop = true + return + } + if restartPos.Older(oldPos) { + // The restartNotary event is triggered by 'BlockConfirmed' + // of some older block. + return + } + } + var nextHeight uint64 + var nextTime time.Time + for { + // Make sure we are stoppable. + select { + case <-mgr.ctx.Done(): + breakLoop = true + return + default: + } + nextHeight, nextTime = mgr.bcModule.nextBlock() + if nextHeight != notReadyHeight { + if isStop(restartPos) { + break + } + if nextHeight > restartPos.Height { + break + } + } + mgr.logger.Debug("BlockChain not ready!!!", + "old", oldPos, "restart", restartPos, "next", nextHeight) + time.Sleep(100 * time.Millisecond) + } + nextPos := types.Position{ + Round: setting.round, + Height: nextHeight, + } + oldPos = nextPos + var leader types.NodeID + leader, err = mgr.calcLeader(setting.dkgSet, setting.crs, nextPos) + if err != nil { + return + } + time.Sleep(nextTime.Sub(time.Now())) + setting.ticker.Restart() + agr.restart(setting.dkgSet, setting.threshold, nextPos, leader, setting.crs) + return + } +Loop: + for { + select { + case <-mgr.ctx.Done(): + break Loop + default: + } + if agr.confirmed() { + // Block until receive restartPos + select { + case restartPos := <-recv.restartNotary: + breakLoop, err := restart(restartPos) + if err != nil { + return err + } + if breakLoop { + break Loop + } + case <-mgr.ctx.Done(): + break Loop + } + } + select { + case restartPos := <-recv.restartNotary: + breakLoop, err := restart(restartPos) + if err != nil { + return err + } + if breakLoop { + break Loop + } + default: + } + if !mgr.recv.isNotary { + select { + case <-setting.ticker.Tick(): + continue Loop + case <-mgr.ctx.Done(): + break Loop + } + } + if err = agr.nextState(); err != nil { + mgr.logger.Error("Failed to proceed to next state", + "nodeID", mgr.ID.String(), + "error", err) + break Loop + } + if agr.pullVotes() { + pos := agr.agreementID() + mgr.logger.Debug("Calling Network.PullVotes for syncing votes", + "position", pos) + mgr.network.PullVotes(pos) + } + for i := 0; i < agr.clocks(); i++ { + // Priority select for agreement.done(). + select { + case <-agr.done(): + continue Loop + default: + } + select { + case <-agr.done(): + continue Loop + case <-setting.ticker.Tick(): + } + } + } + return nil +} |