diff options
author | Mission Liao <mission.liao@dexon.org> | 2019-01-15 17:45:44 +0800 |
---|---|---|
committer | Mission Liao <mission.liao@dexon.org> | 2019-01-28 16:22:44 +0800 |
commit | c9033d37c43c0185b13954b1b952e4471eb349e1 (patch) | |
tree | 88c31bd631167e9b2f24cdd6ae81a85369f48da4 | |
parent | 4cf4acd6b4cb8b61b555d27fae24a5b3d468c09f (diff) | |
download | dexon-consensus-c9033d37c43c0185b13954b1b952e4471eb349e1.tar.gz dexon-consensus-c9033d37c43c0185b13954b1b952e4471eb349e1.tar.zst dexon-consensus-c9033d37c43c0185b13954b1b952e4471eb349e1.zip |
Integrate agreementVoteCache to agreement module
-rw-r--r-- | core/agreement-mgr.go | 108 | ||||
-rw-r--r-- | core/agreement-state.go | 2 | ||||
-rw-r--r-- | core/agreement.go | 335 | ||||
-rw-r--r-- | core/consensus.go | 2 |
4 files changed, 170 insertions, 277 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 6b86c73..896f233 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -26,6 +26,7 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus/common" + agrPkg "github.com/dexon-foundation/dexon-consensus/core/agreement" "github.com/dexon-foundation/dexon-consensus/core/types" "github.com/dexon-foundation/dexon-consensus/core/utils" ) @@ -72,12 +73,11 @@ type agreementMgrConfig struct { } type baRoundSetting struct { - chainID uint32 - notarySet map[types.NodeID]struct{} - agr *agreement - recv *consensusBAReceiver - ticker Ticker - crs common.Hash + chainID uint32 + agr *agreement + recv *consensusBAReceiver + ticker Ticker + crs common.Hash } type agreementMgr struct { @@ -102,6 +102,7 @@ type agreementMgr struct { pendingVotes map[uint64][]*types.Vote pendingBlocks map[uint64][]*types.Block isRunning bool + vCache *agrPkg.VoteCache // This lock should be used when attempting to: // - add a new baModule. @@ -130,6 +131,7 @@ func newAgreementMgr(con *Consensus, initRound uint64, initRound: initRound, lastEndTime: initRoundBeginTime, processedBAResult: make(map[types.Position]struct{}, maxResultCache), + vCache: agrPkg.NewVoteCache(initRound), } } @@ -162,6 +164,20 @@ func (mgr *agreementMgr) run() { } } +func (mgr *agreementMgr) getAllNotarySets(round uint64, config *types.Config, + crs common.Hash) []map[types.NodeID]struct{} { + notarySets := []map[types.NodeID]struct{}{} + nodes, err := mgr.cache.GetNodeSet(round) + if err != nil { + panic(err) + } + for i := uint32(0); i < config.NumChains; i++ { + notarySets = append(notarySets, nodes.GetSubSet( + int(config.NotarySetSize), types.NewNotarySetTarget(crs, i))) + } + return notarySets +} + func (mgr *agreementMgr) appendConfig( round uint64, config *types.Config, crs common.Hash) (err error) { mgr.chainLock.Lock() @@ -195,14 +211,6 @@ func (mgr *agreementMgr) appendConfig( newLeaderSelector(genValidLeader(mgr), mgr.logger), mgr.signer, mgr.logger) - // Hacky way to initialize first notarySet. - nodes, err := mgr.cache.GetNodeSet(round) - if err != nil { - return err - } - agrModule.notarySet = nodes.GetSubSet( - int(config.NotarySetSize), - types.NewNotarySetTarget(crs, i)) // Hacky way to make agreement module self contained. recv.agreementModule = agrModule mgr.baModules = append(mgr.baModules, agrModule) @@ -215,30 +223,33 @@ func (mgr *agreementMgr) appendConfig( }(i) } } - return nil + return mgr.vCache.AppendNotarySets( + round, mgr.getAllNotarySets(round, config, crs)) } func (mgr *agreementMgr) processVote(v *types.Vote) error { mgr.chainLock.RLock() defer mgr.chainLock.RUnlock() + signals, err := mgr.vCache.ProcessVote(v) + if err != nil || len(signals) == 0 { + return err + } if v.Position.ChainID >= uint32(len(mgr.baModules)) { - mgr.logger.Error("Process vote for unknown chain to BA", + mgr.logger.Error("Process signal for unknown chain to BA", "position", &v.Position, "baChain", len(mgr.baModules), "baRound", len(mgr.configs), "initRound", mgr.initRound) return utils.ErrInvalidChainID } - filter := mgr.voteFilters[v.Position.ChainID] - if filter.Filter(v) { - return nil - } - v = v.Clone() - err := mgr.baModules[v.Position.ChainID].processVote(v) - if err == nil { - mgr.baModules[v.Position.ChainID].updateFilter(filter) + agreement := mgr.baModules[v.Position.ChainID] + for _, s := range signals { + if err := agreement.processSignal(s); err != nil { + // All agreement signals should be ok for BA modules to process. + panic(err) + } } - return err + return nil } func (mgr *agreementMgr) processBlock(b *types.Block) error { @@ -282,6 +293,10 @@ func (mgr *agreementMgr) processAgreementResult( result *types.AgreementResult) error { mgr.chainLock.RLock() defer mgr.chainLock.RUnlock() + signals, err := mgr.vCache.ProcessResult(result) + if err != nil || len(signals) == 0 { + return err + } if result.Position.ChainID >= uint32(len(mgr.baModules)) { mgr.logger.Error("Process unknown result for unknown chain to BA", "position", &result.Position, @@ -292,41 +307,24 @@ func (mgr *agreementMgr) processAgreementResult( } agreement := mgr.baModules[result.Position.ChainID] aID := agreement.agreementID() - if isStop(aID) { - return nil - } - if result.Position == aID && !agreement.confirmed() { + if !isStop(aID) && result.Position.Newer(&aID) { mgr.logger.Info("Syncing BA", "position", &result.Position) - for key := range result.Votes { - if err := agreement.processVote(&result.Votes[key]); err != nil { - return err - } - } - } else if result.Position.Newer(&aID) { - mgr.logger.Info("Fast syncing BA", "position", &result.Position) - nodes, err := mgr.cache.GetNodeSet(result.Position.Round) - if err != nil { - return err - } - mgr.logger.Debug("Calling Network.PullBlocks for fast syncing BA", + mgr.logger.Debug("Calling Network.PullBlocks for syncing BA", "hash", result.BlockHash) mgr.network.PullBlocks(common.Hashes{result.BlockHash}) mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round) crs := utils.GetCRSWithPanic(mgr.gov, result.Position.Round, mgr.logger) - nIDs := nodes.GetSubSet( - int(utils.GetConfigWithPanic( - mgr.gov, result.Position.Round, mgr.logger).NotarySetSize), - types.NewNotarySetTarget(crs, result.Position.ChainID)) - for key := range result.Votes { - if err := agreement.processVote(&result.Votes[key]); err != nil { - return err - } - } leader, err := mgr.cache.GetLeaderNode(result.Position) if err != nil { return err } - agreement.restart(nIDs, result.Position, leader, crs) + agreement.restart(result.Position, leader, crs) + } + for _, s := range signals { + if err := agreement.processSignal(s); err != nil { + // All agreement signals should be ok for BA modules to process. + panic(err) + } } return nil } @@ -396,10 +394,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { if err != nil { panic(err) } - setting.crs = config.crs - setting.notarySet = notarySet - _, isNotary = setting.notarySet[mgr.ID] - if isNotary { + if _, isNotary = notarySet[mgr.ID]; isNotary { mgr.logger.Info("selected as notary set", "ID", mgr.ID, "round", nextRound, @@ -410,6 +405,7 @@ func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { "round", nextRound, "chainID", chainID) } + setting.crs = config.crs // Setup ticker if tickDuration != config.lambdaBA { if setting.ticker != nil { @@ -546,7 +542,7 @@ func (mgr *agreementMgr) baRoutineForOneRound( } time.Sleep(nextTime.Sub(time.Now())) setting.ticker.Restart() - agr.restart(setting.notarySet, nextPos, leader, setting.crs) + agr.restart(nextPos, leader, setting.crs) return } Loop: diff --git a/core/agreement-state.go b/core/agreement-state.go index 3579fbf..febed01 100644 --- a/core/agreement-state.go +++ b/core/agreement-state.go @@ -157,7 +157,7 @@ func (s *commitState) nextState() (agreementState, error) { hash := types.SkipBlockHash // Once we received 2t+1 PreCom votes at current period, we should be // able to receive that signal and update locked value/period. - if s.a.lockRound == s.a.period { + if s.a.lockIter == s.a.period { hash = s.a.lockValue } s.a.recv.ProposeVote(types.NewVote(types.VoteCom, hash, s.a.period)) diff --git a/core/agreement.go b/core/agreement.go index 9e5d8ab..300979e 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -25,6 +25,7 @@ import ( "time" "github.com/dexon-foundation/dexon-consensus/common" + agrPkg "github.com/dexon-foundation/dexon-consensus/core/agreement" "github.com/dexon-foundation/dexon-consensus/core/types" "github.com/dexon-foundation/dexon-consensus/core/utils" ) @@ -99,17 +100,15 @@ type pendingVote struct { type agreementData struct { recv agreementReceiver - ID types.NodeID - isLeader bool - leader *leaderSelector - lockValue common.Hash - lockIter uint64 - period uint64 - requiredVote int - votes map[uint64][]map[types.NodeID]*types.Vote - lock sync.RWMutex - blocks map[types.NodeID]*types.Block - blocksLock sync.Mutex + ID types.NodeID + isLeader bool + leader *leaderSelector + lockValue common.Hash + lockIter uint64 + period uint64 + lock sync.RWMutex + blocks map[types.NodeID]*types.Block + blocksLock sync.Mutex } // agreement is the agreement protocal describe in the Crypto Shuffle Algorithm. @@ -118,12 +117,11 @@ type agreement struct { data *agreementData aID *atomic.Value doneChan chan struct{} - notarySet map[types.NodeID]struct{} hasVoteFast bool hasOutput bool lock sync.RWMutex pendingBlock []pendingBlock - pendingVote []pendingVote + pendingSignal []*agrPkg.Signal candidateBlock map[common.Hash]*types.Block fastForward chan uint64 signer *utils.Signer @@ -155,8 +153,7 @@ func newAgreement( // restart the agreement func (a *agreement) restart( - notarySet map[types.NodeID]struct{}, aID types.Position, leader types.NodeID, - crs common.Hash) { + aID types.Position, leader types.NodeID, crs common.Hash) { if !func() bool { a.lock.Lock() defer a.lock.Unlock() @@ -170,11 +167,8 @@ func (a *agreement) restart( defer a.data.lock.Unlock() a.data.blocksLock.Lock() defer a.data.blocksLock.Unlock() - a.data.votes = make(map[uint64][]map[types.NodeID]*types.Vote) - a.data.votes[1] = newVoteListMap() a.data.period = 2 a.data.blocks = make(map[types.NodeID]*types.Block) - a.data.requiredVote = len(notarySet)/3*2 + 1 a.data.leader.restart(crs) a.data.lockValue = types.NullBlockHash a.data.lockIter = 0 @@ -187,7 +181,6 @@ func (a *agreement) restart( a.hasVoteFast = false a.hasOutput = false a.state = newFastState(a.data) - a.notarySet = notarySet a.candidateBlock = make(map[common.Hash]*types.Block) a.aID.Store(struct { pos types.Position @@ -219,43 +212,39 @@ func (a *agreement) restart( } a.pendingBlock = newPendingBlock }() - - replayVote := make([]*types.Vote, 0) + replaySignal := make([]*agrPkg.Signal, 0) func() { a.lock.Lock() defer a.lock.Unlock() - newPendingVote := make([]pendingVote, 0) - for _, pending := range a.pendingVote { - if aID.Newer(&pending.vote.Position) { + newPendingSignal := make([]*agrPkg.Signal, 0) + for _, pending := range a.pendingSignal { + if aID.Newer(&pending.Position) { continue - } else if pending.vote.Position == aID { - replayVote = append(replayVote, pending.vote) - } else if pending.receivedTime.After(expireTime) { - newPendingVote = append(newPendingVote, pending) + } else if pending.Position == aID { + replaySignal = append(replaySignal, pending) + } else { + newPendingSignal = append(newPendingSignal, pending) } } - a.pendingVote = newPendingVote + a.pendingSignal = newPendingSignal }() - for _, block := range replayBlock { if err := a.processBlock(block); err != nil { a.logger.Error("failed to process block when restarting agreement", "block", block) } } - - for _, vote := range replayVote { - if err := a.processVote(vote); err != nil { - a.logger.Error("failed to process vote when restarting agreement", - "vote", vote) + for _, signal := range replaySignal { + if err := a.processSignal(signal); err != nil { + a.logger.Error("failed to process signal when restarting agreement", + "signal", signal) } } } func (a *agreement) stop() { - a.restart(make(map[types.NodeID]struct{}), types.Position{ - ChainID: math.MaxUint32, - }, types.NodeID{}, common.Hash{}) + a.restart(types.Position{ChainID: math.MaxUint32}, types.NodeID{}, + common.Hash{}) } func isStop(aID types.Position) bool { @@ -315,40 +304,6 @@ func (a *agreement) nextState() (err error) { return } -func (a *agreement) sanityCheck(vote *types.Vote) error { - if vote.Type >= types.MaxVoteType { - return ErrInvalidVote - } - if _, exist := a.notarySet[vote.ProposerID]; !exist { - return ErrNotInNotarySet - } - ok, err := utils.VerifyVoteSignature(vote) - if err != nil { - return err - } - if !ok { - return ErrIncorrectVoteSignature - } - return nil -} - -func (a *agreement) checkForkVote(vote *types.Vote) ( - alreadyExist bool, err error) { - a.data.lock.RLock() - defer a.data.lock.RUnlock() - if votes, exist := a.data.votes[vote.Period]; exist { - if oldVote, exist := votes[vote.Type][vote.ProposerID]; exist { - alreadyExist = true - if vote.BlockHash != oldVote.BlockHash { - a.data.recv.ReportForkVote(oldVote, vote) - err = &ErrForkVote{vote.ProposerID, oldVote, vote} - return - } - } - } - return -} - // prepareVote prepares a vote. func (a *agreement) prepareVote(vote *types.Vote) (err error) { vote.Position = a.agreementID() @@ -367,137 +322,118 @@ func (a *agreement) updateFilter(filter *utils.VoteFilter) { filter.Height = a.agreementID().Height } -// processVote is the entry point for processing Vote. -func (a *agreement) processVote(vote *types.Vote) error { +// processSignal is the entry point for processing agreement.Signal. +func (a *agreement) processSignal(signal *agrPkg.Signal) error { + addPullBlocks := func(votes []types.Vote) map[common.Hash]struct{} { + set := make(map[common.Hash]struct{}) + for _, vote := range votes { + if vote.BlockHash == types.NullBlockHash || + vote.BlockHash == types.SkipBlockHash { + continue + } + if _, found := a.findCandidateBlockNoLock(vote.BlockHash); !found { + set[vote.BlockHash] = struct{}{} + } + } + return set + } a.lock.Lock() defer a.lock.Unlock() - if err := a.sanityCheck(vote); err != nil { - return err - } aID := a.agreementID() - // Agreement module has stopped. if isStop(aID) { - // Hacky way to not drop first votes for height 0. - if vote.Position.Height == uint64(0) { - a.pendingVote = append(a.pendingVote, pendingVote{ - vote: vote, - receivedTime: time.Now().UTC(), - }) + // Hacky way to not drop the first signal for height 0. + if signal.Position.Height == 0 { + a.pendingSignal = append(a.pendingSignal, signal) } + a.logger.Trace("Dropping signal when stopped", "signal", signal) return nil } - if vote.Position != aID { - if aID.Newer(&vote.Position) { + if signal.Position != aID { + if aID.Newer(&signal.Position) { + a.logger.Trace("Dropping older stopped", "signal", signal) return nil } - a.pendingVote = append(a.pendingVote, pendingVote{ - vote: vote, - receivedTime: time.Now().UTC(), - }) - return nil - } - exist, err := a.checkForkVote(vote) - if err != nil { - return err - } - if exist { - return nil - } - - a.data.lock.Lock() - defer a.data.lock.Unlock() - if _, exist := a.data.votes[vote.Period]; !exist { - a.data.votes[vote.Period] = newVoteListMap() - } - if _, exist := a.data.votes[vote.Period][vote.Type][vote.ProposerID]; exist { + a.pendingSignal = append(a.pendingSignal, signal) return nil } - a.data.votes[vote.Period][vote.Type][vote.ProposerID] = vote - if !a.hasOutput && - (vote.Type == types.VoteCom || - vote.Type == types.VoteFast || - vote.Type == types.VoteFastCom) { - if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok && - hash != types.SkipBlockHash { - if vote.Type == types.VoteFast { - if !a.hasVoteFast { - a.data.recv.ProposeVote( - types.NewVote(types.VoteFastCom, hash, vote.Period)) - a.data.lockValue = hash - a.data.lockIter = 1 - a.hasVoteFast = true - } - } else { - a.hasOutput = true - votes := a.data.votes[vote.Period][vote.Type] - votesList := make([]types.Vote, 0, len(votes)) - for _, v := range votes { - if v.BlockHash != hash { - continue - } - votesList = append(votesList, *v) - } - a.data.recv.ConfirmBlock(hash, votesList) - close(a.doneChan) - a.doneChan = nil - } - return nil - } - } else if a.hasOutput { - return nil - } - - // Check if the agreement requires fast-forwarding. - if len(a.fastForward) > 0 { + a.logger.Trace("ProcessSignal", "signal", signal) + if a.hasOutput { return nil } - if vote.Type == types.VotePreCom { - if vote.Period < a.data.lockIter { - // This PreCom is useless for us. - return nil + refVote := &signal.Votes[0] + switch signal.Type { + case agrPkg.SignalFork: + a.data.recv.ReportForkVote(refVote, &signal.Votes[1]) + case agrPkg.SignalDecide: + if a.hasOutput { + break } - if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok && - hash != types.SkipBlockHash { + a.hasOutput = true + a.data.recv.ConfirmBlock(refVote.BlockHash, signal.Votes) + close(a.doneChan) + a.doneChan = nil + case agrPkg.SignalLock: + switch signal.VType { + case types.VotePreCom: + if len(a.fastForward) > 0 { + break + } // Condition 1. - if a.data.period >= vote.Period && vote.Period > a.data.lockIter && - vote.BlockHash != a.data.lockValue { - a.data.lockValue = hash - a.data.lockIter = vote.Period - return nil + if a.data.period >= signal.Period && + signal.Period > a.data.lockIter && + refVote.BlockHash != a.data.lockValue { + a.data.lockValue = refVote.BlockHash + a.data.lockIter = signal.Period + break } // Condition 2. - if vote.Period > a.data.period { - if vote.Period > a.data.lockIter { - a.data.lockValue = hash - a.data.lockIter = vote.Period + if signal.Period > a.data.period { + if signal.Period > a.data.lockIter { + a.data.lockValue = refVote.BlockHash + a.data.lockIter = signal.Period } - a.fastForward <- vote.Period - return nil + a.fastForward <- signal.Period + break + } + case types.VoteFast: + if a.hasOutput { + break + } + if a.hasVoteFast { + break } + a.data.recv.ProposeVote(types.NewVote( + types.VoteFastCom, refVote.BlockHash, signal.Period)) + a.data.lockValue = refVote.BlockHash + a.data.lockIter = 1 + a.hasVoteFast = true + default: + panic(fmt.Errorf("unknwon vote type for signal: %s, %s", refVote, + signal)) } - } - // Condition 3. - if vote.Type == types.VoteCom && vote.Period >= a.data.period && - len(a.data.votes[vote.Period][types.VoteCom]) >= a.data.requiredVote { - hashes := common.Hashes{} - addPullBlocks := func(voteType types.VoteType) { - for _, vote := range a.data.votes[vote.Period][voteType] { - if vote.BlockHash == types.NullBlockHash || - vote.BlockHash == types.SkipBlockHash { - continue + case agrPkg.SignalForward: + switch signal.VType { + case types.VoteCom: + // Condition 3. + if len(a.fastForward) > 0 { + break + } + if signal.Period >= a.data.period { + hashes := common.Hashes{} + for h := range addPullBlocks(signal.Votes) { + hashes = append(hashes, h) } - if _, found := a.findCandidateBlockNoLock(vote.BlockHash); !found { - hashes = append(hashes, vote.BlockHash) + if len(hashes) > 0 { + a.data.recv.PullBlocks(hashes) } + a.fastForward <- signal.Period + 1 } + default: + panic(fmt.Errorf("unknwon vote type for signal: %s, %s", refVote, + signal)) } - addPullBlocks(types.VotePreCom) - addPullBlocks(types.VoteCom) - if len(hashes) > 0 { - a.data.recv.PullBlocks(hashes) - } - a.fastForward <- vote.Period + 1 - return nil + default: + panic(fmt.Errorf("unknown signal type: %v", signal.Type)) } return nil } @@ -515,7 +451,7 @@ func (a *agreement) done() <-chan struct{} { if period <= a.data.period { break } - a.data.setPeriod(period) + a.data.period = period a.state = newPreCommitState(a.data) close(a.doneChan) a.doneChan = make(chan struct{}) @@ -626,42 +562,3 @@ func (a *agreement) findBlockNoLock(hash common.Hash) (*types.Block, bool) { } return b, e } - -func (a *agreementData) countVote(period uint64, voteType types.VoteType) ( - blockHash common.Hash, ok bool) { - a.lock.RLock() - defer a.lock.RUnlock() - return a.countVoteNoLock(period, voteType) -} - -func (a *agreementData) countVoteNoLock( - period uint64, voteType types.VoteType) (blockHash common.Hash, ok bool) { - votes, exist := a.votes[period] - if !exist { - return - } - candidate := make(map[common.Hash]int) - for _, vote := range votes[voteType] { - if _, exist := candidate[vote.BlockHash]; !exist { - candidate[vote.BlockHash] = 0 - } - candidate[vote.BlockHash]++ - } - for candidateHash, votes := range candidate { - if votes >= a.requiredVote { - blockHash = candidateHash - ok = true - return - } - } - return -} - -func (a *agreementData) setPeriod(period uint64) { - for i := a.period + 1; i <= period; i++ { - if _, exist := a.votes[i]; !exist { - a.votes[i] = newVoteListMap() - } - } - a.period = period -} diff --git a/core/consensus.go b/core/consensus.go index f37f887..f973808 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -80,7 +80,7 @@ func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { return } go func() { - if err := recv.agreementModule.processVote(vote); err != nil { + if err := recv.consensus.baMgr.processVote(vote); err != nil { recv.consensus.logger.Error("Failed to process self vote", "error", err, "vote", vote) |