diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/agreement.go | 10 | ||||
-rw-r--r-- | core/consensus.go | 15 | ||||
-rw-r--r-- | core/consensus_test.go | 56 |
3 files changed, 53 insertions, 28 deletions
diff --git a/core/agreement.go b/core/agreement.go index 2337b43..8618b5f 100644 --- a/core/agreement.go +++ b/core/agreement.go @@ -130,9 +130,7 @@ func newAgreement( fastForward: make(chan uint64, 1), authModule: authModule, } - agreement.restart(notarySet, types.Position{ - ChainID: math.MaxUint32, - }) + agreement.stop() return agreement } @@ -203,6 +201,12 @@ func (a *agreement) restart( } } +func (a *agreement) stop() { + a.restart(make(map[types.NodeID]struct{}), types.Position{ + ChainID: math.MaxUint32, + }) +} + // clocks returns how many time this state is required. func (a *agreement) clocks() int { return a.state.clocks() diff --git a/core/consensus.go b/core/consensus.go index 0781505..4929d0b 100644 --- a/core/consensus.go +++ b/core/consensus.go @@ -246,8 +246,6 @@ func NewConsensus( if err != nil { panic(err) } - // Setup context. - ctx, ctxCancel := context.WithCancel(context.Background()) // Setup auth module. authModule := NewAuthenticator(prv) // Check if the application implement Debug interface. @@ -283,8 +281,6 @@ func NewConsensus( cfgModule: cfgModule, dMoment: dMoment, nodeSetCache: nodeSetCache, - ctx: ctx, - ctxCancel: ctxCancel, authModule: authModule, event: common.NewEvent(), } @@ -316,6 +312,8 @@ func NewConsensus( // Run starts running DEXON Consensus. func (con *Consensus) Run() { + // Setup context. + con.ctx, con.ctxCancel = context.WithCancel(context.Background()) go con.processMsg(con.network.ReceiveChan()) con.cfgModule.registerDKG(con.round, int(con.currentConfig.DKGSetSize)/3+1) con.event.RegisterTime(con.dMoment.Add(con.currentConfig.RoundInterval/4), @@ -471,6 +469,11 @@ func (con *Consensus) runCRS() { } func (con *Consensus) initialRound(startTime time.Time) { + select { + case <-con.ctx.Done(): + return + default: + } con.currentConfig = con.gov.Configuration(con.round) con.event.RegisterTime(startTime.Add(con.currentConfig.RoundInterval/2), @@ -501,6 +504,10 @@ func (con *Consensus) initialRound(startTime time.Time) { // Stop the Consensus core. func (con *Consensus) Stop() { + for _, a := range con.baModules { + a.stop() + } + con.event.Reset() con.ctxCancel() } diff --git a/core/consensus_test.go b/core/consensus_test.go index e089fdf..555e7dd 100644 --- a/core/consensus_test.go +++ b/core/consensus_test.go @@ -32,7 +32,6 @@ import ( // network implements core.Network. type network struct { - ch <-chan interface{} nID types.NodeID conn *networkConnection } @@ -80,45 +79,57 @@ func (n *network) BroadcastDKGPartialSignature( // ReceiveChan returns a channel to receive messages from DEXON network. func (n *network) ReceiveChan() <-chan interface{} { - return n.ch -} - -type networkConnection struct { - channels map[types.NodeID]chan interface{} + return make(chan interface{}) } func (nc *networkConnection) broadcast(from types.NodeID, msg interface{}) { - for pk, ch := range nc.channels { - if pk == from { + for nID := range nc.cons { + if nID == from { continue } - go func(ch chan interface{}) { - ch <- msg - }(ch) + nc.send(nID, msg) } } func (nc *networkConnection) send(to types.NodeID, msg interface{}) { - ch, exist := nc.channels[to] + con, exist := nc.cons[to] if !exist { return } go func() { - ch <- msg + switch val := msg.(type) { + case *types.Block: + nc.s.Require().NoError(con.preProcessBlock(val)) + case *types.Vote: + nc.s.Require().NoError(con.ProcessVote(val)) + case *types.AgreementResult: + nc.s.Require().NoError(con.ProcessAgreementResult(val)) + case *types.BlockRandomnessResult: + nc.s.Require().NoError(con.ProcessBlockRandomnessResult(val)) + case *types.DKGPrivateShare: + nc.s.Require().NoError(con.cfgModule.processPrivateShare(val)) + case *types.DKGPartialSignature: + nc.s.Require().NoError(con.cfgModule.processPartialSignature(val)) + } }() } -func (nc *networkConnection) join(pk crypto.PublicKey) *network { - ch := make(chan interface{}, 300) - nID := types.NewNodeID(pk) - nc.channels[nID] = ch +type networkConnection struct { + s *ConsensusTestSuite + cons map[types.NodeID]*Consensus +} + +func (nc *networkConnection) newNetwork(nID types.NodeID) *network { return &network{ - ch: ch, nID: nID, conn: nc, } } +func (nc *networkConnection) setCon(nID types.NodeID, con *Consensus) { + nc.cons[nID] = con +} + type ConsensusTestSuite struct { suite.Suite conn *networkConnection @@ -126,7 +137,8 @@ type ConsensusTestSuite struct { func (s *ConsensusTestSuite) SetupTest() { s.conn = &networkConnection{ - channels: make(map[types.NodeID]chan interface{}), + s: s, + cons: make(map[types.NodeID]*Consensus), } } @@ -151,8 +163,11 @@ func (s *ConsensusTestSuite) prepareConsensus( app := test.NewApp() db, err := blockdb.NewMemBackedBlockDB() s.Require().Nil(err) + nID := types.NewNodeID(prvKey.PublicKey()) + network := s.conn.newNetwork(nID) con := NewConsensus(dMoment, app, gov, db, - s.conn.join(prvKey.PublicKey()), prvKey) + network, prvKey) + s.conn.setCon(nID, con) return app, con } @@ -462,7 +477,6 @@ func (s *ConsensusTestSuite) TestDKGCRS() { _, con := s.prepareConsensus(dMoment, gov, key) nID := types.NewNodeID(key.PublicKey()) cons[nID] = con - go con.processMsg(con.network.ReceiveChan()) con.cfgModule.registerDKG(uint64(0), n/3+1) } for _, con := range cons { |