diff options
author | Mission Liao <mission.liao@dexon.org> | 2019-04-03 16:14:26 +0800 |
---|---|---|
committer | Mission Liao <mission.liao@dexon.org> | 2019-04-15 14:41:19 +0800 |
commit | 603410f66d9c0bdc23c8d647f5e732e88e32265e (patch) | |
tree | cc3a366fc559118db21115a1d5435b5c88ec1799 | |
parent | 46dab5936d7a9e52f42edaa926a1adb6dcafd81b (diff) | |
download | dexon-consensus-603410f66d9c0bdc23c8d647f5e732e88e32265e.tar.gz dexon-consensus-603410f66d9c0bdc23c8d647f5e732e88e32265e.tar.zst dexon-consensus-603410f66d9c0bdc23c8d647f5e732e88e32265e.zip |
Add agreementCache
-rw-r--r-- | core/agreement-cache.go | 943 | ||||
-rw-r--r-- | core/agreement-cache_test.go | 907 |
2 files changed, 1850 insertions, 0 deletions
diff --git a/core/agreement-cache.go b/core/agreement-cache.go new file mode 100644 index 0000000..1fd33f6 --- /dev/null +++ b/core/agreement-cache.go @@ -0,0 +1,943 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + +package core + +import ( + "bytes" + "errors" + "fmt" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +// Errors raised from agreementCache. +var ( + ErrUnableToAgree = errors.New("unable to agree") + ErrUnknownHeight = errors.New("unknown height") +) + +// voteSet is a place to keep one type of votes for one period in one position. +type voteSet struct { + votes map[types.NodeID]types.Vote + counts map[common.Hash]int + best common.Hash + votesAsList []types.Vote + triggeredEvent *agreementEvent + purged bool +} + +func newVoteSet(size int) *voteSet { + return &voteSet{ + votes: make(map[types.NodeID]types.Vote, size), + votesAsList: make([]types.Vote, 0, size), + counts: make(map[common.Hash]int), + } +} + +func (s *voteSet) add(v types.Vote) (oldV types.Vote, forked bool) { + if s.purged { + return + } + oldV, exist := s.votes[v.ProposerID] + if exist { + forked = oldV.BlockHash != v.BlockHash + return + } + s.votes[v.ProposerID] = v + s.votesAsList = append(s.votesAsList, v) + s.counts[v.BlockHash]++ + if s.counts[v.BlockHash] > s.counts[s.best] { + s.best = v.BlockHash + } + return +} + +func (s *voteSet) votesByHash(h common.Hash, estimatedLength int) []types.Vote { + votes := make([]types.Vote, 0, estimatedLength) + for _, v := range s.votesAsList { + if v.BlockHash == h { + votes = append(votes, v) + } + } + return votes +} + +func (s *voteSet) isAgreedOnOneHashPossible(maxCount, threshold int) bool { + if len(s.votes) == maxCount { + return s.counts[s.best] >= threshold + } else if len(s.votes) > maxCount { + panic(fmt.Errorf("collecting votes exceeding max count: %v %v %v", + maxCount, threshold, len(s.votes))) + } + return maxCount-len(s.votes)+s.counts[s.best] >= threshold +} + +func (s *voteSet) appendTo(src []types.Vote) []types.Vote { + if len(s.votesAsList) == 0 { + if s.triggeredEvent == nil { + return src + } + return append(src, s.triggeredEvent.Votes...) + } + return append(src, s.votesAsList...) +} + +func (s *voteSet) setEvent(e *agreementEvent, purge bool) { + if purge { + s.votes = nil + s.counts = nil + s.votesAsList = nil + s.purged = true + } + s.triggeredEvent = e +} + +func (s *voteSet) isPurged() bool { + return s.purged +} + +type agreementCacheConfig struct { + utils.RoundBasedConfig + + notarySet map[types.NodeID]struct{} + lambdaBA time.Duration +} + +func (c *agreementCacheConfig) from( + round uint64, + config *types.Config, + notarySet map[types.NodeID]struct{}) { + c.SetupRoundBasedFields(round, config) + c.lambdaBA = config.LambdaBA + c.notarySet = notarySet +} + +func newAgreementCacheConfig( + prev agreementCacheConfig, + config *types.Config, + notarySet map[types.NodeID]struct{}) (c agreementCacheConfig) { + c = agreementCacheConfig{} + c.from(prev.RoundID()+1, config, notarySet) + c.AppendTo(prev.RoundBasedConfig) + return +} + +type agreementSnapshotVotesIndex struct { + position types.Position + period uint64 + idx int +} + +func (x agreementSnapshotVotesIndex) Older(o agreementSnapshotVotesIndex) bool { + return x.position.Older(o.position) || + (x.position.Equal(o.position) && x.period < o.period) +} + +func (x agreementSnapshotVotesIndex) Newer(o agreementSnapshotVotesIndex) bool { + return x.position.Newer(o.position) || + (x.position.Equal(o.position) && x.period > o.period) +} + +type agreementSnapshotVotesIndexes []agreementSnapshotVotesIndex + +func (x agreementSnapshotVotesIndexes) nearestNewerIdx( + position types.Position, + period uint64) (agreementSnapshotVotesIndex, bool) { + fakeIdx := agreementSnapshotVotesIndex{position: position, period: period} + i := sort.Search(len(x), func(i int) bool { + return x[i].Newer(fakeIdx) + }) + if i < len(x) { + return x[i], true + } + return agreementSnapshotVotesIndex{}, false +} + +func (x agreementSnapshotVotesIndexes) nearestNewerOrEqualIdx( + position types.Position, + period uint64) (agreementSnapshotVotesIndex, bool) { + fakeIdx := agreementSnapshotVotesIndex{position: position, period: period} + i := sort.Search(len(x), func(i int) bool { + return !x[i].Older(fakeIdx) + }) + if i < len(x) { + return x[i], true + } + return agreementSnapshotVotesIndex{}, false +} + +// agreementSnapshot represents a group of ongoing votes that could be pulled +// to help others nodes to reach consensus. It supports to take a subset of +// snapshoted votes by providing the latest locked position P and period r. +// (NOTE: assume P-1 is already decided). +// +// When a subset request (P, r) is made, all pre-commit/fast votes later than +// that position would be returned. And all fast-commit/commit votes later than +// (P, 0) would be returned, too. To support this kind of access, the "votes" +// slice in snapshot is splited into two parts and can be viewed as this: +// +// |<- pre-commit/fast ->|<- commit/fast-commit ->| +// |P0,r0|P1,r1|P1,r2|P2,r1|P3,r1|P2,r2|P2,r1|P1,r2|P1,r0|P0,r0| +// +// When a pull request (P1,r2) is made, the slots between (P2,r1)->(P3,r1) in +// pre-commit/fast part and slots between (P1,r0)->(P2,r2) in +// commit/fast-commit part would be taken, and they are continuous in the slice, +// thus we don't need to recreate a new slice for pulling. +type agreementSnapshot struct { + expired time.Time + votes []types.Vote + evtDecide *agreementEvent + preComIdxes agreementSnapshotVotesIndexes // pre-commit/fast parts. + comIdxes agreementSnapshotVotesIndexes // commit/fast-commit parts. + boundary int // middle line between parts. +} + +func (s agreementSnapshot) get(position types.Position, lockPeriod uint64) ( + r *types.AgreementResult, votes []types.Vote) { + if s.evtDecide != nil && s.evtDecide.Position.Newer(position) { + result := s.evtDecide.toAgreementResult() + r = &result + } + begin, end := s.boundary, s.boundary + if i, found := s.preComIdxes.nearestNewerIdx(position, lockPeriod); found { + begin = i.idx + } + if i, found := s.comIdxes.nearestNewerOrEqualIdx(position, 0); found { + end = i.idx + } + votes = s.votes[begin:end] + return +} + +func (s *agreementSnapshot) addPreCommitVotes( + position types.Position, period uint64, votes []types.Vote) { + if len(votes) == 0 { + return + } + i := agreementSnapshotVotesIndex{ + position: position, + period: period, + } + if len(s.preComIdxes) > 0 && !s.preComIdxes[len(s.preComIdxes)-1].Older(i) { + panic(fmt.Errorf( + "invalid snapshot index additional in pre-commit case: %+v,%+v", + i, s.preComIdxes[len(s.preComIdxes)-1])) + } + // In pre-commit case, we would record the index of the first vote. + i.idx = len(s.votes) + s.votes = append(s.votes, votes...) + if len(s.votes) == i.idx { + return + } + s.preComIdxes = append(s.preComIdxes, i) +} + +func (s *agreementSnapshot) addCommitVotes( + position types.Position, period uint64, votes []types.Vote) { + if len(votes) == 0 { + return + } + i := agreementSnapshotVotesIndex{ + position: position, + period: period, + } + if len(s.comIdxes) > 0 && !s.comIdxes[len(s.comIdxes)-1].Newer(i) { + panic(fmt.Errorf( + "invalid snapshot index additional in commit case: %+v,%+v", + i, s.comIdxes[len(s.comIdxes)-1])) + } + // In commit case, we would record the index of the last vote. + beforeAppend := len(s.votes) + s.votes = append(s.votes, votes...) + if len(s.votes) == beforeAppend { + return + } + i.idx = len(s.votes) + s.comIdxes = append(agreementSnapshotVotesIndexes{i}, s.comIdxes...) +} + +func (s *agreementSnapshot) markBoundary() { + if len(s.comIdxes) > 0 { + panic(fmt.Errorf("attempt to mark boundary after commit votes added")) + } + s.boundary = len(s.votes) +} + +type agreementCacheReceiver interface { + GetNotarySet(round uint64) (map[types.NodeID]struct{}, error) + RecoverTSIG(blockHash common.Hash, votes []types.Vote) ([]byte, error) + VerifyTSIG(round uint64, hash common.Hash, tSig []byte) (bool, error) +} + +type agreementCache struct { + recv agreementCacheReceiver + configs []agreementCacheConfig + configsLock sync.RWMutex + lock sync.RWMutex + vSets map[types.Position]map[uint64][]*voteSet // Position > Period > Type + refEvts []*agreementEvent + refPosition atomic.Value + lastSnapshot atomic.Value + lastSnapshotCh chan time.Time +} + +func (c *agreementCache) votes(v types.Vote, config agreementCacheConfig, + createIfNotExist bool) *voteSet { + vForPosition, exist := c.vSets[v.Position] + if !exist { + if !createIfNotExist { + return nil + } + vForPosition = make(map[uint64][]*voteSet) + c.vSets[v.Position] = vForPosition + } + vForPeriod, exist := vForPosition[v.Period] + if !exist { + if !createIfNotExist { + return nil + } + reserved := len(config.notarySet) + vForPeriod = make([]*voteSet, types.MaxVoteType) + for idx := range vForPeriod { + if types.VoteType(idx) == types.VoteInit { + continue + } + vForPeriod[idx] = &voteSet{ + votes: make(map[types.NodeID]types.Vote, reserved), + votesAsList: make([]types.Vote, 0, reserved), + counts: make(map[common.Hash]int), + } + } + vForPosition[v.Period] = vForPeriod + } + return vForPeriod[v.Type] +} + +func (c *agreementCache) config( + round uint64) (cfg agreementCacheConfig, found bool) { + c.configsLock.RLock() + defer c.configsLock.RUnlock() + if len(c.configs) == 0 { + return + } + firstRound := c.configs[0].RoundID() + if round < firstRound || round >= firstRound+uint64(len(c.configs)) { + return + } + cfg, found = c.configs[round-firstRound], true + return +} + +func (c *agreementCache) firstConfig() (cfg agreementCacheConfig, found bool) { + c.configsLock.RLock() + defer c.configsLock.RUnlock() + if len(c.configs) == 0 { + return + } + return c.configs[0], true +} + +// isIgnorable is the most trivial way to filter outdated messages. +func (c *agreementCache) isIgnorable(p types.Position) bool { + return !p.Newer(c.refPosition.Load().(types.Position)) +} + +func (c *agreementCache) isIgnorableNoLock(p types.Position) bool { + evtDecide := c.refEvts[agreementEventDecide] + if evtDecide == nil { + return false + } + return !p.Newer(evtDecide.Position) +} + +func (c *agreementCache) isVoteIgnorable(v types.Vote) bool { + if v.Type == types.VoteInit { + return true + } + if c.isIgnorableNoLock(v.Position) { + return true + } + switch v.Type { + case types.VotePreCom, types.VoteFast: + evtLock := c.refEvts[agreementEventLock] + if evtLock == nil || evtLock.Position.Older(v.Position) { + return false + } + if evtLock.Position.Newer(v.Position) { + return true + } + return evtLock.period >= v.Period + } + return false +} + +func (c *agreementCache) checkVote( + v types.Vote, config agreementCacheConfig) (bool, *agreementEvent, error) { + if v.Type >= types.MaxVoteType { + return false, nil, ErrInvalidVote + } + if !config.Contains(v.Position.Height) { + return false, nil, ErrUnknownHeight + } + ok, err := utils.VerifyVoteSignature(&v) + if err != nil { + return false, nil, err + } + if !ok { + return false, nil, ErrIncorrectVoteSignature + } + c.lock.RLock() + defer c.lock.RUnlock() + if c.isVoteIgnorable(v) { + return false, nil, nil + } + if _, exist := config.notarySet[v.ProposerID]; !exist { + return false, nil, ErrNotInNotarySet + } + // Check for forked votes. + // + // NOTE: we won't be able to detect forked votes if they are purged. + if vSet := c.votes(v, config, false); vSet != nil { + if oldV, exist := vSet.votes[v.ProposerID]; exist { + if v.BlockHash != oldV.BlockHash { + return false, newAgreementEvent( + agreementEventFork, []types.Vote{oldV, v}), nil + } + } + } + return true, nil, nil +} + +func (c *agreementCache) checkResult(r *types.AgreementResult, + config agreementCacheConfig) (bool, error) { + if r.Position.Round < DKGDelayRound { + if err := VerifyAgreementResult(r, config.notarySet); err != nil { + return false, err + } + if bytes.Compare(r.Randomness, NoRand) != 0 { + return false, ErrIncorrectAgreementResult + } + } else { + ok, err := c.recv.VerifyTSIG(r.Position.Round, r.BlockHash, r.Randomness) + if err != nil { + return false, err + } + if !ok { + return false, ErrIncorrectBlockRandomness + } + } + c.lock.RLock() + defer c.lock.RUnlock() + if c.isIgnorableNoLock(r.Position) { + return false, nil + } + return true, nil +} + +func (c *agreementCache) checkFinalizedBlock(b *types.Block) (bool, error) { + if b.Position.Round < DKGDelayRound { + // Finalized blocks from rounds before DKGDelayRound can't be the proof + // of any agreement. + return false, nil + } + ok, err := c.recv.VerifyTSIG(b.Position.Round, b.Hash, b.Randomness) + if err != nil { + return false, err + } + if !ok { + return false, ErrIncorrectBlockRandomness + } + c.lock.RLock() + defer c.lock.RUnlock() + return !c.isIgnorableNoLock(b.Position), nil +} + +func (c *agreementCache) trigger(v types.Vote, vSet *voteSet, + config agreementCacheConfig) (e *agreementEvent, err error) { + var ( + maxVotes = len(config.notarySet) + requiredVotes = maxVotes*2/3 + 1 + ) + newDecideEvt := func(vSet *voteSet) (*agreementEvent, error) { + votes := vSet.votesByHash(vSet.best, requiredVotes) + if v.Position.Round < DKGDelayRound { + return newAgreementEvent(agreementEventDecide, votes), nil + } + tsig, err := c.recv.RecoverTSIG(vSet.best, votes) + if err != nil { + return nil, err + } + e := newAgreementEventFromTSIG(v.Position, vSet.best, tsig, + vSet.best == types.NullBlockHash) + return e, nil + } + switch v.Type { + case types.VoteCom: + // 2t+1 commit votes on SKIP should be handled by condition#3, other + // cases should trigger a "decide" event. + if vSet.best != types.SkipBlockHash && + vSet.counts[vSet.best] >= requiredVotes { + e, err = newDecideEvt(vSet) + break + } + // It's the condition#3, there are more than 2t+1 commit votes for + // different values or skip. + if vSet.triggeredEvent == nil && len(vSet.votes) >= requiredVotes { + copiedVotes := make([]types.Vote, requiredVotes) + copy(copiedVotes, vSet.votesAsList) + e = newAgreementEvent(agreementEventForward, copiedVotes) + break + } + case types.VoteFastCom: + if vSet.best == types.SkipBlockHash || + vSet.best == types.NullBlockHash || + vSet.counts[vSet.best] < requiredVotes { + break + } + e, err = newDecideEvt(vSet) + case types.VotePreCom, types.VoteFast: + if vSet.counts[vSet.best] < requiredVotes { + break + } + e = newAgreementEvent( + agreementEventLock, vSet.votesByHash(vSet.best, requiredVotes)) + } + if err != nil { + return + } + if e == nil { + // TODO(mission): this threshold can be lowered to f+1, however, it + // might not be equal to better performance (In most + // cases we should be able to reach agreement in one + // period.) Need to benchmark before adjusting it. + if v.Type != types.VoteCom && + len(vSet.votesAsList) >= requiredVotes && + !vSet.isAgreedOnOneHashPossible(maxVotes, requiredVotes) { + vSet.setEvent(nil, true) + } + return + } + // Overwriting a triggered decide event is not valid. + if vSet.triggeredEvent != nil && + vSet.triggeredEvent.evtType == agreementEventDecide { + panic(fmt.Errorf("attempt to overwrite a decide signal: %s %s", + vSet.triggeredEvent, e)) + } + if v.Type == types.VoteCom && e.evtType == agreementEventForward { + // A group of commit votes might trigger a decide signal after + // triggering a forward signal. + vSet.setEvent( + e, !vSet.isAgreedOnOneHashPossible(maxVotes, requiredVotes)) + } else { + vSet.setEvent(e, true) + } + return +} + +func (c *agreementCache) updateReferenceEvent( + e *agreementEvent) (updated bool) { + refEvt := c.refEvts[e.evtType] + if refEvt != nil { + // Make sure we are raising signal forwarding. All signals from + // older position or older period of the same position should never + // happen, except fork votes. + if e.Position.Older(refEvt.Position) { + panic(fmt.Errorf("backward agreement event: %s %s", refEvt, e)) + } + if e.Position.Equal(refEvt.Position) { + switch e.evtType { + case agreementEventDecide: + panic(fmt.Errorf("duplicated decided event: %s %s", refEvt, e)) + case agreementEventLock: + if e.period > refEvt.period { + break + } + panic(fmt.Errorf("backward lock event: %s %s", refEvt, e)) + case agreementEventForward: + // It's possible for forward signal triggered in older period, + // we should not panic it. + if e.period <= refEvt.period { + return + } + } + } + } + updated = true + c.refEvts[e.evtType] = e + switch e.evtType { + case agreementEventLock: + // A lock signal by commit votes should trigger period forwarding, too. + eF := c.refEvts[agreementEventForward] + if eF != nil { + if eF.Position.Newer(e.Position) { + break + } + if eF.Position.Equal(e.Position) && eF.period <= e.period { + break + } + } + c.refEvts[agreementEventForward] = e + case agreementEventDecide: + clearRef := func(eType agreementEventType) { + if c.refEvts[eType] == nil { + return + } + if c.refEvts[eType].Position.Newer(e.Position) { + return + } + c.refEvts[eType] = nil + } + clearRef(agreementEventForward) + clearRef(agreementEventLock) + c.refPosition.Store(e.Position) + } + return +} + +func (c *agreementCache) purgeBy(e *agreementEvent) { + purgeByPeriod := func(vForPeriod []*voteSet) { + for vType, vSet := range vForPeriod { + switch types.VoteType(vType) { + case types.VotePreCom, types.VoteFast: + vSet.setEvent(nil, true) + } + } + } + switch e.evtType { + case agreementEventDecide: + for p := range c.vSets { + if !p.Newer(e.Position) { + delete(c.vSets, p) + } + } + // Purge notary set by position when decided. + firstRoundID := c.configs[0].RoundID() + if e.Position.Round > firstRoundID { + c.configs = c.configs[e.Position.Round-firstRoundID:] + } + case agreementEventLock: + // For older positions, purge all pre-commit/fast votes. + for p, vForPosition := range c.vSets { + if !p.Older(e.Position) { + continue + } + for _, vForPeriod := range vForPosition { + purgeByPeriod(vForPeriod) + } + } + // Only locked signal can be used to purge older periods in the same + // position. + vForPosition := c.vSets[e.Position] + for period, vForPeriod := range vForPosition { + if period >= e.period { + continue + } + // It's safe to purge votes in older periods, except for + // commit/fast-commit votes: a decide signal should be raised from + // any period even if we've locked on some later period. We can only + // purge those votes when it's impossible to trigger an decide + // signal from that period. + purgeByPeriod(vForPeriod) + } + } +} + +func newAgreementCache(recv agreementCacheReceiver) (c *agreementCache) { + c = &agreementCache{ + vSets: make(map[types.Position]map[uint64][]*voteSet), + refEvts: make([]*agreementEvent, maxAgreementEventType), + lastSnapshotCh: make(chan time.Time, 1), + recv: recv, + } + c.lastSnapshotCh <- time.Time{} + c.lastSnapshot.Store(&agreementSnapshot{}) + c.refPosition.Store(types.Position{}) + return +} + +func (c *agreementCache) notifyRoundEvents( + evts []utils.RoundEventParam) error { + apply := func(e utils.RoundEventParam) error { + if len(c.configs) > 0 { + lastCfg := c.configs[len(c.configs)-1] + if e.BeginHeight != lastCfg.RoundEndHeight() { + return ErrInvalidBlockHeight + } + if lastCfg.RoundID() == e.Round { + c.configs[len(c.configs)-1].ExtendLength() + } else if lastCfg.RoundID()+1 == e.Round { + notarySet, err := c.recv.GetNotarySet(e.Round) + if err != nil { + return err + } + c.configs = append(c.configs, newAgreementCacheConfig( + lastCfg, e.Config, notarySet)) + } else { + return ErrInvalidRoundID + } + } else { + notarySet, err := c.recv.GetNotarySet(e.Round) + if err != nil { + return err + } + cfg := agreementCacheConfig{} + cfg.from(e.Round, e.Config, notarySet) + cfg.SetRoundBeginHeight(e.BeginHeight) + c.configs = append(c.configs, cfg) + } + return nil + } + c.configsLock.Lock() + defer c.configsLock.Unlock() + for _, e := range evts { + if err := apply(e); err != nil { + return err + } + } + return nil +} + +func (c *agreementCache) processVote( + v types.Vote) (evts []*agreementEvent, err error) { + if c.isIgnorable(v.Position) { + return + } + config, found := c.config(v.Position.Round) + if !found { + err = ErrRoundOutOfRange + return + } + ok, forkEvt, err := c.checkVote(v, config) + if forkEvt != nil { + evts = append(evts, forkEvt) + } + if err != nil || !ok { + return + } + c.lock.Lock() + defer c.lock.Unlock() + // Although we've checked if this vote is ignorable in checkVote method, + // it might become ignorable before we acquire writer lock. + if c.isVoteIgnorable(v) { + return + } + vSet := c.votes(v, config, true) + switch { + case vSet.isPurged(): + case vSet.triggeredEvent != nil: + if v.Type == types.VotePreCom || v.Type == types.VoteFast { + break + } + fallthrough + default: + if oldV, forked := vSet.add(v); forked { + panic(fmt.Errorf("unexpected forked vote: %s %s", &oldV, &v)) + } + var evt *agreementEvent + evt, err = c.trigger(v, vSet, config) + if err != nil { + break + } + if evt != nil { + if c.updateReferenceEvent(evt) { + c.purgeBy(evt) + evts = append(evts, evt) + } + } + } + return +} + +func (c *agreementCache) processAgreementResult( + r *types.AgreementResult) (evts []*agreementEvent, err error) { + if c.isIgnorable(r.Position) { + return + } + config, found := c.config(r.Position.Round) + if !found { + err = ErrRoundOutOfRange + return + } + ok, err := c.checkResult(r, config) + if err != nil || !ok { + return + } + c.lock.Lock() + defer c.lock.Unlock() + if c.isIgnorableNoLock(r.Position) { + return + } + var evt *agreementEvent + if r.Position.Round >= DKGDelayRound { + evt = newAgreementEventFromTSIG( + r.Position, r.BlockHash, r.Randomness, r.IsEmptyBlock) + } else { + vSet := c.votes(r.Votes[0], config, true) + for _, v := range r.Votes { + if oldV, forked := vSet.add(v); forked { + evts = append(evts, newAgreementEvent( + agreementEventFork, []types.Vote{oldV, v})) + } + } + evt, err = c.trigger(r.Votes[0], vSet, config) + } + if err != nil { + return + } + if evt == nil { + err = ErrUnableToAgree + return + } + if c.updateReferenceEvent(evt) { + c.purgeBy(evt) + evts = append(evts, evt) + } else { + // It should be treated as error when unable to proceed via + // types.AgreementResult. + err = fmt.Errorf("unable to assign decide event: %s from %s", evt, r) + return + } + return +} + +func (c *agreementCache) processFinalizedBlock( + b *types.Block) (evts []*agreementEvent, err error) { + if c.isIgnorable(b.Position) { + return + } + ok, err := c.checkFinalizedBlock(b) + if err != nil || !ok { + return + } + c.lock.Lock() + defer c.lock.Unlock() + if c.isIgnorableNoLock(b.Position) { + return + } + evt := newAgreementEventFromTSIG( + b.Position, b.Hash, b.Randomness, b.IsEmpty()) + if c.updateReferenceEvent(evt) { + c.purgeBy(evt) + evts = append(evts, evt) + } else { + // It should be treated as error when unable to proceed via finalized + // blocks. + err = fmt.Errorf("unable to assign decide event: %s from %s", evt, b) + return + } + return +} + +func (c *agreementCache) snapshot(expect time.Time) *agreementSnapshot { + snapshoted := <-c.lastSnapshotCh + defer func() { + c.lastSnapshotCh <- snapshoted + }() + if !snapshoted.Before(expect) { + // Reuse current snapshot, someone else might perform snapshot right + // before us. + return c.lastSnapshot.Load().(*agreementSnapshot) + } + // NOTE: There is no clue to decide which round of config to use as snapshot + // refresh interval, simply pick the first one as workaround. + var refreshInterval = 200 * time.Millisecond + if config, found := c.firstConfig(); found { + // The interval to refresh the cache should be shorter than lambdaBA. + refreshInterval = config.lambdaBA * 4 / 7 + } + c.lock.RLock() + defer c.lock.RUnlock() + ss := &agreementSnapshot{ + evtDecide: c.refEvts[agreementEventDecide], + } + var positions []types.Position + for position := range c.vSets { + positions = append(positions, position) + } + sort.Slice(positions, func(i, j int) bool { + return positions[i].Older(positions[j]) + }) + // All votes in cache are useful votes, so just keep them all in snapshot. + // They would be orgainzed in acending order of (position, period) of votes. + for _, position := range positions { + vForPosition := c.vSets[position] + var periods []uint64 + for period := range vForPosition { + periods = append(periods, period) + } + sort.Slice(periods, func(i, j int) bool { + return periods[i] < periods[j] + }) + for _, period := range periods { + vForPeriod := vForPosition[period] + votes := []types.Vote{} + for t, vSet := range vForPeriod { + if t == int(types.VotePreCom) || t == int(types.VoteFast) { + votes = vSet.appendTo(votes) + } + } + ss.addPreCommitVotes(position, period, votes) + } + } + ss.markBoundary() + // It's the part for commit/fast-commit votes, they should be appended in + // reversed order. + for i := range positions { + position := positions[len(positions)-i-1] + vForPosition := c.vSets[position] + var periods []uint64 + for period := range vForPosition { + periods = append(periods, period) + } + sort.Slice(periods, func(i, j int) bool { + return periods[i] > periods[j] + }) + for _, period := range periods { + vForPeriod := vForPosition[period] + votes := []types.Vote{} + for t, vSet := range vForPeriod { + if t == int(types.VoteFastCom) || t == int(types.VoteCom) { + votes = vSet.appendTo(votes) + } + } + ss.addCommitVotes(position, period, votes) + } + } + ss.expired = time.Now().Add(refreshInterval) + c.lastSnapshot.Store(ss) + snapshoted = ss.expired + return ss +} + +// pull latest agreement results, and votes for ongoing BA. This method can be +// called concurrently. +func (c *agreementCache) pull(p types.Position, lockPeriod uint64) ( + r *types.AgreementResult, votes []types.Vote) { + snapshot := c.lastSnapshot.Load().(*agreementSnapshot) + now := time.Now() + if now.After(snapshot.expired) { + snapshot = c.snapshot(now) + } + return snapshot.get(p, lockPeriod) +} diff --git a/core/agreement-cache_test.go b/core/agreement-cache_test.go new file mode 100644 index 0000000..fa1c8dd --- /dev/null +++ b/core/agreement-cache_test.go @@ -0,0 +1,907 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + +package core + +import ( + "bytes" + "math/rand" + "sort" + "testing" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core/crypto" + "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" + "github.com/stretchr/testify/suite" +) + +func countVotes(votes []types.Vote, p types.Position, period uint64, + t types.VoteType) (cnt int) { + for _, v := range votes { + if !v.Position.Equal(p) { + continue + } + if v.Period != period { + continue + } + if v.Type != t { + continue + } + cnt++ + } + return +} + +type testAgreementCacheReceiver struct { + s *AgreementCacheTestSuite + isTsigValid bool +} + +func (r *testAgreementCacheReceiver) GetNotarySet( + round uint64) (map[types.NodeID]struct{}, error) { + return r.s.notarySet, nil +} + +func (r *testAgreementCacheReceiver) RecoverTSIG( + blockHash common.Hash, votes []types.Vote) ([]byte, error) { + return []byte("OK"), nil +} + +func (r *testAgreementCacheReceiver) VerifyTSIG( + round uint64, hash common.Hash, tSig []byte) (bool, error) { + return r.isTsigValid, nil +} + +func (r *testAgreementCacheReceiver) VerifySignature(hash common.Hash, + sig crypto.Signature) bool { + return bytes.Compare(sig.Signature, []byte("OK")) == 0 +} + +type AgreementCacheTestSuite struct { + suite.Suite + notarySet map[types.NodeID]struct{} + requiredVotes int + f int + roundLength uint64 + signers []*utils.Signer + cache *agreementCache + recv *testAgreementCacheReceiver +} + +func (s *AgreementCacheTestSuite) SetupSuite() { + var ( + prvKeys []crypto.PrivateKey + pubKeys []crypto.PublicKey + count = 7 + ) + for i := 0; i < count; i++ { + prvKey, err := ecdsa.NewPrivateKey() + s.Require().NoError(err) + prvKeys = append(prvKeys, prvKey) + pubKeys = append(pubKeys, prvKey.PublicKey()) + } + for _, k := range prvKeys { + s.signers = append(s.signers, utils.NewSigner(k)) + } + s.f = count / 3 + s.requiredVotes = 2*s.f + 1 + s.roundLength = 100 + s.notarySet = make(map[types.NodeID]struct{}) + for _, k := range pubKeys { + s.notarySet[types.NewNodeID(k)] = struct{}{} + } + s.recv = &testAgreementCacheReceiver{s: s} +} + +func (s *AgreementCacheTestSuite) SetupTest() { + s.recv.isTsigValid = true + s.newCache(s.newRoundEvents(1)) +} + +func (s *AgreementCacheTestSuite) newVote(t types.VoteType, h common.Hash, + p types.Position, period uint64, signer *utils.Signer) *types.Vote { + v := types.NewVote(t, h, period) + v.Position = p + s.Require().NoError(signer.SignVote(v)) + return v +} + +func (s *AgreementCacheTestSuite) newVotes(t types.VoteType, h common.Hash, + p types.Position, period uint64) (votes []types.Vote) { + for _, signer := range s.signers { + votes = append(votes, *s.newVote(t, h, p, period, signer)) + } + return +} + +func (s *AgreementCacheTestSuite) testVotes( + votes []types.Vote, eType agreementEventType) (e *agreementEvent) { + for idx := range votes { + evts, err := s.cache.processVote(votes[idx]) + s.Require().NoError(err) + if idx+1 < s.requiredVotes { + s.Require().Empty(evts) + } + if idx+1 == s.requiredVotes { + s.Require().Len(evts, 1) + s.Require().Equal(evts[0].evtType, eType) + s.Require().Equal(evts[0].Position, votes[0].Position) + if votes[0].Position.Round < DKGDelayRound || + eType != agreementEventDecide { + s.Require().Equal(evts[0].period, votes[0].Period) + s.Require().Len(evts[0].Votes, s.requiredVotes) + } + e = evts[0] + break + } + } + // Replay those votes again won't trigger another event. + for idx := range votes { + evts, err := s.cache.processVote(votes[idx]) + s.Require().NoError(err) + s.Require().Empty(evts) + if idx+1 == s.requiredVotes { + break + } + } + return +} + +func (s *AgreementCacheTestSuite) takeSubset(ss *agreementSnapshot, + p types.Position, period uint64, length int) ( + *types.AgreementResult, []types.Vote) { + s.Require().NotNil(ss) + r, votes := ss.get(p, period) + s.Require().Len(votes, length) + return r, votes +} + +func (s *AgreementCacheTestSuite) newRoundEvents( + round uint64) (rEvts []utils.RoundEventParam) { + h := types.GenesisHeight + for r := uint64(0); r <= round; r++ { + rEvts = append(rEvts, utils.RoundEventParam{ + Round: r, + BeginHeight: h, + Config: &types.Config{ + LambdaBA: 100 * time.Millisecond, + RoundLength: s.roundLength, + }, + }) + h += s.roundLength + } + return +} + +func (s *AgreementCacheTestSuite) newCache(rEvts []utils.RoundEventParam) { + s.cache = newAgreementCache(s.recv) + s.Require().NotNil(s.cache) + s.Require().NoError(s.cache.notifyRoundEvents(rEvts)) +} + +func (s *AgreementCacheTestSuite) generateTestData( + positionCount, periodCount uint64) ( + lastPosition types.Position, + votes []types.Vote, bs []*types.Block, rs []*types.AgreementResult) { + gen := func(p types.Position, period uint64) ( + vs []types.Vote, b *types.Block, r *types.AgreementResult) { + h := common.NewRandomHash() + v := s.newVotes(types.VotePreCom, h, p, period) + vs = append(vs, v...) + vCom := s.newVotes(types.VoteCom, h, p, period) + vs = append(vs, vCom...) + if period == 0 { + v = s.newVotes(types.VoteFast, h, p, period) + vs = append(vs, v...) + v = s.newVotes(types.VoteFastCom, h, p, period) + vs = append(vs, v...) + } + r = &types.AgreementResult{BlockHash: h, Position: p} + b = &types.Block{Hash: h, Position: p} + if p.Round >= DKGDelayRound { + r.Randomness = []byte("cold-freeze") + b.Randomness = []byte("blackhole-rocks") + } else { + r.Votes = vCom[:s.requiredVotes] + r.Randomness = NoRand + b.Randomness = NoRand + } + return + } + for i := uint64(0); i < positionCount; i++ { + lastPosition = types.Position{Height: types.GenesisHeight + i} + lastPosition.Round = uint64(i) / s.roundLength + for period := uint64(0); period < periodCount; period++ { + vs, b, r := gen(lastPosition, period) + votes = append(votes, vs...) + bs = append(bs, b) + rs = append(rs, r) + } + } + return +} + +func (s *AgreementCacheTestSuite) TestAgreementResult() { + var ( + hash = common.NewRandomHash() + position = types.Position{Round: 0, Height: 3} + ) + // An agreement result with fast-commit votes can trigger a decide event. + r := &types.AgreementResult{ + BlockHash: hash, + Position: position, + Votes: s.newVotes(types.VoteFastCom, hash, position, 1), + Randomness: NoRand, + } + evts, err := s.cache.processAgreementResult(r) + s.Require().NoError(err) + s.Require().Len(evts, 1) + s.Require().Equal(evts[0].evtType, agreementEventDecide) + s.Require().Equal(evts[0].Position, position) + s.Require().Equal(evts[0].period, uint64(1)) + s.Require().Len(evts[0].Votes, len(r.Votes)) + // An agreement result with commit votes can trigger a decide event. + position.Height++ + r = &types.AgreementResult{ + BlockHash: hash, + Position: position, + Votes: s.newVotes(types.VoteCom, hash, position, 1), + Randomness: NoRand, + } + evts, err = s.cache.processAgreementResult(r) + s.Require().NoError(err) + s.Require().Len(evts, 1) + s.Require().Equal(evts[0].evtType, agreementEventDecide) + s.Require().Equal(evts[0].Position, position) + s.Require().Equal(evts[0].period, uint64(1)) + s.Require().Len(evts[0].Votes, len(r.Votes)) + // An agreement result from older position would be ignored. + position.Height-- + evts, err = s.cache.processAgreementResult(&types.AgreementResult{ + BlockHash: hash, + Position: position, + Votes: s.newVotes(types.VoteCom, hash, position, 1), + Randomness: NoRand, + }) + s.Require().NoError(err) + s.Require().Empty(evts) + position.Height++ + // An agreement result contains fork votes should be detected, while still + // be able to trigger a decide event. + position.Height++ + forkedVote := s.newVote( + types.VoteCom, common.NewRandomHash(), position, 1, s.signers[0]) + evts, err = s.cache.processVote(*forkedVote) + s.Require().NoError(err) + s.Require().Empty(evts) + hash = common.NewRandomHash() + evts, err = s.cache.processAgreementResult(&types.AgreementResult{ + BlockHash: hash, + Position: position, + Votes: s.newVotes(types.VoteCom, hash, position, 1), + Randomness: NoRand, + }) + s.Require().NoError(err) + s.Require().Len(evts, 2) + s.Require().Equal(evts[0].evtType, agreementEventFork) + s.Require().Equal(evts[0].Position, position) + s.Require().Equal(evts[0].period, uint64(1)) + s.Require().Equal(evts[1].evtType, agreementEventDecide) + s.Require().Equal(evts[1].Position, position) + s.Require().Equal(evts[1].period, uint64(1)) + // An agreement result with valid TSIG can trigger a decide event. + position.Round = 1 + position.Height++ + hash = common.NewRandomHash() + evts, err = s.cache.processAgreementResult(&types.AgreementResult{ + BlockHash: hash, + Position: position, + Randomness: []byte("OK-LA"), + }) + s.Require().NoError(err) + s.Require().Len(evts, 1) + s.Require().Equal(evts[0].evtType, agreementEventDecide) + s.Require().Equal(evts[0].Position, position) + s.Require().Equal(evts[0].Randomness, []byte("OK-LA")) + // An agreement result with invalid TSIG would raise error. + s.recv.isTsigValid = false + position.Height++ + hash = common.NewRandomHash() + evts, err = s.cache.processAgreementResult(&types.AgreementResult{ + BlockHash: hash, + Position: position, + Randomness: []byte("NOK"), + }) + s.Require().EqualError(err, ErrIncorrectBlockRandomness.Error()) + s.Require().Empty(evts) + s.recv.isTsigValid = true +} + +func (s *AgreementCacheTestSuite) TestBasicUsage() { + var ( + hash = common.NewRandomHash() + position = types.Position{Round: 0, Height: types.GenesisHeight} + ) + // If there are 2t+1 pre-commit votes for the same value, it should raise + // a lock event. + s.testVotes( + s.newVotes(types.VotePreCom, hash, position, 1), agreementEventLock) + // If there are 2t+1 commit votes for the same value, it should raise + // a decide event. + s.testVotes( + s.newVotes(types.VoteCom, hash, position, 1), agreementEventDecide) + // If there are 2t+1 fast votes for the same value, it should raise + // a lock event. + position.Height++ + hash = common.NewRandomHash() + s.testVotes( + s.newVotes(types.VoteFast, hash, position, 1), agreementEventLock) + // If there are 2t+1 commit votes for SKIP, it should raise a forward + // event. + position.Height++ + hash = types.SkipBlockHash + s.testVotes( + s.newVotes(types.VoteCom, hash, position, 1), agreementEventForward) + // If there are 2t+1 commit votes for different value, it should raise + // a forward event. + position.Height++ + hash = common.NewRandomHash() + votes01 := s.newVotes(types.VoteCom, hash, position, 1) + hash = common.NewRandomHash() + votes02 := s.newVotes(types.VoteCom, hash, position, 1) + votes := append([]types.Vote(nil), votes01[0]) + votes = append(votes, votes02[1:]...) + s.testVotes(votes, agreementEventForward) + // If a forked vote is detected, it should raise a fork event. + position.Height++ + hash = common.NewRandomHash() + votes01 = s.newVotes(types.VotePreCom, hash, position, 1) + hash = common.NewRandomHash() + votes02 = s.newVotes(types.VotePreCom, hash, position, 1) + evts, err := s.cache.processVote(votes01[0]) + s.Require().NoError(err) + s.Require().Empty(evts) + evts, err = s.cache.processVote(votes02[0]) + s.Require().NoError(err) + s.Require().Len(evts, 1) + s.Require().Equal(evts[0].evtType, agreementEventFork) + s.Require().Equal(evts[0].Position, position) + s.Require().Equal(evts[0].period, uint64(1)) + s.Require().Len(evts[0].Votes, 2) + s.Require().Equal(evts[0].Votes[0], votes01[0]) + s.Require().Equal(evts[0].Votes[1], votes02[0]) +} + +func (s *AgreementCacheTestSuite) TestDecideInOlderPeriod() { + var ( + hash = common.NewRandomHash() + position = types.Position{Round: 0, Height: types.GenesisHeight} + ) + // Trigger fast-forward via condition#3. + hash = common.NewRandomHash() + votes01 := s.newVotes(types.VoteCom, hash, position, 1) + hash = common.NewRandomHash() + votes02 := s.newVotes(types.VoteCom, hash, position, 1) + votes := append(votes01[:1], votes02[1:s.requiredVotes]...) + s.testVotes(votes, agreementEventForward) + // Trigger fast-forward by pre-commit votes in later period. + hash = common.NewRandomHash() + s.testVotes( + s.newVotes(types.VotePreCom, hash, position, 2), agreementEventLock) + // Process a commit vote in period#1, should still trigger a decide event. + evts, err := s.cache.processVote(votes02[s.requiredVotes]) + s.Require().NoError(err) + s.Require().Len(evts, 1) + s.Require().Equal(evts[0].evtType, agreementEventDecide) + s.Require().Equal(evts[0].Position, position) + s.Require().Equal(evts[0].period, uint64(1)) +} + +func (s *AgreementCacheTestSuite) TestDecideAfterForward() { + var ( + hash = common.NewRandomHash() + position = types.Position{ + Round: 1, + Height: types.GenesisHeight + s.roundLength, + } + ) + // Trigger fast-forward via condition#3. + hash = common.NewRandomHash() + votes01 := s.newVotes(types.VoteCom, hash, position, 1) + hash = common.NewRandomHash() + votes02 := s.newVotes(types.VoteCom, hash, position, 1) + votes := append(votes01[:1], votes02[1:s.requiredVotes]...) + s.testVotes(votes, agreementEventForward) + // Send remain votes of one hash to see if a decide event can be triggered. + evts, err := s.cache.processVote(votes02[s.requiredVotes]) + s.Require().NoError(err) + s.Require().Len(evts, 1) + s.Require().Equal(evts[0].evtType, agreementEventDecide) +} + +func (s *AgreementCacheTestSuite) TestDecideByFinalizedBlock() { + // A finalized block from round before DKGDelayRound won't trigger a decide + // event. + + // A finalized block from round after DKGDelayRound would trigger a decide + // event. + + // A finalized block from older position would be ignored. +} + +func (s *AgreementCacheTestSuite) TestFastBA() { + var ( + hash = common.NewRandomHash() + position = types.Position{Round: 0, Height: types.GenesisHeight} + ) + test := func() { + // Fast -> FastCom, successfuly confirmed by Fast mode. + s.testVotes(s.newVotes(types.VoteFast, hash, position, 1), + agreementEventLock) + s.testVotes(s.newVotes(types.VoteFastCom, hash, position, 1), + agreementEventDecide) + // Fast -> PreCom -> Com, confirmed by RBA. + position.Height++ + s.testVotes( + s.newVotes(types.VoteFast, hash, position, 1), agreementEventLock) + s.testVotes( + s.newVotes(types.VotePreCom, hash, position, 2), agreementEventLock) + s.testVotes( + s.newVotes(types.VoteCom, hash, position, 2), agreementEventDecide) + } + // The case for rounds before DKGDelayRound. + test() + // The case for rounds after DKGDelayRound. + position.Round = 1 + position.Height = types.GenesisHeight + s.roundLength + test() +} + +func (s *AgreementCacheTestSuite) TestSnapshot() { + var ( + hash = common.NewRandomHash() + count = s.requiredVotes - 1 + ) + process := func(vs []types.Vote, until int) []types.Vote { + for i := range vs[:until] { + evts, err := s.cache.processVote(vs[i]) + s.Require().NoError(err) + s.Require().Empty(evts) + } + return vs + } + // Process some votes without triggering any event. + // + // Below is the expected slice of votes when taking snapshot. + // |<- pre-commit/fast ->|<-commit/fast-commit ->| + // |P0,r1|P1,r0|P1,r1|P1,r2|P2,r1|P2,r1|P1,r2|P1,r0|P0,r1| + p0 := types.Position{Round: 1, Height: types.GenesisHeight + s.roundLength} + process(s.newVotes(types.VoteFast, hash, p0, 1), count) + process(s.newVotes(types.VotePreCom, hash, p0, 1), count) + process(s.newVotes(types.VoteCom, hash, p0, 1), count) + process(s.newVotes(types.VoteFastCom, hash, p0, 1), count) + p1 := p0 + p1.Height++ + process(s.newVotes(types.VoteCom, hash, p1, 0), count) + process(s.newVotes(types.VoteFastCom, hash, p1, 0), count) + process(s.newVotes(types.VotePreCom, hash, p1, 0), count) + process(s.newVotes(types.VoteFast, hash, p1, 0), count) + process(s.newVotes(types.VoteFast, hash, p1, 1), count) + process(s.newVotes(types.VotePreCom, hash, p1, 1), count) + process(s.newVotes(types.VoteFast, hash, p1, 2), count) + process(s.newVotes(types.VotePreCom, hash, p1, 2), count) + process(s.newVotes(types.VoteCom, hash, p1, 2), count) + process(s.newVotes(types.VoteFastCom, hash, p1, 2), count) + p2 := p1 + p2.Height++ + process(s.newVotes(types.VoteFast, hash, p2, 1), count) + process(s.newVotes(types.VotePreCom, hash, p2, 1), count) + process(s.newVotes(types.VoteCom, hash, p2, 1), count) + process(s.newVotes(types.VoteFastCom, hash, p2, 1), count) + // The snapshot should contains all processed votes. + ss := s.cache.snapshot(time.Now()) + s.takeSubset(ss, types.Position{}, 0, 18*count) + // all votes in older position would be igonred. + _, vs := s.takeSubset(ss, p1, 1, 10*count) + s.Require().Equal(0, countVotes(vs, p0, 1, types.VotePreCom)) + s.Require().Equal(0, countVotes(vs, p0, 1, types.VoteFast)) + s.Require().Equal(0, countVotes(vs, p0, 1, types.VoteCom)) + s.Require().Equal(0, countVotes(vs, p0, 1, types.VoteFastCom)) + // pre-commit/fast votes in older or equal period of the same position would + // be ignored. + s.Require().Equal(0, countVotes(vs, p1, 0, types.VotePreCom)) + s.Require().Equal(0, countVotes(vs, p1, 0, types.VoteFast)) + s.Require().Equal(0, countVotes(vs, p1, 1, types.VoteCom)) + s.Require().Equal(0, countVotes(vs, p1, 1, types.VoteFastCom)) + // pre-commit/fast votes in newer period of the same position would not be + // ignored. + s.Require().Equal(count, countVotes(vs, p1, 2, types.VotePreCom)) + s.Require().Equal(count, countVotes(vs, p1, 2, types.VoteFast)) + // commit/fast-commit votes in the same position can't be ignored. + s.Require().Equal(count, countVotes(vs, p1, 0, types.VoteCom)) + s.Require().Equal(count, countVotes(vs, p1, 0, types.VoteFastCom)) + s.Require().Equal(count, countVotes(vs, p1, 2, types.VoteCom)) + s.Require().Equal(count, countVotes(vs, p1, 2, types.VoteFastCom)) + // all votes in newer position would be kept. + s.Require().Equal(count, countVotes(vs, p2, 1, types.VotePreCom)) + s.Require().Equal(count, countVotes(vs, p2, 1, types.VoteFast)) + s.Require().Equal(count, countVotes(vs, p2, 1, types.VoteCom)) + s.Require().Equal(count, countVotes(vs, p2, 1, types.VoteFastCom)) + // Take an empty subset. + s.takeSubset(ss, types.Position{Round: 1, Height: 1000}, 0, 0) + // Take a subset contains only commit/fast-commit votes. + _, vs = s.takeSubset(ss, p2, 1, 2*count) + s.Require().Equal(0, countVotes(vs, p2, 1, types.VotePreCom)) + s.Require().Equal(0, countVotes(vs, p2, 1, types.VoteFast)) + s.Require().Equal(count, countVotes(vs, p2, 1, types.VoteCom)) + s.Require().Equal(count, countVotes(vs, p2, 1, types.VoteFastCom)) + // Taks a subset contains only pre-commit/fast votes. + p3 := p2 + p3.Height++ + process(s.newVotes(types.VotePreCom, hash, p3, 1), count) + process(s.newVotes(types.VoteFast, hash, p3, 1), count) + ss = s.cache.snapshot(time.Now().Add(time.Second)) + _, vs = s.takeSubset(ss, p3, 0, 2*count) + s.Require().Equal(count, countVotes(vs, p3, 1, types.VotePreCom)) + s.Require().Equal(count, countVotes(vs, p3, 1, types.VoteFast)) +} + +func (s *AgreementCacheTestSuite) TestPurgeByDecideEvent() { + var ( + hash = common.NewRandomHash() + count = s.requiredVotes - 1 + ) + process := func(vs []types.Vote, until int) []types.Vote { + for i := range vs[:until] { + evts, err := s.cache.processVote(vs[i]) + s.Require().NoError(err) + s.Require().Empty(evts) + } + return vs + } + // There are some pre-commit/fast votes unable to trigger any signal. + p00 := types.Position{Round: 1, Height: types.GenesisHeight + s.roundLength} + process(s.newVotes(types.VotePreCom, hash, p00, 1), count) + process(s.newVotes(types.VoteFast, hash, p00, 1), count) + // There are some commit/fast-commit votes unable to trigger any signal. + process(s.newVotes(types.VoteCom, hash, p00, 1), count) + process(s.newVotes(types.VoteFastCom, hash, p00, 1), count) + // There are some pre-commit/fast votes at later position unable to trigger + // any signal. + p01 := p00 + p01.Height++ + s.Require().True(p01.Round >= DKGDelayRound) + process(s.newVotes(types.VotePreCom, hash, p01, 3), count) + process(s.newVotes(types.VoteFast, hash, p01, 3), count) + // There are some commit/fast-commit votes at later position unable to + // trigger any signal. + process(s.newVotes(types.VoteCom, hash, p01, 1), count) + votesFC := process(s.newVotes(types.VoteFastCom, hash, p01, 1), count) + // There are some pre-commit/fast votes at the newest position unable to + // trigger any signal. + p02 := p01 + p02.Height++ + process(s.newVotes(types.VotePreCom, hash, p02, 1), count) + process(s.newVotes(types.VoteFast, hash, p02, 1), count) + // There are some commit/fast-commit votes at the newest position unable to + // trigger any signal. + process(s.newVotes(types.VoteCom, hash, p02, 1), count) + process(s.newVotes(types.VoteFastCom, hash, p02, 1), count) + // Check current snapshot: all votes are exists, no decide event triggered. + ss := s.cache.snapshot(time.Now()) + r, _ := s.takeSubset(ss, types.Position{}, 0, 12*count) + s.Require().Nil(r) + // We receive some commit votes position that can trigger some decide event, + // then those votes should be purged. + evts, err := s.cache.processVote(votesFC[count]) + s.Require().NoError(err) + s.Require().Len(evts, 1) + s.Require().Equal(evts[0].evtType, agreementEventDecide) + // All votes in older/equal position should be purged. + ss = s.cache.snapshot(time.Now().Add(time.Second)) + r, votes := s.takeSubset(ss, types.Position{}, 0, 4*count) + s.Require().NotNil(r) + s.Require().Equal(r.Position, p01) + s.Require().NotEmpty(r.Randomness) + // All votes in later position should be kept. + s.Require().Equal(count, countVotes(votes, p02, 1, types.VotePreCom)) + s.Require().Equal(count, countVotes(votes, p02, 1, types.VoteFast)) + s.Require().Equal(count, countVotes(votes, p02, 1, types.VoteCom)) + s.Require().Equal(count, countVotes(votes, p02, 1, types.VoteFastCom)) +} + +func (s *AgreementCacheTestSuite) TestPrugeByLockEvent() { + var ( + hash = common.NewRandomHash() + count = s.requiredVotes - 1 + ) + process := func(vs []types.Vote, until int) []types.Vote { + for i := range vs[:until] { + evts, err := s.cache.processVote(vs[i]) + s.Require().NoError(err) + s.Require().Empty(evts) + } + return vs + } + // There are some votes unable to trigger any signal. + p00 := types.Position{Round: 1, Height: types.GenesisHeight + s.roundLength} + process(s.newVotes(types.VoteFast, hash, p00, 1), count) + process(s.newVotes(types.VotePreCom, hash, p00, 1), count) + process(s.newVotes(types.VoteFastCom, hash, p00, 1), count) + process(s.newVotes(types.VoteCom, hash, p00, 1), count) + p01 := p00 + p01.Height++ + process(s.newVotes(types.VoteFast, hash, p01, 0), count) + votes := process(s.newVotes(types.VotePreCom, hash, p01, 1), count) + process(s.newVotes(types.VoteCom, hash, p01, 1), count) + process(s.newVotes(types.VotePreCom, hash, p01, 2), count) + ss := s.cache.snapshot(time.Now()) + s.takeSubset(ss, types.Position{}, 0, 8*count) + // Receive some pre-commit votes position that can trigger locked event. + evts, err := s.cache.processVote(votes[count]) + s.Require().NoError(err) + s.Require().Len(evts, 1) + s.Require().Equal(evts[0].evtType, agreementEventLock) + s.Require().Equal(evts[0].Position, votes[count].Position) + s.Require().Equal(evts[0].period, votes[count].Period) + ss = s.cache.snapshot(time.Now().Add(time.Second)) + r, votes := s.takeSubset(ss, types.Position{}, 0, 5*count+1) + s.Require().Nil(r) + // Those pre-commit/fast votes in older position should be purged. + s.Require().Equal(0, countVotes(votes, p00, 1, types.VoteFast)) + s.Require().Equal(0, countVotes(votes, p00, 1, types.VotePreCom)) + // Those pre-commit/fast votes in older period should be purged. + s.Require().Equal(0, countVotes(votes, p01, 0, types.VoteFast)) + // Those pre-commit/fast votes in newer period should be included. + s.Require().Equal(count, countVotes(votes, p01, 2, types.VotePreCom)) + // Those votes triggering events should be included. + s.Require().Equal(count+1, countVotes(votes, p01, 1, types.VotePreCom)) + // We shouldn't purge commit votes by locked event. + s.Require().Equal(count, countVotes(votes, p00, 1, types.VoteCom)) + s.Require().Equal(count, countVotes(votes, p00, 1, types.VoteFastCom)) + s.Require().Equal(count, countVotes(votes, p01, 1, types.VoteCom)) +} + +func (s *AgreementCacheTestSuite) TestPurgeByImpossibleToAgreeOnOneHash() { + var ( + hash = common.NewRandomHash() + count = s.requiredVotes - 1 + ) + process := func(vs []types.Vote, until int) []types.Vote { + for i := range vs[:until] { + evts, err := s.cache.processVote(vs[i]) + s.Require().NoError(err) + s.Require().Empty(evts) + } + return vs + } + // 2f votes for one hash. + p00 := types.Position{Round: 1, Height: types.GenesisHeight + s.roundLength} + process(s.newVotes(types.VotePreCom, hash, p00, 1), count) + s.takeSubset(s.cache.snapshot(time.Now()), types.Position{}, 0, count) + // f+1 votes for another hash. + hash = common.NewRandomHash() + otherVotes := s.newVotes(types.VotePreCom, hash, p00, 1) + process(otherVotes[count:], s.f+1) + s.takeSubset( + s.cache.snapshot(time.Now().Add(time.Second)), types.Position{}, 0, 0) +} + +func (s *AgreementCacheTestSuite) TestIgnoredVotes() { + isIgnored := func(v *types.Vote, ssTime time.Time) { + evts, err := s.cache.processVote(*v) + s.Require().NoError(err) + s.Require().Empty(evts) + ss := s.cache.snapshot(ssTime) + _, votes := ss.get(types.Position{}, 0) + s.Require().Empty(votes) + } + // Make sure the initial state is expected. + _, found := s.cache.config(0) + s.Require().True(found) + c1, found := s.cache.config(1) + s.Require().True(found) + _, found = s.cache.config(2) + s.Require().False(found) + endH := types.GenesisHeight + s.roundLength*2 + s.Require().True(c1.Contains(endH - 1)) + s.Require().False(c1.Contains(endH)) + // Votes from unknown height should be ignored. + h := common.NewRandomHash() + p := types.Position{Round: 1, Height: endH} + v := s.newVote(types.VoteCom, h, p, 1, s.signers[0]) + _, err := s.cache.processVote(*v) + s.Require().EqualError(err, ErrUnknownHeight.Error()) + p = types.Position{Round: 0, Height: 0} + v = s.newVote(types.VoteCom, h, p, 1, s.signers[0]) + isIgnored(v, time.Now()) + // Vote with type=init should be ignored. + p = types.Position{Round: 0, Height: types.GenesisHeight} + v = s.newVote(types.VoteInit, h, p, 1, s.signers[0]) + isIgnored(v, time.Now().Add(time.Second)) +} + +func (s *AgreementCacheTestSuite) TestAgreementSnapshotVotesIndex() { + i0 := agreementSnapshotVotesIndex{ + position: types.Position{Round: 0, Height: types.GenesisHeight}, + period: 0} + i1 := agreementSnapshotVotesIndex{ + position: types.Position{Round: 0, Height: types.GenesisHeight}, + period: 1} + i2 := agreementSnapshotVotesIndex{ + position: types.Position{Round: 0, Height: types.GenesisHeight + 1}, + period: 0} + i3 := agreementSnapshotVotesIndex{ + position: types.Position{Round: 1, Height: types.GenesisHeight}, + period: 0} + i4 := agreementSnapshotVotesIndex{ + position: types.Position{Round: 1, Height: types.GenesisHeight}, + period: 2} + is := agreementSnapshotVotesIndexes{i0, i1, i2, i3, i4} + s.Require().True(sort.SliceIsSorted(is, func(i, j int) bool { + return !is[i].Newer(is[j]) + })) + for i := range is[:len(is)-1] { + s.Require().True(is[i].Older(is[i+1])) + } + for i := range is[:len(is)-1] { + s.Require().True(is[i+1].Newer(is[i])) + } + for _, i := range is { + s.Require().False(i.Older(i)) + s.Require().False(i.Newer(i)) + } + for i := range is[:len(is)-1] { + iFound, found := is.nearestNewerIdx(is[i].position, is[i].period) + s.Require().True(found) + s.Require().Equal(iFound, is[i+1]) + } + _, found := is.nearestNewerIdx(i4.position, i4.period) + s.Require().False(found) + _, found = is.nearestNewerOrEqualIdx(i4.position, i4.period) + s.Require().True(found) +} + +func (s *AgreementCacheTestSuite) TestAgreementSnapshot() { + var ( + h = common.NewRandomHash() + count = s.requiredVotes - 1 + ) + newVotes := func(t types.VoteType, h common.Hash, p types.Position, + period uint64) []types.Vote { + votes := s.newVotes(t, h, p, period) + return votes[:count] + } + ss := agreementSnapshot{} + // |<-pre-commit ->|<- commit ->| + // | P0,1 | P2,2 | P3,1 | P2,2 | P2,1 | P1,0 | + p0 := types.Position{Round: 0, Height: types.GenesisHeight} + p1 := types.Position{Round: 0, Height: types.GenesisHeight + 1} + p2 := types.Position{Round: 0, Height: types.GenesisHeight + 2} + p3 := types.Position{Round: 0, Height: types.GenesisHeight + 3} + p4 := types.Position{Round: 0, Height: types.GenesisHeight + 4} + ss.addPreCommitVotes(p0, 1, newVotes(types.VotePreCom, h, p0, 1)) + ss.addPreCommitVotes(p2, 2, newVotes(types.VoteFast, h, p1, 2)) + ss.addPreCommitVotes(p3, 1, newVotes(types.VoteFast, h, p3, 1)) + // Add commit/fast-common votes in reversed order. + ss.markBoundary() + ss.addCommitVotes(p2, 2, newVotes(types.VoteCom, h, p2, 2)) + ss.addCommitVotes(p2, 1, newVotes(types.VoteFastCom, h, p2, 1)) + ss.addCommitVotes(p1, 0, newVotes(types.VoteCom, h, p1, 0)) + // Get with a very new position, should return empty votes. + r, votes := ss.get(types.Position{Round: 0, Height: 10}, 0) + s.Require().Nil(r) + s.Require().Empty(votes) + // Get with a very old position, should return all votes. + _, votes = ss.get(types.Position{}, 0) + s.Require().Nil(r) + s.Require().Len(votes, 6*count) + // Only the newest pre-commit votes returns. + _, votes = ss.get(p3, 0) + s.Require().Len(votes, count) + s.Require().Equal(count, countVotes(votes, p3, 1, types.VoteFast)) + // pre-commit votes in the same position and period is ignored, commit votes + // in the same position and period is included. + _, votes = ss.get(p2, 2) + s.Require().Len(votes, 3*count) + s.Require().Equal(count, countVotes(votes, p3, 1, types.VoteFast)) + s.Require().Equal(count, countVotes(votes, p2, 1, types.VoteFastCom)) + s.Require().Equal(count, countVotes(votes, p2, 2, types.VoteCom)) + // Only the newest commit votes is included. + ss = agreementSnapshot{} + ss.addPreCommitVotes(p0, 1, newVotes(types.VotePreCom, h, p0, 1)) + ss.markBoundary() + ss.addCommitVotes(p4, 1, newVotes(types.VoteFastCom, h, p4, 1)) + _, votes = ss.get(p4, 10) + s.Require().Len(votes, count) + s.Require().Equal(count, countVotes(votes, p4, 1, types.VoteFastCom)) +} + +func (s *AgreementCacheTestSuite) TestRandomly() { + var ( + positionCount uint64 = 300 + periodCount uint64 = 5 + iteration = 3 + ) + lastPosition, vs, bs, rs := s.generateTestData(positionCount, periodCount) + s.Require().NotEmpty(vs) + s.Require().NotEmpty(bs) + s.Require().NotEmpty(rs) + // Requirements in this scenario: + // - no panic, error should be raised. + // - the latest decide event should be identical the last position. + // - events should be incremental by type. + var lastEvts = make([]*agreementEvent, maxAgreementEventType) + chk := func(evts []*agreementEvent, err error) { + s.Require().NoError(err) + for _, e := range evts { + last := lastEvts[e.evtType] + if last != nil { + s.Require().True(e.Position.Newer(last.Position)) + } + lastEvts[e.evtType] = e + } + } + randObj := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := 0; i < iteration; i++ { + // Shuffle those slices. + randObj.Shuffle(len(vs), func(i, j int) { vs[i], vs[i] = vs[j], vs[i] }) + randObj.Shuffle(len(bs), func(i, j int) { bs[i], bs[j] = bs[j], bs[i] }) + randObj.Shuffle(len(rs), func(i, j int) { rs[i], rs[j] = rs[j], rs[i] }) + for i := range lastEvts { + lastEvts[i] = nil + } + s.newCache(s.newRoundEvents(positionCount/s.roundLength - 1)) + var vIdx, bIdx, rIdx int + for { + switch randObj.Int() % 3 { + case 0: + if vIdx >= len(vs) { + break + } + chk(s.cache.processVote(vs[vIdx])) + vIdx++ + case 1: + if bIdx >= len(bs) { + break + } + chk(s.cache.processFinalizedBlock(bs[bIdx])) + bIdx++ + case 2: + if rIdx >= len(rs) { + break + } + chk(s.cache.processAgreementResult(rs[rIdx])) + rIdx++ + } + if vIdx >= len(vs) && bIdx >= len(bs) && rIdx >= len(rs) { + // Make sure we reach agreement on the last position. + lastDecide := lastEvts[agreementEventDecide] + s.Require().NotNil(lastDecide) + s.Require().Equal(lastDecide.Position, lastPosition) + break + } + } + } +} + +func TestAgreementCache(t *testing.T) { + suite.Run(t, new(AgreementCacheTestSuite)) +} |