diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-01-24 11:49:25 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-01-24 11:49:25 +0800 |
commit | b6ca251bcb6e1a19a7276afe68bf37a4372670fa (patch) | |
tree | b1aac8e9759f150d58009020cd92b579e766895c | |
parent | 779f63a9f6fc3f4c628f0b97c822546ac51d0eb6 (diff) | |
download | dexon-consensus-b6ca251bcb6e1a19a7276afe68bf37a4372670fa.tar.gz dexon-consensus-b6ca251bcb6e1a19a7276afe68bf37a4372670fa.tar.zst dexon-consensus-b6ca251bcb6e1a19a7276afe68bf37a4372670fa.zip |
core: Add vote filter (#430)
* core: ignore usless vote
* core: export SkipBlockHash and NullBlockHash
* core: add VoteFilter
* Add test
* New VoteFilter for each round
-rw-r--r-- | core/agreement-mgr.go | 15 | ||||
-rw-r--r-- | core/agreement-state.go | 18 | ||||
-rw-r--r-- | core/agreement-state_test.go | 8 | ||||
-rw-r--r-- | core/agreement.go | 28 | ||||
-rw-r--r-- | core/agreement_test.go | 8 | ||||
-rw-r--r-- | core/consensus.go | 2 | ||||
-rw-r--r-- | core/types/vote.go | 12 | ||||
-rw-r--r-- | core/utils/vote-filter.go | 61 | ||||
-rw-r--r-- | core/utils/vote-filter_test.go | 92 |
9 files changed, 214 insertions, 30 deletions
diff --git a/core/agreement-mgr.go b/core/agreement-mgr.go index 7410977..a8fab7c 100644 --- a/core/agreement-mgr.go +++ b/core/agreement-mgr.go @@ -94,6 +94,7 @@ type agreementMgr struct { initRound uint64 configs []*agreementMgrConfig baModules []*agreement + voteFilters []*utils.VoteFilter waitGroup sync.WaitGroup pendingVotes map[uint64][]*types.Vote pendingBlocks map[uint64][]*types.Block @@ -201,6 +202,7 @@ func (mgr *agreementMgr) appendConfig( // Hacky way to make agreement module self contained. recv.agreementModule = agrModule mgr.baModules = append(mgr.baModules, agrModule) + mgr.voteFilters = append(mgr.voteFilters, utils.NewVoteFilter()) if mgr.isRunning { mgr.waitGroup.Add(1) go func(idx uint32) { @@ -213,7 +215,6 @@ func (mgr *agreementMgr) appendConfig( } func (mgr *agreementMgr) processVote(v *types.Vote) error { - v = v.Clone() mgr.lock.RLock() defer mgr.lock.RUnlock() if v.Position.ChainID >= uint32(len(mgr.baModules)) { @@ -224,7 +225,16 @@ func (mgr *agreementMgr) processVote(v *types.Vote) error { "initRound", mgr.initRound) return utils.ErrInvalidChainID } - return mgr.baModules[v.Position.ChainID].processVote(v) + filter := mgr.voteFilters[v.Position.ChainID] + if filter.Filter(v) { + return nil + } + v = v.Clone() + err := mgr.baModules[v.Position.ChainID].processVote(v) + if err == nil { + mgr.baModules[v.Position.ChainID].updateFilter(filter) + } + return err } func (mgr *agreementMgr) processBlock(b *types.Block) error { @@ -423,6 +433,7 @@ Loop: Round: setting.recv.round(), ChainID: math.MaxUint32, } + mgr.voteFilters[chainID] = utils.NewVoteFilter() if err := mgr.baRoutineForOneRound(&setting); err != nil { mgr.logger.Error("BA routine failed", "error", err, diff --git a/core/agreement-state.go b/core/agreement-state.go index 5b2ce52..73d7b7a 100644 --- a/core/agreement-state.go +++ b/core/agreement-state.go @@ -20,7 +20,6 @@ package core import ( "fmt" - "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/types" ) @@ -45,15 +44,6 @@ const ( stateSleep ) -var nullBlockHash common.Hash -var skipBlockHash common.Hash - -func init() { - for idx := range skipBlockHash { - skipBlockHash[idx] = 0xff - } -} - type agreementState interface { state() agreementStateType nextState() (agreementState, error) @@ -78,7 +68,7 @@ func (s *fastState) nextState() (agreementState, error) { return s.a.isLeader }() { hash := s.a.recv.ProposeBlock() - if hash != nullBlockHash { + if hash != types.NullBlockHash { s.a.lock.Lock() defer s.a.lock.Unlock() s.a.recv.ProposeVote(types.NewVote(types.VoteFast, hash, s.a.period)) @@ -143,7 +133,7 @@ func (s *preCommitState) nextState() (agreementState, error) { s.a.lock.RLock() defer s.a.lock.RUnlock() hash := s.a.lockValue - if hash == nullBlockHash { + if hash == types.NullBlockHash { hash = s.a.leader.leaderBlockHash() } s.a.recv.ProposeVote(types.NewVote(types.VotePreCom, hash, s.a.period)) @@ -165,13 +155,13 @@ func (s *commitState) nextState() (agreementState, error) { s.a.lock.Lock() defer s.a.lock.Unlock() hash, ok := s.a.countVoteNoLock(s.a.period, types.VotePreCom) - if ok && hash != skipBlockHash { + if ok && hash != types.SkipBlockHash { if s.a.period > s.a.lockIter { s.a.lockValue = hash s.a.lockIter = s.a.period } } else { - hash = skipBlockHash + hash = types.SkipBlockHash } s.a.recv.ProposeVote(types.NewVote(types.VoteCom, hash, s.a.period)) return newForwardState(s.a), nil diff --git a/core/agreement-state_test.go b/core/agreement-state_test.go index 557193d..1b7d41b 100644 --- a/core/agreement-state_test.go +++ b/core/agreement-state_test.go @@ -210,7 +210,7 @@ func (s *AgreementStateTestSuite) TestPreCommitState() { } // If lockvalue == null, propose preCom-vote for the leader block. - a.data.lockValue = nullBlockHash + a.data.lockValue = types.NullBlockHash a.data.period = 1 newState, err := state.nextState() s.Require().NoError(err) @@ -265,13 +265,13 @@ func (s *AgreementStateTestSuite) TestCommitState() { s.Require().Len(s.voteChan, 1) vote = <-s.voteChan s.Equal(types.VoteCom, vote.Type) - s.Equal(skipBlockHash, vote.BlockHash) + s.Equal(types.SkipBlockHash, vote.BlockHash) s.Equal(stateForward, newState.state()) // If there are 2f+1 preCom-votes for SKIP, it's same as the 'else' condition. a.data.period = 3 for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VotePreCom, skipBlockHash, 3) + vote := s.prepareVote(nID, types.VotePreCom, types.SkipBlockHash, 3) s.Require().NoError(a.processVote(vote)) } newState, err = state.nextState() @@ -279,7 +279,7 @@ func (s *AgreementStateTestSuite) TestCommitState() { s.Require().Len(s.voteChan, 1) vote = <-s.voteChan s.Equal(types.VoteCom, vote.Type) - s.Equal(skipBlockHash, vote.BlockHash) + s.Equal(types.SkipBlockHash, vote.BlockHash) s.Equal(stateForward, newState.state()) } diff --git a/core/agreement.go b/core/agreement.go index ebb9b02..c08518a 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -176,7 +176,7 @@ func (a *agreement) restart( a.data.blocks = make(map[types.NodeID]*types.Block) a.data.requiredVote = len(notarySet)/3*2 + 1 a.data.leader.restart(crs) - a.data.lockValue = nullBlockHash + a.data.lockValue = types.NullBlockHash a.data.lockIter = 0 a.data.isLeader = a.data.ID == leader if a.doneChan != nil { @@ -352,6 +352,17 @@ func (a *agreement) prepareVote(vote *types.Vote) (err error) { return } +func (a *agreement) updateFilter(filter *utils.VoteFilter) { + a.lock.RLock() + defer a.lock.RUnlock() + a.data.lock.RLock() + defer a.data.lock.RUnlock() + filter.Confirm = a.hasOutput + filter.LockIter = a.data.lockIter + filter.Period = a.data.period + filter.Height = a.agreementID().Height +} + // processVote is the entry point for processing Vote. func (a *agreement) processVote(vote *types.Vote) error { a.lock.Lock() @@ -394,13 +405,16 @@ func (a *agreement) processVote(vote *types.Vote) error { if _, exist := a.data.votes[vote.Period]; !exist { a.data.votes[vote.Period] = newVoteListMap() } + if _, exist := a.data.votes[vote.Period][vote.Type][vote.ProposerID]; exist { + return nil + } a.data.votes[vote.Period][vote.Type][vote.ProposerID] = vote if !a.hasOutput && (vote.Type == types.VoteCom || vote.Type == types.VoteFast || vote.Type == types.VoteFastCom) { if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok && - hash != skipBlockHash { + hash != types.SkipBlockHash { if vote.Type == types.VoteFast { if !a.hasVoteFast { a.data.recv.ProposeVote( @@ -427,8 +441,12 @@ func (a *agreement) processVote(vote *types.Vote) error { return nil } if vote.Type == types.VotePreCom { + if vote.Period < a.data.lockIter { + // This PreCom is useless for us. + return nil + } if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok && - hash != skipBlockHash { + hash != types.SkipBlockHash { // Condition 1. if a.data.period >= vote.Period && vote.Period > a.data.lockIter && vote.BlockHash != a.data.lockValue { @@ -453,7 +471,8 @@ func (a *agreement) processVote(vote *types.Vote) error { hashes := common.Hashes{} addPullBlocks := func(voteType types.VoteType) { for _, vote := range a.data.votes[vote.Period][voteType] { - if vote.BlockHash == nullBlockHash || vote.BlockHash == skipBlockHash { + if vote.BlockHash == types.NullBlockHash || + vote.BlockHash == types.SkipBlockHash { continue } if _, found := a.findCandidateBlockNoLock(vote.BlockHash); !found { @@ -461,7 +480,6 @@ func (a *agreement) processVote(vote *types.Vote) error { } } } - addPullBlocks(types.VoteInit) addPullBlocks(types.VotePreCom) addPullBlocks(types.VoteCom) if len(hashes) > 0 { diff --git a/core/agreement_test.go b/core/agreement_test.go index 6b74255..4dbbe6e 100644 --- a/core/agreement_test.go +++ b/core/agreement_test.go @@ -370,9 +370,9 @@ func (s *AgreementTestSuite) TestFastForwardCond1() { // No fast forward if vote.BlockHash == SKIP a.data.lockIter = 6 a.data.period = 8 - a.data.lockValue = nullBlockHash + a.data.lockValue = types.NullBlockHash for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VotePreCom, skipBlockHash, uint64(7)) + vote := s.prepareVote(nID, types.VotePreCom, types.SkipBlockHash, uint64(7)) s.Require().NoError(a.processVote(vote)) } @@ -425,7 +425,7 @@ func (s *AgreementTestSuite) TestFastForwardCond2() { // No fast forward if vote.BlockHash == SKIP a.data.period = 6 for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VotePreCom, skipBlockHash, uint64(7)) + vote := s.prepareVote(nID, types.VotePreCom, types.SkipBlockHash, uint64(7)) s.Require().NoError(a.processVote(vote)) } @@ -475,7 +475,7 @@ func (s *AgreementTestSuite) TestDecide() { // No decide if com-vote on SKIP. for nID := range a.notarySet { - vote := s.prepareVote(nID, types.VoteCom, skipBlockHash, uint64(2)) + vote := s.prepareVote(nID, types.VoteCom, types.SkipBlockHash, uint64(2)) s.Require().NoError(a.processVote(vote)) if votes++; votes == 3 { break diff --git a/core/consensus.go b/core/consensus.go index e8d1d61..3a27b5f 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -99,7 +99,7 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { block := recv.consensus.proposeBlock(recv.chainID, recv.round()) if block == nil { recv.consensus.logger.Error("unable to propose block") - return nullBlockHash + return types.NullBlockHash } go func() { if err := recv.consensus.preProcessBlock(block); err != nil { diff --git a/core/types/vote.go b/core/types/vote.go index ae86e51..46ea1df 100644 --- a/core/types/vote.go +++ b/core/types/vote.go @@ -38,6 +38,18 @@ const ( MaxVoteType ) +// NullBlockHash is the blockHash for ⊥ value. +var NullBlockHash common.Hash + +// SkipBlockHash is the blockHash for SKIP value. +var SkipBlockHash common.Hash + +func init() { + for idx := range SkipBlockHash { + SkipBlockHash[idx] = 0xff + } +} + // VoteHeader is the header for vote, which can be used as map keys. type VoteHeader struct { ProposerID NodeID `json:"proposer_id"` diff --git a/core/utils/vote-filter.go b/core/utils/vote-filter.go new file mode 100644 index 0000000..a199027 --- /dev/null +++ b/core/utils/vote-filter.go @@ -0,0 +1,61 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + +package utils + +import ( + "github.com/dexon-foundation/dexon-consensus/core/types" +) + +// VoteFilter filters votes that are useless for now. +// To maximize performance, this structure is not thread-safe and will never be. +type VoteFilter struct { + Height uint64 + LockIter uint64 + Period uint64 + Confirm bool +} + +// NewVoteFilter creates a new vote filter instance. +func NewVoteFilter() *VoteFilter { + return &VoteFilter{} +} + +// Filter checks if the vote should be filtered out. +func (vf *VoteFilter) Filter(vote *types.Vote) bool { + if vote.Type == types.VoteInit { + return true + } + if vote.Position.Height < vf.Height { + return true + } else if vote.Position.Height > vf.Height { + // It's impossible to check the vote of other height. + return false + } + if vf.Confirm { + return true + } + if vote.Type == types.VotePreCom && vote.Period < vf.LockIter { + return true + } + if vote.Type == types.VoteCom && + vote.Period < vf.Period && + vote.BlockHash == types.SkipBlockHash { + return true + } + return false +} diff --git a/core/utils/vote-filter_test.go b/core/utils/vote-filter_test.go new file mode 100644 index 0000000..88050e1 --- /dev/null +++ b/core/utils/vote-filter_test.go @@ -0,0 +1,92 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + +package utils + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core/types" +) + +type VoteFilterTestSuite struct { + suite.Suite +} + +func (s *VoteFilterTestSuite) TestFilterVotePass() { + filter := NewVoteFilter() + filter.Height = uint64(6) + filter.Period = uint64(3) + filter.LockIter = uint64(3) + // Pass with higher Height. + vote := types.NewVote(types.VotePreCom, common.NewRandomHash(), uint64(1)) + vote.Position.Height = filter.Height + 1 + s.Require().False(filter.Filter(vote)) + // Pass with VotePreCom. + vote = types.NewVote(types.VotePreCom, common.NewRandomHash(), + filter.LockIter) + vote.Position.Height = filter.Height + s.Require().False(filter.Filter(vote)) + // Pass with VoteCom. + vote = types.NewVote(types.VoteCom, common.NewRandomHash(), + filter.Period) + vote.Position.Height = filter.Height + s.Require().False(filter.Filter(vote)) + vote.Period-- + s.Require().False(filter.Filter(vote)) +} + +func (s *VoteFilterTestSuite) TestFilterVoteInit() { + filter := NewVoteFilter() + vote := types.NewVote(types.VoteInit, common.NewRandomHash(), uint64(1)) + s.True(filter.Filter(vote)) +} + +func (s *VoteFilterTestSuite) TestFilterVotePreCom() { + filter := NewVoteFilter() + filter.LockIter = uint64(3) + vote := types.NewVote(types.VotePreCom, common.NewRandomHash(), uint64(1)) + s.True(filter.Filter(vote)) +} + +func (s *VoteFilterTestSuite) TestFilterVoteCom() { + filter := NewVoteFilter() + filter.Period = uint64(3) + vote := types.NewVote(types.VoteCom, types.SkipBlockHash, uint64(1)) + s.True(filter.Filter(vote)) +} + +func (s *VoteFilterTestSuite) TestFilterConfirm() { + filter := NewVoteFilter() + filter.Confirm = true + vote := types.NewVote(types.VoteCom, common.NewRandomHash(), uint64(1)) + s.True(filter.Filter(vote)) +} +func (s *VoteFilterTestSuite) TestFilterLowerHeight() { + filter := NewVoteFilter() + filter.Height = uint64(10) + vote := types.NewVote(types.VoteCom, common.NewRandomHash(), uint64(1)) + vote.Position.Height = filter.Height - 1 + s.True(filter.Filter(vote)) +} + +func TestVoteFilter(t *testing.T) { + suite.Run(t, new(VoteFilterTestSuite)) +} |