diff options
Diffstat (limited to 'vendor/github.com/tangerine-network/tangerine-consensus/core/agreement.go')
-rw-r--r-- | vendor/github.com/tangerine-network/tangerine-consensus/core/agreement.go | 797 |
1 files changed, 797 insertions, 0 deletions
diff --git a/vendor/github.com/tangerine-network/tangerine-consensus/core/agreement.go b/vendor/github.com/tangerine-network/tangerine-consensus/core/agreement.go new file mode 100644 index 000000000..62d19670c --- /dev/null +++ b/vendor/github.com/tangerine-network/tangerine-consensus/core/agreement.go @@ -0,0 +1,797 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + +package core + +import ( + "fmt" + "math" + "sync" + "sync/atomic" + "time" + + "github.com/tangerine-network/tangerine-consensus/common" + "github.com/tangerine-network/tangerine-consensus/core/types" + "github.com/tangerine-network/tangerine-consensus/core/utils" +) + +// closedchan is a reusable closed channel. +var closedchan = make(chan struct{}) + +func init() { + close(closedchan) +} + +// Errors for agreement module. +var ( + ErrInvalidVote = fmt.Errorf("invalid vote") + ErrNotInNotarySet = fmt.Errorf("not in notary set") + ErrIncorrectVoteSignature = fmt.Errorf("incorrect vote signature") + ErrIncorrectVotePartialSignature = fmt.Errorf("incorrect vote psig") + ErrMismatchBlockPosition = fmt.Errorf("mismatch block position") +) + +// ErrFork for fork error in agreement. +type ErrFork struct { + nID types.NodeID + old, new common.Hash +} + +func (e *ErrFork) Error() string { + return fmt.Sprintf("fork is found for %s, old %s, new %s", + e.nID.String(), e.old, e.new) +} + +// ErrForkVote for fork vote error in agreement. +type ErrForkVote struct { + nID types.NodeID + old, new *types.Vote +} + +func (e *ErrForkVote) Error() string { + return fmt.Sprintf("fork vote is found for %s, old %s, new %s", + e.nID.String(), e.old, e.new) +} + +func newVoteListMap() []map[types.NodeID]*types.Vote { + listMap := make([]map[types.NodeID]*types.Vote, types.MaxVoteType) + for idx := range listMap { + listMap[idx] = make(map[types.NodeID]*types.Vote) + } + return listMap +} + +// agreementReceiver is the interface receiving agreement event. +type agreementReceiver interface { + ProposeVote(vote *types.Vote) + ProposeBlock() common.Hash + // ConfirmBlock is called with lock hold. User can safely use all data within + // agreement module. + ConfirmBlock(common.Hash, map[types.NodeID]*types.Vote) + PullBlocks(common.Hashes) + ReportForkVote(v1, v2 *types.Vote) + ReportForkBlock(b1, b2 *types.Block) + VerifyPartialSignature(vote *types.Vote) (bool, bool) +} + +type pendingBlock struct { + block *types.Block + receivedTime time.Time +} + +type pendingVote struct { + vote *types.Vote + receivedTime time.Time +} + +// agreementData is the data for agreementState. +type agreementData struct { + recv agreementReceiver + + ID types.NodeID + isLeader bool + leader *leaderSelector + lockValue common.Hash + lockIter uint64 + period uint64 + requiredVote int + votes map[uint64][]map[types.NodeID]*types.Vote + lock sync.RWMutex + blocks map[types.NodeID]*types.Block + blocksLock sync.Mutex +} + +// agreement is the agreement protocal describe in the Crypto Shuffle Algorithm. +type agreement struct { + state agreementState + data *agreementData + aID *atomic.Value + doneChan chan struct{} + notarySet map[types.NodeID]struct{} + hasVoteFast bool + hasOutput bool + lock sync.RWMutex + pendingBlock []pendingBlock + pendingVote []pendingVote + pendingAgreementResult map[types.Position]*types.AgreementResult + candidateBlock map[common.Hash]*types.Block + fastForward chan uint64 + signer *utils.Signer + logger common.Logger +} + +// newAgreement creates a agreement instance. +func newAgreement( + ID types.NodeID, + recv agreementReceiver, + leader *leaderSelector, + signer *utils.Signer, + logger common.Logger) *agreement { + agreement := &agreement{ + data: &agreementData{ + recv: recv, + ID: ID, + leader: leader, + }, + aID: &atomic.Value{}, + pendingAgreementResult: make(map[types.Position]*types.AgreementResult), + candidateBlock: make(map[common.Hash]*types.Block), + fastForward: make(chan uint64, 1), + signer: signer, + logger: logger, + } + agreement.stop() + return agreement +} + +// restart the agreement +func (a *agreement) restart( + notarySet map[types.NodeID]struct{}, + threshold int, aID types.Position, leader types.NodeID, + crs common.Hash) { + if !func() bool { + a.lock.Lock() + defer a.lock.Unlock() + if !isStop(aID) { + oldAID := a.agreementID() + if !isStop(oldAID) && !aID.Newer(oldAID) { + return false + } + } + a.logger.Debug("Restarting BA", + "notarySet", notarySet, "position", aID, "leader", leader) + a.data.lock.Lock() + defer a.data.lock.Unlock() + a.data.blocksLock.Lock() + defer a.data.blocksLock.Unlock() + a.data.votes = make(map[uint64][]map[types.NodeID]*types.Vote) + a.data.votes[1] = newVoteListMap() + a.data.period = 2 + a.data.blocks = make(map[types.NodeID]*types.Block) + a.data.requiredVote = threshold + a.data.leader.restart(crs) + a.data.lockValue = types.SkipBlockHash + a.data.lockIter = 0 + a.data.isLeader = a.data.ID == leader + if a.doneChan != nil { + close(a.doneChan) + } + a.doneChan = make(chan struct{}) + a.fastForward = make(chan uint64, 1) + a.hasVoteFast = false + a.hasOutput = false + a.state = newFastState(a.data) + a.notarySet = notarySet + a.candidateBlock = make(map[common.Hash]*types.Block) + a.aID.Store(struct { + pos types.Position + leader types.NodeID + }{aID, leader}) + return true + }() { + return + } + + if isStop(aID) { + return + } + + var result *types.AgreementResult + func() { + a.lock.Lock() + defer a.lock.Unlock() + newPendingAgreementResult := make( + map[types.Position]*types.AgreementResult) + for pos, agr := range a.pendingAgreementResult { + if pos.Newer(aID) { + newPendingAgreementResult[pos] = agr + } else if pos == aID { + result = agr + } + } + a.pendingAgreementResult = newPendingAgreementResult + }() + + expireTime := time.Now().Add(-10 * time.Second) + replayBlock := make([]*types.Block, 0) + func() { + a.lock.Lock() + defer a.lock.Unlock() + newPendingBlock := make([]pendingBlock, 0) + for _, pending := range a.pendingBlock { + if aID.Newer(pending.block.Position) { + continue + } else if pending.block.Position == aID { + if result == nil || + result.Position.Round < DKGDelayRound || + result.BlockHash == pending.block.Hash { + replayBlock = append(replayBlock, pending.block) + } + } else if pending.receivedTime.After(expireTime) { + newPendingBlock = append(newPendingBlock, pending) + } + } + a.pendingBlock = newPendingBlock + }() + + replayVote := make([]*types.Vote, 0) + func() { + a.lock.Lock() + defer a.lock.Unlock() + newPendingVote := make([]pendingVote, 0) + for _, pending := range a.pendingVote { + if aID.Newer(pending.vote.Position) { + continue + } else if pending.vote.Position == aID { + if result == nil || result.Position.Round < DKGDelayRound { + replayVote = append(replayVote, pending.vote) + } + } else if pending.receivedTime.After(expireTime) { + newPendingVote = append(newPendingVote, pending) + } + } + a.pendingVote = newPendingVote + }() + + for _, block := range replayBlock { + if err := a.processBlock(block); err != nil { + a.logger.Error("Failed to process block when restarting agreement", + "block", block) + } + } + + if result != nil { + if err := a.processAgreementResult(result); err != nil { + a.logger.Error("Failed to process agreement result when retarting", + "result", result) + } + } + + for _, vote := range replayVote { + if err := a.processVote(vote); err != nil { + a.logger.Error("Failed to process vote when restarting agreement", + "vote", vote) + } + } +} + +func (a *agreement) stop() { + a.restart(make(map[types.NodeID]struct{}), int(math.MaxInt32), + types.Position{ + Height: math.MaxUint64, + }, + types.NodeID{}, common.Hash{}) +} + +func isStop(aID types.Position) bool { + return aID.Height == math.MaxUint64 +} + +// clocks returns how many time this state is required. +func (a *agreement) clocks() int { + a.data.lock.RLock() + defer a.data.lock.RUnlock() + scale := int(a.data.period) - 1 + if a.state.state() == stateForward { + scale = 1 + } + if scale < 1 { + // just in case. + scale = 1 + } + // 10 is a magic number derived from many years of experience. + if scale > 10 { + scale = 10 + } + return a.state.clocks() * scale +} + +// pullVotes returns if current agreement requires more votes to continue. +func (a *agreement) pullVotes() bool { + a.data.lock.RLock() + defer a.data.lock.RUnlock() + return a.state.state() == statePullVote || + a.state.state() == stateInitial || + (a.state.state() == statePreCommit && (a.data.period%3) == 0) +} + +// agreementID returns the current agreementID. +func (a *agreement) agreementID() types.Position { + return a.aID.Load().(struct { + pos types.Position + leader types.NodeID + }).pos +} + +// leader returns the current leader. +func (a *agreement) leader() types.NodeID { + return a.aID.Load().(struct { + pos types.Position + leader types.NodeID + }).leader +} + +// nextState is called at the specific clock time. +func (a *agreement) nextState() (err error) { + a.lock.Lock() + defer a.lock.Unlock() + if a.hasOutput { + a.state = newSleepState(a.data) + return + } + a.state, err = a.state.nextState() + return +} + +func (a *agreement) sanityCheck(vote *types.Vote) error { + if vote.Type >= types.MaxVoteType { + return ErrInvalidVote + } + ok, err := utils.VerifyVoteSignature(vote) + if err != nil { + return err + } + if !ok { + return ErrIncorrectVoteSignature + } + if vote.Position.Round != a.agreementID().Round { + // TODO(jimmy): maybe we can verify partial signature at agreement-mgr. + return nil + } + if ok, report := a.data.recv.VerifyPartialSignature(vote); !ok { + if report { + return ErrIncorrectVotePartialSignature + } + return ErrSkipButNoError + } + return nil +} + +func (a *agreement) checkForkVote(vote *types.Vote) ( + alreadyExist bool, err error) { + a.data.lock.RLock() + defer a.data.lock.RUnlock() + if votes, exist := a.data.votes[vote.Period]; exist { + if oldVote, exist := votes[vote.Type][vote.ProposerID]; exist { + alreadyExist = true + if vote.BlockHash != oldVote.BlockHash { + a.data.recv.ReportForkVote(oldVote, vote) + err = &ErrForkVote{vote.ProposerID, oldVote, vote} + return + } + } + } + return +} + +// prepareVote prepares a vote. +func (a *agreement) prepareVote(vote *types.Vote) (err error) { + vote.Position = a.agreementID() + err = a.signer.SignVote(vote) + return +} + +func (a *agreement) updateFilter(filter *utils.VoteFilter) { + if isStop(a.agreementID()) { + return + } + a.lock.RLock() + defer a.lock.RUnlock() + a.data.lock.RLock() + defer a.data.lock.RUnlock() + filter.Confirm = a.hasOutput + filter.LockIter = a.data.lockIter + filter.Period = a.data.period + filter.Position.Height = a.agreementID().Height +} + +// processVote is the entry point for processing Vote. +func (a *agreement) processVote(vote *types.Vote) error { + a.lock.Lock() + defer a.lock.Unlock() + if err := a.sanityCheck(vote); err != nil { + return err + } + aID := a.agreementID() + + // Agreement module has stopped. + if isStop(aID) { + // Hacky way to not drop first votes when round just begins. + if vote.Position.Round == aID.Round { + a.pendingVote = append(a.pendingVote, pendingVote{ + vote: vote, + receivedTime: time.Now().UTC(), + }) + return nil + } + return ErrSkipButNoError + } + if vote.Position != aID { + if aID.Newer(vote.Position) { + return nil + } + a.pendingVote = append(a.pendingVote, pendingVote{ + vote: vote, + receivedTime: time.Now().UTC(), + }) + return nil + } + exist, err := a.checkForkVote(vote) + if err != nil { + return err + } + if exist { + return nil + } + + a.data.lock.Lock() + defer a.data.lock.Unlock() + if _, exist := a.data.votes[vote.Period]; !exist { + a.data.votes[vote.Period] = newVoteListMap() + } + if _, exist := a.data.votes[vote.Period][vote.Type][vote.ProposerID]; exist { + return nil + } + a.data.votes[vote.Period][vote.Type][vote.ProposerID] = vote + if !a.hasOutput && + (vote.Type == types.VoteCom || + vote.Type == types.VoteFast || + vote.Type == types.VoteFastCom) { + if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok && + hash != types.SkipBlockHash { + if vote.Type == types.VoteFast { + if !a.hasVoteFast { + if a.state.state() == stateFast || + a.state.state() == stateFastVote { + a.data.recv.ProposeVote( + types.NewVote(types.VoteFastCom, hash, vote.Period)) + a.hasVoteFast = true + + } + if a.data.lockIter == 0 { + a.data.lockValue = hash + a.data.lockIter = 1 + } + } + } else { + a.hasOutput = true + a.data.recv.ConfirmBlock(hash, + a.data.votes[vote.Period][vote.Type]) + if a.doneChan != nil { + close(a.doneChan) + a.doneChan = nil + } + } + return nil + } + } else if a.hasOutput { + return nil + } + + // Check if the agreement requires fast-forwarding. + if len(a.fastForward) > 0 { + return nil + } + if vote.Type == types.VotePreCom { + if vote.Period < a.data.lockIter { + // This PreCom is useless for us. + return nil + } + if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok && + hash != types.SkipBlockHash { + // Condition 1. + if vote.Period > a.data.lockIter { + a.data.lockValue = hash + a.data.lockIter = vote.Period + } + // Condition 2. + if vote.Period > a.data.period { + a.fastForward <- vote.Period + if a.doneChan != nil { + close(a.doneChan) + a.doneChan = nil + } + return nil + } + } + } + // Condition 3. + if vote.Type == types.VoteCom && vote.Period >= a.data.period && + len(a.data.votes[vote.Period][types.VoteCom]) >= a.data.requiredVote { + hashes := common.Hashes{} + addPullBlocks := func(voteType types.VoteType) { + for _, vote := range a.data.votes[vote.Period][voteType] { + if vote.BlockHash == types.NullBlockHash || + vote.BlockHash == types.SkipBlockHash { + continue + } + if _, found := a.findCandidateBlockNoLock(vote.BlockHash); !found { + hashes = append(hashes, vote.BlockHash) + } + } + } + addPullBlocks(types.VotePreCom) + addPullBlocks(types.VoteCom) + if len(hashes) > 0 { + a.data.recv.PullBlocks(hashes) + } + a.fastForward <- vote.Period + 1 + if a.doneChan != nil { + close(a.doneChan) + a.doneChan = nil + } + return nil + } + return nil +} + +func (a *agreement) processFinalizedBlock(block *types.Block) { + a.lock.Lock() + defer a.lock.Unlock() + if a.hasOutput { + return + } + aID := a.agreementID() + if aID.Older(block.Position) { + return + } + a.addCandidateBlockNoLock(block) + a.hasOutput = true + a.data.lock.Lock() + defer a.data.lock.Unlock() + a.data.recv.ConfirmBlock(block.Hash, nil) + if a.doneChan != nil { + close(a.doneChan) + a.doneChan = nil + } +} + +func (a *agreement) processAgreementResult(result *types.AgreementResult) error { + a.lock.Lock() + defer a.lock.Unlock() + aID := a.agreementID() + if result.Position.Older(aID) { + return nil + } else if result.Position.Newer(aID) { + a.pendingAgreementResult[result.Position] = result + return nil + } + if a.hasOutput { + return nil + } + a.data.lock.Lock() + defer a.data.lock.Unlock() + if _, exist := a.findCandidateBlockNoLock(result.BlockHash); !exist { + a.data.recv.PullBlocks(common.Hashes{result.BlockHash}) + } + a.hasOutput = true + a.data.recv.ConfirmBlock(result.BlockHash, nil) + if a.doneChan != nil { + close(a.doneChan) + a.doneChan = nil + } + return nil +} + +func (a *agreement) done() <-chan struct{} { + a.lock.Lock() + defer a.lock.Unlock() + select { + case period := <-a.fastForward: + a.data.lock.Lock() + defer a.data.lock.Unlock() + if period <= a.data.period { + break + } + a.data.setPeriod(period) + a.state = newPreCommitState(a.data) + a.doneChan = make(chan struct{}) + return closedchan + default: + } + if a.doneChan == nil { + return closedchan + } + return a.doneChan +} + +func (a *agreement) confirmed() bool { + a.lock.RLock() + defer a.lock.RUnlock() + return a.confirmedNoLock() +} + +func (a *agreement) confirmedNoLock() bool { + return a.hasOutput +} + +// processBlock is the entry point for processing Block. +func (a *agreement) processBlock(block *types.Block) error { + checkSkip := func() bool { + aID := a.agreementID() + if block.Position != aID { + // Agreement module has stopped. + if !isStop(aID) { + if aID.Newer(block.Position) { + return true + } + } + } + return false + } + if checkSkip() { + return nil + } + if err := utils.VerifyBlockSignature(block); err != nil { + return err + } + + a.lock.Lock() + defer a.lock.Unlock() + a.data.blocksLock.Lock() + defer a.data.blocksLock.Unlock() + aID := a.agreementID() + // a.agreementID might change during lock, so we need to checkSkip again. + if checkSkip() { + return nil + } else if aID != block.Position { + a.pendingBlock = append(a.pendingBlock, pendingBlock{ + block: block, + receivedTime: time.Now().UTC(), + }) + return nil + } else if a.confirmedNoLock() { + return nil + } + if b, exist := a.data.blocks[block.ProposerID]; exist { + if b.Hash != block.Hash { + a.data.recv.ReportForkBlock(b, block) + return &ErrFork{block.ProposerID, b.Hash, block.Hash} + } + return nil + } + if err := a.data.leader.processBlock(block); err != nil { + return err + } + a.data.blocks[block.ProposerID] = block + a.addCandidateBlockNoLock(block) + if block.ProposerID != a.data.ID && + (a.state.state() == stateFast || a.state.state() == stateFastVote) && + block.ProposerID == a.leader() { + go func() { + for func() bool { + if aID != a.agreementID() { + return false + } + a.lock.RLock() + defer a.lock.RUnlock() + if a.state.state() != stateFast && a.state.state() != stateFastVote { + return false + } + a.data.lock.RLock() + defer a.data.lock.RUnlock() + a.data.blocksLock.Lock() + defer a.data.blocksLock.Unlock() + block, exist := a.data.blocks[a.leader()] + if !exist { + return true + } + ok, err := a.data.leader.validLeader(block, a.data.leader.hashCRS) + if err != nil { + fmt.Println("Error checking validLeader for Fast BA", + "error", err, "block", block) + return false + } + if ok { + a.data.recv.ProposeVote( + types.NewVote(types.VoteFast, block.Hash, a.data.period)) + return false + } + return true + }() { + // TODO(jimmy): retry interval should be related to configurations. + time.Sleep(250 * time.Millisecond) + } + }() + } + return nil +} + +func (a *agreement) addCandidateBlock(block *types.Block) { + a.lock.Lock() + defer a.lock.Unlock() + a.addCandidateBlockNoLock(block) +} + +func (a *agreement) addCandidateBlockNoLock(block *types.Block) { + a.candidateBlock[block.Hash] = block +} + +func (a *agreement) findCandidateBlockNoLock( + hash common.Hash) (*types.Block, bool) { + b, e := a.candidateBlock[hash] + return b, e +} + +// find a block in both candidate blocks and pending blocks in leader-selector. +// A block might be confirmed by others while we can't verify its validity. +func (a *agreement) findBlockNoLock(hash common.Hash) (*types.Block, bool) { + b, e := a.findCandidateBlockNoLock(hash) + if !e { + b, e = a.data.leader.findPendingBlock(hash) + } + return b, e +} + +func (a *agreementData) countVote(period uint64, voteType types.VoteType) ( + blockHash common.Hash, ok bool) { + a.lock.RLock() + defer a.lock.RUnlock() + return a.countVoteNoLock(period, voteType) +} + +func (a *agreementData) countVoteNoLock( + period uint64, voteType types.VoteType) (blockHash common.Hash, ok bool) { + votes, exist := a.votes[period] + if !exist { + return + } + candidate := make(map[common.Hash]int) + for _, vote := range votes[voteType] { + if _, exist := candidate[vote.BlockHash]; !exist { + candidate[vote.BlockHash] = 0 + } + candidate[vote.BlockHash]++ + } + for candidateHash, votes := range candidate { + if votes >= a.requiredVote { + blockHash = candidateHash + ok = true + return + } + } + return +} + +func (a *agreementData) setPeriod(period uint64) { + for i := a.period + 1; i <= period; i++ { + if _, exist := a.votes[i]; !exist { + a.votes[i] = newVoteListMap() + } + } + a.period = period +} |