diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2019-03-29 15:17:13 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-29 15:17:13 +0800 |
commit | 92d1b675e7acff789a819426521efc99bdbd9aff (patch) | |
tree | e3c886a6678e7338d114ca9baec0ca2632b748ac | |
parent | 04a4df9479e31f1418760a389060706a72259381 (diff) | |
download | dexon-consensus-92d1b675e7acff789a819426521efc99bdbd9aff.tar.gz dexon-consensus-92d1b675e7acff789a819426521efc99bdbd9aff.tar.zst dexon-consensus-92d1b675e7acff789a819426521efc99bdbd9aff.zip |
core: run dkg by height (#527)
* core: run dkg by height
* core: check cc.dkgCtx before running each dkg phases
* fix
-rw-r--r-- | core/configuration-chain.go | 115 | ||||
-rw-r--r-- | core/configuration-chain_test.go | 100 | ||||
-rw-r--r-- | core/consensus.go | 16 | ||||
-rw-r--r-- | core/consensus_test.go | 22 |
4 files changed, 180 insertions, 73 deletions
diff --git a/core/configuration-chain.go b/core/configuration-chain.go index 92b2830..c8aac38 100644 --- a/core/configuration-chain.go +++ b/core/configuration-chain.go @@ -406,17 +406,6 @@ func (cc *configurationChain) runDKGPhaseNine(round uint64, reset uint64) error return nil } -func (cc *configurationChain) runTick(ticker Ticker) (aborted bool) { - cc.dkgLock.Unlock() - defer cc.dkgLock.Lock() - select { - case <-cc.dkgCtx.Done(): - aborted = true - case <-ticker.Tick(): - } - return -} - func (cc *configurationChain) initDKGPhasesFunc() { cc.dkgRunPhases = []dkgStepFn{ func(round uint64, reset uint64) error { @@ -448,7 +437,18 @@ func (cc *configurationChain) initDKGPhasesFunc() { } } -func (cc *configurationChain) runDKG(round uint64, reset uint64) (err error) { +func (cc *configurationChain) runDKG( + round uint64, reset uint64, event *common.Event, + dkgBeginHeight, dkgHeight uint64) (err error) { + // Check if corresponding DKG signer is ready. + if _, _, err = cc.getDKGInfo(round, false); err == nil { + return ErrSkipButNoError + } + cfg := utils.GetConfigWithPanic(cc.gov, round, cc.logger) + phaseHeight := uint64( + cfg.LambdaDKG.Nanoseconds() / cfg.MinBlockInterval.Nanoseconds()) + skipPhase := int(dkgHeight / phaseHeight) + cc.logger.Info("Skipping DKG phase", "phase", skipPhase) cc.dkgLock.Lock() defer cc.dkgLock.Unlock() if cc.dkg == nil { @@ -467,51 +467,70 @@ func (cc *configurationChain) runDKG(round uint64, reset uint64) (err error) { panic(fmt.Errorf("duplicated call to runDKG: %d %d", round, reset)) } cc.dkgRunning = true - var ticker Ticker defer func() { - if ticker != nil { - ticker.Stop() - } // Here we should hold the cc.dkgLock, reset cc.dkg to nil when done. if cc.dkg != nil { cc.dkg = nil } cc.dkgRunning = false }() - // Check if corresponding DKG signer is ready. - if _, _, err = cc.getDKGInfo(round, true); err == nil { - return ErrSkipButNoError + wg := sync.WaitGroup{} + var dkgError error + // Make a copy of cc.dkgCtx so each phase function can refer to the correct + // context. + ctx := cc.dkgCtx + cc.dkg.step = skipPhase + for i := skipPhase; i < len(cc.dkgRunPhases); i++ { + wg.Add(1) + event.RegisterHeight(dkgBeginHeight+phaseHeight*uint64(i), func(uint64) { + go func() { + defer wg.Done() + cc.dkgLock.Lock() + defer cc.dkgLock.Unlock() + if dkgError != nil { + return + } + select { + case <-ctx.Done(): + dkgError = ErrDKGAborted + return + default: + } + + err := cc.dkgRunPhases[cc.dkg.step](round, reset) + if err == nil || err == ErrSkipButNoError { + err = nil + cc.dkg.step++ + err = cc.db.PutOrUpdateDKGProtocol(cc.dkg.toDKGProtocolInfo()) + if err != nil { + cc.logger.Error("Failed to save DKG Protocol", + "step", cc.dkg.step, + "error", err) + } + } + if err != nil && dkgError == nil { + dkgError = err + } + }() + }) } - tickStartAt := 1 - - for i := cc.dkg.step; i < len(cc.dkgRunPhases); i++ { - if i >= tickStartAt && ticker == nil { - ticker = newTicker(cc.gov, round, TickerDKG) - } - - if ticker != nil && cc.runTick(ticker) { - return - } - - switch err = cc.dkgRunPhases[i](round, reset); err { - case ErrSkipButNoError, nil: - cc.dkg.step = i + 1 - err = cc.db.PutOrUpdateDKGProtocol(cc.dkg.toDKGProtocolInfo()) - if err != nil { - return fmt.Errorf("put or update DKG protocol error: %v", err) - } - - if err == nil { - continue - } else { - return - } - default: - return - } + cc.dkgLock.Unlock() + wgChan := make(chan struct{}, 1) + go func() { + wg.Wait() + wgChan <- struct{}{} + }() + select { + case <-cc.dkgCtx.Done(): + case <-wgChan: } - - return nil + cc.dkgLock.Lock() + select { + case <-cc.dkgCtx.Done(): + return ErrDKGAborted + default: + } + return dkgError } func (cc *configurationChain) isDKGFinal(round uint64) bool { diff --git a/core/configuration-chain_test.go b/core/configuration-chain_test.go index 0adfdcf..c3a8023 100644 --- a/core/configuration-chain_test.go +++ b/core/configuration-chain_test.go @@ -195,14 +195,50 @@ func (s *ConfigurationChainTestSuite) setupNodes(n int) { } } +type testEvent struct { + event *common.Event + ctx context.Context + cancel context.CancelFunc +} + +func newTestEvent() *testEvent { + e := &testEvent{ + event: common.NewEvent(), + } + return e +} + +func (evt *testEvent) run(interval time.Duration) { + evt.ctx, evt.cancel = context.WithCancel(context.Background()) + go func() { + height := uint64(0) + Loop: + for { + select { + case <-evt.ctx.Done(): + break Loop + case <-time.After(interval): + } + evt.event.NotifyHeight(height) + height++ + } + }() +} + +func (evt *testEvent) stop() { + evt.cancel() +} + func (s *ConfigurationChainTestSuite) runDKG( k, n int, round, reset uint64) map[types.NodeID]*configurationChain { s.setupNodes(n) + evts := make(map[types.NodeID]*testEvent) cfgChains := make(map[types.NodeID]*configurationChain) recv := newTestCCGlobalReceiver(s) for _, nID := range s.nIDs { + evts[nID] = newTestEvent() gov, err := test.NewGovernance(test.NewState(DKGDelayRound, s.pubKeys, 100*time.Millisecond, &common.NullLogger{}, true, ), ConfigRoundShift) @@ -228,11 +264,13 @@ func (s *ConfigurationChainTestSuite) runDKG( errs := make(chan error, n) wg := sync.WaitGroup{} wg.Add(n) - for _, cc := range cfgChains { - go func(cc *configurationChain) { + for nID, cc := range cfgChains { + go func(cc *configurationChain, nID types.NodeID) { defer wg.Done() - errs <- cc.runDKG(round, reset) - }(cc) + errs <- cc.runDKG(round, reset, evts[nID].event, 10, 0) + }(cc, nID) + evts[nID].run(100 * time.Millisecond) + defer evts[nID].stop() } wg.Wait() for range cfgChains { @@ -324,6 +362,7 @@ func (s *ConfigurationChainTestSuite) TestDKGMasterPublicKeyDelayAdd() { round := DKGDelayRound reset := uint64(0) lambdaDKG := 1000 * time.Millisecond + minBlockInterval := 100 * time.Millisecond s.setupNodes(n) cfgChains := make(map[types.NodeID]*configurationChain) @@ -337,6 +376,8 @@ func (s *ConfigurationChainTestSuite) TestDKGMasterPublicKeyDelayAdd() { s.Require().NoError(err) s.Require().NoError(state.RequestChange( test.StateChangeLambdaDKG, lambdaDKG)) + s.Require().NoError(state.RequestChange( + test.StateChangeMinBlockInterval, minBlockInterval)) cache := utils.NewNodeSetCache(gov) dbInst, err := db.NewMemBackedDB() s.Require().NoError(err) @@ -364,10 +405,13 @@ func (s *ConfigurationChainTestSuite) TestDKGMasterPublicKeyDelayAdd() { wg := sync.WaitGroup{} wg.Add(n) for _, cc := range cfgChains { + evt := newTestEvent() go func(cc *configurationChain) { defer wg.Done() - errs <- cc.runDKG(round, reset) + errs <- cc.runDKG(round, reset, evt.event, 0, 0) }(cc) + evt.run(100 * time.Millisecond) + defer evt.stop() } wg.Wait() for range cfgChains { @@ -391,6 +435,7 @@ func (s *ConfigurationChainTestSuite) TestDKGComplaintDelayAdd() { round := DKGDelayRound reset := uint64(0) lambdaDKG := 1000 * time.Millisecond + minBlockInterval := 100 * time.Millisecond s.setupNodes(n) cfgChains := make(map[types.NodeID]*configurationChain) @@ -403,6 +448,8 @@ func (s *ConfigurationChainTestSuite) TestDKGComplaintDelayAdd() { s.Require().NoError(err) s.Require().NoError(state.RequestChange( test.StateChangeLambdaDKG, lambdaDKG)) + s.Require().NoError(state.RequestChange( + test.StateChangeMinBlockInterval, minBlockInterval)) cache := utils.NewNodeSetCache(gov) dbInst, err := db.NewMemBackedDB() s.Require().NoError(err) @@ -425,10 +472,13 @@ func (s *ConfigurationChainTestSuite) TestDKGComplaintDelayAdd() { wg := sync.WaitGroup{} wg.Add(n) for _, cc := range cfgChains { + evt := newTestEvent() go func(cc *configurationChain) { defer wg.Done() - errs <- cc.runDKG(round, reset) + errs <- cc.runDKG(round, reset, evt.event, 0, 0) }(cc) + evt.run(minBlockInterval) + defer evt.stop() } complaints := -1 go func() { @@ -636,36 +686,54 @@ func (s *ConfigurationChainTestSuite) TestDKGAbort() { cc.registerDKG(context.Background(), round, reset, k) // We should be blocked because DKGReady is not enough. errs := make(chan error, 1) - called := make(chan struct{}, 1) + evt := newTestEvent() go func() { - called <- struct{}{} - errs <- cc.runDKG(round, reset) + errs <- cc.runDKG(round, reset, evt.event, 0, 0) }() + evt.run(100 * time.Millisecond) + defer evt.stop() + // The second register shouldn't be blocked, too. randHash := common.NewRandomHash() gov.ResetDKG(randHash[:]) - <-called + for func() bool { + cc.dkgLock.RLock() + defer cc.dkgLock.RUnlock() + return !cc.dkgRunning + }() { + time.Sleep(100 * time.Millisecond) + } cc.registerDKG(context.Background(), round, reset+1, k) err = <-errs s.Require().EqualError(ErrDKGAborted, err.Error()) go func() { - called <- struct{}{} - errs <- cc.runDKG(round, reset+1) + errs <- cc.runDKG(round, reset+1, evt.event, 0, 0) }() // The third register shouldn't be blocked, too randHash = common.NewRandomHash() gov.ProposeCRS(round+1, randHash[:]) randHash = common.NewRandomHash() gov.ResetDKG(randHash[:]) - <-called + for func() bool { + cc.dkgLock.RLock() + defer cc.dkgLock.RUnlock() + return !cc.dkgRunning + }() { + time.Sleep(100 * time.Millisecond) + } cc.registerDKG(context.Background(), round+1, reset+1, k) err = <-errs s.Require().EqualError(ErrDKGAborted, err.Error()) go func() { - called <- struct{}{} - errs <- cc.runDKG(round+1, reset+1) + errs <- cc.runDKG(round+1, reset+1, evt.event, 0, 0) }() - <-called + for func() bool { + cc.dkgLock.RLock() + defer cc.dkgLock.RUnlock() + return !cc.dkgRunning + }() { + time.Sleep(100 * time.Millisecond) + } // Abort with older round, shouldn't be aborted. aborted := cc.abortDKG(context.Background(), round, reset+1) s.Require().False(aborted) diff --git a/core/consensus.go b/core/consensus.go index 2246bf1..6e30723 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -980,13 +980,17 @@ func (con *Consensus) prepare(initBlock *types.Block) (err error) { con.cfgModule.registerDKG(con.ctx, nextRound, e.Reset, utils.GetDKGThreshold(nextConfig)) con.event.RegisterHeight(e.NextDKGPreparationHeight(), - func(uint64) { + func(h uint64) { func() { con.dkgReady.L.Lock() defer con.dkgReady.L.Unlock() con.dkgRunning = 0 }() - con.runDKG(nextRound, e.Reset, nextConfig) + // We want to skip some of the DKG phases when started. + dkgCurrentHeight := h - e.NextDKGPreparationHeight() + con.runDKG( + nextRound, e.Reset, + e.NextDKGPreparationHeight(), dkgCurrentHeight) }) }() }) @@ -1118,7 +1122,8 @@ func (con *Consensus) generateBlockRandomness(blocks []*types.Block) { } // runDKG starts running DKG protocol. -func (con *Consensus) runDKG(round, reset uint64, config *types.Config) { +func (con *Consensus) runDKG( + round, reset, dkgBeginHeight, dkgHeight uint64) { con.dkgReady.L.Lock() defer con.dkgReady.L.Unlock() if con.dkgRunning != 0 { @@ -1132,7 +1137,10 @@ func (con *Consensus) runDKG(round, reset uint64, config *types.Config) { con.dkgReady.Broadcast() con.dkgRunning = 2 }() - if err := con.cfgModule.runDKG(round, reset); err != nil { + if err := + con.cfgModule.runDKG( + round, reset, + con.event, dkgBeginHeight, dkgHeight); err != nil { con.logger.Error("Failed to runDKG", "error", err) } }() diff --git a/core/consensus_test.go b/core/consensus_test.go index 9565e95..19c43ad 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -267,7 +267,23 @@ func (s *ConsensusTestSuite) TestDKGCRS() { } time.Sleep(gov.Configuration(0).MinBlockInterval * 4) for _, con := range cons { - go con.runDKG(0, 0, gov.Configuration(0)) + go con.runDKG(0, 0, 0, 0) + } + crsFinish := make(chan struct{}, len(cons)) + for _, con := range cons { + go func(con *Consensus) { + height := uint64(0) + Loop: + for { + select { + case <-crsFinish: + break Loop + case <-time.After(lambda): + } + con.event.NotifyHeight(height) + height++ + } + }(con) } for _, con := range cons { func() { @@ -278,16 +294,12 @@ func (s *ConsensusTestSuite) TestDKGCRS() { } }() } - crsFinish := make(chan struct{}) for _, con := range cons { go func(con *Consensus) { con.runCRS(0, gov.CRS(0), false) crsFinish <- struct{}{} }(con) } - for range cons { - <-crsFinish - } s.NotNil(gov.CRS(1)) } |