aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/agreement.go10
-rw-r--r--core/consensus.go15
-rw-r--r--core/consensus_test.go56
3 files changed, 53 insertions, 28 deletions
diff --git a/core/agreement.go b/core/agreement.go
index 2337b43..8618b5f 100644
--- a/core/agreement.go
+++ b/core/agreement.go
@@ -130,9 +130,7 @@ func newAgreement(
fastForward: make(chan uint64, 1),
authModule: authModule,
}
- agreement.restart(notarySet, types.Position{
- ChainID: math.MaxUint32,
- })
+ agreement.stop()
return agreement
}
@@ -203,6 +201,12 @@ func (a *agreement) restart(
}
}
+func (a *agreement) stop() {
+ a.restart(make(map[types.NodeID]struct{}), types.Position{
+ ChainID: math.MaxUint32,
+ })
+}
+
// clocks returns how many time this state is required.
func (a *agreement) clocks() int {
return a.state.clocks()
diff --git a/core/consensus.go b/core/consensus.go
index 0781505..4929d0b 100644
--- a/core/consensus.go
+++ b/core/consensus.go
@@ -246,8 +246,6 @@ func NewConsensus(
if err != nil {
panic(err)
}
- // Setup context.
- ctx, ctxCancel := context.WithCancel(context.Background())
// Setup auth module.
authModule := NewAuthenticator(prv)
// Check if the application implement Debug interface.
@@ -283,8 +281,6 @@ func NewConsensus(
cfgModule: cfgModule,
dMoment: dMoment,
nodeSetCache: nodeSetCache,
- ctx: ctx,
- ctxCancel: ctxCancel,
authModule: authModule,
event: common.NewEvent(),
}
@@ -316,6 +312,8 @@ func NewConsensus(
// Run starts running DEXON Consensus.
func (con *Consensus) Run() {
+ // Setup context.
+ con.ctx, con.ctxCancel = context.WithCancel(context.Background())
go con.processMsg(con.network.ReceiveChan())
con.cfgModule.registerDKG(con.round, int(con.currentConfig.DKGSetSize)/3+1)
con.event.RegisterTime(con.dMoment.Add(con.currentConfig.RoundInterval/4),
@@ -471,6 +469,11 @@ func (con *Consensus) runCRS() {
}
func (con *Consensus) initialRound(startTime time.Time) {
+ select {
+ case <-con.ctx.Done():
+ return
+ default:
+ }
con.currentConfig = con.gov.Configuration(con.round)
con.event.RegisterTime(startTime.Add(con.currentConfig.RoundInterval/2),
@@ -501,6 +504,10 @@ func (con *Consensus) initialRound(startTime time.Time) {
// Stop the Consensus core.
func (con *Consensus) Stop() {
+ for _, a := range con.baModules {
+ a.stop()
+ }
+ con.event.Reset()
con.ctxCancel()
}
diff --git a/core/consensus_test.go b/core/consensus_test.go
index e089fdf..555e7dd 100644
--- a/core/consensus_test.go
+++ b/core/consensus_test.go
@@ -32,7 +32,6 @@ import (
// network implements core.Network.
type network struct {
- ch <-chan interface{}
nID types.NodeID
conn *networkConnection
}
@@ -80,45 +79,57 @@ func (n *network) BroadcastDKGPartialSignature(
// ReceiveChan returns a channel to receive messages from DEXON network.
func (n *network) ReceiveChan() <-chan interface{} {
- return n.ch
-}
-
-type networkConnection struct {
- channels map[types.NodeID]chan interface{}
+ return make(chan interface{})
}
func (nc *networkConnection) broadcast(from types.NodeID, msg interface{}) {
- for pk, ch := range nc.channels {
- if pk == from {
+ for nID := range nc.cons {
+ if nID == from {
continue
}
- go func(ch chan interface{}) {
- ch <- msg
- }(ch)
+ nc.send(nID, msg)
}
}
func (nc *networkConnection) send(to types.NodeID, msg interface{}) {
- ch, exist := nc.channels[to]
+ con, exist := nc.cons[to]
if !exist {
return
}
go func() {
- ch <- msg
+ switch val := msg.(type) {
+ case *types.Block:
+ nc.s.Require().NoError(con.preProcessBlock(val))
+ case *types.Vote:
+ nc.s.Require().NoError(con.ProcessVote(val))
+ case *types.AgreementResult:
+ nc.s.Require().NoError(con.ProcessAgreementResult(val))
+ case *types.BlockRandomnessResult:
+ nc.s.Require().NoError(con.ProcessBlockRandomnessResult(val))
+ case *types.DKGPrivateShare:
+ nc.s.Require().NoError(con.cfgModule.processPrivateShare(val))
+ case *types.DKGPartialSignature:
+ nc.s.Require().NoError(con.cfgModule.processPartialSignature(val))
+ }
}()
}
-func (nc *networkConnection) join(pk crypto.PublicKey) *network {
- ch := make(chan interface{}, 300)
- nID := types.NewNodeID(pk)
- nc.channels[nID] = ch
+type networkConnection struct {
+ s *ConsensusTestSuite
+ cons map[types.NodeID]*Consensus
+}
+
+func (nc *networkConnection) newNetwork(nID types.NodeID) *network {
return &network{
- ch: ch,
nID: nID,
conn: nc,
}
}
+func (nc *networkConnection) setCon(nID types.NodeID, con *Consensus) {
+ nc.cons[nID] = con
+}
+
type ConsensusTestSuite struct {
suite.Suite
conn *networkConnection
@@ -126,7 +137,8 @@ type ConsensusTestSuite struct {
func (s *ConsensusTestSuite) SetupTest() {
s.conn = &networkConnection{
- channels: make(map[types.NodeID]chan interface{}),
+ s: s,
+ cons: make(map[types.NodeID]*Consensus),
}
}
@@ -151,8 +163,11 @@ func (s *ConsensusTestSuite) prepareConsensus(
app := test.NewApp()
db, err := blockdb.NewMemBackedBlockDB()
s.Require().Nil(err)
+ nID := types.NewNodeID(prvKey.PublicKey())
+ network := s.conn.newNetwork(nID)
con := NewConsensus(dMoment, app, gov, db,
- s.conn.join(prvKey.PublicKey()), prvKey)
+ network, prvKey)
+ s.conn.setCon(nID, con)
return app, con
}
@@ -462,7 +477,6 @@ func (s *ConsensusTestSuite) TestDKGCRS() {
_, con := s.prepareConsensus(dMoment, gov, key)
nID := types.NewNodeID(key.PublicKey())
cons[nID] = con
- go con.processMsg(con.network.ReceiveChan())
con.cfgModule.registerDKG(uint64(0), n/3+1)
}
for _, con := range cons {