From 2b2396b6bce0f21b515ac2d38556f6dca08b1770 Mon Sep 17 00:00:00 2001 From: Jimmy Hu Date: Wed, 27 Feb 2019 10:41:01 +0800 Subject: core: sync to latest core (#214) * vendor: sync to latest core * fix for single chain --- .../dexon-consensus/core/consensus.go | 267 ++++++++++----------- 1 file changed, 127 insertions(+), 140 deletions(-) (limited to 'vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go') diff --git a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go index f4c0a372d..370df72cf 100644 --- a/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go +++ b/vendor/github.com/dexon-foundation/dexon-consensus/core/consensus.go @@ -58,12 +58,12 @@ var ( // consensusBAReceiver implements agreementReceiver. type consensusBAReceiver struct { // TODO(mission): consensus would be replaced by blockChain and network. - consensus *Consensus - agreementModule *agreement - changeNotaryTime time.Time - roundValue *atomic.Value - isNotary bool - restartNotary chan types.Position + consensus *Consensus + agreementModule *agreement + changeNotaryHeight uint64 + roundValue *atomic.Value + isNotary bool + restartNotary chan types.Position } func (recv *consensusBAReceiver) round() uint64 { @@ -241,10 +241,17 @@ CleanChannelLoop: } } newPos := block.Position - if block.Timestamp.After(recv.changeNotaryTime) { + if block.Position.Height+1 == recv.changeNotaryHeight { newPos.Round++ recv.roundValue.Store(newPos.Round) } + currentRound := recv.round() + if block.Position.Height > recv.changeNotaryHeight && + block.Position.Round <= currentRound { + panic(fmt.Errorf( + "round not switch when confirmig: %s, %d, should switch at %d", + block, currentRound, recv.changeNotaryHeight)) + } recv.restartNotary <- newPos } @@ -383,7 +390,6 @@ type Consensus struct { bcModule *blockChain dMoment time.Time nodeSetCache *utils.NodeSetCache - round uint64 roundForNewConfig uint64 lock sync.RWMutex ctx context.Context @@ -412,7 +418,7 @@ func NewConsensus( prv crypto.PrivateKey, logger common.Logger) *Consensus { return newConsensusForRound( - nil, dMoment, app, gov, db, network, prv, logger, true) + nil, 0, dMoment, app, gov, db, network, prv, logger, true) } // NewConsensusForSimulation creates an instance of Consensus for simulation, @@ -426,7 +432,7 @@ func NewConsensusForSimulation( prv crypto.PrivateKey, logger common.Logger) *Consensus { return newConsensusForRound( - nil, dMoment, app, gov, db, network, prv, logger, false) + nil, 0, dMoment, app, gov, db, network, prv, logger, false) } // NewConsensusFromSyncer constructs an Consensus instance from information @@ -440,7 +446,8 @@ func NewConsensusForSimulation( // their positions, in ascending order. func NewConsensusFromSyncer( initBlock *types.Block, - initRoundBeginTime time.Time, + initRoundBeginHeight uint64, + dMoment time.Time, app Application, gov Governance, db db.Database, @@ -451,8 +458,8 @@ func NewConsensusFromSyncer( cachedMessages []interface{}, logger common.Logger) (*Consensus, error) { // Setup Consensus instance. - con := newConsensusForRound(initBlock, initRoundBeginTime, app, gov, db, - networkModule, prv, logger, true) + con := newConsensusForRound(initBlock, initRoundBeginHeight, dMoment, app, + gov, db, networkModule, prv, logger, true) // Launch a dummy receiver before we start receiving from network module. con.dummyMsgBuffer = cachedMessages con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( @@ -486,9 +493,11 @@ func NewConsensusFromSyncer( } // newConsensusForRound creates a Consensus instance. +// TODO(mission): remove dMoment, it's no longer one part of consensus. func newConsensusForRound( initBlock *types.Block, - initRoundBeginTime time.Time, + initRoundBeginHeight uint64, + dMoment time.Time, app Application, gov Governance, db db.Database, @@ -511,6 +520,7 @@ func newConsensusForRound( initRound = initBlock.Position.Round } initConfig := utils.GetConfigWithPanic(gov, initRound, logger) + initCRS := utils.GetCRSWithPanic(gov, initRound, logger) // Init configuration chain. ID := types.NewNodeID(prv.PublicKey()) recv := &consensusDKGReceiver{ @@ -529,8 +539,8 @@ func newConsensusForRound( } bcConfig := blockChainConfig{} bcConfig.fromConfig(initRound, initConfig) - bcConfig.setRoundBeginTime(initRoundBeginTime) - bcModule := newBlockChain(ID, initBlock, bcConfig, appModule, + bcConfig.setRoundBeginHeight(initRoundBeginHeight) + bcModule := newBlockChain(ID, dMoment, initBlock, bcConfig, appModule, NewTSigVerifierCache(gov, 7), signer, logger) // Construct Consensus instance. con := &Consensus{ @@ -544,7 +554,7 @@ func newConsensusForRound( dkgReady: sync.NewCond(&sync.Mutex{}), cfgModule: cfgModule, bcModule: bcModule, - dMoment: initRoundBeginTime, + dMoment: dMoment, nodeSetCache: nodeSetCache, signer: signer, event: common.NewEvent(), @@ -555,8 +565,15 @@ func newConsensusForRound( processBlockChan: make(chan *types.Block, 1024), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) - con.baMgr = newAgreementMgr(con, initRound, initRoundBeginTime) - if err := con.prepare(initBlock); err != nil { + baConfig := agreementMgrConfig{} + baConfig.from(initRound, initConfig, initCRS) + baConfig.setRoundBeginHeight(initRoundBeginHeight) + var err error + con.baMgr, err = newAgreementMgr(con, initRound, baConfig) + if err != nil { + panic(err) + } + if err = con.prepare(initRoundBeginHeight, initBlock); err != nil { panic(err) } return con @@ -566,53 +583,28 @@ func newConsensusForRound( // 'initBlock' could be either: // - nil // - the last finalized block -func (con *Consensus) prepare(initBlock *types.Block) (err error) { +func (con *Consensus) prepare( + initRoundBeginHeight uint64, initBlock *types.Block) (err error) { // The block past from full node should be delivered already or known by // full node. We don't have to notify it. initRound := uint64(0) if initBlock != nil { initRound = initBlock.Position.Round } + // Setup blockChain module. con.roundForNewConfig = initRound + 1 initConfig := utils.GetConfigWithPanic(con.gov, initRound, con.logger) - // Setup context. - con.logger.Debug("Calling Governance.CRS", "round", initRound) - initCRS := con.gov.CRS(initRound) - if (initCRS == common.Hash{}) { - err = ErrCRSNotReady - return - } - if err = con.baMgr.appendConfig(initRound, initConfig, initCRS); err != nil { - return - } - // Setup blockChain module. initPlusOneCfg := utils.GetConfigWithPanic(con.gov, initRound+1, con.logger) if err = con.bcModule.appendConfig(initRound+1, initPlusOneCfg); err != nil { return } if initRound == 0 { - dkgSet, err := con.nodeSetCache.GetDKGSet(initRound) - if err != nil { - return err - } - if _, exist := dkgSet[con.ID]; exist { - con.logger.Info("Selected as DKG set", "round", initRound) - go func() { - // Sleep until dMoment come. - time.Sleep(con.dMoment.Sub(time.Now().UTC())) - // Network is not stable upon starting. Wait some time to ensure first - // DKG would success. Three is a magic number. - time.Sleep(initConfig.MinBlockInterval * 3) - con.cfgModule.registerDKG(initRound, getDKGThreshold(initConfig)) - con.event.RegisterTime(con.dMoment.Add(initConfig.RoundInterval/4), - func(time.Time) { - con.runDKG(initRound, initConfig) - }) - }() + if DKGDelayRound == 0 { + panic("not implemented yet") } } // Register events. - con.initialRound(con.dMoment, initRound, initConfig) + con.initialRound(initRoundBeginHeight, initRound, initConfig) return } @@ -672,18 +664,11 @@ func (con *Consensus) runDKG(round uint64, config *types.Config) { } con.dkgRunning = 1 go func() { - startTime := time.Now().UTC() defer func() { con.dkgReady.L.Lock() defer con.dkgReady.L.Unlock() con.dkgReady.Broadcast() con.dkgRunning = 2 - DKGTime := time.Now().Sub(startTime) - if DKGTime.Nanoseconds() >= - config.RoundInterval.Nanoseconds()/2 { - con.logger.Warn("Your computer cannot finish DKG on time!", - "nodeID", con.ID.String()) - } }() if err := con.cfgModule.runDKG(round); err != nil { con.logger.Error("Failed to runDKG", "error", err) @@ -739,25 +724,28 @@ func (con *Consensus) runCRS(round uint64) { } func (con *Consensus) initialRound( - startTime time.Time, round uint64, config *types.Config) { + startHeight uint64, round uint64, config *types.Config) { select { case <-con.ctx.Done(): return default: } - curDkgSet, err := con.nodeSetCache.GetDKGSet(round) - if err != nil { - con.logger.Error("Error getting DKG set", "round", round, "error", err) - curDkgSet = make(map[types.NodeID]struct{}) - } - // Initiate CRS routine. - if _, exist := curDkgSet[con.ID]; exist { - con.event.RegisterTime(startTime.Add(config.RoundInterval/2), - func(time.Time) { - go func() { - con.runCRS(round) - }() - }) + if round >= DKGDelayRound { + curDkgSet, err := con.nodeSetCache.GetDKGSet(round) + if err != nil { + con.logger.Error("Error getting DKG set", "round", round, "error", err) + curDkgSet = make(map[types.NodeID]struct{}) + } + // Initiate CRS routine. + if _, exist := curDkgSet[con.ID]; exist { + con.event.RegisterHeight( + startHeight+config.RoundLength/2, + func(uint64) { + go func() { + con.runCRS(round) + }() + }) + } } // checkCRS is a generator of checker to check if CRS for that round is // ready or not. @@ -774,76 +762,75 @@ func (con *Consensus) initialRound( } } // Initiate BA modules. - con.event.RegisterTime( - startTime.Add(config.RoundInterval/2+config.LambdaDKG), - func(time.Time) { - go func(nextRound uint64) { - if !checkWithCancel( - con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Debug("unable to prepare CRS for baMgr", - "round", nextRound) - return - } - // Notify BA for new round. - nextConfig := utils.GetConfigWithPanic( - con.gov, nextRound, con.logger) - nextCRS := utils.GetCRSWithPanic( - con.gov, nextRound, con.logger) - con.logger.Info("appendConfig for baMgr", "round", nextRound) - if err := con.baMgr.appendConfig( - nextRound, nextConfig, nextCRS); err != nil { - panic(err) - } - }(round + 1) - }) + con.event.RegisterHeight(startHeight+config.RoundLength/2, func(uint64) { + go func(nextRound uint64) { + if !checkWithCancel( + con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { + con.logger.Debug("unable to prepare CRS for baMgr", + "round", nextRound) + return + } + // Notify BA for new round. + nextConfig := utils.GetConfigWithPanic( + con.gov, nextRound, con.logger) + nextCRS := utils.GetCRSWithPanic( + con.gov, nextRound, con.logger) + con.logger.Info("appendConfig for baMgr", "round", nextRound) + if err := con.baMgr.appendConfig( + nextRound, nextConfig, nextCRS); err != nil { + panic(err) + } + }(round + 1) + }) // Initiate DKG for this round. - con.event.RegisterTime(startTime.Add(config.RoundInterval/2+config.LambdaDKG), - func(time.Time) { - go func(nextRound uint64) { - // Normally, gov.CRS would return non-nil. Use this for in case of - // unexpected network fluctuation and ensure the robustness. - if !checkWithCancel( - con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { - con.logger.Debug("unable to prepare CRS for DKG set", - "round", nextRound) - return - } - nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) - if err != nil { - con.logger.Error("Error getting DKG set", - "round", nextRound, - "error", err) - return - } - if _, exist := nextDkgSet[con.ID]; !exist { - return - } - con.logger.Info("Selected as DKG set", "round", nextRound) - con.cfgModule.registerDKG(nextRound, getDKGThreshold(config)) - con.event.RegisterTime( - startTime.Add(config.RoundInterval*2/3), - func(time.Time) { - func() { - con.dkgReady.L.Lock() - defer con.dkgReady.L.Unlock() - con.dkgRunning = 0 - }() - nextConfig := utils.GetConfigWithPanic( - con.gov, nextRound, con.logger) - con.runDKG(nextRound, nextConfig) - }) - }(round + 1) - }) + con.event.RegisterHeight(startHeight+config.RoundLength/2, func(uint64) { + go func(nextRound uint64) { + if nextRound < DKGDelayRound { + con.logger.Info("Skip runDKG for round", "round", nextRound) + return + } + // Normally, gov.CRS would return non-nil. Use this for in case of + // unexpected network fluctuation and ensure the robustness. + if !checkWithCancel( + con.ctx, 500*time.Millisecond, checkCRS(nextRound)) { + con.logger.Debug("unable to prepare CRS for DKG set", + "round", nextRound) + return + } + nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound) + if err != nil { + con.logger.Error("Error getting DKG set", + "round", nextRound, + "error", err) + return + } + if _, exist := nextDkgSet[con.ID]; !exist { + return + } + con.logger.Info("Selected as DKG set", "round", nextRound) + con.cfgModule.registerDKG(nextRound, getDKGThreshold(config)) + con.event.RegisterHeight(startHeight+config.RoundLength*2/3, + func(uint64) { + func() { + con.dkgReady.L.Lock() + defer con.dkgReady.L.Unlock() + con.dkgRunning = 0 + }() + nextConfig := utils.GetConfigWithPanic( + con.gov, nextRound, con.logger) + con.runDKG(nextRound, nextConfig) + }) + }(round + 1) + }) // Prepare blockChain module for next round and next "initialRound" routine. - con.event.RegisterTime(startTime.Add(config.RoundInterval), - func(time.Time) { - // Change round. - // Get configuration for next round. - nextRound := round + 1 - nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, con.logger) - con.initialRound( - startTime.Add(config.RoundInterval), nextRound, nextConfig) - }) + con.event.RegisterHeight(startHeight+config.RoundLength, func(uint64) { + // Change round. + // Get configuration for next round. + nextRound := round + 1 + nextConfig := utils.GetConfigWithPanic(con.gov, nextRound, con.logger) + con.initialRound( + startHeight+config.RoundLength, nextRound, nextConfig) + }) } // Stop the Consensus core. @@ -1194,7 +1181,7 @@ func (con *Consensus) deliverFinalizedBlocksWithoutLock() (err error) { "pending", con.bcModule.lastPendingBlock()) for _, b := range deliveredBlocks { con.deliverBlock(b) - go con.event.NotifyTime(b.Finalization.Timestamp) + go con.event.NotifyHeight(b.Finalization.Height) } return } -- cgit