aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMission Liao <mission.liao@dexon.org>2019-04-03 16:14:26 +0800
committerMission Liao <mission.liao@dexon.org>2019-04-15 14:41:19 +0800
commit603410f66d9c0bdc23c8d647f5e732e88e32265e (patch)
treecc3a366fc559118db21115a1d5435b5c88ec1799
parent46dab5936d7a9e52f42edaa926a1adb6dcafd81b (diff)
downloaddexon-consensus-603410f66d9c0bdc23c8d647f5e732e88e32265e.tar.gz
dexon-consensus-603410f66d9c0bdc23c8d647f5e732e88e32265e.tar.zst
dexon-consensus-603410f66d9c0bdc23c8d647f5e732e88e32265e.zip
Add agreementCache
-rw-r--r--core/agreement-cache.go943
-rw-r--r--core/agreement-cache_test.go907
2 files changed, 1850 insertions, 0 deletions
diff --git a/core/agreement-cache.go b/core/agreement-cache.go
new file mode 100644
index 0000000..1fd33f6
--- /dev/null
+++ b/core/agreement-cache.go
@@ -0,0 +1,943 @@
+// Copyright 2019 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus/common"
+ "github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
+)
+
+// Errors raised from agreementCache.
+var (
+ ErrUnableToAgree = errors.New("unable to agree")
+ ErrUnknownHeight = errors.New("unknown height")
+)
+
+// voteSet is a place to keep one type of votes for one period in one position.
+type voteSet struct {
+ votes map[types.NodeID]types.Vote
+ counts map[common.Hash]int
+ best common.Hash
+ votesAsList []types.Vote
+ triggeredEvent *agreementEvent
+ purged bool
+}
+
+func newVoteSet(size int) *voteSet {
+ return &voteSet{
+ votes: make(map[types.NodeID]types.Vote, size),
+ votesAsList: make([]types.Vote, 0, size),
+ counts: make(map[common.Hash]int),
+ }
+}
+
+func (s *voteSet) add(v types.Vote) (oldV types.Vote, forked bool) {
+ if s.purged {
+ return
+ }
+ oldV, exist := s.votes[v.ProposerID]
+ if exist {
+ forked = oldV.BlockHash != v.BlockHash
+ return
+ }
+ s.votes[v.ProposerID] = v
+ s.votesAsList = append(s.votesAsList, v)
+ s.counts[v.BlockHash]++
+ if s.counts[v.BlockHash] > s.counts[s.best] {
+ s.best = v.BlockHash
+ }
+ return
+}
+
+func (s *voteSet) votesByHash(h common.Hash, estimatedLength int) []types.Vote {
+ votes := make([]types.Vote, 0, estimatedLength)
+ for _, v := range s.votesAsList {
+ if v.BlockHash == h {
+ votes = append(votes, v)
+ }
+ }
+ return votes
+}
+
+func (s *voteSet) isAgreedOnOneHashPossible(maxCount, threshold int) bool {
+ if len(s.votes) == maxCount {
+ return s.counts[s.best] >= threshold
+ } else if len(s.votes) > maxCount {
+ panic(fmt.Errorf("collecting votes exceeding max count: %v %v %v",
+ maxCount, threshold, len(s.votes)))
+ }
+ return maxCount-len(s.votes)+s.counts[s.best] >= threshold
+}
+
+func (s *voteSet) appendTo(src []types.Vote) []types.Vote {
+ if len(s.votesAsList) == 0 {
+ if s.triggeredEvent == nil {
+ return src
+ }
+ return append(src, s.triggeredEvent.Votes...)
+ }
+ return append(src, s.votesAsList...)
+}
+
+func (s *voteSet) setEvent(e *agreementEvent, purge bool) {
+ if purge {
+ s.votes = nil
+ s.counts = nil
+ s.votesAsList = nil
+ s.purged = true
+ }
+ s.triggeredEvent = e
+}
+
+func (s *voteSet) isPurged() bool {
+ return s.purged
+}
+
+type agreementCacheConfig struct {
+ utils.RoundBasedConfig
+
+ notarySet map[types.NodeID]struct{}
+ lambdaBA time.Duration
+}
+
+func (c *agreementCacheConfig) from(
+ round uint64,
+ config *types.Config,
+ notarySet map[types.NodeID]struct{}) {
+ c.SetupRoundBasedFields(round, config)
+ c.lambdaBA = config.LambdaBA
+ c.notarySet = notarySet
+}
+
+func newAgreementCacheConfig(
+ prev agreementCacheConfig,
+ config *types.Config,
+ notarySet map[types.NodeID]struct{}) (c agreementCacheConfig) {
+ c = agreementCacheConfig{}
+ c.from(prev.RoundID()+1, config, notarySet)
+ c.AppendTo(prev.RoundBasedConfig)
+ return
+}
+
+type agreementSnapshotVotesIndex struct {
+ position types.Position
+ period uint64
+ idx int
+}
+
+func (x agreementSnapshotVotesIndex) Older(o agreementSnapshotVotesIndex) bool {
+ return x.position.Older(o.position) ||
+ (x.position.Equal(o.position) && x.period < o.period)
+}
+
+func (x agreementSnapshotVotesIndex) Newer(o agreementSnapshotVotesIndex) bool {
+ return x.position.Newer(o.position) ||
+ (x.position.Equal(o.position) && x.period > o.period)
+}
+
+type agreementSnapshotVotesIndexes []agreementSnapshotVotesIndex
+
+func (x agreementSnapshotVotesIndexes) nearestNewerIdx(
+ position types.Position,
+ period uint64) (agreementSnapshotVotesIndex, bool) {
+ fakeIdx := agreementSnapshotVotesIndex{position: position, period: period}
+ i := sort.Search(len(x), func(i int) bool {
+ return x[i].Newer(fakeIdx)
+ })
+ if i < len(x) {
+ return x[i], true
+ }
+ return agreementSnapshotVotesIndex{}, false
+}
+
+func (x agreementSnapshotVotesIndexes) nearestNewerOrEqualIdx(
+ position types.Position,
+ period uint64) (agreementSnapshotVotesIndex, bool) {
+ fakeIdx := agreementSnapshotVotesIndex{position: position, period: period}
+ i := sort.Search(len(x), func(i int) bool {
+ return !x[i].Older(fakeIdx)
+ })
+ if i < len(x) {
+ return x[i], true
+ }
+ return agreementSnapshotVotesIndex{}, false
+}
+
+// agreementSnapshot represents a group of ongoing votes that could be pulled
+// to help others nodes to reach consensus. It supports to take a subset of
+// snapshoted votes by providing the latest locked position P and period r.
+// (NOTE: assume P-1 is already decided).
+//
+// When a subset request (P, r) is made, all pre-commit/fast votes later than
+// that position would be returned. And all fast-commit/commit votes later than
+// (P, 0) would be returned, too. To support this kind of access, the "votes"
+// slice in snapshot is splited into two parts and can be viewed as this:
+//
+// |<- pre-commit/fast ->|<- commit/fast-commit ->|
+// |P0,r0|P1,r1|P1,r2|P2,r1|P3,r1|P2,r2|P2,r1|P1,r2|P1,r0|P0,r0|
+//
+// When a pull request (P1,r2) is made, the slots between (P2,r1)->(P3,r1) in
+// pre-commit/fast part and slots between (P1,r0)->(P2,r2) in
+// commit/fast-commit part would be taken, and they are continuous in the slice,
+// thus we don't need to recreate a new slice for pulling.
+type agreementSnapshot struct {
+ expired time.Time
+ votes []types.Vote
+ evtDecide *agreementEvent
+ preComIdxes agreementSnapshotVotesIndexes // pre-commit/fast parts.
+ comIdxes agreementSnapshotVotesIndexes // commit/fast-commit parts.
+ boundary int // middle line between parts.
+}
+
+func (s agreementSnapshot) get(position types.Position, lockPeriod uint64) (
+ r *types.AgreementResult, votes []types.Vote) {
+ if s.evtDecide != nil && s.evtDecide.Position.Newer(position) {
+ result := s.evtDecide.toAgreementResult()
+ r = &result
+ }
+ begin, end := s.boundary, s.boundary
+ if i, found := s.preComIdxes.nearestNewerIdx(position, lockPeriod); found {
+ begin = i.idx
+ }
+ if i, found := s.comIdxes.nearestNewerOrEqualIdx(position, 0); found {
+ end = i.idx
+ }
+ votes = s.votes[begin:end]
+ return
+}
+
+func (s *agreementSnapshot) addPreCommitVotes(
+ position types.Position, period uint64, votes []types.Vote) {
+ if len(votes) == 0 {
+ return
+ }
+ i := agreementSnapshotVotesIndex{
+ position: position,
+ period: period,
+ }
+ if len(s.preComIdxes) > 0 && !s.preComIdxes[len(s.preComIdxes)-1].Older(i) {
+ panic(fmt.Errorf(
+ "invalid snapshot index additional in pre-commit case: %+v,%+v",
+ i, s.preComIdxes[len(s.preComIdxes)-1]))
+ }
+ // In pre-commit case, we would record the index of the first vote.
+ i.idx = len(s.votes)
+ s.votes = append(s.votes, votes...)
+ if len(s.votes) == i.idx {
+ return
+ }
+ s.preComIdxes = append(s.preComIdxes, i)
+}
+
+func (s *agreementSnapshot) addCommitVotes(
+ position types.Position, period uint64, votes []types.Vote) {
+ if len(votes) == 0 {
+ return
+ }
+ i := agreementSnapshotVotesIndex{
+ position: position,
+ period: period,
+ }
+ if len(s.comIdxes) > 0 && !s.comIdxes[len(s.comIdxes)-1].Newer(i) {
+ panic(fmt.Errorf(
+ "invalid snapshot index additional in commit case: %+v,%+v",
+ i, s.comIdxes[len(s.comIdxes)-1]))
+ }
+ // In commit case, we would record the index of the last vote.
+ beforeAppend := len(s.votes)
+ s.votes = append(s.votes, votes...)
+ if len(s.votes) == beforeAppend {
+ return
+ }
+ i.idx = len(s.votes)
+ s.comIdxes = append(agreementSnapshotVotesIndexes{i}, s.comIdxes...)
+}
+
+func (s *agreementSnapshot) markBoundary() {
+ if len(s.comIdxes) > 0 {
+ panic(fmt.Errorf("attempt to mark boundary after commit votes added"))
+ }
+ s.boundary = len(s.votes)
+}
+
+type agreementCacheReceiver interface {
+ GetNotarySet(round uint64) (map[types.NodeID]struct{}, error)
+ RecoverTSIG(blockHash common.Hash, votes []types.Vote) ([]byte, error)
+ VerifyTSIG(round uint64, hash common.Hash, tSig []byte) (bool, error)
+}
+
+type agreementCache struct {
+ recv agreementCacheReceiver
+ configs []agreementCacheConfig
+ configsLock sync.RWMutex
+ lock sync.RWMutex
+ vSets map[types.Position]map[uint64][]*voteSet // Position > Period > Type
+ refEvts []*agreementEvent
+ refPosition atomic.Value
+ lastSnapshot atomic.Value
+ lastSnapshotCh chan time.Time
+}
+
+func (c *agreementCache) votes(v types.Vote, config agreementCacheConfig,
+ createIfNotExist bool) *voteSet {
+ vForPosition, exist := c.vSets[v.Position]
+ if !exist {
+ if !createIfNotExist {
+ return nil
+ }
+ vForPosition = make(map[uint64][]*voteSet)
+ c.vSets[v.Position] = vForPosition
+ }
+ vForPeriod, exist := vForPosition[v.Period]
+ if !exist {
+ if !createIfNotExist {
+ return nil
+ }
+ reserved := len(config.notarySet)
+ vForPeriod = make([]*voteSet, types.MaxVoteType)
+ for idx := range vForPeriod {
+ if types.VoteType(idx) == types.VoteInit {
+ continue
+ }
+ vForPeriod[idx] = &voteSet{
+ votes: make(map[types.NodeID]types.Vote, reserved),
+ votesAsList: make([]types.Vote, 0, reserved),
+ counts: make(map[common.Hash]int),
+ }
+ }
+ vForPosition[v.Period] = vForPeriod
+ }
+ return vForPeriod[v.Type]
+}
+
+func (c *agreementCache) config(
+ round uint64) (cfg agreementCacheConfig, found bool) {
+ c.configsLock.RLock()
+ defer c.configsLock.RUnlock()
+ if len(c.configs) == 0 {
+ return
+ }
+ firstRound := c.configs[0].RoundID()
+ if round < firstRound || round >= firstRound+uint64(len(c.configs)) {
+ return
+ }
+ cfg, found = c.configs[round-firstRound], true
+ return
+}
+
+func (c *agreementCache) firstConfig() (cfg agreementCacheConfig, found bool) {
+ c.configsLock.RLock()
+ defer c.configsLock.RUnlock()
+ if len(c.configs) == 0 {
+ return
+ }
+ return c.configs[0], true
+}
+
+// isIgnorable is the most trivial way to filter outdated messages.
+func (c *agreementCache) isIgnorable(p types.Position) bool {
+ return !p.Newer(c.refPosition.Load().(types.Position))
+}
+
+func (c *agreementCache) isIgnorableNoLock(p types.Position) bool {
+ evtDecide := c.refEvts[agreementEventDecide]
+ if evtDecide == nil {
+ return false
+ }
+ return !p.Newer(evtDecide.Position)
+}
+
+func (c *agreementCache) isVoteIgnorable(v types.Vote) bool {
+ if v.Type == types.VoteInit {
+ return true
+ }
+ if c.isIgnorableNoLock(v.Position) {
+ return true
+ }
+ switch v.Type {
+ case types.VotePreCom, types.VoteFast:
+ evtLock := c.refEvts[agreementEventLock]
+ if evtLock == nil || evtLock.Position.Older(v.Position) {
+ return false
+ }
+ if evtLock.Position.Newer(v.Position) {
+ return true
+ }
+ return evtLock.period >= v.Period
+ }
+ return false
+}
+
+func (c *agreementCache) checkVote(
+ v types.Vote, config agreementCacheConfig) (bool, *agreementEvent, error) {
+ if v.Type >= types.MaxVoteType {
+ return false, nil, ErrInvalidVote
+ }
+ if !config.Contains(v.Position.Height) {
+ return false, nil, ErrUnknownHeight
+ }
+ ok, err := utils.VerifyVoteSignature(&v)
+ if err != nil {
+ return false, nil, err
+ }
+ if !ok {
+ return false, nil, ErrIncorrectVoteSignature
+ }
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ if c.isVoteIgnorable(v) {
+ return false, nil, nil
+ }
+ if _, exist := config.notarySet[v.ProposerID]; !exist {
+ return false, nil, ErrNotInNotarySet
+ }
+ // Check for forked votes.
+ //
+ // NOTE: we won't be able to detect forked votes if they are purged.
+ if vSet := c.votes(v, config, false); vSet != nil {
+ if oldV, exist := vSet.votes[v.ProposerID]; exist {
+ if v.BlockHash != oldV.BlockHash {
+ return false, newAgreementEvent(
+ agreementEventFork, []types.Vote{oldV, v}), nil
+ }
+ }
+ }
+ return true, nil, nil
+}
+
+func (c *agreementCache) checkResult(r *types.AgreementResult,
+ config agreementCacheConfig) (bool, error) {
+ if r.Position.Round < DKGDelayRound {
+ if err := VerifyAgreementResult(r, config.notarySet); err != nil {
+ return false, err
+ }
+ if bytes.Compare(r.Randomness, NoRand) != 0 {
+ return false, ErrIncorrectAgreementResult
+ }
+ } else {
+ ok, err := c.recv.VerifyTSIG(r.Position.Round, r.BlockHash, r.Randomness)
+ if err != nil {
+ return false, err
+ }
+ if !ok {
+ return false, ErrIncorrectBlockRandomness
+ }
+ }
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ if c.isIgnorableNoLock(r.Position) {
+ return false, nil
+ }
+ return true, nil
+}
+
+func (c *agreementCache) checkFinalizedBlock(b *types.Block) (bool, error) {
+ if b.Position.Round < DKGDelayRound {
+ // Finalized blocks from rounds before DKGDelayRound can't be the proof
+ // of any agreement.
+ return false, nil
+ }
+ ok, err := c.recv.VerifyTSIG(b.Position.Round, b.Hash, b.Randomness)
+ if err != nil {
+ return false, err
+ }
+ if !ok {
+ return false, ErrIncorrectBlockRandomness
+ }
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ return !c.isIgnorableNoLock(b.Position), nil
+}
+
+func (c *agreementCache) trigger(v types.Vote, vSet *voteSet,
+ config agreementCacheConfig) (e *agreementEvent, err error) {
+ var (
+ maxVotes = len(config.notarySet)
+ requiredVotes = maxVotes*2/3 + 1
+ )
+ newDecideEvt := func(vSet *voteSet) (*agreementEvent, error) {
+ votes := vSet.votesByHash(vSet.best, requiredVotes)
+ if v.Position.Round < DKGDelayRound {
+ return newAgreementEvent(agreementEventDecide, votes), nil
+ }
+ tsig, err := c.recv.RecoverTSIG(vSet.best, votes)
+ if err != nil {
+ return nil, err
+ }
+ e := newAgreementEventFromTSIG(v.Position, vSet.best, tsig,
+ vSet.best == types.NullBlockHash)
+ return e, nil
+ }
+ switch v.Type {
+ case types.VoteCom:
+ // 2t+1 commit votes on SKIP should be handled by condition#3, other
+ // cases should trigger a "decide" event.
+ if vSet.best != types.SkipBlockHash &&
+ vSet.counts[vSet.best] >= requiredVotes {
+ e, err = newDecideEvt(vSet)
+ break
+ }
+ // It's the condition#3, there are more than 2t+1 commit votes for
+ // different values or skip.
+ if vSet.triggeredEvent == nil && len(vSet.votes) >= requiredVotes {
+ copiedVotes := make([]types.Vote, requiredVotes)
+ copy(copiedVotes, vSet.votesAsList)
+ e = newAgreementEvent(agreementEventForward, copiedVotes)
+ break
+ }
+ case types.VoteFastCom:
+ if vSet.best == types.SkipBlockHash ||
+ vSet.best == types.NullBlockHash ||
+ vSet.counts[vSet.best] < requiredVotes {
+ break
+ }
+ e, err = newDecideEvt(vSet)
+ case types.VotePreCom, types.VoteFast:
+ if vSet.counts[vSet.best] < requiredVotes {
+ break
+ }
+ e = newAgreementEvent(
+ agreementEventLock, vSet.votesByHash(vSet.best, requiredVotes))
+ }
+ if err != nil {
+ return
+ }
+ if e == nil {
+ // TODO(mission): this threshold can be lowered to f+1, however, it
+ // might not be equal to better performance (In most
+ // cases we should be able to reach agreement in one
+ // period.) Need to benchmark before adjusting it.
+ if v.Type != types.VoteCom &&
+ len(vSet.votesAsList) >= requiredVotes &&
+ !vSet.isAgreedOnOneHashPossible(maxVotes, requiredVotes) {
+ vSet.setEvent(nil, true)
+ }
+ return
+ }
+ // Overwriting a triggered decide event is not valid.
+ if vSet.triggeredEvent != nil &&
+ vSet.triggeredEvent.evtType == agreementEventDecide {
+ panic(fmt.Errorf("attempt to overwrite a decide signal: %s %s",
+ vSet.triggeredEvent, e))
+ }
+ if v.Type == types.VoteCom && e.evtType == agreementEventForward {
+ // A group of commit votes might trigger a decide signal after
+ // triggering a forward signal.
+ vSet.setEvent(
+ e, !vSet.isAgreedOnOneHashPossible(maxVotes, requiredVotes))
+ } else {
+ vSet.setEvent(e, true)
+ }
+ return
+}
+
+func (c *agreementCache) updateReferenceEvent(
+ e *agreementEvent) (updated bool) {
+ refEvt := c.refEvts[e.evtType]
+ if refEvt != nil {
+ // Make sure we are raising signal forwarding. All signals from
+ // older position or older period of the same position should never
+ // happen, except fork votes.
+ if e.Position.Older(refEvt.Position) {
+ panic(fmt.Errorf("backward agreement event: %s %s", refEvt, e))
+ }
+ if e.Position.Equal(refEvt.Position) {
+ switch e.evtType {
+ case agreementEventDecide:
+ panic(fmt.Errorf("duplicated decided event: %s %s", refEvt, e))
+ case agreementEventLock:
+ if e.period > refEvt.period {
+ break
+ }
+ panic(fmt.Errorf("backward lock event: %s %s", refEvt, e))
+ case agreementEventForward:
+ // It's possible for forward signal triggered in older period,
+ // we should not panic it.
+ if e.period <= refEvt.period {
+ return
+ }
+ }
+ }
+ }
+ updated = true
+ c.refEvts[e.evtType] = e
+ switch e.evtType {
+ case agreementEventLock:
+ // A lock signal by commit votes should trigger period forwarding, too.
+ eF := c.refEvts[agreementEventForward]
+ if eF != nil {
+ if eF.Position.Newer(e.Position) {
+ break
+ }
+ if eF.Position.Equal(e.Position) && eF.period <= e.period {
+ break
+ }
+ }
+ c.refEvts[agreementEventForward] = e
+ case agreementEventDecide:
+ clearRef := func(eType agreementEventType) {
+ if c.refEvts[eType] == nil {
+ return
+ }
+ if c.refEvts[eType].Position.Newer(e.Position) {
+ return
+ }
+ c.refEvts[eType] = nil
+ }
+ clearRef(agreementEventForward)
+ clearRef(agreementEventLock)
+ c.refPosition.Store(e.Position)
+ }
+ return
+}
+
+func (c *agreementCache) purgeBy(e *agreementEvent) {
+ purgeByPeriod := func(vForPeriod []*voteSet) {
+ for vType, vSet := range vForPeriod {
+ switch types.VoteType(vType) {
+ case types.VotePreCom, types.VoteFast:
+ vSet.setEvent(nil, true)
+ }
+ }
+ }
+ switch e.evtType {
+ case agreementEventDecide:
+ for p := range c.vSets {
+ if !p.Newer(e.Position) {
+ delete(c.vSets, p)
+ }
+ }
+ // Purge notary set by position when decided.
+ firstRoundID := c.configs[0].RoundID()
+ if e.Position.Round > firstRoundID {
+ c.configs = c.configs[e.Position.Round-firstRoundID:]
+ }
+ case agreementEventLock:
+ // For older positions, purge all pre-commit/fast votes.
+ for p, vForPosition := range c.vSets {
+ if !p.Older(e.Position) {
+ continue
+ }
+ for _, vForPeriod := range vForPosition {
+ purgeByPeriod(vForPeriod)
+ }
+ }
+ // Only locked signal can be used to purge older periods in the same
+ // position.
+ vForPosition := c.vSets[e.Position]
+ for period, vForPeriod := range vForPosition {
+ if period >= e.period {
+ continue
+ }
+ // It's safe to purge votes in older periods, except for
+ // commit/fast-commit votes: a decide signal should be raised from
+ // any period even if we've locked on some later period. We can only
+ // purge those votes when it's impossible to trigger an decide
+ // signal from that period.
+ purgeByPeriod(vForPeriod)
+ }
+ }
+}
+
+func newAgreementCache(recv agreementCacheReceiver) (c *agreementCache) {
+ c = &agreementCache{
+ vSets: make(map[types.Position]map[uint64][]*voteSet),
+ refEvts: make([]*agreementEvent, maxAgreementEventType),
+ lastSnapshotCh: make(chan time.Time, 1),
+ recv: recv,
+ }
+ c.lastSnapshotCh <- time.Time{}
+ c.lastSnapshot.Store(&agreementSnapshot{})
+ c.refPosition.Store(types.Position{})
+ return
+}
+
+func (c *agreementCache) notifyRoundEvents(
+ evts []utils.RoundEventParam) error {
+ apply := func(e utils.RoundEventParam) error {
+ if len(c.configs) > 0 {
+ lastCfg := c.configs[len(c.configs)-1]
+ if e.BeginHeight != lastCfg.RoundEndHeight() {
+ return ErrInvalidBlockHeight
+ }
+ if lastCfg.RoundID() == e.Round {
+ c.configs[len(c.configs)-1].ExtendLength()
+ } else if lastCfg.RoundID()+1 == e.Round {
+ notarySet, err := c.recv.GetNotarySet(e.Round)
+ if err != nil {
+ return err
+ }
+ c.configs = append(c.configs, newAgreementCacheConfig(
+ lastCfg, e.Config, notarySet))
+ } else {
+ return ErrInvalidRoundID
+ }
+ } else {
+ notarySet, err := c.recv.GetNotarySet(e.Round)
+ if err != nil {
+ return err
+ }
+ cfg := agreementCacheConfig{}
+ cfg.from(e.Round, e.Config, notarySet)
+ cfg.SetRoundBeginHeight(e.BeginHeight)
+ c.configs = append(c.configs, cfg)
+ }
+ return nil
+ }
+ c.configsLock.Lock()
+ defer c.configsLock.Unlock()
+ for _, e := range evts {
+ if err := apply(e); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (c *agreementCache) processVote(
+ v types.Vote) (evts []*agreementEvent, err error) {
+ if c.isIgnorable(v.Position) {
+ return
+ }
+ config, found := c.config(v.Position.Round)
+ if !found {
+ err = ErrRoundOutOfRange
+ return
+ }
+ ok, forkEvt, err := c.checkVote(v, config)
+ if forkEvt != nil {
+ evts = append(evts, forkEvt)
+ }
+ if err != nil || !ok {
+ return
+ }
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ // Although we've checked if this vote is ignorable in checkVote method,
+ // it might become ignorable before we acquire writer lock.
+ if c.isVoteIgnorable(v) {
+ return
+ }
+ vSet := c.votes(v, config, true)
+ switch {
+ case vSet.isPurged():
+ case vSet.triggeredEvent != nil:
+ if v.Type == types.VotePreCom || v.Type == types.VoteFast {
+ break
+ }
+ fallthrough
+ default:
+ if oldV, forked := vSet.add(v); forked {
+ panic(fmt.Errorf("unexpected forked vote: %s %s", &oldV, &v))
+ }
+ var evt *agreementEvent
+ evt, err = c.trigger(v, vSet, config)
+ if err != nil {
+ break
+ }
+ if evt != nil {
+ if c.updateReferenceEvent(evt) {
+ c.purgeBy(evt)
+ evts = append(evts, evt)
+ }
+ }
+ }
+ return
+}
+
+func (c *agreementCache) processAgreementResult(
+ r *types.AgreementResult) (evts []*agreementEvent, err error) {
+ if c.isIgnorable(r.Position) {
+ return
+ }
+ config, found := c.config(r.Position.Round)
+ if !found {
+ err = ErrRoundOutOfRange
+ return
+ }
+ ok, err := c.checkResult(r, config)
+ if err != nil || !ok {
+ return
+ }
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ if c.isIgnorableNoLock(r.Position) {
+ return
+ }
+ var evt *agreementEvent
+ if r.Position.Round >= DKGDelayRound {
+ evt = newAgreementEventFromTSIG(
+ r.Position, r.BlockHash, r.Randomness, r.IsEmptyBlock)
+ } else {
+ vSet := c.votes(r.Votes[0], config, true)
+ for _, v := range r.Votes {
+ if oldV, forked := vSet.add(v); forked {
+ evts = append(evts, newAgreementEvent(
+ agreementEventFork, []types.Vote{oldV, v}))
+ }
+ }
+ evt, err = c.trigger(r.Votes[0], vSet, config)
+ }
+ if err != nil {
+ return
+ }
+ if evt == nil {
+ err = ErrUnableToAgree
+ return
+ }
+ if c.updateReferenceEvent(evt) {
+ c.purgeBy(evt)
+ evts = append(evts, evt)
+ } else {
+ // It should be treated as error when unable to proceed via
+ // types.AgreementResult.
+ err = fmt.Errorf("unable to assign decide event: %s from %s", evt, r)
+ return
+ }
+ return
+}
+
+func (c *agreementCache) processFinalizedBlock(
+ b *types.Block) (evts []*agreementEvent, err error) {
+ if c.isIgnorable(b.Position) {
+ return
+ }
+ ok, err := c.checkFinalizedBlock(b)
+ if err != nil || !ok {
+ return
+ }
+ c.lock.Lock()
+ defer c.lock.Unlock()
+ if c.isIgnorableNoLock(b.Position) {
+ return
+ }
+ evt := newAgreementEventFromTSIG(
+ b.Position, b.Hash, b.Randomness, b.IsEmpty())
+ if c.updateReferenceEvent(evt) {
+ c.purgeBy(evt)
+ evts = append(evts, evt)
+ } else {
+ // It should be treated as error when unable to proceed via finalized
+ // blocks.
+ err = fmt.Errorf("unable to assign decide event: %s from %s", evt, b)
+ return
+ }
+ return
+}
+
+func (c *agreementCache) snapshot(expect time.Time) *agreementSnapshot {
+ snapshoted := <-c.lastSnapshotCh
+ defer func() {
+ c.lastSnapshotCh <- snapshoted
+ }()
+ if !snapshoted.Before(expect) {
+ // Reuse current snapshot, someone else might perform snapshot right
+ // before us.
+ return c.lastSnapshot.Load().(*agreementSnapshot)
+ }
+ // NOTE: There is no clue to decide which round of config to use as snapshot
+ // refresh interval, simply pick the first one as workaround.
+ var refreshInterval = 200 * time.Millisecond
+ if config, found := c.firstConfig(); found {
+ // The interval to refresh the cache should be shorter than lambdaBA.
+ refreshInterval = config.lambdaBA * 4 / 7
+ }
+ c.lock.RLock()
+ defer c.lock.RUnlock()
+ ss := &agreementSnapshot{
+ evtDecide: c.refEvts[agreementEventDecide],
+ }
+ var positions []types.Position
+ for position := range c.vSets {
+ positions = append(positions, position)
+ }
+ sort.Slice(positions, func(i, j int) bool {
+ return positions[i].Older(positions[j])
+ })
+ // All votes in cache are useful votes, so just keep them all in snapshot.
+ // They would be orgainzed in acending order of (position, period) of votes.
+ for _, position := range positions {
+ vForPosition := c.vSets[position]
+ var periods []uint64
+ for period := range vForPosition {
+ periods = append(periods, period)
+ }
+ sort.Slice(periods, func(i, j int) bool {
+ return periods[i] < periods[j]
+ })
+ for _, period := range periods {
+ vForPeriod := vForPosition[period]
+ votes := []types.Vote{}
+ for t, vSet := range vForPeriod {
+ if t == int(types.VotePreCom) || t == int(types.VoteFast) {
+ votes = vSet.appendTo(votes)
+ }
+ }
+ ss.addPreCommitVotes(position, period, votes)
+ }
+ }
+ ss.markBoundary()
+ // It's the part for commit/fast-commit votes, they should be appended in
+ // reversed order.
+ for i := range positions {
+ position := positions[len(positions)-i-1]
+ vForPosition := c.vSets[position]
+ var periods []uint64
+ for period := range vForPosition {
+ periods = append(periods, period)
+ }
+ sort.Slice(periods, func(i, j int) bool {
+ return periods[i] > periods[j]
+ })
+ for _, period := range periods {
+ vForPeriod := vForPosition[period]
+ votes := []types.Vote{}
+ for t, vSet := range vForPeriod {
+ if t == int(types.VoteFastCom) || t == int(types.VoteCom) {
+ votes = vSet.appendTo(votes)
+ }
+ }
+ ss.addCommitVotes(position, period, votes)
+ }
+ }
+ ss.expired = time.Now().Add(refreshInterval)
+ c.lastSnapshot.Store(ss)
+ snapshoted = ss.expired
+ return ss
+}
+
+// pull latest agreement results, and votes for ongoing BA. This method can be
+// called concurrently.
+func (c *agreementCache) pull(p types.Position, lockPeriod uint64) (
+ r *types.AgreementResult, votes []types.Vote) {
+ snapshot := c.lastSnapshot.Load().(*agreementSnapshot)
+ now := time.Now()
+ if now.After(snapshot.expired) {
+ snapshot = c.snapshot(now)
+ }
+ return snapshot.get(p, lockPeriod)
+}
diff --git a/core/agreement-cache_test.go b/core/agreement-cache_test.go
new file mode 100644
index 0000000..fa1c8dd
--- /dev/null
+++ b/core/agreement-cache_test.go
@@ -0,0 +1,907 @@
+// Copyright 2019 The dexon-consensus Authors
+// This file is part of the dexon-consensus library.
+//
+// The dexon-consensus library is free software: you can redistribute it
+// and/or modify it under the terms of the GNU Lesser General Public License as
+// published by the Free Software Foundation, either version 3 of the License,
+// or (at your option) any later version.
+//
+// The dexon-consensus library is distributed in the hope that it will be
+// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
+// General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the dexon-consensus library. If not, see
+// <http://www.gnu.org/licenses/>.
+
+package core
+
+import (
+ "bytes"
+ "math/rand"
+ "sort"
+ "testing"
+ "time"
+
+ "github.com/dexon-foundation/dexon-consensus/common"
+ "github.com/dexon-foundation/dexon-consensus/core/crypto"
+ "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa"
+ "github.com/dexon-foundation/dexon-consensus/core/types"
+ "github.com/dexon-foundation/dexon-consensus/core/utils"
+ "github.com/stretchr/testify/suite"
+)
+
+func countVotes(votes []types.Vote, p types.Position, period uint64,
+ t types.VoteType) (cnt int) {
+ for _, v := range votes {
+ if !v.Position.Equal(p) {
+ continue
+ }
+ if v.Period != period {
+ continue
+ }
+ if v.Type != t {
+ continue
+ }
+ cnt++
+ }
+ return
+}
+
+type testAgreementCacheReceiver struct {
+ s *AgreementCacheTestSuite
+ isTsigValid bool
+}
+
+func (r *testAgreementCacheReceiver) GetNotarySet(
+ round uint64) (map[types.NodeID]struct{}, error) {
+ return r.s.notarySet, nil
+}
+
+func (r *testAgreementCacheReceiver) RecoverTSIG(
+ blockHash common.Hash, votes []types.Vote) ([]byte, error) {
+ return []byte("OK"), nil
+}
+
+func (r *testAgreementCacheReceiver) VerifyTSIG(
+ round uint64, hash common.Hash, tSig []byte) (bool, error) {
+ return r.isTsigValid, nil
+}
+
+func (r *testAgreementCacheReceiver) VerifySignature(hash common.Hash,
+ sig crypto.Signature) bool {
+ return bytes.Compare(sig.Signature, []byte("OK")) == 0
+}
+
+type AgreementCacheTestSuite struct {
+ suite.Suite
+ notarySet map[types.NodeID]struct{}
+ requiredVotes int
+ f int
+ roundLength uint64
+ signers []*utils.Signer
+ cache *agreementCache
+ recv *testAgreementCacheReceiver
+}
+
+func (s *AgreementCacheTestSuite) SetupSuite() {
+ var (
+ prvKeys []crypto.PrivateKey
+ pubKeys []crypto.PublicKey
+ count = 7
+ )
+ for i := 0; i < count; i++ {
+ prvKey, err := ecdsa.NewPrivateKey()
+ s.Require().NoError(err)
+ prvKeys = append(prvKeys, prvKey)
+ pubKeys = append(pubKeys, prvKey.PublicKey())
+ }
+ for _, k := range prvKeys {
+ s.signers = append(s.signers, utils.NewSigner(k))
+ }
+ s.f = count / 3
+ s.requiredVotes = 2*s.f + 1
+ s.roundLength = 100
+ s.notarySet = make(map[types.NodeID]struct{})
+ for _, k := range pubKeys {
+ s.notarySet[types.NewNodeID(k)] = struct{}{}
+ }
+ s.recv = &testAgreementCacheReceiver{s: s}
+}
+
+func (s *AgreementCacheTestSuite) SetupTest() {
+ s.recv.isTsigValid = true
+ s.newCache(s.newRoundEvents(1))
+}
+
+func (s *AgreementCacheTestSuite) newVote(t types.VoteType, h common.Hash,
+ p types.Position, period uint64, signer *utils.Signer) *types.Vote {
+ v := types.NewVote(t, h, period)
+ v.Position = p
+ s.Require().NoError(signer.SignVote(v))
+ return v
+}
+
+func (s *AgreementCacheTestSuite) newVotes(t types.VoteType, h common.Hash,
+ p types.Position, period uint64) (votes []types.Vote) {
+ for _, signer := range s.signers {
+ votes = append(votes, *s.newVote(t, h, p, period, signer))
+ }
+ return
+}
+
+func (s *AgreementCacheTestSuite) testVotes(
+ votes []types.Vote, eType agreementEventType) (e *agreementEvent) {
+ for idx := range votes {
+ evts, err := s.cache.processVote(votes[idx])
+ s.Require().NoError(err)
+ if idx+1 < s.requiredVotes {
+ s.Require().Empty(evts)
+ }
+ if idx+1 == s.requiredVotes {
+ s.Require().Len(evts, 1)
+ s.Require().Equal(evts[0].evtType, eType)
+ s.Require().Equal(evts[0].Position, votes[0].Position)
+ if votes[0].Position.Round < DKGDelayRound ||
+ eType != agreementEventDecide {
+ s.Require().Equal(evts[0].period, votes[0].Period)
+ s.Require().Len(evts[0].Votes, s.requiredVotes)
+ }
+ e = evts[0]
+ break
+ }
+ }
+ // Replay those votes again won't trigger another event.
+ for idx := range votes {
+ evts, err := s.cache.processVote(votes[idx])
+ s.Require().NoError(err)
+ s.Require().Empty(evts)
+ if idx+1 == s.requiredVotes {
+ break
+ }
+ }
+ return
+}
+
+func (s *AgreementCacheTestSuite) takeSubset(ss *agreementSnapshot,
+ p types.Position, period uint64, length int) (
+ *types.AgreementResult, []types.Vote) {
+ s.Require().NotNil(ss)
+ r, votes := ss.get(p, period)
+ s.Require().Len(votes, length)
+ return r, votes
+}
+
+func (s *AgreementCacheTestSuite) newRoundEvents(
+ round uint64) (rEvts []utils.RoundEventParam) {
+ h := types.GenesisHeight
+ for r := uint64(0); r <= round; r++ {
+ rEvts = append(rEvts, utils.RoundEventParam{
+ Round: r,
+ BeginHeight: h,
+ Config: &types.Config{
+ LambdaBA: 100 * time.Millisecond,
+ RoundLength: s.roundLength,
+ },
+ })
+ h += s.roundLength
+ }
+ return
+}
+
+func (s *AgreementCacheTestSuite) newCache(rEvts []utils.RoundEventParam) {
+ s.cache = newAgreementCache(s.recv)
+ s.Require().NotNil(s.cache)
+ s.Require().NoError(s.cache.notifyRoundEvents(rEvts))
+}
+
+func (s *AgreementCacheTestSuite) generateTestData(
+ positionCount, periodCount uint64) (
+ lastPosition types.Position,
+ votes []types.Vote, bs []*types.Block, rs []*types.AgreementResult) {
+ gen := func(p types.Position, period uint64) (
+ vs []types.Vote, b *types.Block, r *types.AgreementResult) {
+ h := common.NewRandomHash()
+ v := s.newVotes(types.VotePreCom, h, p, period)
+ vs = append(vs, v...)
+ vCom := s.newVotes(types.VoteCom, h, p, period)
+ vs = append(vs, vCom...)
+ if period == 0 {
+ v = s.newVotes(types.VoteFast, h, p, period)
+ vs = append(vs, v...)
+ v = s.newVotes(types.VoteFastCom, h, p, period)
+ vs = append(vs, v...)
+ }
+ r = &types.AgreementResult{BlockHash: h, Position: p}
+ b = &types.Block{Hash: h, Position: p}
+ if p.Round >= DKGDelayRound {
+ r.Randomness = []byte("cold-freeze")
+ b.Randomness = []byte("blackhole-rocks")
+ } else {
+ r.Votes = vCom[:s.requiredVotes]
+ r.Randomness = NoRand
+ b.Randomness = NoRand
+ }
+ return
+ }
+ for i := uint64(0); i < positionCount; i++ {
+ lastPosition = types.Position{Height: types.GenesisHeight + i}
+ lastPosition.Round = uint64(i) / s.roundLength
+ for period := uint64(0); period < periodCount; period++ {
+ vs, b, r := gen(lastPosition, period)
+ votes = append(votes, vs...)
+ bs = append(bs, b)
+ rs = append(rs, r)
+ }
+ }
+ return
+}
+
+func (s *AgreementCacheTestSuite) TestAgreementResult() {
+ var (
+ hash = common.NewRandomHash()
+ position = types.Position{Round: 0, Height: 3}
+ )
+ // An agreement result with fast-commit votes can trigger a decide event.
+ r := &types.AgreementResult{
+ BlockHash: hash,
+ Position: position,
+ Votes: s.newVotes(types.VoteFastCom, hash, position, 1),
+ Randomness: NoRand,
+ }
+ evts, err := s.cache.processAgreementResult(r)
+ s.Require().NoError(err)
+ s.Require().Len(evts, 1)
+ s.Require().Equal(evts[0].evtType, agreementEventDecide)
+ s.Require().Equal(evts[0].Position, position)
+ s.Require().Equal(evts[0].period, uint64(1))
+ s.Require().Len(evts[0].Votes, len(r.Votes))
+ // An agreement result with commit votes can trigger a decide event.
+ position.Height++
+ r = &types.AgreementResult{
+ BlockHash: hash,
+ Position: position,
+ Votes: s.newVotes(types.VoteCom, hash, position, 1),
+ Randomness: NoRand,
+ }
+ evts, err = s.cache.processAgreementResult(r)
+ s.Require().NoError(err)
+ s.Require().Len(evts, 1)
+ s.Require().Equal(evts[0].evtType, agreementEventDecide)
+ s.Require().Equal(evts[0].Position, position)
+ s.Require().Equal(evts[0].period, uint64(1))
+ s.Require().Len(evts[0].Votes, len(r.Votes))
+ // An agreement result from older position would be ignored.
+ position.Height--
+ evts, err = s.cache.processAgreementResult(&types.AgreementResult{
+ BlockHash: hash,
+ Position: position,
+ Votes: s.newVotes(types.VoteCom, hash, position, 1),
+ Randomness: NoRand,
+ })
+ s.Require().NoError(err)
+ s.Require().Empty(evts)
+ position.Height++
+ // An agreement result contains fork votes should be detected, while still
+ // be able to trigger a decide event.
+ position.Height++
+ forkedVote := s.newVote(
+ types.VoteCom, common.NewRandomHash(), position, 1, s.signers[0])
+ evts, err = s.cache.processVote(*forkedVote)
+ s.Require().NoError(err)
+ s.Require().Empty(evts)
+ hash = common.NewRandomHash()
+ evts, err = s.cache.processAgreementResult(&types.AgreementResult{
+ BlockHash: hash,
+ Position: position,
+ Votes: s.newVotes(types.VoteCom, hash, position, 1),
+ Randomness: NoRand,
+ })
+ s.Require().NoError(err)
+ s.Require().Len(evts, 2)
+ s.Require().Equal(evts[0].evtType, agreementEventFork)
+ s.Require().Equal(evts[0].Position, position)
+ s.Require().Equal(evts[0].period, uint64(1))
+ s.Require().Equal(evts[1].evtType, agreementEventDecide)
+ s.Require().Equal(evts[1].Position, position)
+ s.Require().Equal(evts[1].period, uint64(1))
+ // An agreement result with valid TSIG can trigger a decide event.
+ position.Round = 1
+ position.Height++
+ hash = common.NewRandomHash()
+ evts, err = s.cache.processAgreementResult(&types.AgreementResult{
+ BlockHash: hash,
+ Position: position,
+ Randomness: []byte("OK-LA"),
+ })
+ s.Require().NoError(err)
+ s.Require().Len(evts, 1)
+ s.Require().Equal(evts[0].evtType, agreementEventDecide)
+ s.Require().Equal(evts[0].Position, position)
+ s.Require().Equal(evts[0].Randomness, []byte("OK-LA"))
+ // An agreement result with invalid TSIG would raise error.
+ s.recv.isTsigValid = false
+ position.Height++
+ hash = common.NewRandomHash()
+ evts, err = s.cache.processAgreementResult(&types.AgreementResult{
+ BlockHash: hash,
+ Position: position,
+ Randomness: []byte("NOK"),
+ })
+ s.Require().EqualError(err, ErrIncorrectBlockRandomness.Error())
+ s.Require().Empty(evts)
+ s.recv.isTsigValid = true
+}
+
+func (s *AgreementCacheTestSuite) TestBasicUsage() {
+ var (
+ hash = common.NewRandomHash()
+ position = types.Position{Round: 0, Height: types.GenesisHeight}
+ )
+ // If there are 2t+1 pre-commit votes for the same value, it should raise
+ // a lock event.
+ s.testVotes(
+ s.newVotes(types.VotePreCom, hash, position, 1), agreementEventLock)
+ // If there are 2t+1 commit votes for the same value, it should raise
+ // a decide event.
+ s.testVotes(
+ s.newVotes(types.VoteCom, hash, position, 1), agreementEventDecide)
+ // If there are 2t+1 fast votes for the same value, it should raise
+ // a lock event.
+ position.Height++
+ hash = common.NewRandomHash()
+ s.testVotes(
+ s.newVotes(types.VoteFast, hash, position, 1), agreementEventLock)
+ // If there are 2t+1 commit votes for SKIP, it should raise a forward
+ // event.
+ position.Height++
+ hash = types.SkipBlockHash
+ s.testVotes(
+ s.newVotes(types.VoteCom, hash, position, 1), agreementEventForward)
+ // If there are 2t+1 commit votes for different value, it should raise
+ // a forward event.
+ position.Height++
+ hash = common.NewRandomHash()
+ votes01 := s.newVotes(types.VoteCom, hash, position, 1)
+ hash = common.NewRandomHash()
+ votes02 := s.newVotes(types.VoteCom, hash, position, 1)
+ votes := append([]types.Vote(nil), votes01[0])
+ votes = append(votes, votes02[1:]...)
+ s.testVotes(votes, agreementEventForward)
+ // If a forked vote is detected, it should raise a fork event.
+ position.Height++
+ hash = common.NewRandomHash()
+ votes01 = s.newVotes(types.VotePreCom, hash, position, 1)
+ hash = common.NewRandomHash()
+ votes02 = s.newVotes(types.VotePreCom, hash, position, 1)
+ evts, err := s.cache.processVote(votes01[0])
+ s.Require().NoError(err)
+ s.Require().Empty(evts)
+ evts, err = s.cache.processVote(votes02[0])
+ s.Require().NoError(err)
+ s.Require().Len(evts, 1)
+ s.Require().Equal(evts[0].evtType, agreementEventFork)
+ s.Require().Equal(evts[0].Position, position)
+ s.Require().Equal(evts[0].period, uint64(1))
+ s.Require().Len(evts[0].Votes, 2)
+ s.Require().Equal(evts[0].Votes[0], votes01[0])
+ s.Require().Equal(evts[0].Votes[1], votes02[0])
+}
+
+func (s *AgreementCacheTestSuite) TestDecideInOlderPeriod() {
+ var (
+ hash = common.NewRandomHash()
+ position = types.Position{Round: 0, Height: types.GenesisHeight}
+ )
+ // Trigger fast-forward via condition#3.
+ hash = common.NewRandomHash()
+ votes01 := s.newVotes(types.VoteCom, hash, position, 1)
+ hash = common.NewRandomHash()
+ votes02 := s.newVotes(types.VoteCom, hash, position, 1)
+ votes := append(votes01[:1], votes02[1:s.requiredVotes]...)
+ s.testVotes(votes, agreementEventForward)
+ // Trigger fast-forward by pre-commit votes in later period.
+ hash = common.NewRandomHash()
+ s.testVotes(
+ s.newVotes(types.VotePreCom, hash, position, 2), agreementEventLock)
+ // Process a commit vote in period#1, should still trigger a decide event.
+ evts, err := s.cache.processVote(votes02[s.requiredVotes])
+ s.Require().NoError(err)
+ s.Require().Len(evts, 1)
+ s.Require().Equal(evts[0].evtType, agreementEventDecide)
+ s.Require().Equal(evts[0].Position, position)
+ s.Require().Equal(evts[0].period, uint64(1))
+}
+
+func (s *AgreementCacheTestSuite) TestDecideAfterForward() {
+ var (
+ hash = common.NewRandomHash()
+ position = types.Position{
+ Round: 1,
+ Height: types.GenesisHeight + s.roundLength,
+ }
+ )
+ // Trigger fast-forward via condition#3.
+ hash = common.NewRandomHash()
+ votes01 := s.newVotes(types.VoteCom, hash, position, 1)
+ hash = common.NewRandomHash()
+ votes02 := s.newVotes(types.VoteCom, hash, position, 1)
+ votes := append(votes01[:1], votes02[1:s.requiredVotes]...)
+ s.testVotes(votes, agreementEventForward)
+ // Send remain votes of one hash to see if a decide event can be triggered.
+ evts, err := s.cache.processVote(votes02[s.requiredVotes])
+ s.Require().NoError(err)
+ s.Require().Len(evts, 1)
+ s.Require().Equal(evts[0].evtType, agreementEventDecide)
+}
+
+func (s *AgreementCacheTestSuite) TestDecideByFinalizedBlock() {
+ // A finalized block from round before DKGDelayRound won't trigger a decide
+ // event.
+
+ // A finalized block from round after DKGDelayRound would trigger a decide
+ // event.
+
+ // A finalized block from older position would be ignored.
+}
+
+func (s *AgreementCacheTestSuite) TestFastBA() {
+ var (
+ hash = common.NewRandomHash()
+ position = types.Position{Round: 0, Height: types.GenesisHeight}
+ )
+ test := func() {
+ // Fast -> FastCom, successfuly confirmed by Fast mode.
+ s.testVotes(s.newVotes(types.VoteFast, hash, position, 1),
+ agreementEventLock)
+ s.testVotes(s.newVotes(types.VoteFastCom, hash, position, 1),
+ agreementEventDecide)
+ // Fast -> PreCom -> Com, confirmed by RBA.
+ position.Height++
+ s.testVotes(
+ s.newVotes(types.VoteFast, hash, position, 1), agreementEventLock)
+ s.testVotes(
+ s.newVotes(types.VotePreCom, hash, position, 2), agreementEventLock)
+ s.testVotes(
+ s.newVotes(types.VoteCom, hash, position, 2), agreementEventDecide)
+ }
+ // The case for rounds before DKGDelayRound.
+ test()
+ // The case for rounds after DKGDelayRound.
+ position.Round = 1
+ position.Height = types.GenesisHeight + s.roundLength
+ test()
+}
+
+func (s *AgreementCacheTestSuite) TestSnapshot() {
+ var (
+ hash = common.NewRandomHash()
+ count = s.requiredVotes - 1
+ )
+ process := func(vs []types.Vote, until int) []types.Vote {
+ for i := range vs[:until] {
+ evts, err := s.cache.processVote(vs[i])
+ s.Require().NoError(err)
+ s.Require().Empty(evts)
+ }
+ return vs
+ }
+ // Process some votes without triggering any event.
+ //
+ // Below is the expected slice of votes when taking snapshot.
+ // |<- pre-commit/fast ->|<-commit/fast-commit ->|
+ // |P0,r1|P1,r0|P1,r1|P1,r2|P2,r1|P2,r1|P1,r2|P1,r0|P0,r1|
+ p0 := types.Position{Round: 1, Height: types.GenesisHeight + s.roundLength}
+ process(s.newVotes(types.VoteFast, hash, p0, 1), count)
+ process(s.newVotes(types.VotePreCom, hash, p0, 1), count)
+ process(s.newVotes(types.VoteCom, hash, p0, 1), count)
+ process(s.newVotes(types.VoteFastCom, hash, p0, 1), count)
+ p1 := p0
+ p1.Height++
+ process(s.newVotes(types.VoteCom, hash, p1, 0), count)
+ process(s.newVotes(types.VoteFastCom, hash, p1, 0), count)
+ process(s.newVotes(types.VotePreCom, hash, p1, 0), count)
+ process(s.newVotes(types.VoteFast, hash, p1, 0), count)
+ process(s.newVotes(types.VoteFast, hash, p1, 1), count)
+ process(s.newVotes(types.VotePreCom, hash, p1, 1), count)
+ process(s.newVotes(types.VoteFast, hash, p1, 2), count)
+ process(s.newVotes(types.VotePreCom, hash, p1, 2), count)
+ process(s.newVotes(types.VoteCom, hash, p1, 2), count)
+ process(s.newVotes(types.VoteFastCom, hash, p1, 2), count)
+ p2 := p1
+ p2.Height++
+ process(s.newVotes(types.VoteFast, hash, p2, 1), count)
+ process(s.newVotes(types.VotePreCom, hash, p2, 1), count)
+ process(s.newVotes(types.VoteCom, hash, p2, 1), count)
+ process(s.newVotes(types.VoteFastCom, hash, p2, 1), count)
+ // The snapshot should contains all processed votes.
+ ss := s.cache.snapshot(time.Now())
+ s.takeSubset(ss, types.Position{}, 0, 18*count)
+ // all votes in older position would be igonred.
+ _, vs := s.takeSubset(ss, p1, 1, 10*count)
+ s.Require().Equal(0, countVotes(vs, p0, 1, types.VotePreCom))
+ s.Require().Equal(0, countVotes(vs, p0, 1, types.VoteFast))
+ s.Require().Equal(0, countVotes(vs, p0, 1, types.VoteCom))
+ s.Require().Equal(0, countVotes(vs, p0, 1, types.VoteFastCom))
+ // pre-commit/fast votes in older or equal period of the same position would
+ // be ignored.
+ s.Require().Equal(0, countVotes(vs, p1, 0, types.VotePreCom))
+ s.Require().Equal(0, countVotes(vs, p1, 0, types.VoteFast))
+ s.Require().Equal(0, countVotes(vs, p1, 1, types.VoteCom))
+ s.Require().Equal(0, countVotes(vs, p1, 1, types.VoteFastCom))
+ // pre-commit/fast votes in newer period of the same position would not be
+ // ignored.
+ s.Require().Equal(count, countVotes(vs, p1, 2, types.VotePreCom))
+ s.Require().Equal(count, countVotes(vs, p1, 2, types.VoteFast))
+ // commit/fast-commit votes in the same position can't be ignored.
+ s.Require().Equal(count, countVotes(vs, p1, 0, types.VoteCom))
+ s.Require().Equal(count, countVotes(vs, p1, 0, types.VoteFastCom))
+ s.Require().Equal(count, countVotes(vs, p1, 2, types.VoteCom))
+ s.Require().Equal(count, countVotes(vs, p1, 2, types.VoteFastCom))
+ // all votes in newer position would be kept.
+ s.Require().Equal(count, countVotes(vs, p2, 1, types.VotePreCom))
+ s.Require().Equal(count, countVotes(vs, p2, 1, types.VoteFast))
+ s.Require().Equal(count, countVotes(vs, p2, 1, types.VoteCom))
+ s.Require().Equal(count, countVotes(vs, p2, 1, types.VoteFastCom))
+ // Take an empty subset.
+ s.takeSubset(ss, types.Position{Round: 1, Height: 1000}, 0, 0)
+ // Take a subset contains only commit/fast-commit votes.
+ _, vs = s.takeSubset(ss, p2, 1, 2*count)
+ s.Require().Equal(0, countVotes(vs, p2, 1, types.VotePreCom))
+ s.Require().Equal(0, countVotes(vs, p2, 1, types.VoteFast))
+ s.Require().Equal(count, countVotes(vs, p2, 1, types.VoteCom))
+ s.Require().Equal(count, countVotes(vs, p2, 1, types.VoteFastCom))
+ // Taks a subset contains only pre-commit/fast votes.
+ p3 := p2
+ p3.Height++
+ process(s.newVotes(types.VotePreCom, hash, p3, 1), count)
+ process(s.newVotes(types.VoteFast, hash, p3, 1), count)
+ ss = s.cache.snapshot(time.Now().Add(time.Second))
+ _, vs = s.takeSubset(ss, p3, 0, 2*count)
+ s.Require().Equal(count, countVotes(vs, p3, 1, types.VotePreCom))
+ s.Require().Equal(count, countVotes(vs, p3, 1, types.VoteFast))
+}
+
+func (s *AgreementCacheTestSuite) TestPurgeByDecideEvent() {
+ var (
+ hash = common.NewRandomHash()
+ count = s.requiredVotes - 1
+ )
+ process := func(vs []types.Vote, until int) []types.Vote {
+ for i := range vs[:until] {
+ evts, err := s.cache.processVote(vs[i])
+ s.Require().NoError(err)
+ s.Require().Empty(evts)
+ }
+ return vs
+ }
+ // There are some pre-commit/fast votes unable to trigger any signal.
+ p00 := types.Position{Round: 1, Height: types.GenesisHeight + s.roundLength}
+ process(s.newVotes(types.VotePreCom, hash, p00, 1), count)
+ process(s.newVotes(types.VoteFast, hash, p00, 1), count)
+ // There are some commit/fast-commit votes unable to trigger any signal.
+ process(s.newVotes(types.VoteCom, hash, p00, 1), count)
+ process(s.newVotes(types.VoteFastCom, hash, p00, 1), count)
+ // There are some pre-commit/fast votes at later position unable to trigger
+ // any signal.
+ p01 := p00
+ p01.Height++
+ s.Require().True(p01.Round >= DKGDelayRound)
+ process(s.newVotes(types.VotePreCom, hash, p01, 3), count)
+ process(s.newVotes(types.VoteFast, hash, p01, 3), count)
+ // There are some commit/fast-commit votes at later position unable to
+ // trigger any signal.
+ process(s.newVotes(types.VoteCom, hash, p01, 1), count)
+ votesFC := process(s.newVotes(types.VoteFastCom, hash, p01, 1), count)
+ // There are some pre-commit/fast votes at the newest position unable to
+ // trigger any signal.
+ p02 := p01
+ p02.Height++
+ process(s.newVotes(types.VotePreCom, hash, p02, 1), count)
+ process(s.newVotes(types.VoteFast, hash, p02, 1), count)
+ // There are some commit/fast-commit votes at the newest position unable to
+ // trigger any signal.
+ process(s.newVotes(types.VoteCom, hash, p02, 1), count)
+ process(s.newVotes(types.VoteFastCom, hash, p02, 1), count)
+ // Check current snapshot: all votes are exists, no decide event triggered.
+ ss := s.cache.snapshot(time.Now())
+ r, _ := s.takeSubset(ss, types.Position{}, 0, 12*count)
+ s.Require().Nil(r)
+ // We receive some commit votes position that can trigger some decide event,
+ // then those votes should be purged.
+ evts, err := s.cache.processVote(votesFC[count])
+ s.Require().NoError(err)
+ s.Require().Len(evts, 1)
+ s.Require().Equal(evts[0].evtType, agreementEventDecide)
+ // All votes in older/equal position should be purged.
+ ss = s.cache.snapshot(time.Now().Add(time.Second))
+ r, votes := s.takeSubset(ss, types.Position{}, 0, 4*count)
+ s.Require().NotNil(r)
+ s.Require().Equal(r.Position, p01)
+ s.Require().NotEmpty(r.Randomness)
+ // All votes in later position should be kept.
+ s.Require().Equal(count, countVotes(votes, p02, 1, types.VotePreCom))
+ s.Require().Equal(count, countVotes(votes, p02, 1, types.VoteFast))
+ s.Require().Equal(count, countVotes(votes, p02, 1, types.VoteCom))
+ s.Require().Equal(count, countVotes(votes, p02, 1, types.VoteFastCom))
+}
+
+func (s *AgreementCacheTestSuite) TestPrugeByLockEvent() {
+ var (
+ hash = common.NewRandomHash()
+ count = s.requiredVotes - 1
+ )
+ process := func(vs []types.Vote, until int) []types.Vote {
+ for i := range vs[:until] {
+ evts, err := s.cache.processVote(vs[i])
+ s.Require().NoError(err)
+ s.Require().Empty(evts)
+ }
+ return vs
+ }
+ // There are some votes unable to trigger any signal.
+ p00 := types.Position{Round: 1, Height: types.GenesisHeight + s.roundLength}
+ process(s.newVotes(types.VoteFast, hash, p00, 1), count)
+ process(s.newVotes(types.VotePreCom, hash, p00, 1), count)
+ process(s.newVotes(types.VoteFastCom, hash, p00, 1), count)
+ process(s.newVotes(types.VoteCom, hash, p00, 1), count)
+ p01 := p00
+ p01.Height++
+ process(s.newVotes(types.VoteFast, hash, p01, 0), count)
+ votes := process(s.newVotes(types.VotePreCom, hash, p01, 1), count)
+ process(s.newVotes(types.VoteCom, hash, p01, 1), count)
+ process(s.newVotes(types.VotePreCom, hash, p01, 2), count)
+ ss := s.cache.snapshot(time.Now())
+ s.takeSubset(ss, types.Position{}, 0, 8*count)
+ // Receive some pre-commit votes position that can trigger locked event.
+ evts, err := s.cache.processVote(votes[count])
+ s.Require().NoError(err)
+ s.Require().Len(evts, 1)
+ s.Require().Equal(evts[0].evtType, agreementEventLock)
+ s.Require().Equal(evts[0].Position, votes[count].Position)
+ s.Require().Equal(evts[0].period, votes[count].Period)
+ ss = s.cache.snapshot(time.Now().Add(time.Second))
+ r, votes := s.takeSubset(ss, types.Position{}, 0, 5*count+1)
+ s.Require().Nil(r)
+ // Those pre-commit/fast votes in older position should be purged.
+ s.Require().Equal(0, countVotes(votes, p00, 1, types.VoteFast))
+ s.Require().Equal(0, countVotes(votes, p00, 1, types.VotePreCom))
+ // Those pre-commit/fast votes in older period should be purged.
+ s.Require().Equal(0, countVotes(votes, p01, 0, types.VoteFast))
+ // Those pre-commit/fast votes in newer period should be included.
+ s.Require().Equal(count, countVotes(votes, p01, 2, types.VotePreCom))
+ // Those votes triggering events should be included.
+ s.Require().Equal(count+1, countVotes(votes, p01, 1, types.VotePreCom))
+ // We shouldn't purge commit votes by locked event.
+ s.Require().Equal(count, countVotes(votes, p00, 1, types.VoteCom))
+ s.Require().Equal(count, countVotes(votes, p00, 1, types.VoteFastCom))
+ s.Require().Equal(count, countVotes(votes, p01, 1, types.VoteCom))
+}
+
+func (s *AgreementCacheTestSuite) TestPurgeByImpossibleToAgreeOnOneHash() {
+ var (
+ hash = common.NewRandomHash()
+ count = s.requiredVotes - 1
+ )
+ process := func(vs []types.Vote, until int) []types.Vote {
+ for i := range vs[:until] {
+ evts, err := s.cache.processVote(vs[i])
+ s.Require().NoError(err)
+ s.Require().Empty(evts)
+ }
+ return vs
+ }
+ // 2f votes for one hash.
+ p00 := types.Position{Round: 1, Height: types.GenesisHeight + s.roundLength}
+ process(s.newVotes(types.VotePreCom, hash, p00, 1), count)
+ s.takeSubset(s.cache.snapshot(time.Now()), types.Position{}, 0, count)
+ // f+1 votes for another hash.
+ hash = common.NewRandomHash()
+ otherVotes := s.newVotes(types.VotePreCom, hash, p00, 1)
+ process(otherVotes[count:], s.f+1)
+ s.takeSubset(
+ s.cache.snapshot(time.Now().Add(time.Second)), types.Position{}, 0, 0)
+}
+
+func (s *AgreementCacheTestSuite) TestIgnoredVotes() {
+ isIgnored := func(v *types.Vote, ssTime time.Time) {
+ evts, err := s.cache.processVote(*v)
+ s.Require().NoError(err)
+ s.Require().Empty(evts)
+ ss := s.cache.snapshot(ssTime)
+ _, votes := ss.get(types.Position{}, 0)
+ s.Require().Empty(votes)
+ }
+ // Make sure the initial state is expected.
+ _, found := s.cache.config(0)
+ s.Require().True(found)
+ c1, found := s.cache.config(1)
+ s.Require().True(found)
+ _, found = s.cache.config(2)
+ s.Require().False(found)
+ endH := types.GenesisHeight + s.roundLength*2
+ s.Require().True(c1.Contains(endH - 1))
+ s.Require().False(c1.Contains(endH))
+ // Votes from unknown height should be ignored.
+ h := common.NewRandomHash()
+ p := types.Position{Round: 1, Height: endH}
+ v := s.newVote(types.VoteCom, h, p, 1, s.signers[0])
+ _, err := s.cache.processVote(*v)
+ s.Require().EqualError(err, ErrUnknownHeight.Error())
+ p = types.Position{Round: 0, Height: 0}
+ v = s.newVote(types.VoteCom, h, p, 1, s.signers[0])
+ isIgnored(v, time.Now())
+ // Vote with type=init should be ignored.
+ p = types.Position{Round: 0, Height: types.GenesisHeight}
+ v = s.newVote(types.VoteInit, h, p, 1, s.signers[0])
+ isIgnored(v, time.Now().Add(time.Second))
+}
+
+func (s *AgreementCacheTestSuite) TestAgreementSnapshotVotesIndex() {
+ i0 := agreementSnapshotVotesIndex{
+ position: types.Position{Round: 0, Height: types.GenesisHeight},
+ period: 0}
+ i1 := agreementSnapshotVotesIndex{
+ position: types.Position{Round: 0, Height: types.GenesisHeight},
+ period: 1}
+ i2 := agreementSnapshotVotesIndex{
+ position: types.Position{Round: 0, Height: types.GenesisHeight + 1},
+ period: 0}
+ i3 := agreementSnapshotVotesIndex{
+ position: types.Position{Round: 1, Height: types.GenesisHeight},
+ period: 0}
+ i4 := agreementSnapshotVotesIndex{
+ position: types.Position{Round: 1, Height: types.GenesisHeight},
+ period: 2}
+ is := agreementSnapshotVotesIndexes{i0, i1, i2, i3, i4}
+ s.Require().True(sort.SliceIsSorted(is, func(i, j int) bool {
+ return !is[i].Newer(is[j])
+ }))
+ for i := range is[:len(is)-1] {
+ s.Require().True(is[i].Older(is[i+1]))
+ }
+ for i := range is[:len(is)-1] {
+ s.Require().True(is[i+1].Newer(is[i]))
+ }
+ for _, i := range is {
+ s.Require().False(i.Older(i))
+ s.Require().False(i.Newer(i))
+ }
+ for i := range is[:len(is)-1] {
+ iFound, found := is.nearestNewerIdx(is[i].position, is[i].period)
+ s.Require().True(found)
+ s.Require().Equal(iFound, is[i+1])
+ }
+ _, found := is.nearestNewerIdx(i4.position, i4.period)
+ s.Require().False(found)
+ _, found = is.nearestNewerOrEqualIdx(i4.position, i4.period)
+ s.Require().True(found)
+}
+
+func (s *AgreementCacheTestSuite) TestAgreementSnapshot() {
+ var (
+ h = common.NewRandomHash()
+ count = s.requiredVotes - 1
+ )
+ newVotes := func(t types.VoteType, h common.Hash, p types.Position,
+ period uint64) []types.Vote {
+ votes := s.newVotes(t, h, p, period)
+ return votes[:count]
+ }
+ ss := agreementSnapshot{}
+ // |<-pre-commit ->|<- commit ->|
+ // | P0,1 | P2,2 | P3,1 | P2,2 | P2,1 | P1,0 |
+ p0 := types.Position{Round: 0, Height: types.GenesisHeight}
+ p1 := types.Position{Round: 0, Height: types.GenesisHeight + 1}
+ p2 := types.Position{Round: 0, Height: types.GenesisHeight + 2}
+ p3 := types.Position{Round: 0, Height: types.GenesisHeight + 3}
+ p4 := types.Position{Round: 0, Height: types.GenesisHeight + 4}
+ ss.addPreCommitVotes(p0, 1, newVotes(types.VotePreCom, h, p0, 1))
+ ss.addPreCommitVotes(p2, 2, newVotes(types.VoteFast, h, p1, 2))
+ ss.addPreCommitVotes(p3, 1, newVotes(types.VoteFast, h, p3, 1))
+ // Add commit/fast-common votes in reversed order.
+ ss.markBoundary()
+ ss.addCommitVotes(p2, 2, newVotes(types.VoteCom, h, p2, 2))
+ ss.addCommitVotes(p2, 1, newVotes(types.VoteFastCom, h, p2, 1))
+ ss.addCommitVotes(p1, 0, newVotes(types.VoteCom, h, p1, 0))
+ // Get with a very new position, should return empty votes.
+ r, votes := ss.get(types.Position{Round: 0, Height: 10}, 0)
+ s.Require().Nil(r)
+ s.Require().Empty(votes)
+ // Get with a very old position, should return all votes.
+ _, votes = ss.get(types.Position{}, 0)
+ s.Require().Nil(r)
+ s.Require().Len(votes, 6*count)
+ // Only the newest pre-commit votes returns.
+ _, votes = ss.get(p3, 0)
+ s.Require().Len(votes, count)
+ s.Require().Equal(count, countVotes(votes, p3, 1, types.VoteFast))
+ // pre-commit votes in the same position and period is ignored, commit votes
+ // in the same position and period is included.
+ _, votes = ss.get(p2, 2)
+ s.Require().Len(votes, 3*count)
+ s.Require().Equal(count, countVotes(votes, p3, 1, types.VoteFast))
+ s.Require().Equal(count, countVotes(votes, p2, 1, types.VoteFastCom))
+ s.Require().Equal(count, countVotes(votes, p2, 2, types.VoteCom))
+ // Only the newest commit votes is included.
+ ss = agreementSnapshot{}
+ ss.addPreCommitVotes(p0, 1, newVotes(types.VotePreCom, h, p0, 1))
+ ss.markBoundary()
+ ss.addCommitVotes(p4, 1, newVotes(types.VoteFastCom, h, p4, 1))
+ _, votes = ss.get(p4, 10)
+ s.Require().Len(votes, count)
+ s.Require().Equal(count, countVotes(votes, p4, 1, types.VoteFastCom))
+}
+
+func (s *AgreementCacheTestSuite) TestRandomly() {
+ var (
+ positionCount uint64 = 300
+ periodCount uint64 = 5
+ iteration = 3
+ )
+ lastPosition, vs, bs, rs := s.generateTestData(positionCount, periodCount)
+ s.Require().NotEmpty(vs)
+ s.Require().NotEmpty(bs)
+ s.Require().NotEmpty(rs)
+ // Requirements in this scenario:
+ // - no panic, error should be raised.
+ // - the latest decide event should be identical the last position.
+ // - events should be incremental by type.
+ var lastEvts = make([]*agreementEvent, maxAgreementEventType)
+ chk := func(evts []*agreementEvent, err error) {
+ s.Require().NoError(err)
+ for _, e := range evts {
+ last := lastEvts[e.evtType]
+ if last != nil {
+ s.Require().True(e.Position.Newer(last.Position))
+ }
+ lastEvts[e.evtType] = e
+ }
+ }
+ randObj := rand.New(rand.NewSource(time.Now().UnixNano()))
+ for i := 0; i < iteration; i++ {
+ // Shuffle those slices.
+ randObj.Shuffle(len(vs), func(i, j int) { vs[i], vs[i] = vs[j], vs[i] })
+ randObj.Shuffle(len(bs), func(i, j int) { bs[i], bs[j] = bs[j], bs[i] })
+ randObj.Shuffle(len(rs), func(i, j int) { rs[i], rs[j] = rs[j], rs[i] })
+ for i := range lastEvts {
+ lastEvts[i] = nil
+ }
+ s.newCache(s.newRoundEvents(positionCount/s.roundLength - 1))
+ var vIdx, bIdx, rIdx int
+ for {
+ switch randObj.Int() % 3 {
+ case 0:
+ if vIdx >= len(vs) {
+ break
+ }
+ chk(s.cache.processVote(vs[vIdx]))
+ vIdx++
+ case 1:
+ if bIdx >= len(bs) {
+ break
+ }
+ chk(s.cache.processFinalizedBlock(bs[bIdx]))
+ bIdx++
+ case 2:
+ if rIdx >= len(rs) {
+ break
+ }
+ chk(s.cache.processAgreementResult(rs[rIdx]))
+ rIdx++
+ }
+ if vIdx >= len(vs) && bIdx >= len(bs) && rIdx >= len(rs) {
+ // Make sure we reach agreement on the last position.
+ lastDecide := lastEvts[agreementEventDecide]
+ s.Require().NotNil(lastDecide)
+ s.Require().Equal(lastDecide.Position, lastPosition)
+ break
+ }
+ }
+ }
+}
+
+func TestAgreementCache(t *testing.T) {
+ suite.Run(t, new(AgreementCacheTestSuite))
+}