From c852eda00f781abafaab2b41d2c1a85fe9d3177f Mon Sep 17 00:00:00 2001
From: Mission Liao <mission.liao@dexon.org>
Date: Wed, 20 Mar 2019 17:29:22 +0800
Subject: core: reset DKG (#502)

* Allow utils.NodeSetCache to purge by rounds.

* Purge utils.NodeSetCache when DKG reset.

* Add a utils.RoundEvent handler to abort all
  previous running DKG

* Fix test.App hangs in BlockDelivered when
  utils.RoundEvent is attached.

  ValidateNextRound is a blocking call and would
  block test.App.BlockDelivered.
---
 GNUmakefile                        |   4 +-
 core/consensus.go                  | 120 ++++++++++++++++++++++--
 core/consensus_test.go             |   2 +-
 core/syncer/consensus.go           |  10 ++
 core/test/app.go                   |   2 +-
 core/test/app_test.go              |  16 ++--
 core/test/governance.go            |  29 +++++-
 core/test/governance_test.go       |   7 ++
 core/test/network.go               |   9 +-
 core/test/network_test.go          |   2 +-
 core/test/state.go                 |  11 +--
 core/test/state_test.go            |   8 +-
 core/utils/crypto.go               |   9 ++
 core/utils/nodeset-cache.go        |  20 ++++
 core/utils/nodeset-cache_test.go   |  18 ++++
 core/utils/round-based-config.go   |   2 +-
 core/utils/round-event.go          |   9 ++
 integration_test/byzantine_test.go |  15 ++-
 integration_test/consensus_test.go | 181 ++++++++++++++++++++++++++++++-------
 19 files changed, 395 insertions(+), 79 deletions(-)

diff --git a/GNUmakefile b/GNUmakefile
index 425a569..bbf14bd 100644
--- a/GNUmakefile
+++ b/GNUmakefile
@@ -44,11 +44,11 @@ else
 endif
 endef
 
-GO_TEST_TIMEOUT := 20m
+GO_TEST_TIMEOUT := 33m
 
 TEST_TARGET := go list ./... | grep -v 'vendor'
 ifeq ($(NO_INTEGRATION_TEST), true)
-	GO_TEST_TIMEOUT := 15m
+	GO_TEST_TIMEOUT := 25m
 	TEST_TARGET := $(TEST_TARGET) | grep -v 'integration_test'
 else ifeq ($(ONLY_INTEGRATION_TEST), true)
 	TEST_TARGET := $(TEST_TARGET) | grep 'integration_test'
diff --git a/core/consensus.go b/core/consensus.go
index c75f542..8f8002b 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -630,8 +630,40 @@ func (con *Consensus) prepare(
 			panic("not implemented yet")
 		}
 	}
+	// Measure time elapse for each handler of round events.
+	elapse := func(what string, lastE utils.RoundEventParam) func() {
+		start := time.Now()
+		con.logger.Info("handle round event",
+			"what", what,
+			"event", lastE)
+		return func() {
+			con.logger.Info("finish round event",
+				"what", what,
+				"event", lastE,
+				"elapse", time.Since(start))
+		}
+	}
+	// Register round event handler to purge cached node set. To make sure each
+	// modules see the up-to-date node set, we need to make sure this action
+	// should be taken as the first one.
+	con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+		defer elapse("purge node set", evts[len(evts)-1])()
+		for _, e := range evts {
+			if e.Reset == 0 {
+				continue
+			}
+			con.nodeSetCache.Purge(e.Round + 1)
+		}
+	})
+	// Register round event handler to abort previous running DKG if any.
+	con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+		e := evts[len(evts)-1]
+		defer elapse("abort DKG", e)()
+		con.cfgModule.abortDKG(e.Round+1, e.Reset)
+	})
 	// Register round event handler to update BA and BC modules.
 	con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+		defer elapse("append config", evts[len(evts)-1])()
 		// Always updates newer configs to the later modules first in the flow.
 		if err := con.bcModule.notifyRoundEvents(evts); err != nil {
 			panic(err)
@@ -643,11 +675,62 @@ func (con *Consensus) prepare(
 			}
 		}
 	})
+	// Register round event handler to reset DKG if the DKG set for next round
+	// failed to setup.
+	con.roundEvent.Register(func(evts []utils.RoundEventParam) {
+		e := evts[len(evts)-1]
+		defer elapse("reset DKG", e)()
+		nextRound := e.Round + 1
+		if nextRound < DKGDelayRound {
+			return
+		}
+		curDKGSet, err := con.nodeSetCache.GetDKGSet(e.Round)
+		if err != nil {
+			con.logger.Error("Error getting DKG set when proposing CRS",
+				"round", e.Round,
+				"error", err)
+			return
+		}
+		if _, exist := curDKGSet[con.ID]; !exist {
+			return
+		}
+		isDKGValid := func() bool {
+			nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
+				con.logger)
+			if !con.gov.IsDKGFinal(nextRound) {
+				con.logger.Error("Next DKG is not final, reset it",
+					"round", e.Round,
+					"reset", e.Reset)
+				return false
+			}
+			if _, err := typesDKG.NewGroupPublicKey(
+				nextRound,
+				con.gov.DKGMasterPublicKeys(nextRound),
+				con.gov.DKGComplaints(nextRound),
+				utils.GetDKGThreshold(nextConfig)); err != nil {
+				con.logger.Error("Next DKG failed to prepare, reset it",
+					"round", e.Round,
+					"reset", e.Reset,
+					"error", err)
+				return false
+			}
+			return true
+		}
+		con.event.RegisterHeight(e.NextDKGResetHeight(), func(uint64) {
+			if isDKGValid() {
+				return
+			}
+			// Aborting all previous running DKG protocol instance if any.
+			con.cfgModule.abortDKG(nextRound, e.Reset)
+			con.runCRS(e.Round, utils.Rehash(e.CRS, uint(e.Reset+1)), true)
+		})
+	})
 	// Register round event handler to propose new CRS.
 	con.roundEvent.Register(func(evts []utils.RoundEventParam) {
 		// We don't have to propose new CRS during DKG reset, the reset of DKG
 		// would be done by the DKG set in previous round.
 		e := evts[len(evts)-1]
+		defer elapse("propose CRS", e)()
 		if e.Reset != 0 || e.Round < DKGDelayRound {
 			return
 		}
@@ -667,13 +750,14 @@ func (con *Consensus) prepare(
 					con.logger.Debug("CRS already proposed", "round", e.Round+1)
 					return
 				}
-				con.runCRS(e.Round, e.CRS)
+				con.runCRS(e.Round, e.CRS, false)
 			})
 		}
 	})
 	// Touch nodeSetCache for next round.
 	con.roundEvent.Register(func(evts []utils.RoundEventParam) {
 		e := evts[len(evts)-1]
+		defer elapse("touch node set cache", e)()
 		if e.Reset != 0 {
 			return
 		}
@@ -702,6 +786,7 @@ func (con *Consensus) prepare(
 	// Trigger round validation method for next period.
 	con.roundEvent.Register(func(evts []utils.RoundEventParam) {
 		e := evts[len(evts)-1]
+		defer elapse("next round", e)()
 		// Register a routine to trigger round events.
 		con.event.RegisterHeight(e.NextRoundValidationHeight(), func(
 			blockHeight uint64) {
@@ -711,7 +796,9 @@ func (con *Consensus) prepare(
 		con.event.RegisterHeight(e.NextDKGRegisterHeight(), func(uint64) {
 			nextRound := e.Round + 1
 			if nextRound < DKGDelayRound {
-				con.logger.Info("Skip runDKG for round", "round", nextRound)
+				con.logger.Info("Skip runDKG for round",
+					"round", nextRound,
+					"reset", e.Reset)
 				return
 			}
 			// Normally, gov.CRS would return non-nil. Use this for in case of
@@ -719,21 +806,27 @@ func (con *Consensus) prepare(
 			if !checkWithCancel(
 				con.ctx, 500*time.Millisecond, checkCRS(nextRound)) {
 				con.logger.Debug("unable to prepare CRS for DKG set",
-					"round", nextRound)
+					"round", nextRound,
+					"reset", e.Reset)
 				return
 			}
 			nextDkgSet, err := con.nodeSetCache.GetDKGSet(nextRound)
 			if err != nil {
 				con.logger.Error("Error getting DKG set for next round",
 					"round", nextRound,
+					"reset", e.Reset,
 					"error", err)
 				return
 			}
 			if _, exist := nextDkgSet[con.ID]; !exist {
-				con.logger.Info("Not selected as DKG set", "round", nextRound)
+				con.logger.Info("Not selected as DKG set",
+					"round", nextRound,
+					"reset", e.Reset)
 				return
 			}
-			con.logger.Info("Selected as DKG set", "round", nextRound)
+			con.logger.Info("Selected as DKG set",
+				"round", nextRound,
+				"reset", e.Reset)
 			nextConfig := utils.GetConfigWithPanic(con.gov, nextRound,
 				con.logger)
 			con.cfgModule.registerDKG(nextRound, e.Reset, utils.GetDKGThreshold(
@@ -821,7 +914,7 @@ func (con *Consensus) runDKG(round, reset uint64, config *types.Config) {
 	}()
 }
 
-func (con *Consensus) runCRS(round uint64, hash common.Hash) {
+func (con *Consensus) runCRS(round uint64, hash common.Hash, reset bool) {
 	// Start running next round CRS.
 	psig, err := con.cfgModule.preparePartialSignature(round, hash)
 	if err != nil {
@@ -841,10 +934,17 @@ func (con *Consensus) runCRS(round uint64, hash common.Hash) {
 		if err != nil {
 			con.logger.Error("Failed to run CRS Tsig", "error", err)
 		} else {
-			con.logger.Debug("Calling Governance.ProposeCRS",
-				"round", round+1,
-				"crs", hex.EncodeToString(crs))
-			con.gov.ProposeCRS(round+1, crs)
+			if reset {
+				con.logger.Debug("Calling Governance.ResetDKG",
+					"round", round+1,
+					"crs", hex.EncodeToString(crs))
+				con.gov.ResetDKG(crs)
+			} else {
+				con.logger.Debug("Calling Governance.ProposeCRS",
+					"round", round+1,
+					"crs", hex.EncodeToString(crs))
+				con.gov.ProposeCRS(round+1, crs)
+			}
 		}
 	}
 }
diff --git a/core/consensus_test.go b/core/consensus_test.go
index 541f157..e60a173 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -285,7 +285,7 @@ func (s *ConsensusTestSuite) TestDKGCRS() {
 	crsFinish := make(chan struct{})
 	for _, con := range cons {
 		go func(con *Consensus) {
-			con.runCRS(0, gov.CRS(0))
+			con.runCRS(0, gov.CRS(0), false)
 			crsFinish <- struct{}{}
 		}(con)
 	}
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go
index 8d89d07..2eeee9d 100644
--- a/core/syncer/consensus.go
+++ b/core/syncer/consensus.go
@@ -168,6 +168,16 @@ func (con *Consensus) assureBuffering() {
 	}
 	// Make sure con.roundEvt stopped before stopping con.agreementModule.
 	con.waitGroup.Add(1)
+	// Register a round event handler to reset node set cache, this handler
+	// should be the highest priority.
+	con.roundEvt.Register(func(evts []utils.RoundEventParam) {
+		for _, e := range evts {
+			if e.Reset == 0 {
+				continue
+			}
+			con.nodeSetCache.Purge(e.Round + 1)
+		}
+	})
 	// Register a round event handler to notify CRS to agreementModule.
 	con.roundEvt.Register(func(evts []utils.RoundEventParam) {
 		con.waitGroup.Add(1)
diff --git a/core/test/app.go b/core/test/app.go
index df58135..12b2047 100644
--- a/core/test/app.go
+++ b/core/test/app.go
@@ -265,7 +265,7 @@ func (app *App) BlockDelivered(blockHash common.Hash, pos types.Position,
 			}
 		}
 	}()
-	app.hEvt.NotifyHeight(result.Height)
+	go app.hEvt.NotifyHeight(result.Height)
 }
 
 // GetLatestDeliveredPosition would return the latest position of delivered
diff --git a/core/test/app_test.go b/core/test/app_test.go
index c83aaf6..138f803 100644
--- a/core/test/app_test.go
+++ b/core/test/app_test.go
@@ -309,15 +309,15 @@ func (s *AppTestSuite) TestAttachedWithRoundEvent() {
 		1900, 2019, core.ConfigRoundShift)
 	s.Require().NoError(err)
 	// Register a handler to collects triggered events.
-	var evts []evtParamToCheck
+	evts := make(chan evtParamToCheck, 2)
 	rEvt.Register(func(params []utils.RoundEventParam) {
 		for _, p := range params {
-			evts = append(evts, evtParamToCheck{
+			evts <- evtParamToCheck{
 				round:  p.Round,
 				reset:  p.Reset,
 				height: p.BeginHeight,
 				crs:    p.CRS,
-			})
+			}
 		}
 	})
 	// Setup App instance.
@@ -336,18 +336,16 @@ func (s *AppTestSuite) TestAttachedWithRoundEvent() {
 	// Deliver blocks from height=2020 to height=2081.
 	deliver(0, 0, 2019)
 	deliver(19, 2020, 2091)
-	s.Require().Len(evts, 2)
-	s.Require().Equal(evts[0], evtParamToCheck{19, 2, 2100, gov.CRS(19)})
-	s.Require().Equal(evts[1], evtParamToCheck{20, 0, 2200, gov.CRS(20)})
+	s.Require().Equal(<-evts, evtParamToCheck{19, 2, 2100, gov.CRS(19)})
+	s.Require().Equal(<-evts, evtParamToCheck{20, 0, 2200, gov.CRS(20)})
 	// Deliver blocks from height=2082 to height=2281.
 	deliver(19, 2092, 2199)
 	deliver(20, 2200, 2291)
-	s.Require().Len(evts, 3)
-	s.Require().Equal(evts[2], evtParamToCheck{21, 0, 2300, gov.CRS(21)})
+	s.Require().Equal(<-evts, evtParamToCheck{21, 0, 2300, gov.CRS(21)})
 	// Deliver blocks from height=2282 to height=2381.
 	deliver(20, 2292, 2299)
 	deliver(21, 2300, 2391)
-	s.Require().Equal(evts[3], evtParamToCheck{22, 0, 2400, gov.CRS(22)})
+	s.Require().Equal(<-evts, evtParamToCheck{22, 0, 2400, gov.CRS(22)})
 }
 
 func TestApp(t *testing.T) {
diff --git a/core/test/governance.go b/core/test/governance.go
index 62bee70..9fe525b 100644
--- a/core/test/governance.go
+++ b/core/test/governance.go
@@ -154,7 +154,9 @@ func (g *Governance) AddDKGComplaint(complaint *typesDKG.Complaint) {
 	}
 	if err := g.stateModule.RequestChange(
 		StateAddDKGComplaint, complaint); err != nil {
-		panic(err)
+		if err != ErrChangeWontApply {
+			panic(err)
+		}
 	}
 	g.broadcastPendingStateChanges()
 }
@@ -174,7 +176,9 @@ func (g *Governance) AddDKGMasterPublicKey(masterPublicKey *typesDKG.MasterPubli
 	}
 	if err := g.stateModule.RequestChange(
 		StateAddDKGMasterPublicKey, masterPublicKey); err != nil {
-		panic(err)
+		if err != ErrChangeWontApply {
+			panic(err)
+		}
 	}
 	g.broadcastPendingStateChanges()
 }
@@ -187,8 +191,11 @@ func (g *Governance) DKGMasterPublicKeys(
 
 // AddDKGMPKReady adds a DKG ready message.
 func (g *Governance) AddDKGMPKReady(ready *typesDKG.MPKReady) {
-	if err := g.stateModule.RequestChange(StateAddDKGMPKReady, ready); err != nil {
-		panic(err)
+	if err := g.stateModule.RequestChange(
+		StateAddDKGMPKReady, ready); err != nil {
+		if err != ErrChangeWontApply {
+			panic(err)
+		}
 	}
 	g.broadcastPendingStateChanges()
 }
@@ -214,7 +221,9 @@ func (g *Governance) AddDKGFinalize(final *typesDKG.Finalize) {
 		return
 	}
 	if err := g.stateModule.RequestChange(StateAddDKGFinal, final); err != nil {
-		panic(err)
+		if err != ErrChangeWontApply {
+			panic(err)
+		}
 	}
 	g.broadcastPendingStateChanges()
 }
@@ -341,6 +350,11 @@ func (g *Governance) Clone() *Governance {
 		}
 		copiedNodeSets = append(copiedNodeSets, copiedNodeSet)
 	}
+	// Clone prohibited flag.
+	copiedProhibitedTypes := make(map[StateChangeType]struct{})
+	for t := range g.prohibitedTypes {
+		copiedProhibitedTypes[t] = struct{}{}
+	}
 	// Clone pending changes.
 	return &Governance{
 		roundShift:           g.roundShift,
@@ -348,6 +362,7 @@ func (g *Governance) Clone() *Governance {
 		stateModule:          copiedState,
 		nodeSets:             copiedNodeSets,
 		pendingConfigChanges: copiedPendingChanges,
+		prohibitedTypes:      copiedProhibitedTypes,
 	}
 }
 
@@ -369,6 +384,10 @@ func (g *Governance) Equal(other *Governance, checkState bool) bool {
 	if !reflect.DeepEqual(g.pendingConfigChanges, other.pendingConfigChanges) {
 		return false
 	}
+	// Check prohibited types.
+	if !reflect.DeepEqual(g.prohibitedTypes, other.prohibitedTypes) {
+		return false
+	}
 	getSortedKeys := func(keys []crypto.PublicKey) (encoded []string) {
 		for _, key := range keys {
 			encoded = append(encoded, hex.EncodeToString(key.Bytes()))
diff --git a/core/test/governance_test.go b/core/test/governance_test.go
index 36819a0..a2d3a47 100644
--- a/core/test/governance_test.go
+++ b/core/test/governance_test.go
@@ -70,6 +70,13 @@ func (s *GovernanceTestSuite) TestEqual() {
 	// Change its roundShift
 	g5.roundShift = 3
 	req.False(g1.Equal(g5, true))
+	// Prohibit some change.
+	g1.Prohibit(StateAddDKGFinal)
+	// Make a clone and should be equal.
+	g6 := g1.Clone()
+	req.True(g1.Equal(g6, true))
+	g6.Unprohibit(StateAddDKGFinal)
+	req.False(g1.Equal(g6, true))
 }
 
 func (s *GovernanceTestSuite) TestRegisterChange() {
diff --git a/core/test/network.go b/core/test/network.go
index 0bbb12e..443a26c 100644
--- a/core/test/network.go
+++ b/core/test/network.go
@@ -498,14 +498,19 @@ func (n *Network) addStateModule(s *State) {
 	n.stateModule = s
 }
 
-// AddNodeSetCache attaches an utils.NodeSetCache to this module. Once attached
+// AttachNodeSetCache attaches an utils.NodeSetCache to this module. Once attached
 // The behavior of Broadcast-X methods would be switched to broadcast to correct
 // set of peers, instead of all peers.
-func (n *Network) AddNodeSetCache(cache *utils.NodeSetCache) {
+func (n *Network) AttachNodeSetCache(cache *utils.NodeSetCache) {
 	// This variable should be attached before run, no lock to protect it.
 	n.cache = cache
 }
 
+// PurgeNodeSetCache purges cache of some round in attached utils.NodeSetCache.
+func (n *Network) PurgeNodeSetCache(round uint64) {
+	n.cache.Purge(round)
+}
+
 func (n *Network) pullBlocksAsync(hashes common.Hashes) {
 	// Setup notification channels for each block hash.
 	notYetReceived := make(map[common.Hash]struct{})
diff --git a/core/test/network_test.go b/core/test/network_test.go
index 863fee2..f9a6db9 100644
--- a/core/test/network_test.go
+++ b/core/test/network_test.go
@@ -289,7 +289,7 @@ func (s *NetworkTestSuite) TestBroadcastToSet() {
 	req.NotNil(nerd)
 	req.NotNil(dkgNode)
 	req.NotNil(notaryNode)
-	nerd.AddNodeSetCache(cache)
+	nerd.AttachNodeSetCache(cache)
 	// Try broadcasting with datum from round 0, and make sure only node belongs
 	// to that set receiving the message.
 	nerd.BroadcastVote(&types.Vote{VoteHeader: types.VoteHeader{
diff --git a/core/test/state.go b/core/test/state.go
index 89d2e90..ce906ae 100644
--- a/core/test/state.go
+++ b/core/test/state.go
@@ -77,9 +77,6 @@ var (
 	// ErrChangeWontApply means the state change won't be applied for some
 	// reason.
 	ErrChangeWontApply = errors.New("change won't apply")
-	// ErrUnmatchedResetCount means an DKG message attempt to apply is not
-	// the latest reset count in State module.
-	ErrUnmatchedResetCount = errors.New("unmatched reset count of DKG message")
 	// ErrNotInRemoteMode means callers attempts to call functions for remote
 	// mode when the State instance is still in local mode.
 	ErrNotInRemoteMode = errors.New(
@@ -641,17 +638,17 @@ func (s *State) isValidRequest(req *StateChangeRequest) error {
 	case StateAddDKGMPKReady:
 		ready := req.Payload.(*typesDKG.MPKReady)
 		if ready.Reset != s.dkgResetCount[ready.Round] {
-			return ErrUnmatchedResetCount
+			return ErrChangeWontApply
 		}
 	case StateAddDKGFinal:
 		final := req.Payload.(*typesDKG.Finalize)
 		if final.Reset != s.dkgResetCount[final.Round] {
-			return ErrUnmatchedResetCount
+			return ErrChangeWontApply
 		}
 	case StateAddDKGMasterPublicKey:
 		mpk := req.Payload.(*typesDKG.MasterPublicKey)
 		if mpk.Reset != s.dkgResetCount[mpk.Round] {
-			return ErrUnmatchedResetCount
+			return ErrChangeWontApply
 		}
 		// If we've received identical MPK, ignore it.
 		mpkForRound, exists := s.dkgMasterPublicKeys[mpk.Round]
@@ -671,7 +668,7 @@ func (s *State) isValidRequest(req *StateChangeRequest) error {
 	case StateAddDKGComplaint:
 		comp := req.Payload.(*typesDKG.Complaint)
 		if comp.Reset != s.dkgResetCount[comp.Round] {
-			return ErrUnmatchedResetCount
+			return ErrChangeWontApply
 		}
 		// If we've received DKG final from that proposer, we would ignore
 		// its complaint.
diff --git a/core/test/state_test.go b/core/test/state_test.go
index 2adfc95..0ec90a4 100644
--- a/core/test/state_test.go
+++ b/core/test/state_test.go
@@ -472,13 +472,13 @@ func (s *StateTestSuite) TestUnmatchedResetCount() {
 	s.Require().NoError(st.RequestChange(StateResetDKG, common.NewRandomHash()))
 	s.Require().NoError(st.RequestChange(StateResetDKG, common.NewRandomHash()))
 	s.Require().Equal(st.dkgResetCount[1], uint64(2))
-	s.Require().EqualError(ErrUnmatchedResetCount, st.RequestChange(
+	s.Require().EqualError(ErrChangeWontApply, st.RequestChange(
 		StateAddDKGMasterPublicKey, mpk).Error())
-	s.Require().EqualError(ErrUnmatchedResetCount, st.RequestChange(
+	s.Require().EqualError(ErrChangeWontApply, st.RequestChange(
 		StateAddDKGMPKReady, ready).Error())
-	s.Require().EqualError(ErrUnmatchedResetCount, st.RequestChange(
+	s.Require().EqualError(ErrChangeWontApply, st.RequestChange(
 		StateAddDKGComplaint, comp).Error())
-	s.Require().EqualError(ErrUnmatchedResetCount, st.RequestChange(
+	s.Require().EqualError(ErrChangeWontApply, st.RequestChange(
 		StateAddDKGFinal, final).Error())
 	mpk = s.newDKGMasterPublicKey(1, 2)
 	ready = s.newDKGMPKReady(1, 2)
diff --git a/core/utils/crypto.go b/core/utils/crypto.go
index 7532d29..8be503f 100644
--- a/core/utils/crypto.go
+++ b/core/utils/crypto.go
@@ -323,3 +323,12 @@ func VerifyDKGFinalizeSignature(
 	}
 	return true, nil
 }
+
+// Rehash hashes the hash again and again and again...
+func Rehash(hash common.Hash, count uint) common.Hash {
+	result := hash
+	for i := uint(0); i < count; i++ {
+		result = crypto.Keccak256Hash(result[:])
+	}
+	return result
+}
diff --git a/core/utils/nodeset-cache.go b/core/utils/nodeset-cache.go
index e09120d..0090123 100644
--- a/core/utils/nodeset-cache.go
+++ b/core/utils/nodeset-cache.go
@@ -59,6 +59,9 @@ type NodeSetCacheInterface interface {
 }
 
 // NodeSetCache caches node set information.
+//
+// NOTE: this module doesn't handle DKG resetting and can only be used along
+//       with utils.RoundEvent.
 type NodeSetCache struct {
 	lock    sync.RWMutex
 	nsIntf  NodeSetCacheInterface
@@ -165,6 +168,23 @@ func (cache *NodeSetCache) GetLeaderNode(pos types.Position) (
 	return IDs.leaderNode[pos.Height], nil
 }
 
+// Purge a specific round.
+func (cache *NodeSetCache) Purge(rID uint64) {
+	cache.lock.Lock()
+	defer cache.lock.Unlock()
+	nIDs, exist := cache.rounds[rID]
+	if !exist {
+		return
+	}
+	for nID := range nIDs.nodeSet.IDs {
+		rec := cache.keyPool[nID]
+		if rec.refCnt--; rec.refCnt == 0 {
+			delete(cache.keyPool, nID)
+		}
+	}
+	delete(cache.rounds, rID)
+}
+
 // Touch updates the internal cache of round.
 func (cache *NodeSetCache) Touch(round uint64) (err error) {
 	_, err = cache.update(round)
diff --git a/core/utils/nodeset-cache_test.go b/core/utils/nodeset-cache_test.go
index fe905cf..45d30a7 100644
--- a/core/utils/nodeset-cache_test.go
+++ b/core/utils/nodeset-cache_test.go
@@ -138,6 +138,24 @@ func (s *NodeSetCacheTestSuite) TestTouch() {
 	req.True(exists)
 }
 
+func (s *NodeSetCacheTestSuite) TestPurge() {
+	var (
+		nsIntf = &nsIntf{
+			s:   s,
+			crs: common.NewRandomHash(),
+		}
+		cache = NewNodeSetCache(nsIntf)
+		req   = s.Require()
+	)
+	err := cache.Touch(1)
+	req.NoError(err)
+	_, exist := cache.get(1)
+	req.True(exist)
+	cache.Purge(1)
+	_, exist = cache.get(1)
+	req.False(exist)
+}
+
 func TestNodeSetCache(t *testing.T) {
 	suite.Run(t, new(NodeSetCacheTestSuite))
 }
diff --git a/core/utils/round-based-config.go b/core/utils/round-based-config.go
index 3219a13..4c83d04 100644
--- a/core/utils/round-based-config.go
+++ b/core/utils/round-based-config.go
@@ -90,7 +90,7 @@ func (c *RoundBasedConfig) RoundEndHeight() uint64 {
 	return c.roundEndHeight
 }
 
-// AppendTo a config in previous round.
+// AppendTo a config from previous round.
 func (c *RoundBasedConfig) AppendTo(other RoundBasedConfig) {
 	if c.roundID != other.roundID+1 {
 		panic(fmt.Errorf("round IDs of configs not continuous: %d %d",
diff --git a/core/utils/round-event.go b/core/utils/round-event.go
index fe735e5..3536a27 100644
--- a/core/utils/round-event.go
+++ b/core/utils/round-event.go
@@ -93,6 +93,13 @@ func (e RoundEventParam) NextDKGRegisterHeight() uint64 {
 	return e.BeginHeight + e.Config.RoundLength/2
 }
 
+func (e RoundEventParam) String() string {
+	return fmt.Sprintf("roundEvtParam{Round:%d Reset:%d Height:%d}",
+		e.Round,
+		e.Reset,
+		e.BeginHeight)
+}
+
 // roundEventFn defines the fingerprint of handlers of round events.
 type roundEventFn func([]RoundEventParam)
 
@@ -177,6 +184,8 @@ func NewRoundEvent(parentCtx context.Context, gov governanceAccessor,
 
 // Register a handler to be called when new round is confirmed or new DKG reset
 // is detected.
+//
+// The earlier registered handler has higher priority.
 func (e *RoundEvent) Register(h roundEventFn) {
 	e.lock.Lock()
 	defer e.lock.Unlock()
diff --git a/integration_test/byzantine_test.go b/integration_test/byzantine_test.go
index a709870..e95e58d 100644
--- a/integration_test/byzantine_test.go
+++ b/integration_test/byzantine_test.go
@@ -80,9 +80,16 @@ func (s *ByzantineTestSuite) setupNodes(
 		gov := seedGov.Clone()
 		gov.SwitchToRemoteMode(networkModule)
 		gov.NotifyRound(0)
-		networkModule.AddNodeSetCache(utils.NewNodeSetCache(gov))
+		networkModule.AttachNodeSetCache(utils.NewNodeSetCache(gov))
 		app := test.NewApp(1, gov, nil)
-		nodes[nID] = &node{nID, nil, app, gov, dbInst, networkModule}
+		nodes[nID] = &node{
+			ID:      nID,
+			app:     app,
+			gov:     gov,
+			db:      dbInst,
+			network: networkModule,
+			logger:  &common.NullLogger{},
+		}
 		go func() {
 			defer wg.Done()
 			s.Require().NoError(networkModule.Setup(serverChannel))
@@ -102,7 +109,7 @@ func (s *ByzantineTestSuite) setupNodes(
 			node.db,
 			node.network,
 			k,
-			&common.NullLogger{},
+			node.logger,
 		)
 	}
 	return nodes
@@ -144,7 +151,7 @@ func (s *ByzantineTestSuite) TestOneSlowNodeOneDeadNode() {
 		core.ConfigRoundShift)
 	req.NoError(err)
 	req.NoError(seedGov.State().RequestChange(
-		test.StateChangeRoundLength, uint64(60)))
+		test.StateChangeRoundLength, uint64(100)))
 	slowNodeID := types.NewNodeID(pubKeys[0])
 	deadNodeID := types.NewNodeID(pubKeys[1])
 	s.directLatencyModel[slowNodeID] = &test.FixedLatencyModel{
diff --git a/integration_test/consensus_test.go b/integration_test/consensus_test.go
index dd64113..afea6d8 100644
--- a/integration_test/consensus_test.go
+++ b/integration_test/consensus_test.go
@@ -46,8 +46,22 @@ type node struct {
 	con     *core.Consensus
 	app     *test.App
 	gov     *test.Governance
+	rEvt    *utils.RoundEvent
 	db      db.Database
 	network *test.Network
+	logger  common.Logger
+}
+
+func prohibitDKG(gov *test.Governance) {
+	gov.Prohibit(test.StateAddDKGMasterPublicKey)
+	gov.Prohibit(test.StateAddDKGFinal)
+	gov.Prohibit(test.StateAddDKGComplaint)
+}
+
+func unprohibitDKG(gov *test.Governance) {
+	gov.Unprohibit(test.StateAddDKGMasterPublicKey)
+	gov.Unprohibit(test.StateAddDKGFinal)
+	gov.Unprohibit(test.StateAddDKGComplaint)
 }
 
 func (s *ConsensusTestSuite) setupNodes(
@@ -55,7 +69,8 @@ func (s *ConsensusTestSuite) setupNodes(
 	prvKeys []crypto.PrivateKey,
 	seedGov *test.Governance) map[types.NodeID]*node {
 	var (
-		wg sync.WaitGroup
+		wg        sync.WaitGroup
+		initRound uint64
 	)
 	// Setup peer server at transport layer.
 	server := test.NewFakeTransportServer()
@@ -76,11 +91,22 @@ func (s *ConsensusTestSuite) setupNodes(
 		)
 		gov := seedGov.Clone()
 		gov.SwitchToRemoteMode(networkModule)
-		gov.NotifyRound(0)
-		networkModule.AddNodeSetCache(utils.NewNodeSetCache(gov))
-		app := test.NewApp(1, gov, nil)
+		gov.NotifyRound(initRound)
+		networkModule.AttachNodeSetCache(utils.NewNodeSetCache(gov))
+		logger := &common.NullLogger{}
+		rEvt, err := utils.NewRoundEvent(context.Background(), gov, logger, 0,
+			0, 0, core.ConfigRoundShift)
+		s.Require().NoError(err)
 		nID := types.NewNodeID(k.PublicKey())
-		nodes[nID] = &node{nID, nil, app, gov, dbInst, networkModule}
+		nodes[nID] = &node{
+			ID:      nID,
+			app:     test.NewApp(initRound+1, gov, rEvt),
+			gov:     gov,
+			db:      dbInst,
+			logger:  logger,
+			rEvt:    rEvt,
+			network: networkModule,
+		}
 		go func() {
 			defer wg.Done()
 			s.Require().NoError(networkModule.Setup(serverChannel))
@@ -100,7 +126,7 @@ func (s *ConsensusTestSuite) setupNodes(
 			node.db,
 			node.network,
 			k,
-			&common.NullLogger{},
+			node.logger,
 		)
 	}
 	return nodes
@@ -215,7 +241,7 @@ func (s *ConsensusTestSuite) TestSimple() {
 		core.ConfigRoundShift)
 	req.NoError(err)
 	req.NoError(seedGov.State().RequestChange(
-		test.StateChangeRoundLength, uint64(60)))
+		test.StateChangeRoundLength, uint64(100)))
 	// A short round interval.
 	nodes := s.setupNodes(dMoment, prvKeys, seedGov)
 	for _, n := range nodes {
@@ -244,7 +270,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() {
 		req        = s.Require()
 		peerCount  = 7
 		dMoment    = time.Now().UTC()
-		untilRound = uint64(6)
+		untilRound = uint64(5)
 	)
 	if testing.Short() {
 		// Short test won't test configuration change packed as payload of
@@ -259,7 +285,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() {
 		core.ConfigRoundShift)
 	req.NoError(err)
 	req.NoError(seedGov.State().RequestChange(
-		test.StateChangeRoundLength, uint64(60)))
+		test.StateChangeRoundLength, uint64(100)))
 	req.NoError(seedGov.State().RequestChange(
 		test.StateChangeNotarySetSize, uint32(4)))
 	req.NoError(seedGov.State().RequestChange(
@@ -267,7 +293,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() {
 	seedGov.CatchUpWithRound(0)
 	// Setup configuration for round 0 and round 1.
 	req.NoError(seedGov.State().RequestChange(
-		test.StateChangeRoundLength, uint64(85)))
+		test.StateChangeRoundLength, uint64(100)))
 	req.NoError(seedGov.State().RequestChange(
 		test.StateChangeNotarySetSize, uint32(5)))
 	req.NoError(seedGov.State().RequestChange(
@@ -275,7 +301,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() {
 	seedGov.CatchUpWithRound(1)
 	// Setup configuration for round 2.
 	req.NoError(seedGov.State().RequestChange(
-		test.StateChangeRoundLength, uint64(85)))
+		test.StateChangeRoundLength, uint64(100)))
 	req.NoError(seedGov.State().RequestChange(
 		test.StateChangeNotarySetSize, uint32(6)))
 	req.NoError(seedGov.State().RequestChange(
@@ -283,7 +309,7 @@ func (s *ConsensusTestSuite) TestSetSizeChange() {
 	seedGov.CatchUpWithRound(2)
 	// Setup configuration for round 3.
 	req.NoError(seedGov.State().RequestChange(
-		test.StateChangeRoundLength, uint64(60)))
+		test.StateChangeRoundLength, uint64(100)))
 	req.NoError(seedGov.State().RequestChange(
 		test.StateChangeNotarySetSize, uint32(4)))
 	req.NoError(seedGov.State().RequestChange(
@@ -298,18 +324,11 @@ func (s *ConsensusTestSuite) TestSetSizeChange() {
 	}
 	// Register configuration changes for round 4.
 	req.NoError(pickedNode.gov.RegisterConfigChange(
-		4, test.StateChangeRoundLength, uint64(80)))
+		4, test.StateChangeRoundLength, uint64(100)))
 	req.NoError(pickedNode.gov.RegisterConfigChange(
 		4, test.StateChangeNotarySetSize, uint32(5)))
 	req.NoError(pickedNode.gov.RegisterConfigChange(
 		4, test.StateChangeDKGSetSize, uint32(5)))
-	// Register configuration changes for round 5.
-	req.NoError(pickedNode.gov.RegisterConfigChange(
-		5, test.StateChangeRoundLength, uint64(60)))
-	req.NoError(pickedNode.gov.RegisterConfigChange(
-		5, test.StateChangeNotarySetSize, uint32(4)))
-	req.NoError(pickedNode.gov.RegisterConfigChange(
-		5, test.StateChangeDKGSetSize, uint32(4)))
 	// Run test.
 	for _, n := range nodes {
 		go n.con.Run()
@@ -340,9 +359,11 @@ func (s *ConsensusTestSuite) TestSync() {
 		req        = s.Require()
 		peerCount  = 4
 		dMoment    = time.Now().UTC()
-		untilRound = uint64(7)
-		stopRound  = uint64(5)
-		aliveRound = uint64(4)
+		untilRound = uint64(6)
+		stopRound  = uint64(4)
+		// aliveRound should be large enough to test round event handling in
+		// syncer.
+		aliveRound = uint64(3)
 		errChan    = make(chan error, 100)
 	)
 	prvKeys, pubKeys, err := test.NewKeys(peerCount)
@@ -355,7 +376,7 @@ func (s *ConsensusTestSuite) TestSync() {
 		core.ConfigRoundShift)
 	req.NoError(err)
 	req.NoError(seedGov.State().RequestChange(
-		test.StateChangeRoundLength, uint64(60)))
+		test.StateChangeRoundLength, uint64(100)))
 	seedGov.CatchUpWithRound(0)
 	seedGov.CatchUpWithRound(1)
 	// A short round interval.
@@ -447,29 +468,30 @@ ReachAlive:
 		}
 	}()
 	// Wait until all nodes reach 'untilRound'.
+	var stoppedRound uint64
 	go func() {
 		n, pos := stoppedNode, stoppedNode.app.GetLatestDeliveredPosition()
 	ReachFinished:
 		for {
 			fmt.Println("latestPos", n.ID, &pos)
 			time.Sleep(5 * time.Second)
-			for _, n = range nodes {
+			if stoppedNode.con != nil {
 				pos = n.app.GetLatestDeliveredPosition()
-				if n.ID == stoppedNode.ID {
-					if n.con == nil {
-						continue
-					}
-					if pos.Round < stopRound {
-						continue ReachFinished
-					}
+				if pos.Round >= stopRound {
 					// Stop a node, we should still be able to proceed.
 					stoppedNode.con.Stop()
 					stoppedNode.con = nil
+					stoppedRound = pos.Round
 					fmt.Println("one node stopped", stoppedNode.ID)
 					utils.LaunchDummyReceiver(
 						runnerCtx, stoppedNode.network.ReceiveChan(), nil)
+				}
+			}
+			for _, n = range nodes {
+				if n.ID == stoppedNode.ID {
 					continue
 				}
+				pos = n.app.GetLatestDeliveredPosition()
 				if pos.Round < untilRound {
 					continue ReachFinished
 				}
@@ -485,6 +507,7 @@ ReachAlive:
 	case <-runnerCtx.Done():
 		// This test passed.
 	}
+	s.Require().Equal(stoppedRound, stopRound)
 }
 
 func (s *ConsensusTestSuite) TestForceSync() {
@@ -632,6 +655,100 @@ Loop:
 	s.verifyNodes(nodes)
 }
 
+func (s *ConsensusTestSuite) TestResetDKG() {
+	var (
+		req        = s.Require()
+		peerCount  = 5
+		dMoment    = time.Now().UTC()
+		untilRound = uint64(3)
+	)
+	prvKeys, pubKeys, err := test.NewKeys(peerCount)
+	req.NoError(err)
+	// Setup seed governance instance. Give a short latency to make this test
+	// run faster.
+	seedGov, err := test.NewGovernance(
+		test.NewState(core.DKGDelayRound,
+			pubKeys, 100*time.Millisecond, &common.NullLogger{}, true),
+		core.ConfigRoundShift)
+	req.NoError(err)
+	req.NoError(seedGov.State().RequestChange(
+		test.StateChangeRoundLength, uint64(100)))
+	req.NoError(seedGov.State().RequestChange(
+		test.StateChangeNotarySetSize, uint32(4)))
+	req.NoError(seedGov.State().RequestChange(
+		test.StateChangeDKGSetSize, uint32(4)))
+	nodes := s.setupNodes(dMoment, prvKeys, seedGov)
+	// A round event handler to purge utils.NodeSetCache in test.Network.
+	purgeHandlerGen := func(n *test.Network) func([]utils.RoundEventParam) {
+		return func(evts []utils.RoundEventParam) {
+			for _, e := range evts {
+				if e.Reset == 0 {
+					continue
+				}
+				n.PurgeNodeSetCache(e.Round + 1)
+			}
+		}
+	}
+	// Round Height reference table:
+	// - Round:1 Reset:0 -- 100
+	// - Round:1 Reset:1 -- 200
+	// - Round:1 Reset:2 -- 300
+	// - Round:2 Reset:0 -- 400
+	// - Round:2 Reset:1 -- 500
+	// - Round:3 Reset:0 -- 600
+	// Register round event handler to prohibit/unprohibit DKG operation to
+	// governance.
+	roundHandlerGen := func(g *test.Governance) func([]utils.RoundEventParam) {
+		return func(evts []utils.RoundEventParam) {
+			trigger := func(e utils.RoundEventParam) {
+				// Make round 2 reseted until resetCount == 2.
+				if e.Round == 1 && e.Reset == 0 {
+					prohibitDKG(g)
+				}
+				if e.Round == 1 && e.Reset == 2 {
+					unprohibitDKG(g)
+				}
+				// Make round 3 reseted until resetCount == 1.
+				if e.Round == 2 && e.Reset == 0 {
+					// Allow DKG final this time.
+					g.Prohibit(test.StateAddDKGMasterPublicKey)
+					g.Prohibit(test.StateAddDKGComplaint)
+				}
+				if e.Round == 2 && e.Reset == 1 {
+					unprohibitDKG(g)
+				}
+			}
+			for _, e := range evts {
+				trigger(e)
+			}
+		}
+	}
+	for _, n := range nodes {
+		n.rEvt.Register(purgeHandlerGen(n.network))
+		n.rEvt.Register(roundHandlerGen(n.gov))
+		go n.con.Run()
+	}
+Loop:
+	for {
+		<-time.After(5 * time.Second)
+		for _, n := range nodes {
+			latestPos := n.app.GetLatestDeliveredPosition()
+			fmt.Println("latestPos", n.ID, &latestPos)
+			if latestPos.Round < untilRound {
+				continue Loop
+			}
+		}
+		// Oh ya.
+		break
+	}
+	s.verifyNodes(nodes)
+	for _, n := range nodes {
+		n.con.Stop()
+		req.Equal(n.gov.DKGResetCount(2), uint64(2))
+		req.Equal(n.gov.DKGResetCount(3), uint64(1))
+	}
+}
+
 func TestConsensus(t *testing.T) {
 	suite.Run(t, new(ConsensusTestSuite))
 }
-- 
cgit