diff options
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go')
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go | 152 |
1 files changed, 107 insertions, 45 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go index 9e863696a..a8fab7c69 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go @@ -94,6 +94,7 @@ type agreementMgr struct { initRound uint64 configs []*agreementMgrConfig baModules []*agreement + voteFilters []*utils.VoteFilter waitGroup sync.WaitGroup pendingVotes map[uint64][]*types.Vote pendingBlocks map[uint64][]*types.Block @@ -201,6 +202,7 @@ func (mgr *agreementMgr) appendConfig( // Hacky way to make agreement module self contained. recv.agreementModule = agrModule mgr.baModules = append(mgr.baModules, agrModule) + mgr.voteFilters = append(mgr.voteFilters, utils.NewVoteFilter()) if mgr.isRunning { mgr.waitGroup.Add(1) go func(idx uint32) { @@ -213,7 +215,6 @@ func (mgr *agreementMgr) appendConfig( } func (mgr *agreementMgr) processVote(v *types.Vote) error { - v = v.Clone() mgr.lock.RLock() defer mgr.lock.RUnlock() if v.Position.ChainID >= uint32(len(mgr.baModules)) { @@ -224,7 +225,16 @@ func (mgr *agreementMgr) processVote(v *types.Vote) error { "initRound", mgr.initRound) return utils.ErrInvalidChainID } - return mgr.baModules[v.Position.ChainID].processVote(v) + filter := mgr.voteFilters[v.Position.ChainID] + if filter.Filter(v) { + return nil + } + v = v.Clone() + err := mgr.baModules[v.Position.ChainID].processVote(v) + if err == nil { + mgr.baModules[v.Position.ChainID].updateFilter(filter) + } + return err } func (mgr *agreementMgr) processBlock(b *types.Block) error { @@ -419,7 +429,11 @@ Loop: // Run BA for this round. recv.roundValue.Store(currentRound) recv.changeNotaryTime = roundEndTime - recv.restartNotary <- types.Position{ChainID: math.MaxUint32} + recv.restartNotary <- types.Position{ + Round: setting.recv.round(), + ChainID: math.MaxUint32, + } + mgr.voteFilters[chainID] = utils.NewVoteFilter() if err := mgr.baRoutineForOneRound(&setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, @@ -435,6 +449,79 @@ func (mgr *agreementMgr) baRoutineForOneRound( agr := setting.agr recv := setting.recv oldPos := agr.agreementID() + restart := func(restartPos types.Position) (breakLoop bool, err error) { + if !isStop(restartPos) { + if restartPos.Round > oldPos.Round { + for { + select { + case <-mgr.ctx.Done(): + break + default: + } + tipRound := mgr.lattice.TipRound(setting.chainID) + if tipRound > restartPos.Round { + // It's a vary rare that this go routine sleeps for entire round. + break + } else if tipRound != restartPos.Round { + mgr.logger.Debug("Waiting lattice to change round...", + "pos", &restartPos) + } else { + break + } + time.Sleep(100 * time.Millisecond) + } + // This round is finished. + breakLoop = true + return + } + if restartPos.Older(&oldPos) { + // The restartNotary event is triggered by 'BlockConfirmed' + // of some older block. + return + } + } + var nextHeight uint64 + var nextTime time.Time + for { + nextHeight, nextTime, err = + mgr.lattice.NextBlock(recv.round(), setting.chainID) + if err != nil { + mgr.logger.Debug("Error getting next height", + "error", err, + "round", recv.round(), + "chainID", setting.chainID) + err = nil + nextHeight = restartPos.Height + } + if isStop(oldPos) && nextHeight == 0 { + break + } + if isStop(restartPos) && nextHeight == 0 { + break + } + if nextHeight > restartPos.Height { + break + } + mgr.logger.Debug("Lattice not ready!!!", + "old", &oldPos, "restart", &restartPos, "next", nextHeight) + time.Sleep(100 * time.Millisecond) + } + nextPos := types.Position{ + Round: recv.round(), + ChainID: setting.chainID, + Height: nextHeight, + } + oldPos = nextPos + var leader types.NodeID + leader, err = mgr.cache.GetLeaderNode(nextPos) + if err != nil { + return + } + time.Sleep(nextTime.Sub(time.Now())) + setting.ticker.Restart() + agr.restart(setting.notarySet, nextPos, leader, setting.crs) + return + } Loop: for { select { @@ -442,55 +529,30 @@ Loop: break Loop default: } - select { - case restartPos := <-recv.restartNotary: - if !isStop(restartPos) { - if restartPos.Round > oldPos.Round { - // This round is finished. - break Loop - } - if restartPos.Older(&oldPos) { - // The restartNotary event is triggered by 'BlockConfirmed' - // of some older block. - break - } - } - var nextHeight uint64 - var nextTime time.Time - for { - nextHeight, nextTime, err = - mgr.lattice.NextBlock(recv.round(), setting.chainID) + if agr.confirmed() { + // Block until receive restartPos + select { + case restartPos := <-recv.restartNotary: + breakLoop, err := restart(restartPos) if err != nil { - mgr.logger.Debug("Error getting next height", - "error", err, - "round", recv.round(), - "chainID", setting.chainID) - err = nil - nextHeight = restartPos.Height + return err } - if isStop(restartPos) || nextHeight == 0 { - break - } - if nextHeight > restartPos.Height { - break + if breakLoop { + break Loop } - mgr.logger.Debug("Lattice not ready!!!", - "old", &restartPos, "next", nextHeight) - time.Sleep(100 * time.Millisecond) - } - nextPos := types.Position{ - Round: recv.round(), - ChainID: setting.chainID, - Height: nextHeight, + case <-mgr.ctx.Done(): + break Loop } - oldPos = nextPos - leader, err := mgr.cache.GetLeaderNode(nextPos) + } + select { + case restartPos := <-recv.restartNotary: + breakLoop, err := restart(restartPos) if err != nil { return err } - time.Sleep(nextTime.Sub(time.Now())) - setting.ticker.Restart() - agr.restart(setting.notarySet, nextPos, leader, setting.crs) + if breakLoop { + break Loop + } default: } if agr.pullVotes() { |