From 7ef2bd6a1544d3b92cf2b2119935b425279b4e59 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Fri, 2 Nov 2018 10:44:15 +0800 Subject: vendor: sync to latest core --- .../dexon-consensus-core/core/agreement-state.go | 2 +- .../dexon-consensus-core/core/agreement.go | 54 ++++--- .../dexon-consensus-core/core/consensus.go | 42 ++--- .../dexon-consensus-core/core/lattice.go | 4 +- .../core/total-ordering-syncer.go | 174 +++++++++++++++++++++ 5 files changed, 234 insertions(+), 42 deletions(-) create mode 100644 vendor/github.com/dexon-foundation/dexon-consensus-core/core/total-ordering-syncer.go (limited to 'vendor/github.com/dexon-foundation/dexon-consensus-core') diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement-state.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement-state.go index 426b0629c..4997ddcf3 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement-state.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement-state.go @@ -69,9 +69,9 @@ func newInitialState(a *agreementData) *initialState { func (s *initialState) state() agreementStateType { return stateInitial } func (s *initialState) clocks() int { return 0 } func (s *initialState) nextState() (agreementState, error) { + hash := s.a.recv.ProposeBlock() s.a.lock.Lock() defer s.a.lock.Unlock() - hash := s.a.recv.ProposeBlock() s.a.recv.ProposeVote(&types.Vote{ Type: types.VoteInit, BlockHash: hash, diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go index 3162b2e57..72aefc6b2 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/agreement.go @@ -67,6 +67,8 @@ func newVoteListMap() []map[types.NodeID]*types.Vote { type agreementReceiver interface { ProposeVote(vote *types.Vote) ProposeBlock() common.Hash + // ConfirmBlock is called with lock hold. User can safely use all data within + // agreement module. ConfirmBlock(common.Hash, map[types.NodeID]*types.Vote) PullBlocks(common.Hashes) } @@ -242,12 +244,7 @@ func (a *agreement) nextState() (err error) { } func (a *agreement) sanityCheck(vote *types.Vote) error { - if exist := func() bool { - a.lock.RLock() - defer a.lock.RUnlock() - _, exist := a.notarySet[vote.ProposerID] - return exist - }(); !exist { + if _, exist := a.notarySet[vote.ProposerID]; !exist { return ErrNotInNotarySet } ok, err := verifyVoteSignature(vote) @@ -287,19 +284,18 @@ func (a *agreement) prepareVote(vote *types.Vote) (err error) { // processVote is the entry point for processing Vote. func (a *agreement) processVote(vote *types.Vote) error { + a.lock.Lock() + defer a.lock.Unlock() if err := a.sanityCheck(vote); err != nil { return err } - aID := a.agreementID() - if vote.Position != aID { + if vote.Position != a.aID { // Agreement module has stopped. - if !isStop(aID) { - if aID.Newer(&vote.Position) { + if !isStop(a.aID) { + if a.aID.Newer(&vote.Position) { return nil } } - a.lock.Lock() - defer a.lock.Unlock() a.pendingVote = append(a.pendingVote, pendingVote{ vote: vote, receivedTime: time.Now().UTC(), @@ -329,6 +325,9 @@ func (a *agreement) processVote(vote *types.Vote) error { } // Check if the agreement requires fast-forwarding. + if len(a.fastForward) > 0 { + return nil + } if vote.Type == types.VotePreCom { if hash, ok := a.data.countVoteNoLock(vote.Period, vote.Type); ok && hash != skipBlockHash { @@ -358,7 +357,7 @@ func (a *agreement) processVote(vote *types.Vote) error { if vote.BlockHash == nullBlockHash || vote.BlockHash == skipBlockHash { continue } - if _, found := a.findCandidateBlock(vote.BlockHash); !found { + if _, found := a.findCandidateBlockNoLock(vote.BlockHash); !found { hashes = append(hashes, vote.BlockHash) } } @@ -366,7 +365,9 @@ func (a *agreement) processVote(vote *types.Vote) error { addPullBlocks(types.VoteInit) addPullBlocks(types.VotePreCom) addPullBlocks(types.VoteCom) - a.data.recv.PullBlocks(hashes) + if len(hashes) > 0 { + a.data.recv.PullBlocks(hashes) + } a.fastForward <- vote.Period + 1 return nil } @@ -374,6 +375,10 @@ func (a *agreement) processVote(vote *types.Vote) error { } func (a *agreement) done() <-chan struct{} { + a.lock.Lock() + defer a.lock.Unlock() + a.data.lock.Lock() + defer a.data.lock.Unlock() ch := make(chan struct{}, 1) if a.hasOutput { ch <- struct{}{} @@ -394,14 +399,15 @@ func (a *agreement) done() <-chan struct{} { // processBlock is the entry point for processing Block. func (a *agreement) processBlock(block *types.Block) error { + a.lock.Lock() + defer a.lock.Unlock() a.data.blocksLock.Lock() defer a.data.blocksLock.Unlock() - aID := a.agreementID() - if block.Position != aID { + if block.Position != a.aID { // Agreement module has stopped. - if !isStop(aID) { - if aID.Newer(&block.Position) { + if !isStop(a.aID) { + if a.aID.Newer(&block.Position) { return nil } } @@ -421,23 +427,31 @@ func (a *agreement) processBlock(block *types.Block) error { return err } a.data.blocks[block.ProposerID] = block - a.addCandidateBlock(block) + a.addCandidateBlockNoLock(block) return nil } func (a *agreement) addCandidateBlock(block *types.Block) { a.lock.Lock() defer a.lock.Unlock() + a.addCandidateBlockNoLock(block) +} + +func (a *agreement) addCandidateBlockNoLock(block *types.Block) { a.candidateBlock[block.Hash] = block } func (a *agreement) findCandidateBlock(hash common.Hash) (*types.Block, bool) { a.lock.RLock() defer a.lock.RUnlock() + return a.findCandidateBlockNoLock(hash) +} + +func (a *agreement) findCandidateBlockNoLock( + hash common.Hash) (*types.Block, bool) { b, e := a.candidateBlock[hash] return b, e } - func (a *agreementData) countVote(period uint64, voteType types.VoteType) ( blockHash common.Hash, ok bool) { a.lock.RLock() diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go index 15ecf67c7..525616892 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/consensus.go @@ -71,11 +71,11 @@ type consensusBAReceiver struct { } func (recv *consensusBAReceiver) ProposeVote(vote *types.Vote) { - if err := recv.agreementModule.prepareVote(vote); err != nil { - recv.consensus.logger.Error("Failed to prepare vote", "error", err) - return - } go func() { + if err := recv.agreementModule.prepareVote(vote); err != nil { + recv.consensus.logger.Error("Failed to prepare vote", "error", err) + return + } if err := recv.agreementModule.processVote(vote); err != nil { recv.consensus.logger.Error("Failed to process vote", "error", err) return @@ -92,7 +92,6 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { recv.consensus.logger.Error("unable to propose block") return nullBlockHash } - recv.consensus.baModules[recv.chainID].addCandidateBlock(block) if err := recv.consensus.preProcessBlock(block); err != nil { recv.consensus.logger.Error("Failed to pre-process block", "error", err) return common.Hash{} @@ -117,7 +116,7 @@ func (recv *consensusBAReceiver) ConfirmBlock( } else { var exist bool block, exist = recv.consensus.baModules[recv.chainID]. - findCandidateBlock(hash) + findCandidateBlockNoLock(hash) if !exist { recv.consensus.logger.Error("Unknown block confirmed", "hash", hash, @@ -129,10 +128,15 @@ func (recv *consensusBAReceiver) ConfirmBlock( recv.consensus.baConfirmedBlock[hash] = ch }() recv.consensus.network.PullBlocks(common.Hashes{hash}) - block = <-ch - recv.consensus.logger.Info("Receive unknown block", - "hash", hash, - "chainID", recv.chainID) + go func() { + block = <-ch + recv.consensus.logger.Info("Receive unknown block", + "hash", hash, + "chainID", recv.chainID) + recv.agreementModule.addCandidateBlock(block) + recv.ConfirmBlock(block.Hash, votes) + }() + return } } recv.consensus.ccModule.registerBlock(block) @@ -668,14 +672,16 @@ MessageLoop: continue MessageLoop } } - con.lock.Lock() - defer con.lock.Unlock() - // In case of multiple delivered block. - if _, exist := con.baConfirmedBlock[val.Hash]; !exist { - continue MessageLoop - } - delete(con.baConfirmedBlock, val.Hash) - ch <- val + func() { + con.lock.Lock() + defer con.lock.Unlock() + // In case of multiple delivered block. + if _, exist := con.baConfirmedBlock[val.Hash]; !exist { + return + } + delete(con.baConfirmedBlock, val.Hash) + ch <- val + }() } else if val.IsFinalized() { // For sync mode. if err := con.processFinalizedBlock(val); err != nil { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go index 68b05c2e6..c1339beed 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/lattice.go @@ -36,7 +36,6 @@ var ( type Lattice struct { lock sync.RWMutex authModule *Authenticator - chainNum uint32 app Application debug Debug pool blockPool @@ -61,7 +60,6 @@ func NewLattice( toConfig := newGenesisTotalOrderingConfig(dMoment, cfg) s = &Lattice{ authModule: authModule, - chainNum: cfg.NumChains, app: app, debug: debug, pool: newBlockPool(cfg.NumChains), @@ -180,7 +178,7 @@ func (s *Lattice) addBlockToLattice( // Replay tips in pool to check their validity. for { hasOutput := false - for i := uint32(0); i < s.chainNum; i++ { + for i := uint32(0); i < uint32(len(s.pool)); i++ { var tip *types.Block if tip = s.pool.tip(i); tip == nil { continue diff --git a/vendor/github.com/dexon-foundation/dexon-consensus-core/core/total-ordering-syncer.go b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/total-ordering-syncer.go new file mode 100644 index 000000000..270e637e0 --- /dev/null +++ b/vendor/github.com/dexon-foundation/dexon-consensus-core/core/total-ordering-syncer.go @@ -0,0 +1,174 @@ +// Copyright 2018 The dexon-consensus-core Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package core + +import ( + "sort" + "sync" + + "github.com/dexon-foundation/dexon-consensus-core/common" + "github.com/dexon-foundation/dexon-consensus-core/core/types" +) + +type totalOrderingSyncer struct { + lock sync.RWMutex + + numChains uint32 + syncHeight map[uint32]uint64 + syncDeliverySetIdx int + pendingBlocks []*types.Block + inPendingBlocks map[common.Hash]struct{} + + bootstrapChain map[uint32]struct{} + + // Data to restore delivery set. + pendingDeliveryBlocks []*types.Block + deliverySet map[int][]*types.Block + mapToDeliverySet map[common.Hash]int +} + +func newTotalOrderingSyncer(numChains uint32) *totalOrderingSyncer { + return &totalOrderingSyncer{ + numChains: numChains, + syncHeight: make(map[uint32]uint64), + syncDeliverySetIdx: -1, + inPendingBlocks: make(map[common.Hash]struct{}), + bootstrapChain: make(map[uint32]struct{}), + deliverySet: make(map[int][]*types.Block), + mapToDeliverySet: make(map[common.Hash]int), + } +} + +func (tos *totalOrderingSyncer) synced() bool { + tos.lock.RLock() + defer tos.lock.RUnlock() + return tos.syncDeliverySetIdx != -1 +} + +func (tos *totalOrderingSyncer) processBlock( + block *types.Block) (delivered []*types.Block) { + if tos.synced() { + if tos.syncHeight[block.Position.ChainID] >= block.Position.Height { + return + } + delivered = append(delivered, block) + return + } + tos.lock.Lock() + defer tos.lock.Unlock() + tos.inPendingBlocks[block.Hash] = struct{}{} + tos.pendingBlocks = append(tos.pendingBlocks, block) + if block.Position.Height == 0 { + tos.bootstrapChain[block.Position.ChainID] = struct{}{} + } + if uint32(len(tos.bootstrapChain)) == tos.numChains { + // Bootstrap mode. + delivered = tos.pendingBlocks + tos.syncDeliverySetIdx = 0 + for i := uint32(0); i < tos.numChains; i++ { + tos.syncHeight[i] = uint64(0) + } + } else { + maxDeliverySetIdx := -1 + // TODO(jimmy-dexon): below for loop can be optimized. + PendingBlockLoop: + for i, block := range tos.pendingBlocks { + idx, exist := tos.mapToDeliverySet[block.Hash] + if !exist { + continue + } + deliverySet := tos.deliverySet[idx] + // Check if all the blocks in deliverySet are in the pendingBlocks. + for _, dBlock := range deliverySet { + if _, exist := tos.inPendingBlocks[dBlock.Hash]; !exist { + continue PendingBlockLoop + } + } + if idx > maxDeliverySetIdx { + maxDeliverySetIdx = idx + } + // Check if all of the chains have delivered. + for _, dBlock := range deliverySet { + if h, exist := tos.syncHeight[dBlock.Position.ChainID]; exist { + if dBlock.Position.Height < h { + continue + } + } + tos.syncHeight[dBlock.Position.ChainID] = dBlock.Position.Height + } + if uint32(len(tos.syncHeight)) != tos.numChains { + continue + } + // Core is fully synced, it can start delivering blocks from idx. + tos.syncDeliverySetIdx = maxDeliverySetIdx + delivered = make([]*types.Block, 0, i) + break + } + if tos.syncDeliverySetIdx == -1 { + return + } + // Generating delivering blocks. + for i := maxDeliverySetIdx; i < len(tos.deliverySet); i++ { + deliverySet := tos.deliverySet[i] + sort.Sort(types.ByHash(deliverySet)) + for _, block := range deliverySet { + if block.Position.Height > tos.syncHeight[block.Position.ChainID] { + tos.syncHeight[block.Position.ChainID] = block.Position.Height + } + delivered = append(delivered, block) + } + } + // Flush remaining blocks. + for _, block := range tos.pendingBlocks { + if _, exist := tos.mapToDeliverySet[block.Hash]; exist { + continue + } + if block.Position.Height > tos.syncHeight[block.Position.ChainID] { + tos.syncHeight[block.Position.ChainID] = block.Position.Height + } + delivered = append(delivered, block) + } + } + // Clean internal data model to save memory. + tos.pendingBlocks = nil + tos.inPendingBlocks = nil + tos.bootstrapChain = nil + tos.pendingDeliveryBlocks = nil + tos.deliverySet = nil + tos.mapToDeliverySet = nil + return +} + +// The finalized block should be passed by the order of consensus height. +func (tos *totalOrderingSyncer) processFinalizedBlock(block *types.Block) { + tos.lock.Lock() + defer tos.lock.Unlock() + if len(tos.pendingDeliveryBlocks) > 0 { + if block.Hash.Less( + tos.pendingDeliveryBlocks[len(tos.pendingDeliveryBlocks)-1].Hash) { + // pendingDeliveryBlocks forms a deliverySet. + idx := len(tos.deliverySet) + tos.deliverySet[idx] = tos.pendingDeliveryBlocks + for _, block := range tos.pendingDeliveryBlocks { + tos.mapToDeliverySet[block.Hash] = idx + } + tos.pendingDeliveryBlocks = []*types.Block{} + } + } + tos.pendingDeliveryBlocks = append(tos.pendingDeliveryBlocks, block) +} -- cgit