diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-11-27 10:13:05 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-12 17:27:19 +0800 |
commit | 4c520e06a04c0b734dcc703dd007bb89cfb15a9d (patch) | |
tree | 77b471cb9a9db4df532df82de300fb796576e950 /vendor/github.com | |
parent | 2b4a537e21e3c2d0e3f31dec6cea60e31125db66 (diff) | |
download | go-tangerine-4c520e06a04c0b734dcc703dd007bb89cfb15a9d.tar.gz go-tangerine-4c520e06a04c0b734dcc703dd007bb89cfb15a9d.tar.zst go-tangerine-4c520e06a04c0b734dcc703dd007bb89cfb15a9d.zip |
vendor: sync to latest core (#52)
Diffstat (limited to 'vendor/github.com')
8 files changed, 727 insertions, 309 deletions
diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go new file mode 100644 index 000000000..f695e36cc --- /dev/null +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement-mgr.go @@ -0,0 +1,437 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + +package core + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/dexon-foundation/dexon-consensus/common" + "github.com/dexon-foundation/dexon-consensus/core/types" + "github.com/dexon-foundation/dexon-consensus/core/utils" +) + +// Errors returned from BA modules +var ( + ErrPreviousRoundIsNotFinished = errors.New("previous round is not finished") +) + +// genValidLeader generate a validLeader function for agreement modules. +func genValidLeader( + mgr *agreementMgr) func(*types.Block) (bool, error) { + return func(block *types.Block) (bool, error) { + if block.Timestamp.After(time.Now()) { + return false, nil + } + if err := mgr.lattice.SanityCheck(block); err != nil { + if err == ErrRetrySanityCheckLater { + return false, nil + } + return false, err + } + mgr.logger.Debug("Calling Application.VerifyBlock", "block", block) + switch mgr.app.VerifyBlock(block) { + case types.VerifyInvalidBlock: + return false, ErrInvalidBlock + case types.VerifyRetryLater: + return false, nil + default: + } + return true, nil + } +} + +type agreementMgrConfig struct { + beginTime time.Time + numChains uint32 + roundInterval time.Duration + notarySetSize uint32 + lambdaBA time.Duration + crs common.Hash +} + +type baRoundSetting struct { + chainID uint32 + notarySet map[types.NodeID]struct{} + agr *agreement + recv *consensusBAReceiver + ticker Ticker + crs common.Hash +} + +type agreementMgr struct { + // TODO(mission): unbound Consensus instance from this module. + con *Consensus + ID types.NodeID + app Application + gov Governance + network Network + logger common.Logger + cache *utils.NodeSetCache + auth *Authenticator + lattice *Lattice + ctx context.Context + lastEndTime time.Time + configs []*agreementMgrConfig + baModules []*agreement + waitGroup sync.WaitGroup + pendingVotes map[uint64][]*types.Vote + pendingBlocks map[uint64][]*types.Block + + // This lock should be used when attempting to: + // - add a new baModule. + // - remove all baModules when stopping. In this case, the cleaner need + // to wait for all routines runnning baModules finished. + // - access a method of baModule. + // - append a config from new round. + // The routine running corresponding baModule, however, doesn't have to + // acquire this lock. + lock sync.RWMutex +} + +func newAgreementMgr(con *Consensus, dMoment time.Time) *agreementMgr { + return &agreementMgr{ + con: con, + ID: con.ID, + app: con.app, + gov: con.gov, + network: con.network, + logger: con.logger, + cache: con.nodeSetCache, + auth: con.authModule, + lattice: con.lattice, + ctx: con.ctx, + lastEndTime: dMoment, + } +} + +func (mgr *agreementMgr) appendConfig( + round uint64, config *types.Config, crs common.Hash) (err error) { + mgr.lock.Lock() + defer mgr.lock.Unlock() + // TODO(mission): initiate this module from some round > 0. + if round != uint64(len(mgr.configs)) { + return ErrRoundNotIncreasing + } + newConfig := &agreementMgrConfig{ + beginTime: mgr.lastEndTime, + numChains: config.NumChains, + roundInterval: config.RoundInterval, + notarySetSize: config.NotarySetSize, + lambdaBA: config.LambdaBA, + crs: crs, + } + mgr.configs = append(mgr.configs, newConfig) + mgr.lastEndTime = mgr.lastEndTime.Add(config.RoundInterval) + // Create baModule for newly added chain. + for i := uint32(len(mgr.baModules)); i < newConfig.numChains; i++ { + // Prepare modules. + recv := &consensusBAReceiver{ + consensus: mgr.con, + chainID: i, + restartNotary: make(chan bool, 1), + } + agrModule := newAgreement( + mgr.con.ID, + recv, + newLeaderSelector(genValidLeader(mgr), mgr.logger), + mgr.auth) + // Hacky way to make agreement module self contained. + recv.agreementModule = agrModule + mgr.baModules = append(mgr.baModules, agrModule) + go mgr.runBA(round, i) + } + return nil +} + +func (mgr *agreementMgr) processVote(v *types.Vote) error { + v = v.Clone() + mgr.lock.RLock() + defer mgr.lock.RUnlock() + if v.Position.ChainID >= uint32(len(mgr.baModules)) { + mgr.logger.Error("Process vote for unknown chain to BA", + "position", &v.Position, + "baChain", len(mgr.baModules), + "baRound", len(mgr.configs)) + return utils.ErrInvalidChainID + } + return mgr.baModules[v.Position.ChainID].processVote(v) +} + +func (mgr *agreementMgr) processBlock(b *types.Block) error { + mgr.lock.RLock() + defer mgr.lock.RUnlock() + if b.Position.ChainID >= uint32(len(mgr.baModules)) { + mgr.logger.Error("Process block for unknown chain to BA", + "position", &b.Position, + "baChain", len(mgr.baModules), + "baRound", len(mgr.configs)) + return utils.ErrInvalidChainID + } + return mgr.baModules[b.Position.ChainID].processBlock(b) +} + +func (mgr *agreementMgr) processAgreementResult( + result *types.AgreementResult) error { + mgr.lock.RLock() + defer mgr.lock.RUnlock() + if result.Position.ChainID >= uint32(len(mgr.baModules)) { + mgr.logger.Error("Process unknown result for unknown chain to BA", + "position", &result.Position, + "baChain", len(mgr.baModules), + "baRound", len(mgr.configs)) + return utils.ErrInvalidChainID + } + agreement := mgr.baModules[result.Position.ChainID] + aID := agreement.agreementID() + if isStop(aID) { + return nil + } + if result.Position.Newer(&aID) { + mgr.logger.Info("Syncing BA", "position", &result.Position) + nodes, err := mgr.cache.GetNodeSet(result.Position.Round) + if err != nil { + return err + } + mgr.logger.Debug("Calling Network.PullBlocks for syncing BA", + "hash", result.BlockHash) + mgr.network.PullBlocks(common.Hashes{result.BlockHash}) + mgr.logger.Debug("Calling Governance.CRS", "round", result.Position.Round) + crs := mgr.gov.CRS(result.Position.Round) + nIDs := nodes.GetSubSet( + int(mgr.gov.Configuration(result.Position.Round).NotarySetSize), + types.NewNotarySetTarget(crs, result.Position.ChainID)) + for _, vote := range result.Votes { + agreement.processVote(&vote) + } + agreement.restart(nIDs, result.Position, crs) + } + return nil +} + +func (mgr *agreementMgr) stop() { + // Stop all running agreement modules. + func() { + mgr.lock.Lock() + defer mgr.lock.Unlock() + for _, agr := range mgr.baModules { + agr.stop() + } + }() + // Block until all routines are done. + mgr.waitGroup.Wait() +} + +func (mgr *agreementMgr) runBA(initRound uint64, chainID uint32) { + mgr.waitGroup.Add(1) + defer mgr.waitGroup.Done() + // Acquire agreement module. + agr, recv := func() (*agreement, *consensusBAReceiver) { + mgr.lock.RLock() + defer mgr.lock.RUnlock() + agr := mgr.baModules[chainID] + return agr, agr.data.recv.(*consensusBAReceiver) + }() + // These are round based variables. + var ( + currentRound uint64 + nextRound = initRound + setting = baRoundSetting{ + chainID: chainID, + agr: agr, + recv: recv, + } + roundBeginTime time.Time + roundEndTime time.Time + tickDuration time.Duration + ) + + // Check if this routine needs to awake in this round and prepare essential + // variables when yes. + checkRound := func() (awake bool) { + defer func() { + currentRound = nextRound + nextRound++ + }() + // Wait until the configuartion for next round is ready. + var config *agreementMgrConfig + for { + config = func() *agreementMgrConfig { + mgr.lock.RLock() + defer mgr.lock.RUnlock() + if nextRound < uint64(len(mgr.configs)) { + return mgr.configs[nextRound] + } + return nil + }() + if config != nil { + break + } else { + mgr.logger.Info("round is not ready", "round", nextRound) + time.Sleep(1 * time.Second) + } + } + // Set next checkpoint. + roundBeginTime = config.beginTime + roundEndTime = config.beginTime.Add(config.roundInterval) + // Check if this chain handled by this routine included in this round. + if chainID >= config.numChains { + return false + } + // Check if this node in notary set of this chain in this round. + nodeSet, err := mgr.cache.GetNodeSet(nextRound) + if err != nil { + panic(err) + } + setting.crs = config.crs + setting.notarySet = nodeSet.GetSubSet( + int(config.notarySetSize), + types.NewNotarySetTarget(config.crs, chainID)) + _, awake = setting.notarySet[mgr.ID] + // Setup ticker + if tickDuration != config.lambdaBA { + if setting.ticker != nil { + setting.ticker.Stop() + } + setting.ticker = newTicker(mgr.gov, nextRound, TickerBA) + tickDuration = config.lambdaBA + } + return + } +Loop: + for { + select { + case <-mgr.ctx.Done(): + break Loop + default: + } + now := time.Now().UTC() + if !checkRound() { + if now.After(roundEndTime) { + // That round is passed. + continue Loop + } + // Sleep until next checkpoint. + select { + case <-mgr.ctx.Done(): + break Loop + case <-time.After(roundEndTime.Sub(now)): + continue Loop + } + } + // Sleep until round begin. Here a biased round begin time would be + // used instead of the one in config. The reason it to disperse the load + // of fullnodes to verify confirmed blocks from each chain. + if now.Before(pickBiasedTime(roundBeginTime, 4*tickDuration)) { + select { + case <-mgr.ctx.Done(): + break Loop + case <-time.After(roundBeginTime.Sub(now)): + } + // Clean the tick channel after awake: the tick would be queued in + // channel, thus the first few ticks would not tick on expected + // interval. + <-setting.ticker.Tick() + <-setting.ticker.Tick() + } + // Run BA for this round. + recv.round = currentRound + recv.changeNotaryTime = roundEndTime + recv.restartNotary <- false + if err := mgr.baRoutineForOneRound(&setting); err != nil { + mgr.logger.Error("BA routine failed", + "error", err, + "nodeID", mgr.ID, + "chain", chainID) + break Loop + } + } +} + +func (mgr *agreementMgr) baRoutineForOneRound( + setting *baRoundSetting) (err error) { + agr := setting.agr + recv := setting.recv +Loop: + for { + select { + case <-mgr.ctx.Done(): + break Loop + default: + } + select { + case newNotary := <-recv.restartNotary: + if newNotary { + // This round is finished. + break Loop + } + oldPos := agr.agreementID() + var nextHeight uint64 + for { + nextHeight, err = mgr.lattice.NextHeight(recv.round, setting.chainID) + if err != nil { + panic(err) + } + if isStop(oldPos) || nextHeight == 0 { + break + } + if nextHeight > oldPos.Height { + break + } + time.Sleep(100 * time.Millisecond) + mgr.logger.Debug("Lattice not ready!!!", + "old", &oldPos, "next", nextHeight) + } + nextPos := types.Position{ + Round: recv.round, + ChainID: setting.chainID, + Height: nextHeight, + } + agr.restart(setting.notarySet, nextPos, setting.crs) + default: + } + if agr.pullVotes() { + pos := agr.agreementID() + mgr.logger.Debug("Calling Network.PullVotes for syncing votes", + "position", &pos) + mgr.network.PullVotes(pos) + } + if err = agr.nextState(); err != nil { + mgr.logger.Error("Failed to proceed to next state", + "nodeID", mgr.ID.String(), + "error", err) + break Loop + } + for i := 0; i < agr.clocks(); i++ { + // Priority select for agreement.done(). + select { + case <-agr.done(): + continue Loop + default: + } + select { + case <-agr.done(): + continue Loop + case <-setting.ticker.Tick(): + } + } + } + return nil +} diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go index ff1c71a7c..4fb0deaa8 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/agreement.go @@ -311,12 +311,13 @@ func (a *agreement) processVote(vote *types.Vote) error { return err } aID := a.agreementID() + // Agreement module has stopped. + if isStop(aID) { + return nil + } if vote.Position != aID { - // Agreement module has stopped. - if !isStop(aID) { - if aID.Newer(&vote.Position) { - return nil - } + if aID.Newer(&vote.Position) { + return nil } a.pendingVote = append(a.pendingVote, pendingVote{ vote: vote, diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go index f6bc0149d..20a7bdd4a 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/compaction-chain.go @@ -21,6 +21,7 @@ import ( "container/heap" "fmt" "sync" + "time" "github.com/dexon-foundation/dexon-consensus/common" "github.com/dexon-foundation/dexon-consensus/core/crypto" @@ -39,6 +40,13 @@ var ( "incorrect block randomness result") ) +const maxPendingPeriod = 3 * time.Second + +type pendingRandomnessResult struct { + receivedTime time.Time + randResult *types.BlockRandomnessResult +} + type finalizedBlockHeap = types.ByFinalizationHeight type compactionChain struct { @@ -47,6 +55,7 @@ type compactionChain struct { tsigVerifier *TSigVerifierCache blocks map[common.Hash]*types.Block blockRandomness map[common.Hash][]byte + pendingRandomness map[common.Hash]pendingRandomnessResult pendingBlocks []*types.Block pendingFinalizedBlocks *finalizedBlockHeap lock sync.RWMutex @@ -61,6 +70,7 @@ func newCompactionChain(gov Governance) *compactionChain { tsigVerifier: NewTSigVerifierCache(gov, 7), blocks: make(map[common.Hash]*types.Block), blockRandomness: make(map[common.Hash][]byte), + pendingRandomness: make(map[common.Hash]pendingRandomnessResult), pendingFinalizedBlocks: pendingFinalizedBlocks, } } @@ -83,6 +93,10 @@ func (cc *compactionChain) registerBlock(block *types.Block) { cc.lock.Lock() defer cc.lock.Unlock() cc.blocks[block.Hash] = block + if rand, exist := cc.pendingRandomness[block.Hash]; exist { + cc.blockRandomness[rand.randResult.BlockHash] = rand.randResult.Randomness + delete(cc.pendingRandomness, block.Hash) + } } func (cc *compactionChain) blockRegistered(hash common.Hash) bool { @@ -286,13 +300,6 @@ func (cc *compactionChain) extractFinalizedBlocks() []*types.Block { func (cc *compactionChain) processBlockRandomnessResult( rand *types.BlockRandomnessResult) error { - cc.lock.Lock() - defer cc.lock.Unlock() - if !cc.blockRegisteredNoLock(rand.BlockHash) { - // If the randomness result is discarded here, it'll later be processed by - //finalized block - return ErrBlockNotRegistered - } ok, err := cc.verifyRandomness( rand.BlockHash, rand.Position.Round, rand.Randomness) if err != nil { @@ -301,10 +308,29 @@ func (cc *compactionChain) processBlockRandomnessResult( if !ok { return ErrIncorrectBlockRandomnessResult } + cc.lock.Lock() + defer cc.lock.Unlock() + if !cc.blockRegisteredNoLock(rand.BlockHash) { + cc.purgePending() + cc.pendingRandomness[rand.BlockHash] = pendingRandomnessResult{ + receivedTime: time.Now(), + randResult: rand, + } + return ErrBlockNotRegistered + } cc.blockRandomness[rand.BlockHash] = rand.Randomness return nil } +func (cc *compactionChain) purgePending() { + now := time.Now() + for key, rand := range cc.pendingRandomness { + if now.After(rand.receivedTime.Add(maxPendingPeriod)) { + delete(cc.pendingRandomness, key) + } + } +} + func (cc *compactionChain) lastBlock() *types.Block { cc.lock.RLock() defer cc.lock.RUnlock() diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index e09ee2579..af4041766 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -56,6 +56,8 @@ var ( "incorrect vote position") ErrIncorrectVoteProposer = fmt.Errorf( "incorrect vote proposer") + ErrCRSNotReady = fmt.Errorf( + "CRS not ready") ) // consensusBAReceiver implements agreementReceiver. @@ -103,20 +105,20 @@ func (recv *consensusBAReceiver) ProposeBlock() common.Hash { func (recv *consensusBAReceiver) ConfirmBlock( hash common.Hash, votes map[types.NodeID]*types.Vote) { var block *types.Block - if (hash == common.Hash{}) { + isEmptyBlockConfirmed := hash == common.Hash{} + if isEmptyBlockConfirmed { aID := recv.agreementModule.agreementID() recv.consensus.logger.Info("Empty block is confirmed", "position", &aID) var err error - block, err = recv.consensus.proposeEmptyBlock(recv.chainID) + block, err = recv.consensus.proposeEmptyBlock(recv.round, recv.chainID) if err != nil { recv.consensus.logger.Error("Propose empty block failed", "error", err) return } } else { var exist bool - block, exist = recv.consensus.baModules[recv.chainID]. - findCandidateBlockNoLock(hash) + block, exist = recv.agreementModule.findCandidateBlockNoLock(hash) if !exist { recv.consensus.logger.Error("Unknown block confirmed", "hash", hash, @@ -142,6 +144,54 @@ func (recv *consensusBAReceiver) ConfirmBlock( } } recv.consensus.ccModule.registerBlock(block) + if block.Position.Height != 0 && + !recv.consensus.lattice.Exist(block.ParentHash) { + go func(hash common.Hash) { + parentHash := hash + for { + recv.consensus.logger.Warn("Parent block not confirmed", + "hash", parentHash, + "chainID", recv.chainID) + ch := make(chan *types.Block) + if !func() bool { + recv.consensus.lock.Lock() + defer recv.consensus.lock.Unlock() + if _, exist := recv.consensus.baConfirmedBlock[parentHash]; exist { + return false + } + recv.consensus.baConfirmedBlock[parentHash] = ch + return true + }() { + return + } + var block *types.Block + PullBlockLoop: + for { + recv.consensus.logger.Debug("Calling Network.PullBlock for parent", + "hash", parentHash) + recv.consensus.network.PullBlocks(common.Hashes{parentHash}) + select { + case block = <-ch: + break PullBlockLoop + case <-time.After(1 * time.Second): + } + } + recv.consensus.logger.Info("Receive parent block", + "hash", block.ParentHash, + "chainID", recv.chainID) + recv.consensus.ccModule.registerBlock(block) + if err := recv.consensus.processBlock(block); err != nil { + recv.consensus.logger.Error("Failed to process block", "error", err) + return + } + parentHash = block.ParentHash + if block.Position.Height == 0 || + recv.consensus.lattice.Exist(parentHash) { + return + } + } + }(block.ParentHash) + } voteList := make([]types.Vote, 0, len(votes)) for _, vote := range votes { if vote.BlockHash != hash { @@ -150,11 +200,12 @@ func (recv *consensusBAReceiver) ConfirmBlock( voteList = append(voteList, *vote) } result := &types.AgreementResult{ - BlockHash: block.Hash, - Position: block.Position, - Votes: voteList, + BlockHash: block.Hash, + Position: block.Position, + Votes: voteList, + IsEmptyBlock: isEmptyBlockConfirmed, } - recv.consensus.logger.Debug("Calling Network.BroadcastAgreementResult", + recv.consensus.logger.Debug("Propose AgreementResult", "result", result) recv.consensus.network.BroadcastAgreementResult(result) if err := recv.consensus.processBlock(block); err != nil { @@ -273,8 +324,7 @@ type Consensus struct { authModule *Authenticator // BA. - baModules []*agreement - receivers []*consensusBAReceiver + baMgr *agreementMgr baConfirmedBlock map[common.Hash]chan<- *types.Block // DKG. @@ -365,49 +415,8 @@ func NewConsensus( event: common.NewEvent(), logger: logger, } - - validLeader := func(block *types.Block) (bool, error) { - if block.Timestamp.After(time.Now()) { - return false, nil - } - if err := lattice.SanityCheck(block); err != nil { - if err == ErrRetrySanityCheckLater { - return false, nil - } - return false, err - } - logger.Debug("Calling Application.VerifyBlock", "block", block) - switch app.VerifyBlock(block) { - case types.VerifyInvalidBlock: - return false, ErrInvalidBlock - case types.VerifyRetryLater: - return false, nil - default: - } - return true, nil - } - - con.baModules = make([]*agreement, config.NumChains) - con.receivers = make([]*consensusBAReceiver, config.NumChains) - for i := uint32(0); i < config.NumChains; i++ { - chainID := i - recv := &consensusBAReceiver{ - consensus: con, - chainID: chainID, - restartNotary: make(chan bool, 1), - } - agreementModule := newAgreement( - con.ID, - recv, - newLeaderSelector(validLeader, logger), - con.authModule, - ) - // Hacky way to make agreement module self contained. - recv.agreementModule = agreementModule - recv.changeNotaryTime = dMoment - con.baModules[chainID] = agreementModule - con.receivers[chainID] = recv - } + con.ctx, con.ctxCancel = context.WithCancel(context.Background()) + con.baMgr = newAgreementMgr(con, dMoment) return con } @@ -420,14 +429,27 @@ func (con *Consensus) Run(initBlock *types.Block) { con.logger.Debug("Calling Governance.Configuration", "round", initRound) initConfig := con.gov.Configuration(initRound) // Setup context. - con.ctx, con.ctxCancel = context.WithCancel(context.Background()) con.ccModule.init(initBlock) // TODO(jimmy-dexon): change AppendConfig to add config for specific round. - for i := uint64(0); i <= initRound; i++ { - con.logger.Debug("Calling Governance.Configuration", "round", i+1) - cfg := con.gov.Configuration(i + 1) - if err := con.lattice.AppendConfig(i+1, cfg); err != nil { - panic(err) + for i := uint64(0); i <= initRound+1; i++ { + con.logger.Debug("Calling Governance.Configuration", "round", i) + cfg := con.gov.Configuration(i) + // 0 round is already given to core.Lattice module when constructing. + if i > 0 { + if err := con.lattice.AppendConfig(i, cfg); err != nil { + panic(err) + } + } + // Corresponding CRS might not be ready for next round to initRound. + if i < initRound+1 { + con.logger.Debug("Calling Governance.CRS", "round", i) + crs := con.gov.CRS(i) + if (crs == common.Hash{}) { + panic(ErrCRSNotReady) + } + if err := con.baMgr.appendConfig(i, cfg, crs); err != nil { + panic(err) + } } } dkgSet, err := con.nodeSetCache.GetDKGSet(initRound) @@ -447,103 +469,9 @@ func (con *Consensus) Run(initBlock *types.Block) { }) } con.initialRound(con.dMoment, initRound, initConfig) - ticks := make([]chan struct{}, 0, initConfig.NumChains) - for i := uint32(0); i < initConfig.NumChains; i++ { - tick := make(chan struct{}) - ticks = append(ticks, tick) - // TODO(jimmy-dexon): this is a temporary solution to offset BA time. - // The complelete solution should be delivered along with config change. - offset := time.Duration(i*uint32(4)/initConfig.NumChains) * - initConfig.LambdaBA - go func(chainID uint32, offset time.Duration) { - time.Sleep(offset) - con.runBA(chainID, tick) - }(i, offset) - } - - // Reset ticker. - <-con.tickerObj.Tick() - <-con.tickerObj.Tick() - for { - <-con.tickerObj.Tick() - for _, tick := range ticks { - select { - case tick <- struct{}{}: - default: - } - } - } -} - -func (con *Consensus) runBA(chainID uint32, tick <-chan struct{}) { - // TODO(jimmy-dexon): move this function inside agreement. - agreement := con.baModules[chainID] - recv := con.receivers[chainID] - recv.restartNotary <- true - nIDs := make(map[types.NodeID]struct{}) - crs := common.Hash{} - // Reset ticker - <-tick -BALoop: - for { - select { - case <-con.ctx.Done(): - break BALoop - default: - } - select { - case newNotary := <-recv.restartNotary: - if newNotary { - con.logger.Debug("Calling Governance.CRS", "round", recv.round) - crs = con.gov.CRS(recv.round) - if (crs == common.Hash{}) { - // Governance is out-of-sync. - continue BALoop - } - configForNewRound := con.gov.Configuration(recv.round) - recv.changeNotaryTime = - recv.changeNotaryTime.Add(configForNewRound.RoundInterval) - nodes, err := con.nodeSetCache.GetNodeSet(recv.round) - if err != nil { - panic(err) - } - con.logger.Debug("Calling Governance.Configuration", - "round", recv.round) - nIDs = nodes.GetSubSet( - int(configForNewRound.NotarySetSize), - types.NewNotarySetTarget(crs, chainID)) - } - nextPos := con.lattice.NextPosition(chainID) - nextPos.Round = recv.round - agreement.restart(nIDs, nextPos, crs) - default: - } - if agreement.pullVotes() { - pos := agreement.agreementID() - con.logger.Debug("Calling Network.PullVotes for syncing votes", - "position", &pos) - con.network.PullVotes(pos) - } - err := agreement.nextState() - if err != nil { - con.logger.Error("Failed to proceed to next state", - "nodeID", con.ID.String(), - "error", err) - break BALoop - } - for i := 0; i < agreement.clocks(); i++ { - // Priority select for agreement.done(). - select { - case <-agreement.done(): - continue BALoop - default: - } - select { - case <-agreement.done(): - continue BALoop - case <-tick: - } - } + // Block until done. + select { + case <-con.ctx.Done(): } } @@ -622,6 +550,7 @@ func (con *Consensus) initialRound( con.logger.Error("Error getting DKG set", "round", round, "error", err) curDkgSet = make(map[types.NodeID]struct{}) } + // Initiate CRS routine. if _, exist := curDkgSet[con.ID]; exist { con.event.RegisterTime(startTime.Add(config.RoundInterval/2), func(time.Time) { @@ -630,7 +559,31 @@ func (con *Consensus) initialRound( }() }) } - + // Initiate BA modules. + con.event.RegisterTime( + startTime.Add(config.RoundInterval/2+config.LambdaDKG), + func(time.Time) { + go func(nextRound uint64) { + for (con.gov.CRS(nextRound) == common.Hash{}) { + con.logger.Info("CRS is not ready yet. Try again later...", + "nodeID", con.ID, + "round", nextRound) + time.Sleep(500 * time.Millisecond) + } + // Notify BA for new round. + con.logger.Debug("Calling Governance.Configuration", + "round", nextRound) + nextConfig := con.gov.Configuration(nextRound) + con.logger.Debug("Calling Governance.CRS", + "round", nextRound) + nextCRS := con.gov.CRS(nextRound) + if err := con.baMgr.appendConfig( + nextRound, nextConfig, nextCRS); err != nil { + panic(err) + } + }(round + 1) + }) + // Initiate DKG for this round. con.event.RegisterTime(startTime.Add(config.RoundInterval/2+config.LambdaDKG), func(time.Time) { go func(nextRound uint64) { @@ -670,6 +623,7 @@ func (con *Consensus) initialRound( }) }(round + 1) }) + // Prepare lattice module for next round and next "initialRound" routine. con.event.RegisterTime(startTime.Add(config.RoundInterval), func(time.Time) { // Change round. @@ -685,9 +639,7 @@ func (con *Consensus) initialRound( // Stop the Consensus core. func (con *Consensus) Stop() { - for _, a := range con.baModules { - a.stop() - } + con.baMgr.stop() con.event.Reset() con.ctxCancel() } @@ -785,9 +737,10 @@ func (con *Consensus) proposeBlock(chainID uint32, round uint64) *types.Block { } func (con *Consensus) proposeEmptyBlock( - chainID uint32) (*types.Block, error) { + round uint64, chainID uint32) (*types.Block, error) { block := &types.Block{ Position: types.Position{ + Round: round, ChainID: chainID, }, } @@ -799,15 +752,9 @@ func (con *Consensus) proposeEmptyBlock( // ProcessVote is the entry point to submit ont vote to a Consensus instance. func (con *Consensus) ProcessVote(vote *types.Vote) (err error) { - if vote.Position.ChainID >= uint32(len(con.baModules)) { - return nil - } - if isStop(con.baModules[vote.Position.ChainID].agreementID()) { - return nil - } v := vote.Clone() - err = con.baModules[v.Position.ChainID].processVote(v) - return err + err = con.baMgr.processVote(v) + return } // ProcessAgreementResult processes the randomness request. @@ -826,8 +773,14 @@ func (con *Consensus) ProcessAgreementResult( return ErrIncorrectVoteProposer } for _, vote := range rand.Votes { - if vote.BlockHash != rand.BlockHash { - return ErrIncorrectVoteBlockHash + if rand.IsEmptyBlock { + if (vote.BlockHash != common.Hash{}) { + return ErrIncorrectVoteBlockHash + } + } else { + if vote.BlockHash != rand.BlockHash { + return ErrIncorrectVoteBlockHash + } } if vote.Type != types.VoteCom { return ErrIncorrectVoteType @@ -847,29 +800,8 @@ func (con *Consensus) ProcessAgreementResult( } } // Syncing BA Module. - agreement := con.baModules[rand.Position.ChainID] - aID := agreement.agreementID() - if isStop(aID) { - return nil - } - if rand.Position.Newer(&aID) { - con.logger.Info("Syncing BA", "position", &rand.Position) - nodes, err := con.nodeSetCache.GetNodeSet(rand.Position.Round) - if err != nil { - return err - } - con.logger.Debug("Calling Network.PullBlocks for syncing BA", - "hash", rand.BlockHash) - con.network.PullBlocks(common.Hashes{rand.BlockHash}) - con.logger.Debug("Calling Governance.CRS", "round", rand.Position.Round) - crs := con.gov.CRS(rand.Position.Round) - nIDs := nodes.GetSubSet( - int(con.gov.Configuration(rand.Position.Round).NotarySetSize), - types.NewNotarySetTarget(crs, rand.Position.ChainID)) - for _, vote := range rand.Votes { - agreement.processVote(&vote) - } - agreement.restart(nIDs, rand.Position, crs) + if err := con.baMgr.processAgreementResult(rand); err != nil { + return err } // Calculating randomness. if rand.Position.Round == 0 { @@ -882,7 +814,8 @@ func (con *Consensus) ProcessAgreementResult( if !con.cfgModule.touchTSigHash(rand.BlockHash) { return nil } - con.logger.Debug("Calling Network.BroadcastAgreementResult", "result", rand) + con.logger.Debug("Rebroadcast AgreementResult", + "result", rand) con.network.BroadcastAgreementResult(rand) dkgSet, err := con.nodeSetCache.GetDKGSet(rand.Position.Round) if err != nil { @@ -950,9 +883,7 @@ func (con *Consensus) ProcessBlockRandomnessResult( // preProcessBlock performs Byzantine Agreement on the block. func (con *Consensus) preProcessBlock(b *types.Block) (err error) { - if err = con.baModules[b.Position.ChainID].processBlock(b); err != nil { - return err - } + err = con.baMgr.processBlock(b) return } @@ -1010,6 +941,8 @@ func (con *Consensus) processBlock(block *types.Block) (err error) { go con.event.NotifyTime(b.Finalization.Timestamp) } deliveredBlocks = con.ccModule.extractBlocks() + con.logger.Debug("Last block in compaction chain", + "block", con.ccModule.lastBlock()) for _, b := range deliveredBlocks { if err = con.db.Update(*b); err != nil { panic(err) @@ -1064,9 +997,15 @@ func (con *Consensus) prepareBlock(b *types.Block, if err = con.lattice.PrepareBlock(b, proposeTime); err != nil { return } - con.logger.Debug("Calling Governance.CRS", "round", 0) - if err = - con.authModule.SignCRS(b, con.gov.CRS(b.Position.Round)); err != nil { + con.logger.Debug("Calling Governance.CRS", "round", b.Position.Round) + crs := con.gov.CRS(b.Position.Round) + if crs.Equal(common.Hash{}) { + con.logger.Error("CRS for round is not ready, unable to prepare block", + "position", &b.Position) + err = ErrCRSNotReady + return + } + if err = con.authModule.SignCRS(b, crs); err != nil { return } return diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go index 6fe810ac0..f1ab2de6a 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice-data.go @@ -362,6 +362,35 @@ func (data *latticeData) addFinalizedBlock(block *types.Block) (err error) { return } +// isBindTip checks if a block's fields should follow up its parent block. +func (data *latticeData) isBindTip( + pos types.Position, tip *types.Block) (bindTip bool, err error) { + if tip == nil { + return + } + if pos.Round < tip.Position.Round { + err = ErrInvalidRoundID + return + } + tipConfig := data.getConfig(tip.Position.Round) + if tip.Timestamp.After(tipConfig.roundEndTime) { + if pos.Round == tip.Position.Round { + err = ErrRoundNotSwitch + return + } + if pos.Round == tip.Position.Round+1 { + bindTip = true + } + } else { + if pos.Round != tip.Position.Round { + err = ErrInvalidRoundID + return + } + bindTip = true + } + return +} + // prepareBlock setups fields of a block based on its ChainID and Round, // including: // - Acks @@ -375,7 +404,6 @@ func (data *latticeData) prepareBlock(b *types.Block) error { config *latticeDataConfig acks common.Hashes bindTip bool - chainTip *types.Block ) if config = data.getConfig(b.Position.Round); config == nil { return ErrUnknownRoundID @@ -388,30 +416,16 @@ func (data *latticeData) prepareBlock(b *types.Block) error { b.Position.Height = 0 b.ParentHash = common.Hash{} // Decide valid timestamp range. - homeChain := data.chains[b.Position.ChainID] - if homeChain.tip != nil { - chainTip = homeChain.tip - if b.Position.Round < chainTip.Position.Round { - return ErrInvalidRoundID - } - chainTipConfig := data.getConfig(chainTip.Position.Round) - if chainTip.Timestamp.After(chainTipConfig.roundEndTime) { - if b.Position.Round == chainTip.Position.Round { - return ErrRoundNotSwitch - } - if b.Position.Round == chainTip.Position.Round+1 { - bindTip = true - } - } else { - if b.Position.Round != chainTip.Position.Round { - return ErrInvalidRoundID - } - bindTip = true - } + chainTip := data.chains[b.Position.ChainID].tip + if chainTip != nil { // TODO(mission): find a way to prevent us to assign a witness height // from Jurassic period. b.Witness.Height = chainTip.Witness.Height } + bindTip, err := data.isBindTip(b.Position, chainTip) + if err != nil { + return err + } // For blocks with continuous round ID, assign timestamp range based on // parent block and bound config. if bindTip { @@ -461,40 +475,48 @@ func (data *latticeData) prepareBlock(b *types.Block) error { // - ParentHash and Height from parent block. If there is no valid parent block // (ex. Newly added chain or bootstrap), these fields would be setup as // genesis block. -func (data *latticeData) prepareEmptyBlock(b *types.Block) { +func (data *latticeData) prepareEmptyBlock(b *types.Block) (err error) { // emptyBlock has no proposer. b.ProposerID = types.NodeID{} - var acks common.Hashes // Reset fields to make sure we got these information from parent block. b.Position.Height = 0 - b.Position.Round = 0 b.ParentHash = common.Hash{} b.Timestamp = time.Time{} // Decide valid timestamp range. - homeChain := data.chains[b.Position.ChainID] - if homeChain.tip != nil { - chainTip := homeChain.tip + config := data.getConfig(b.Position.Round) + chainTip := data.chains[b.Position.ChainID].tip + bindTip, err := data.isBindTip(b.Position, chainTip) + if err != nil { + return + } + if bindTip { b.ParentHash = chainTip.Hash - chainTipConfig := data.getConfig(chainTip.Position.Round) - if chainTip.Timestamp.After(chainTipConfig.roundEndTime) { - b.Position.Round = chainTip.Position.Round + 1 - } else { - b.Position.Round = chainTip.Position.Round - } b.Position.Height = chainTip.Position.Height + 1 - b.Timestamp = chainTip.Timestamp.Add(chainTipConfig.minBlockTimeInterval) + b.Timestamp = chainTip.Timestamp.Add(config.minBlockTimeInterval) b.Witness.Height = chainTip.Witness.Height b.Witness.Data = make([]byte, len(chainTip.Witness.Data)) copy(b.Witness.Data, chainTip.Witness.Data) - acks = append(acks, chainTip.Hash) + b.Acks = common.NewSortedHashes(common.Hashes{chainTip.Hash}) + } else { + b.Timestamp = config.roundBeginTime } - b.Acks = common.NewSortedHashes(acks) + return } // TODO(mission): make more abstraction for this method. // nextHeight returns the next height of a chain. -func (data *latticeData) nextPosition(chainID uint32) types.Position { - return data.chains[chainID].nextPosition() +func (data *latticeData) nextHeight( + round uint64, chainID uint32) (uint64, error) { + chainTip := data.chains[chainID].tip + bindTip, err := data.isBindTip( + types.Position{Round: round, ChainID: chainID}, chainTip) + if err != nil { + return 0, err + } + if bindTip { + return chainTip.Position.Height + 1, nil + } + return 0, nil } // findBlock seeks blocks in memory or db. @@ -609,21 +631,6 @@ func (s *chainStatus) addBlock(b *types.Block) { s.tip = b } -// TODO(mission): change back to nextHeight. -// nextPosition returns a valid position for new block in this chain. -func (s *chainStatus) nextPosition() types.Position { - if s.tip == nil { - return types.Position{ - ChainID: s.ID, - Height: 0, - } - } - return types.Position{ - ChainID: s.ID, - Height: s.tip.Position.Height + 1, - } -} - // purgeBlock purges a block from cache, make sure this block is already saved // in blockdb. func (s *chainStatus) purgeBlock(b *types.Block) error { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go index 7b66bd557..f76813d82 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/lattice.go @@ -41,7 +41,6 @@ type Lattice struct { pool blockPool retryAdd bool data *latticeData - toSyncer *totalOrderingSyncer toModule *totalOrdering ctModule *consensusTimestamp logger common.Logger @@ -65,7 +64,6 @@ func NewLattice( debug: debug, pool: newBlockPool(cfg.NumChains), data: newLatticeData(db, dMoment, round, cfg), - toSyncer: newTotalOrderingSyncer(cfg.NumChains), toModule: newTotalOrdering(dMoment, cfg), ctModule: newConsensusTimestamp(dMoment, round, cfg.NumChains), logger: logger, @@ -102,7 +100,9 @@ func (l *Lattice) PrepareBlock( func (l *Lattice) PrepareEmptyBlock(b *types.Block) (err error) { l.lock.RLock() defer l.lock.RUnlock() - l.data.prepareEmptyBlock(b) + if err = l.data.prepareEmptyBlock(b); err != nil { + return + } if b.Hash, err = hashBlock(b); err != nil { return } @@ -155,6 +155,16 @@ func (l *Lattice) SanityCheck(b *types.Block) (err error) { return } +// Exist checks if the block is known to lattice. +func (l *Lattice) Exist(hash common.Hash) bool { + l.lock.RLock() + defer l.lock.RUnlock() + if _, err := l.data.findBlock(hash); err != nil { + return false + } + return true +} + // addBlockToLattice adds a block into lattice, and delivers blocks with the // acks already delivered. // @@ -164,6 +174,8 @@ func (l *Lattice) addBlockToLattice( if tip := l.data.chains[input.Position.ChainID].tip; tip != nil { if !input.Position.Newer(&tip.Position) { + l.logger.Warn("Dropping block: older than tip", + "block", input, "tip", tip) return } } @@ -203,7 +215,7 @@ func (l *Lattice) addBlockToLattice( if l.debug != nil { l.debug.StronglyAcked(b.Hash) } - l.logger.Debug("Calling Application.BlockConfirmed", "block", input) + l.logger.Debug("Calling Application.BlockConfirmed", "block", b) l.app.BlockConfirmed(*b.Clone()) // Purge blocks in pool with the same chainID and lower height. l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) @@ -237,40 +249,37 @@ func (l *Lattice) ProcessBlock( return } - for _, blockToSyncer := range inLattice { - toTotalOrdering := l.toSyncer.processBlock(blockToSyncer) - // Perform total ordering for each block added to lattice. - for _, b = range toTotalOrdering { - toDelivered, deliveredMode, err = l.toModule.processBlock(b) - if err != nil { - // All errors from total ordering is serious, should panic. - panic(err) - } - if len(toDelivered) == 0 { - continue - } - hashes := make(common.Hashes, len(toDelivered)) - for idx := range toDelivered { - hashes[idx] = toDelivered[idx].Hash - } - if l.debug != nil { - l.debug.TotalOrderingDelivered(hashes, deliveredMode) - } - // Perform consensus timestamp module. - if err = l.ctModule.processBlocks(toDelivered); err != nil { - return - } - delivered = append(delivered, toDelivered...) + for _, b = range inLattice { + toDelivered, deliveredMode, err = l.toModule.processBlock(b) + if err != nil { + // All errors from total ordering is serious, should panic. + panic(err) + } + if len(toDelivered) == 0 { + continue + } + hashes := make(common.Hashes, len(toDelivered)) + for idx := range toDelivered { + hashes[idx] = toDelivered[idx].Hash + } + if l.debug != nil { + l.debug.TotalOrderingDelivered(hashes, deliveredMode) } + // Perform consensus timestamp module. + if err = l.ctModule.processBlocks(toDelivered); err != nil { + return + } + delivered = append(delivered, toDelivered...) } return } -// NextPosition returns expected position of incoming block for specified chain. -func (l *Lattice) NextPosition(chainID uint32) types.Position { +// NextHeight returns expected height of incoming block for specified chain and +// given round. +func (l *Lattice) NextHeight(round uint64, chainID uint32) (uint64, error) { l.lock.RLock() defer l.lock.RUnlock() - return l.data.nextPosition(chainID) + return l.data.nextHeight(round, chainID) } // PurgeBlocks purges blocks' cache in memory, this is called when the caller @@ -301,12 +310,4 @@ func (l *Lattice) AppendConfig(round uint64, config *types.Config) (err error) { // ProcessFinalizedBlock is used for syncing lattice data. func (l *Lattice) ProcessFinalizedBlock(b *types.Block) { - defer func() { l.retryAdd = true }() - l.lock.Lock() - defer l.lock.Unlock() - if err := l.data.addFinalizedBlock(b); err != nil { - panic(err) - } - l.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) - l.toSyncer.processFinalizedBlock(b) } diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go index 6df245b08..1c64d4ad9 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/types/block-randomness.go @@ -25,9 +25,10 @@ import ( // AgreementResult describes an agremeent result. type AgreementResult struct { - BlockHash common.Hash `json:"block_hash"` - Position Position `json:"position"` - Votes []Vote `json:"votes"` + BlockHash common.Hash `json:"block_hash"` + Position Position `json:"position"` + Votes []Vote `json:"votes"` + IsEmptyBlock bool `json:"is_empty_block"` } func (r *AgreementResult) String() string { diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go index f6be46130..441aac174 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/utils.go @@ -20,6 +20,7 @@ package core import ( "errors" "fmt" + "math/rand" "os" "sort" "time" @@ -60,7 +61,7 @@ func Debugf(format string, args ...interface{}) { // Debugln is like fmt.Println, but only output when we are in debug mode. func Debugln(args ...interface{}) { if debug { - fmt.Println(args) + fmt.Println(args...) } } @@ -114,6 +115,11 @@ func removeFromSortedUint32Slice(xs []uint32, x uint32) []uint32 { return append(xs[:indexToRemove], xs[indexToRemove+1:]...) } +// pickBiasedTime returns a biased time based on a given range. +func pickBiasedTime(base time.Time, biasedRange time.Duration) time.Time { + return base.Add(time.Duration(rand.Intn(int(biasedRange)))) +} + // HashConfigurationBlock returns the hash value of configuration block. func HashConfigurationBlock( notarySet map[types.NodeID]struct{}, |