diff options
Diffstat (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/syncer')
-rw-r--r-- | vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go | 572 |
1 files changed, 128 insertions, 444 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go index 75c106793..618d90e8c 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/syncer/consensus.go @@ -63,19 +63,16 @@ type Consensus struct { nodeSetCache *utils.NodeSetCache tsigVerifier *core.TSigVerifierCache - lattice *core.Lattice - validatedChains map[uint32]struct{} - finalizedBlockHashes common.Hashes - latticeLastRound uint64 - randomnessResults map[common.Hash]*types.BlockRandomnessResult - blocks []types.ByPosition - agreements []*agreement - configs []*types.Config - roundBeginTimes []time.Time - agreementRoundCut uint64 + randomnessResults map[common.Hash]*types.BlockRandomnessResult + blocks types.BlocksByPosition + agreementModule *agreement + configs []*types.Config + roundBeginTimes []time.Time + agreementRoundCut uint64 // lock for accessing all fields. lock sync.RWMutex + duringBuffering bool moduleWaitGroup sync.WaitGroup agreementWaitGroup sync.WaitGroup pullChan chan common.Hash @@ -100,16 +97,15 @@ func NewConsensus( logger common.Logger) *Consensus { con := &Consensus{ - dMoment: dMoment, - app: app, - gov: gov, - db: db, - network: network, - nodeSetCache: utils.NewNodeSetCache(gov), - tsigVerifier: core.NewTSigVerifierCache(gov, 7), - prv: prv, - logger: logger, - validatedChains: make(map[uint32]struct{}), + dMoment: dMoment, + app: app, + gov: gov, + db: db, + network: network, + nodeSetCache: utils.NewNodeSetCache(gov), + tsigVerifier: core.NewTSigVerifierCache(gov, 7), + prv: prv, + logger: logger, configs: []*types.Config{ utils.GetConfigWithPanic(gov, 0, logger), }, @@ -119,294 +115,66 @@ func NewConsensus( randomnessResults: make(map[common.Hash]*types.BlockRandomnessResult), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) + con.agreementModule = newAgreement( + con.receiveChan, con.pullChan, con.nodeSetCache, con.logger) + con.agreementWaitGroup.Add(1) + go func() { + defer con.agreementWaitGroup.Done() + con.agreementModule.run() + }() return con } -func (con *Consensus) initConsensusObj(initBlock *types.Block) { - func() { - con.lock.Lock() - defer con.lock.Unlock() - con.latticeLastRound = initBlock.Position.Round - debugApp, _ := con.app.(core.Debug) - con.lattice = core.NewLattice( - con.roundBeginTimes[con.latticeLastRound], - con.latticeLastRound, - con.configs[con.latticeLastRound], - utils.NewSigner(con.prv), - con.app, - debugApp, - con.db, - con.logger, - ) - }() +func (con *Consensus) assureBuffering() { + if func() bool { + con.lock.RLock() + defer con.lock.RUnlock() + return con.duringBuffering + }() { + return + } + con.lock.Lock() + defer con.lock.Unlock() + if con.duringBuffering { + return + } + con.duringBuffering = true con.startAgreement() con.startNetwork() con.startCRSMonitor() } -func (con *Consensus) checkIfValidated() (validated bool) { - con.lock.RLock() - defer con.lock.RUnlock() - var ( - round = con.blocks[0][0].Position.Round - numChains = con.configs[round].NumChains - validatedChainCount uint32 - ) - // Make sure we validate some block in all chains. - for chainID := range con.validatedChains { - if chainID < numChains { - validatedChainCount++ - } - } - validated = validatedChainCount == numChains - con.logger.Debug("syncer chain-validation status", - "validated-chain", validatedChainCount, - "round", round, - "valid", validated) - return -} - func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) { con.lock.RLock() defer con.lock.RUnlock() - var ( - round = con.blocks[0][0].Position.Round - numChains = con.configs[round].NumChains - compactionTips = make([]*types.Block, numChains) - overlapCount = uint32(0) - ) defer func() { con.logger.Debug("syncer synced status", - "overlap-count", overlapCount, - "num-chain", numChains, "last-block", blocks[len(blocks)-1], - "synced", synced) + "synced", synced, + ) }() - // Find tips (newset blocks) of each chain in compaction chain. - b := blocks[len(blocks)-1] - for tipCount := uint32(0); tipCount < numChains; { - if compactionTips[b.Position.ChainID] == nil { - // Check chainID for config change. - if b.Position.ChainID < numChains { - compactionTips[b.Position.ChainID] = b - tipCount++ - } - } - if (b.Finalization.ParentHash == common.Hash{}) { - return - } - b1, err := con.db.GetBlock(b.Finalization.ParentHash) - if err != nil { - panic(err) - } - b = &b1 - } - // Check if chain tips of compaction chain and current cached confirmed - // blocks are overlapped on each chain, numChains is decided by the round - // of last block we seen on compaction chain. - for chainID, b := range compactionTips { - if len(con.blocks[chainID]) > 0 { - if !b.Position.Older(&con.blocks[chainID][0].Position) { - overlapCount++ - } - } + if len(con.blocks) == 0 || len(blocks) == 0 { + return } - synced = overlapCount == numChains + synced = !blocks[len(blocks)-1].Position.Older(con.blocks[0].Position) return } -// ensureAgreementOverlapRound ensures the oldest blocks in each chain in -// con.blocks are all in the same round, for avoiding config change while -// syncing. -func (con *Consensus) ensureAgreementOverlapRound() bool { +func (con *Consensus) buildAllEmptyBlocks() { con.lock.Lock() defer con.lock.Unlock() - defer func() { - con.logger.Debug("ensureAgreementOverlapRound returned", - "round", con.agreementRoundCut) - }() - if con.agreementRoundCut > 0 { - return true - } // Clean empty blocks on tips of chains. - for idx, bs := range con.blocks { - for len(bs) > 0 && con.isEmptyBlock(bs[0]) { - bs = bs[1:] - } - con.blocks[idx] = bs + for len(con.blocks) > 0 && con.isEmptyBlock(con.blocks[0]) { + con.blocks = con.blocks[1:] } // Build empty blocks. - for _, bs := range con.blocks { - for i := range bs { - if con.isEmptyBlock(bs[i]) { - if bs[i-1].Position.Height == bs[i].Position.Height-1 { - con.buildEmptyBlock(bs[i], bs[i-1]) - } + for i, b := range con.blocks { + if con.isEmptyBlock(b) { + if con.blocks[i-1].Position.Height+1 == b.Position.Height { + con.buildEmptyBlock(b, con.blocks[i-1]) } } } - var tipRoundMap map[uint64]uint32 - for { - tipRoundMap = make(map[uint64]uint32) - for _, bs := range con.blocks { - if len(bs) > 0 { - tipRoundMap[bs[0].Position.Round]++ - } - } - if len(tipRoundMap) <= 1 { - break - } - // Make all tips in same round. - var maxRound uint64 - for r := range tipRoundMap { - if r > maxRound { - maxRound = r - } - } - for idx, bs := range con.blocks { - for len(bs) > 0 && bs[0].Position.Round < maxRound { - bs = bs[1:] - } - con.blocks[idx] = bs - } - } - if len(tipRoundMap) == 1 { - var r uint64 - for r = range tipRoundMap { - break - } - con.logger.Debug("check agreement round cut", - "tip-round", r, - "configs", len(con.configs)) - if tipRoundMap[r] == con.configs[r].NumChains { - con.agreementRoundCut = r - return true - } - } - return false -} - -func (con *Consensus) findLatticeSyncBlock( - blocks []*types.Block) (*types.Block, error) { - lastBlock := blocks[len(blocks)-1] - round := lastBlock.Position.Round - isConfigChanged := func(prev, cur *types.Config) bool { - return prev.K != cur.K || - prev.NumChains != cur.NumChains || - prev.PhiRatio != cur.PhiRatio - } - for { - // Find round r which r-1, r, r+1 are all in same total ordering config. - for { - sameAsPrevRound := round == 0 || !isConfigChanged( - con.configs[round-1], con.configs[round]) - sameAsNextRound := !isConfigChanged( - con.configs[round], con.configs[round+1]) - if sameAsPrevRound && sameAsNextRound { - break - } - if round == 0 { - // Unable to find a safe round, wait for new rounds. - return nil, nil - } - round-- - } - // Find the newset block which round is "round". - for lastBlock.Position.Round != round { - if (lastBlock.Finalization.ParentHash == common.Hash{}) { - return nil, ErrGenesisBlockReached - } - b, err := con.db.GetBlock(lastBlock.Finalization.ParentHash) - if err != nil { - return nil, err - } - lastBlock = &b - } - // Find the deliver set by hash for two times. Blocks in a deliver set - // returned by total ordering is sorted by hash. If a block's parent - // hash is greater than its hash means there is a cut between deliver - // sets. - var curBlock, prevBlock *types.Block - var deliverSetFirstBlock, deliverSetLastBlock *types.Block - curBlock = lastBlock - for { - if (curBlock.Finalization.ParentHash == common.Hash{}) { - return nil, ErrGenesisBlockReached - } - b, err := con.db.GetBlock(curBlock.Finalization.ParentHash) - if err != nil { - return nil, err - } - prevBlock = &b - if !prevBlock.Hash.Less(curBlock.Hash) { - break - } - curBlock = prevBlock - } - deliverSetLastBlock = prevBlock - curBlock = prevBlock - for { - if (curBlock.Finalization.ParentHash == common.Hash{}) { - break - } - b, err := con.db.GetBlock(curBlock.Finalization.ParentHash) - if err != nil { - return nil, err - } - prevBlock = &b - if !prevBlock.Hash.Less(curBlock.Hash) { - break - } - curBlock = prevBlock - } - deliverSetFirstBlock = curBlock - // Check if all blocks from deliverSetFirstBlock to deliverSetLastBlock - // are in the same round. - ok := true - curBlock = deliverSetLastBlock - for { - if curBlock.Position.Round != round { - ok = false - break - } - b, err := con.db.GetBlock(curBlock.Finalization.ParentHash) - if err != nil { - return nil, err - } - curBlock = &b - if curBlock.Hash == deliverSetFirstBlock.Hash { - break - } - } - if ok { - return deliverSetFirstBlock, nil - } - if round == 0 { - return nil, nil - } - round-- - } -} - -func (con *Consensus) processFinalizedBlock(block *types.Block) error { - if con.lattice == nil { - return nil - } - delivered, err := con.lattice.ProcessFinalizedBlock(block) - if err != nil { - return err - } - con.lock.Lock() - defer con.lock.Unlock() - con.finalizedBlockHashes = append(con.finalizedBlockHashes, block.Hash) - for idx, b := range delivered { - if con.finalizedBlockHashes[idx] != b.Hash { - return ErrMismatchBlockHashSequence - } - con.validatedChains[b.Position.ChainID] = struct{}{} - } - con.finalizedBlockHashes = con.finalizedBlockHashes[len(delivered):] - return nil } // SyncBlocks syncs blocks from compaction chain, latest is true if the caller @@ -420,7 +188,8 @@ func (con *Consensus) SyncBlocks( con.logger.Debug("SyncBlocks returned", "synced", synced, "error", err, - "last-block", con.syncedLastBlock) + "last-block", con.syncedLastBlock, + ) }() if con.syncedLastBlock != nil { synced, err = true, ErrAlreadySynced @@ -442,7 +211,8 @@ func (con *Consensus) SyncBlocks( if blocks[0].Finalization.Height != tipHeight+1 { con.logger.Error("mismatched finalization height", "now", blocks[0].Finalization.Height, - "expected", tipHeight+1) + "expected", tipHeight+1, + ) err = ErrInvalidSyncingFinalizationHeight return } @@ -454,7 +224,6 @@ func (con *Consensus) SyncBlocks( ) con.setupConfigs(blocks) for _, b := range blocks { - // TODO(haoping) remove this if lattice puts blocks into db. if err = con.db.PutBlock(*b); err != nil { // A block might be put into db when confirmed by BA, but not // finalized yet. @@ -469,60 +238,15 @@ func (con *Consensus) SyncBlocks( b.Hash, b.Finalization.Height); err != nil { return } - if err = con.processFinalizedBlock(b); err != nil { - return - } - } - if latest && con.lattice == nil { - // New Lattice and find the deliver set of total ordering when "latest" - // is true for first time. Deliver set is found by block hashes. - var syncBlock *types.Block - syncBlock, err = con.findLatticeSyncBlock(blocks) - if err != nil { - if err == ErrGenesisBlockReached { - con.logger.Debug("SyncBlocks skip error", "error", err) - err = nil - } - return - } - if syncBlock != nil { - con.logger.Debug("deliver set found", "block", syncBlock) - // New lattice with the round of syncBlock. - con.initConsensusObj(syncBlock) - con.setupConfigs(blocks) - // Process blocks from syncBlock to blocks' last block. - b := blocks[len(blocks)-1] - blocksCount := - b.Finalization.Height - syncBlock.Finalization.Height + 1 - blocksToProcess := make([]*types.Block, blocksCount) - for { - blocksToProcess[blocksCount-1] = b - blocksCount-- - if b.Hash == syncBlock.Hash { - break - } - var b1 types.Block - b1, err = con.db.GetBlock(b.Finalization.ParentHash) - if err != nil { - return - } - b = &b1 - } - for _, b := range blocksToProcess { - if err = con.processFinalizedBlock(b); err != nil { - return - } - } - } } - if latest && con.ensureAgreementOverlapRound() { + if latest { + con.assureBuffering() + con.buildAllEmptyBlocks() // Check if compaction and agreements' blocks are overlapped. The // overlapping of compaction chain and BA's oldest blocks means the // syncing is done. - if con.checkIfValidated() && con.checkIfSynced(blocks) { - if err = con.Stop(); err != nil { - return - } + if con.checkIfSynced(blocks) { + con.stopBuffering() con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( context.Background(), con.network.ReceiveChan(), func(msg interface{}) { @@ -547,10 +271,6 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { } // flush all blocks in con.blocks into core.Consensus, and build // core.Consensus from syncer. - confirmedBlocks := make([][]*types.Block, len(con.blocks)) - for i, bs := range con.blocks { - confirmedBlocks[i] = []*types.Block(bs) - } randomnessResults := []*types.BlockRandomnessResult{} for _, r := range con.randomnessResults { randomnessResults = append(randomnessResults, r) @@ -566,19 +286,31 @@ func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { con.db, con.network, con.prv, - con.lattice, - confirmedBlocks, + con.blocks, randomnessResults, con.dummyMsgBuffer, con.logger) return con.syncedConsensus, err } -// Stop the syncer. +// stopBuffering stops the syncer buffering routines. // // This method is mainly for caller to stop the syncer before synced, the syncer // would call this method automatically after being synced. -func (con *Consensus) Stop() error { +func (con *Consensus) stopBuffering() { + if func() bool { + con.lock.RLock() + defer con.lock.RUnlock() + return !con.duringBuffering + }() { + return + } + con.lock.Lock() + defer con.lock.Unlock() + if !con.duringBuffering { + return + } + con.duringBuffering = false con.logger.Trace("syncer is about to stop") // Stop network and CRS routines, wait until they are all stoped. con.ctxCancel() @@ -588,7 +320,7 @@ func (con *Consensus) Stop() error { con.logger.Trace("stop syncer agreement modules") con.stopAgreement() con.logger.Trace("syncer stopped") - return nil + return } // isEmptyBlock checks if a block is an empty block by both its hash and parent @@ -607,41 +339,6 @@ func (con *Consensus) buildEmptyBlock(b *types.Block, parent *types.Block) { b.Acks = common.NewSortedHashes(common.Hashes{parent.Hash}) } -func (con *Consensus) setupConfigsUntilRound(round uint64) { - curMaxNumChains := uint32(0) - func() { - con.lock.Lock() - defer con.lock.Unlock() - con.logger.Debug("syncer setupConfigs", - "until-round", round, - "length", len(con.configs), - "lattice", con.latticeLastRound) - for r := uint64(len(con.configs)); r <= round; r++ { - cfg := utils.GetConfigWithPanic(con.gov, r, con.logger) - con.configs = append(con.configs, cfg) - con.roundBeginTimes = append( - con.roundBeginTimes, - con.roundBeginTimes[r-1].Add(con.configs[r-1].RoundInterval)) - if cfg.NumChains >= curMaxNumChains { - curMaxNumChains = cfg.NumChains - } - } - // Notify core.Lattice for new configs. - if con.lattice != nil { - for con.latticeLastRound+1 <= round { - con.latticeLastRound++ - if err := con.lattice.AppendConfig( - con.latticeLastRound, - con.configs[con.latticeLastRound]); err != nil { - panic(err) - } - } - } - }() - con.resizeByNumChains(curMaxNumChains) - con.logger.Trace("setupConfgis finished", "round", round) -} - // setupConfigs is called by SyncBlocks with blocks from compaction chain. In // the first time, setupConfigs setups from round 0. func (con *Consensus) setupConfigs(blocks []*types.Block) { @@ -661,25 +358,19 @@ func (con *Consensus) setupConfigs(blocks []*types.Block) { con.setupConfigsUntilRound(maxRound + core.ConfigRoundShift - 1) } -// resizeByNumChains resizes fake lattice and agreement if numChains increases. -// Notice the decreasing case is neglected. -func (con *Consensus) resizeByNumChains(numChains uint32) { +func (con *Consensus) setupConfigsUntilRound(round uint64) { con.lock.Lock() defer con.lock.Unlock() - if numChains > uint32(len(con.blocks)) { - for i := uint32(len(con.blocks)); i < numChains; i++ { - // Resize the pool of blocks. - con.blocks = append(con.blocks, types.ByPosition{}) - // Resize agreement modules. - a := newAgreement( - con.receiveChan, con.pullChan, con.nodeSetCache, con.logger) - con.agreements = append(con.agreements, a) - con.agreementWaitGroup.Add(1) - go func() { - defer con.agreementWaitGroup.Done() - a.run() - }() - } + con.logger.Debug("syncer setupConfigs", + "until-round", round, + "length", len(con.configs), + ) + for r := uint64(len(con.configs)); r <= round; r++ { + cfg := utils.GetConfigWithPanic(con.gov, r, con.logger) + con.configs = append(con.configs, cfg) + con.roundBeginTimes = append( + con.roundBeginTimes, + con.roundBeginTimes[r-1].Add(con.configs[r-1].RoundInterval)) } } @@ -693,17 +384,15 @@ func (con *Consensus) startAgreement() { if !ok { return } - chainID := b.Position.ChainID func() { con.lock.Lock() defer con.lock.Unlock() - // If round is cut in agreements, do not add blocks with - // round less then cut round. - if b.Position.Round < con.agreementRoundCut { + if len(con.blocks) > 0 && + !b.Position.Newer(con.blocks[0].Position) { return } - con.blocks[chainID] = append(con.blocks[chainID], b) - sort.Sort(con.blocks[chainID]) + con.blocks = append(con.blocks, b) + sort.Sort(con.blocks) }() case h, ok := <-con.pullChan: if !ok { @@ -721,18 +410,14 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) { return } // We only have to cache randomness result after cutting round. - if r.Position.Round < func() uint64 { - con.lock.RLock() - defer con.lock.RUnlock() - return con.agreementRoundCut - }() { - return - } - if func() (exists bool) { + if func() bool { con.lock.RLock() defer con.lock.RUnlock() - _, exists = con.randomnessResults[r.BlockHash] - return + if len(con.blocks) > 0 && r.Position.Older(con.blocks[0].Position) { + return true + } + _, exists := con.randomnessResults[r.BlockHash] + return exists }() { return } @@ -740,8 +425,9 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) { if err != nil { con.logger.Error("Unable to get tsig verifier", "hash", r.BlockHash.String()[:6], - "position", &r.Position, - "error", err) + "position", r.Position, + "error", err, + ) return } if !ok { @@ -752,8 +438,9 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) { Type: "bls", Signature: r.Randomness}) { con.logger.Info("Block randomness is not valid", - "position", &r.Position, - "hash", r.BlockHash.String()[:6]) + "position", r.Position, + "hash", r.BlockHash.String()[:6], + ) return } con.lock.Lock() @@ -785,18 +472,19 @@ func (con *Consensus) startNetwork() { if func() bool { con.lock.RLock() defer con.lock.RUnlock() - if pos.ChainID >= uint32(len(con.agreements)) { + if pos.ChainID > 0 { // This error might be easily encountered when the // "latest" parameter of SyncBlocks is turned on too // early. con.logger.Error( "Unknown chainID message received (syncer)", - "position", &pos) + "position", pos, + ) return false } return true }() { - con.agreements[pos.ChainID].inputChan <- val + con.agreementModule.inputChan <- val } case <-con.ctx.Done(): return @@ -817,23 +505,25 @@ func (con *Consensus) startCRSMonitor() { } con.logger.Debug("CRS is ready", "round", round) lastNotifiedRound = round - con.lock.Lock() - defer con.lock.Unlock() - for idx, a := range con.agreements { - loop: - for { - select { - case <-con.ctx.Done(): - break loop - case a.inputChan <- round: - break loop - case <-time.After(500 * time.Millisecond): - con.logger.Debug( - "agreement input channel is full when putting CRS", - "chainID", idx, - "round", round) - } + for func() bool { + con.lock.RLock() + defer con.lock.RUnlock() + if !con.duringBuffering { + return false + } + select { + case <-con.ctx.Done(): + return false + case con.agreementModule.inputChan <- round: + return false + case <-time.After(500 * time.Millisecond): + con.logger.Debug( + "agreement input channel is full when putting CRS", + "round", round, + ) + return true } + }() { } } con.moduleWaitGroup.Add(1) @@ -860,16 +550,10 @@ func (con *Consensus) startCRSMonitor() { } func (con *Consensus) stopAgreement() { - func() { - con.lock.Lock() - defer con.lock.Unlock() - for _, a := range con.agreements { - if a.inputChan != nil { - close(a.inputChan) - a.inputChan = nil - } - } - }() + if con.agreementModule.inputChan != nil { + close(con.agreementModule.inputChan) + con.agreementModule.inputChan = nil + } con.agreementWaitGroup.Wait() close(con.receiveChan) close(con.pullChan) |