diff options
Diffstat (limited to 'core/syncer/consensus.go')
-rw-r--r-- | core/syncer/consensus.go | 137 |
1 files changed, 76 insertions, 61 deletions
diff --git a/core/syncer/consensus.go b/core/syncer/consensus.go index fd48793..f2f8f9e 100644 --- a/core/syncer/consensus.go +++ b/core/syncer/consensus.go @@ -69,12 +69,14 @@ type Consensus struct { configs []*types.Config roundBeginHeights []uint64 agreementRoundCut uint64 + heightEvt *common.Event + roundEvt *utils.RoundEvent // lock for accessing all fields. lock sync.RWMutex duringBuffering bool latestCRSRound uint64 - moduleWaitGroup sync.WaitGroup + waitGroup sync.WaitGroup agreementWaitGroup sync.WaitGroup pullChan chan common.Hash receiveChan chan *types.Block @@ -116,6 +118,7 @@ func NewConsensus( receiveChan: make(chan *types.Block, 1000), pullChan: make(chan common.Hash, 1000), randomnessResults: make(map[common.Hash]*types.BlockRandomnessResult), + heightEvt: common.NewEvent(), } con.ctx, con.ctxCancel = context.WithCancel(context.Background()) _, con.initChainTipHeight = db.GetCompactionChainTipInfo() @@ -143,9 +146,73 @@ func (con *Consensus) assureBuffering() { return } con.duringBuffering = true + // Get latest block to prepare utils.RoundEvent. + var ( + err error + blockHash, height = con.db.GetCompactionChainTipInfo() + ) + if height == 0 { + con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, con.logger, + uint64(0), uint64(0), uint64(0), core.ConfigRoundShift) + } else { + var b types.Block + if b, err = con.db.GetBlock(blockHash); err == nil { + beginHeight := con.roundBeginHeights[b.Position.Round] + con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, + con.logger, b.Position.Round, beginHeight, beginHeight, + core.ConfigRoundShift) + } + } + if err != nil { + panic(err) + } + // Make sure con.roundEvt stopped before stopping con.agreementModule. + con.waitGroup.Add(1) + // Register a round event handler to notify CRS to agreementModule. + con.roundEvt.Register(func(evts []utils.RoundEventParam) { + con.waitGroup.Add(1) + go func() { + defer con.waitGroup.Done() + for _, e := range evts { + select { + case <-con.ctx.Done(): + return + default: + } + for func() bool { + select { + case <-con.ctx.Done(): + return false + case con.agreementModule.inputChan <- e.Round: + return false + case <-time.After(500 * time.Millisecond): + con.logger.Warn( + "agreement input channel is full when putting CRS", + "round", e.Round, + ) + return true + } + }() { + } + } + }() + }) + // Register a round event handler to validate next round. + con.roundEvt.Register(func(evts []utils.RoundEventParam) { + e := evts[len(evts)-1] + con.heightEvt.RegisterHeight(e.NextRoundValidationHeight(), func( + blockHeight uint64) { + select { + case <-con.ctx.Done(): + return + default: + } + con.roundEvt.ValidateNextRound(blockHeight) + }) + }) + con.roundEvt.TriggerInitEvent() con.startAgreement() con.startNetwork() - con.startCRSMonitor() } func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) { @@ -265,6 +332,7 @@ func (con *Consensus) SyncBlocks( b.Hash, b.Finalization.Height); err != nil { return } + go con.heightEvt.NotifyHeight(b.Finalization.Height) } if latest { con.assureBuffering() @@ -346,7 +414,10 @@ func (con *Consensus) stopBuffering() { return } con.logger.Trace("stop syncer modules") - con.moduleWaitGroup.Wait() + con.roundEvt.Stop() + con.waitGroup.Done() + // Wait for all routines depends on con.agreementModule stopped. + con.waitGroup.Wait() // Since there is no one waiting for the receive channel of fullnode, we // need to launch a dummy receiver right away. con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( @@ -492,9 +563,9 @@ func (con *Consensus) cacheRandomnessResult(r *types.BlockRandomnessResult) { // startNetwork starts network for receiving blocks and agreement results. func (con *Consensus) startNetwork() { - con.moduleWaitGroup.Add(1) + con.waitGroup.Add(1) go func() { - defer con.moduleWaitGroup.Done() + defer con.waitGroup.Done() loop: for { select { @@ -522,62 +593,6 @@ func (con *Consensus) startNetwork() { }() } -// startCRSMonitor is the dummiest way to verify if the CRS for one round -// is ready or not. -func (con *Consensus) startCRSMonitor() { - var lastNotifiedRound uint64 - // Notify all agreements for new CRS. - notifyNewCRS := func(round uint64) { - con.setupConfigsUntilRound(round) - if round == lastNotifiedRound { - return - } - con.logger.Debug("CRS is ready", "round", round) - lastNotifiedRound = round - func() { - con.lock.Lock() - defer con.lock.Unlock() - con.latestCRSRound = round - }() - for func() bool { - select { - case <-con.ctx.Done(): - return false - case con.agreementModule.inputChan <- round: - return false - case <-time.After(500 * time.Millisecond): - con.logger.Debug( - "agreement input channel is full when putting CRS", - "round", round, - ) - return true - } - }() { - } - } - con.moduleWaitGroup.Add(1) - go func() { - defer con.moduleWaitGroup.Done() - for { - select { - case <-con.ctx.Done(): - return - case <-time.After(500 * time.Millisecond): - } - // Notify agreement modules for the latest round that CRS is - // available if the round is not notified yet. - checked := lastNotifiedRound + 1 - for (con.gov.CRS(checked) != common.Hash{}) { - checked++ - } - checked-- - if checked > lastNotifiedRound { - notifyNewCRS(checked) - } - } - }() -} - func (con *Consensus) stopAgreement() { if con.agreementModule.inputChan != nil { close(con.agreementModule.inputChan) |