diff options
author | Wei-Ning Huang <w@byzantine-lab.io> | 2019-06-23 15:39:23 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@byzantine-lab.io> | 2019-09-17 16:57:30 +0800 |
commit | e6f5201b178f40b516ffe7b98757df25f8aee028 (patch) | |
tree | 982d6281ac9670d0ad451ca6bbd0677488b52344 /vendor/github.com/tangerine-network/tangerine-consensus/core/syncer | |
parent | f6e06ac35033f9e52b6b2e3ebfe623c23a39c338 (diff) | |
download | go-tangerine-e6f5201b178f40b516ffe7b98757df25f8aee028.tar.gz go-tangerine-e6f5201b178f40b516ffe7b98757df25f8aee028.tar.zst go-tangerine-e6f5201b178f40b516ffe7b98757df25f8aee028.zip |
import: switch consensus core to gitlab.com/tangerine-network/tangerine-consensus
Diffstat (limited to 'vendor/github.com/tangerine-network/tangerine-consensus/core/syncer')
3 files changed, 999 insertions, 0 deletions
diff --git a/vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/agreement.go b/vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/agreement.go new file mode 100644 index 000000000..c20bd6018 --- /dev/null +++ b/vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/agreement.go @@ -0,0 +1,301 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package syncer + +import ( + "bytes" + "context" + "fmt" + "time" + + "github.com/tangerine-network/tangerine-consensus/common" + "github.com/tangerine-network/tangerine-consensus/core" + "github.com/tangerine-network/tangerine-consensus/core/crypto" + "github.com/tangerine-network/tangerine-consensus/core/types" + "github.com/tangerine-network/tangerine-consensus/core/utils" +) + +// Struct agreement implements struct of BA (Byzantine Agreement) protocol +// needed in syncer, which only receives agreement results. +type agreement struct { + chainTip uint64 + cache *utils.NodeSetCache + tsigVerifierCache *core.TSigVerifierCache + inputChan chan interface{} + outputChan chan<- *types.Block + pullChan chan<- common.Hash + blocks map[types.Position]map[common.Hash]*types.Block + agreementResults map[common.Hash][]byte + latestCRSRound uint64 + pendingAgrs map[uint64]map[common.Hash]*types.AgreementResult + pendingBlocks map[uint64]map[common.Hash]*types.Block + logger common.Logger + confirmedBlocks map[common.Hash]struct{} + ctx context.Context + ctxCancel context.CancelFunc +} + +// newAgreement creates a new agreement instance. +func newAgreement(chainTip uint64, + ch chan<- *types.Block, pullChan chan<- common.Hash, + cache *utils.NodeSetCache, verifier *core.TSigVerifierCache, + logger common.Logger) *agreement { + a := &agreement{ + chainTip: chainTip, + cache: cache, + tsigVerifierCache: verifier, + inputChan: make(chan interface{}, 1000), + outputChan: ch, + pullChan: pullChan, + blocks: make(map[types.Position]map[common.Hash]*types.Block), + agreementResults: make(map[common.Hash][]byte), + logger: logger, + pendingAgrs: make( + map[uint64]map[common.Hash]*types.AgreementResult), + pendingBlocks: make( + map[uint64]map[common.Hash]*types.Block), + confirmedBlocks: make(map[common.Hash]struct{}), + } + a.ctx, a.ctxCancel = context.WithCancel(context.Background()) + return a +} + +// run starts the agreement, this does not start a new routine, go a new +// routine explicitly in the caller. +func (a *agreement) run() { + defer a.ctxCancel() + for { + select { + case val, ok := <-a.inputChan: + if !ok { + // InputChan is closed by network when network ends. + return + } + switch v := val.(type) { + case *types.Block: + if v.Position.Round >= core.DKGDelayRound && v.IsFinalized() { + a.processFinalizedBlock(v) + } else { + a.processBlock(v) + } + case *types.AgreementResult: + a.processAgreementResult(v) + case uint64: + a.processNewCRS(v) + } + } + } +} + +func (a *agreement) processBlock(b *types.Block) { + if _, exist := a.confirmedBlocks[b.Hash]; exist { + return + } + if rand, exist := a.agreementResults[b.Hash]; exist { + if len(b.Randomness) == 0 { + b.Randomness = rand + } + a.confirm(b) + } else { + if _, exist := a.blocks[b.Position]; !exist { + a.blocks[b.Position] = make(map[common.Hash]*types.Block) + } + a.blocks[b.Position][b.Hash] = b + } +} + +func (a *agreement) processFinalizedBlock(block *types.Block) { + // Cache those results that CRS is not ready yet. + if _, exists := a.confirmedBlocks[block.Hash]; exists { + a.logger.Trace("finalized block already confirmed", "block", block) + return + } + if block.Position.Round > a.latestCRSRound { + pendingsForRound, exists := a.pendingBlocks[block.Position.Round] + if !exists { + pendingsForRound = make(map[common.Hash]*types.Block) + a.pendingBlocks[block.Position.Round] = pendingsForRound + } + pendingsForRound[block.Hash] = block + a.logger.Trace("finalized block cached", "block", block) + return + } + if err := utils.VerifyBlockSignature(block); err != nil { + return + } + verifier, ok, err := a.tsigVerifierCache.UpdateAndGet( + block.Position.Round) + if err != nil { + a.logger.Error("error verifying block randomness", + "block", block, + "error", err) + return + } + if !ok { + a.logger.Error("cannot verify block randomness", "block", block) + return + } + if !verifier.VerifySignature(block.Hash, crypto.Signature{ + Type: "bls", + Signature: block.Randomness, + }) { + a.logger.Error("incorrect block randomness", "block", block) + return + } + a.confirm(block) +} + +func (a *agreement) processAgreementResult(r *types.AgreementResult) { + // Cache those results that CRS is not ready yet. + if _, exists := a.confirmedBlocks[r.BlockHash]; exists { + a.logger.Trace("Agreement result already confirmed", "result", r) + return + } + if r.Position.Round > a.latestCRSRound { + pendingsForRound, exists := a.pendingAgrs[r.Position.Round] + if !exists { + pendingsForRound = make(map[common.Hash]*types.AgreementResult) + a.pendingAgrs[r.Position.Round] = pendingsForRound + } + pendingsForRound[r.BlockHash] = r + a.logger.Trace("Agreement result cached", "result", r) + return + } + if err := core.VerifyAgreementResult(r, a.cache); err != nil { + a.logger.Error("Agreement result verification failed", + "result", r, + "error", err) + return + } + if r.Position.Round >= core.DKGDelayRound { + verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(r.Position.Round) + if err != nil { + a.logger.Error("error verifying agreement result randomness", + "result", r, + "error", err) + return + } + if !ok { + a.logger.Error("cannot verify agreement result randomness", "result", r) + return + } + if !verifier.VerifySignature(r.BlockHash, crypto.Signature{ + Type: "bls", + Signature: r.Randomness, + }) { + a.logger.Error("incorrect agreement result randomness", "result", r) + return + } + } else { + // Special case for rounds before DKGDelayRound. + if bytes.Compare(r.Randomness, core.NoRand) != 0 { + a.logger.Error("incorrect agreement result randomness", "result", r) + return + } + } + if r.IsEmptyBlock { + b := &types.Block{ + Position: r.Position, + Randomness: r.Randomness, + } + // Empty blocks should be confirmed directly, they won't be sent over + // the wire. + a.confirm(b) + return + } + if bs, exist := a.blocks[r.Position]; exist { + if b, exist := bs[r.BlockHash]; exist { + b.Randomness = r.Randomness + a.confirm(b) + return + } + } + a.agreementResults[r.BlockHash] = r.Randomness +loop: + for { + select { + case a.pullChan <- r.BlockHash: + break loop + case <-a.ctx.Done(): + a.logger.Error("Pull request is not sent", + "position", &r.Position, + "hash", r.BlockHash.String()[:6]) + return + case <-time.After(500 * time.Millisecond): + a.logger.Debug("Pull request is unable to send", + "position", &r.Position, + "hash", r.BlockHash.String()[:6]) + } + } +} + +func (a *agreement) processNewCRS(round uint64) { + if round <= a.latestCRSRound { + return + } + prevRound := a.latestCRSRound + 1 + a.latestCRSRound = round + // Verify all pending results. + for r := prevRound; r <= a.latestCRSRound; r++ { + pendingsForRound := a.pendingAgrs[r] + if pendingsForRound == nil { + continue + } + delete(a.pendingAgrs, r) + for _, res := range pendingsForRound { + if err := core.VerifyAgreementResult(res, a.cache); err != nil { + a.logger.Error("Invalid agreement result", + "result", res, + "error", err) + continue + } + a.logger.Error("Flush agreement result", "result", res) + a.processAgreementResult(res) + break + } + } +} + +// confirm notifies consensus the confirmation of a block in BA. +func (a *agreement) confirm(b *types.Block) { + if !b.IsFinalized() { + panic(fmt.Errorf("confirm a block %s without randomness", b)) + } + if _, exist := a.confirmedBlocks[b.Hash]; !exist { + delete(a.blocks, b.Position) + delete(a.agreementResults, b.Hash) + loop: + for { + select { + case a.outputChan <- b: + break loop + case <-a.ctx.Done(): + a.logger.Error("Confirmed block is not sent", "block", b) + return + case <-time.After(500 * time.Millisecond): + a.logger.Debug("Agreement output channel is full", "block", b) + } + } + a.confirmedBlocks[b.Hash] = struct{}{} + } + if b.Position.Height > a.chainTip+1 { + if _, exist := a.confirmedBlocks[b.ParentHash]; !exist { + a.pullChan <- b.ParentHash + } + } +} diff --git a/vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/consensus.go b/vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/consensus.go new file mode 100644 index 000000000..ecadae4ad --- /dev/null +++ b/vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/consensus.go @@ -0,0 +1,543 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus library. +// +// The dexon-consensus library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus library. If not, see +// <http://www.gnu.org/licenses/>. + +package syncer + +import ( + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/tangerine-network/tangerine-consensus/common" + "github.com/tangerine-network/tangerine-consensus/core" + "github.com/tangerine-network/tangerine-consensus/core/crypto" + "github.com/tangerine-network/tangerine-consensus/core/db" + "github.com/tangerine-network/tangerine-consensus/core/types" + "github.com/tangerine-network/tangerine-consensus/core/utils" +) + +var ( + // ErrAlreadySynced is reported when syncer is synced. + ErrAlreadySynced = fmt.Errorf("already synced") + // ErrNotSynced is reported when syncer is not synced yet. + ErrNotSynced = fmt.Errorf("not synced yet") + // ErrGenesisBlockReached is reported when genesis block reached. + ErrGenesisBlockReached = fmt.Errorf("genesis block reached") + // ErrInvalidBlockOrder is reported when SyncBlocks receives unordered + // blocks. + ErrInvalidBlockOrder = fmt.Errorf("invalid block order") + // ErrInvalidSyncingHeight raised when the blocks to sync is not following + // the compaction chain tip in database. + ErrInvalidSyncingHeight = fmt.Errorf("invalid syncing height") +) + +// Consensus is for syncing consensus module. +type Consensus struct { + db db.Database + gov core.Governance + dMoment time.Time + logger common.Logger + app core.Application + prv crypto.PrivateKey + network core.Network + nodeSetCache *utils.NodeSetCache + tsigVerifier *core.TSigVerifierCache + + blocks types.BlocksByPosition + agreementModule *agreement + agreementRoundCut uint64 + heightEvt *common.Event + roundEvt *utils.RoundEvent + + // lock for accessing all fields. + lock sync.RWMutex + duringBuffering bool + latestCRSRound uint64 + waitGroup sync.WaitGroup + agreementWaitGroup sync.WaitGroup + pullChan chan common.Hash + receiveChan chan *types.Block + ctx context.Context + ctxCancel context.CancelFunc + syncedLastBlock *types.Block + syncedConsensus *core.Consensus + syncedSkipNext bool + dummyCancel context.CancelFunc + dummyFinished <-chan struct{} + dummyMsgBuffer []types.Msg + initChainTipHeight uint64 +} + +// NewConsensus creates an instance for Consensus (syncer consensus). +func NewConsensus( + initHeight uint64, + dMoment time.Time, + app core.Application, + gov core.Governance, + db db.Database, + network core.Network, + prv crypto.PrivateKey, + logger common.Logger) *Consensus { + + con := &Consensus{ + dMoment: dMoment, + app: app, + gov: gov, + db: db, + network: network, + nodeSetCache: utils.NewNodeSetCache(gov), + tsigVerifier: core.NewTSigVerifierCache(gov, 7), + prv: prv, + logger: logger, + receiveChan: make(chan *types.Block, 1000), + pullChan: make(chan common.Hash, 1000), + heightEvt: common.NewEvent(), + } + con.ctx, con.ctxCancel = context.WithCancel(context.Background()) + _, con.initChainTipHeight = db.GetCompactionChainTipInfo() + con.agreementModule = newAgreement( + con.initChainTipHeight, + con.receiveChan, + con.pullChan, + con.nodeSetCache, + con.tsigVerifier, + con.logger) + con.agreementWaitGroup.Add(1) + go func() { + defer con.agreementWaitGroup.Done() + con.agreementModule.run() + }() + if err := con.deliverPendingBlocks(initHeight); err != nil { + panic(err) + } + return con +} + +func (con *Consensus) deliverPendingBlocks(height uint64) error { + if height >= con.initChainTipHeight { + return nil + } + blocks := make([]*types.Block, 0, con.initChainTipHeight-height) + hash, _ := con.db.GetCompactionChainTipInfo() + for { + block, err := con.db.GetBlock(hash) + if err != nil { + return err + } + if block.Position.Height == height { + break + } + blocks = append(blocks, &block) + hash = block.ParentHash + } + sort.Sort(types.BlocksByPosition(blocks)) + for _, b := range blocks { + con.logger.Debug("Syncer BlockConfirmed", "block", b) + con.app.BlockConfirmed(*b) + con.logger.Debug("Syncer BlockDelivered", "block", b) + con.app.BlockDelivered(b.Hash, b.Position, b.Randomness) + } + return nil +} + +func (con *Consensus) assureBuffering() { + if func() bool { + con.lock.RLock() + defer con.lock.RUnlock() + return con.duringBuffering + }() { + return + } + con.lock.Lock() + defer con.lock.Unlock() + if con.duringBuffering { + return + } + con.duringBuffering = true + // Get latest block to prepare utils.RoundEvent. + var ( + err error + blockHash, height = con.db.GetCompactionChainTipInfo() + ) + if height == 0 { + con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, con.logger, + types.Position{}, core.ConfigRoundShift) + } else { + var b types.Block + if b, err = con.db.GetBlock(blockHash); err == nil { + con.roundEvt, err = utils.NewRoundEvent(con.ctx, con.gov, + con.logger, b.Position, core.ConfigRoundShift) + } + } + if err != nil { + panic(err) + } + // Make sure con.roundEvt stopped before stopping con.agreementModule. + con.waitGroup.Add(1) + // Register a round event handler to reset node set cache, this handler + // should be the highest priority. + con.roundEvt.Register(func(evts []utils.RoundEventParam) { + for _, e := range evts { + if e.Reset == 0 { + continue + } + con.nodeSetCache.Purge(e.Round + 1) + con.tsigVerifier.Purge(e.Round + 1) + } + }) + // Register a round event handler to notify CRS to agreementModule. + con.roundEvt.Register(func(evts []utils.RoundEventParam) { + con.waitGroup.Add(1) + go func() { + defer con.waitGroup.Done() + for _, e := range evts { + select { + case <-con.ctx.Done(): + return + default: + } + for func() bool { + select { + case <-con.ctx.Done(): + return false + case con.agreementModule.inputChan <- e.Round: + return false + case <-time.After(500 * time.Millisecond): + con.logger.Warn( + "Agreement input channel is full when notifying new round", + "round", e.Round, + ) + return true + } + }() { + } + } + }() + }) + // Register a round event handler to validate next round. + con.roundEvt.Register(func(evts []utils.RoundEventParam) { + con.heightEvt.RegisterHeight( + evts[len(evts)-1].NextRoundValidationHeight(), + utils.RoundEventRetryHandlerGenerator(con.roundEvt, con.heightEvt), + ) + }) + con.roundEvt.TriggerInitEvent() + con.startAgreement() + con.startNetwork() +} + +func (con *Consensus) checkIfSynced(blocks []*types.Block) (synced bool) { + con.lock.RLock() + defer con.lock.RUnlock() + defer func() { + con.logger.Debug("Syncer synced status", + "last-block", blocks[len(blocks)-1], + "synced", synced, + ) + }() + if len(con.blocks) == 0 || len(blocks) == 0 { + return + } + synced = !blocks[len(blocks)-1].Position.Older(con.blocks[0].Position) + return +} + +func (con *Consensus) buildAllEmptyBlocks() { + con.lock.Lock() + defer con.lock.Unlock() + // Clean empty blocks on tips of chains. + for len(con.blocks) > 0 && con.isEmptyBlock(con.blocks[0]) { + con.blocks = con.blocks[1:] + } + // Build empty blocks. + for i, b := range con.blocks { + if con.isEmptyBlock(b) { + if con.blocks[i-1].Position.Height+1 == b.Position.Height { + con.buildEmptyBlock(b, con.blocks[i-1]) + } + } + } +} + +// ForceSync forces syncer to become synced. +func (con *Consensus) ForceSync(lastPos types.Position, skip bool) { + if con.syncedLastBlock != nil { + return + } + hash, height := con.db.GetCompactionChainTipInfo() + if height < lastPos.Height { + panic(fmt.Errorf("compaction chain not synced height %d, tip %d", + lastPos.Height, height)) + } else if height > lastPos.Height { + skip = false + } + block, err := con.db.GetBlock(hash) + if err != nil { + panic(err) + } + con.syncedLastBlock = &block + con.stopBuffering() + // We might call stopBuffering without calling assureBuffering. + if con.dummyCancel == nil { + con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( + context.Background(), con.network.ReceiveChan(), + func(msg types.Msg) { + con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) + }) + } + con.syncedSkipNext = skip + con.logger.Info("Force Sync", "block", &block, "skip", skip) +} + +// SyncBlocks syncs blocks from compaction chain, latest is true if the caller +// regards the blocks are the latest ones. Notice that latest can be true for +// many times. +// NOTICE: parameter "blocks" should be consecutive in compaction height. +// NOTICE: this method is not expected to be called concurrently. +func (con *Consensus) SyncBlocks( + blocks []*types.Block, latest bool) (synced bool, err error) { + defer func() { + con.logger.Debug("SyncBlocks returned", + "synced", synced, + "error", err, + "last-block", con.syncedLastBlock, + ) + }() + if con.syncedLastBlock != nil { + synced, err = true, ErrAlreadySynced + return + } + if len(blocks) == 0 { + return + } + // Check if blocks are consecutive. + for i := 1; i < len(blocks); i++ { + if blocks[i].Position.Height != blocks[i-1].Position.Height+1 { + err = ErrInvalidBlockOrder + return + } + } + // Make sure the first block is the next block of current compaction chain + // tip in DB. + _, tipHeight := con.db.GetCompactionChainTipInfo() + if blocks[0].Position.Height != tipHeight+1 { + con.logger.Error("Mismatched block height", + "now", blocks[0].Position.Height, + "expected", tipHeight+1, + ) + err = ErrInvalidSyncingHeight + return + } + con.logger.Trace("SyncBlocks", + "position", &blocks[0].Position, + "len", len(blocks), + "latest", latest, + ) + for _, b := range blocks { + if err = con.db.PutBlock(*b); err != nil { + // A block might be put into db when confirmed by BA, but not + // finalized yet. + if err == db.ErrBlockExists { + err = con.db.UpdateBlock(*b) + } + if err != nil { + return + } + } + if err = con.db.PutCompactionChainTipInfo( + b.Hash, b.Position.Height); err != nil { + return + } + con.heightEvt.NotifyHeight(b.Position.Height) + } + if latest { + con.assureBuffering() + con.buildAllEmptyBlocks() + // Check if compaction and agreements' blocks are overlapped. The + // overlapping of compaction chain and BA's oldest blocks means the + // syncing is done. + if con.checkIfSynced(blocks) { + con.stopBuffering() + con.syncedLastBlock = blocks[len(blocks)-1] + synced = true + } + } + return +} + +// GetSyncedConsensus returns the core.Consensus instance after synced. +func (con *Consensus) GetSyncedConsensus() (*core.Consensus, error) { + con.lock.Lock() + defer con.lock.Unlock() + if con.syncedConsensus != nil { + return con.syncedConsensus, nil + } + if con.syncedLastBlock == nil { + return nil, ErrNotSynced + } + // flush all blocks in con.blocks into core.Consensus, and build + // core.Consensus from syncer. + con.dummyCancel() + <-con.dummyFinished + var err error + con.syncedConsensus, err = core.NewConsensusFromSyncer( + con.syncedLastBlock, + con.syncedSkipNext, + con.dMoment, + con.app, + con.gov, + con.db, + con.network, + con.prv, + con.blocks, + con.dummyMsgBuffer, + con.logger) + return con.syncedConsensus, err +} + +// stopBuffering stops the syncer buffering routines. +// +// This method is mainly for caller to stop the syncer before synced, the syncer +// would call this method automatically after being synced. +func (con *Consensus) stopBuffering() { + if func() (notBuffering bool) { + con.lock.RLock() + defer con.lock.RUnlock() + notBuffering = !con.duringBuffering + return + }() { + return + } + if func() (alreadyCanceled bool) { + con.lock.Lock() + defer con.lock.Unlock() + if !con.duringBuffering { + alreadyCanceled = true + return + } + con.duringBuffering = false + con.logger.Trace("Syncer is about to stop") + // Stop network and CRS routines, wait until they are all stoped. + con.ctxCancel() + return + }() { + return + } + con.logger.Trace("Stop syncer modules") + con.roundEvt.Stop() + con.waitGroup.Done() + // Wait for all routines depends on con.agreementModule stopped. + con.waitGroup.Wait() + // Since there is no one waiting for the receive channel of fullnode, we + // need to launch a dummy receiver right away. + con.dummyCancel, con.dummyFinished = utils.LaunchDummyReceiver( + context.Background(), con.network.ReceiveChan(), + func(msg types.Msg) { + con.dummyMsgBuffer = append(con.dummyMsgBuffer, msg) + }) + // Stop agreements. + con.logger.Trace("Stop syncer agreement modules") + con.stopAgreement() + con.logger.Trace("Syncer stopped") + return +} + +// isEmptyBlock checks if a block is an empty block by both its hash and parent +// hash are empty. +func (con *Consensus) isEmptyBlock(b *types.Block) bool { + return b.Hash == common.Hash{} && b.ParentHash == common.Hash{} +} + +// buildEmptyBlock builds an empty block in agreement. +func (con *Consensus) buildEmptyBlock(b *types.Block, parent *types.Block) { + cfg := utils.GetConfigWithPanic(con.gov, b.Position.Round, con.logger) + b.Timestamp = parent.Timestamp.Add(cfg.MinBlockInterval) + b.Witness.Height = parent.Witness.Height + b.Witness.Data = make([]byte, len(parent.Witness.Data)) + copy(b.Witness.Data, parent.Witness.Data) +} + +// startAgreement starts agreements for receiving votes and agreements. +func (con *Consensus) startAgreement() { + // Start a routine for listening receive channel and pull block channel. + go func() { + for { + select { + case b, ok := <-con.receiveChan: + if !ok { + return + } + func() { + con.lock.Lock() + defer con.lock.Unlock() + if len(con.blocks) > 0 && + !b.Position.Newer(con.blocks[0].Position) { + return + } + con.blocks = append(con.blocks, b) + sort.Sort(con.blocks) + }() + case h, ok := <-con.pullChan: + if !ok { + return + } + con.network.PullBlocks(common.Hashes{h}) + } + } + }() +} + +// startNetwork starts network for receiving blocks and agreement results. +func (con *Consensus) startNetwork() { + con.waitGroup.Add(1) + go func() { + defer con.waitGroup.Done() + loop: + for { + select { + case val := <-con.network.ReceiveChan(): + switch v := val.Payload.(type) { + case *types.Block: + case *types.AgreementResult: + // Avoid byzantine nodes attack by broadcasting older + // agreement results. Normal nodes might report 'synced' + // while still fall behind other nodes. + if v.Position.Height <= con.initChainTipHeight { + continue loop + } + default: + continue loop + } + con.agreementModule.inputChan <- val.Payload + case <-con.ctx.Done(): + break loop + } + } + }() +} + +func (con *Consensus) stopAgreement() { + if con.agreementModule.inputChan != nil { + close(con.agreementModule.inputChan) + } + con.agreementWaitGroup.Wait() + con.agreementModule.inputChan = nil + close(con.receiveChan) + close(con.pullChan) +} diff --git a/vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/watch-cat.go b/vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/watch-cat.go new file mode 100644 index 000000000..ce2b05c1c --- /dev/null +++ b/vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/watch-cat.go @@ -0,0 +1,155 @@ +// Copyright 2019 The dexon-consensus Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// <http://www.gnu.org/licenses/>. + +package syncer + +import ( + "context" + "time" + + "github.com/tangerine-network/tangerine-consensus/common" + "github.com/tangerine-network/tangerine-consensus/core" + "github.com/tangerine-network/tangerine-consensus/core/types" + "github.com/tangerine-network/tangerine-consensus/core/utils" +) + +type configReader interface { + Configuration(round uint64) *types.Config +} + +// WatchCat is reponsible for signaling if syncer object should be terminated. +type WatchCat struct { + recovery core.Recovery + timeout time.Duration + configReader configReader + feed chan types.Position + lastPosition types.Position + polling time.Duration + ctx context.Context + cancel context.CancelFunc + logger common.Logger +} + +// NewWatchCat creats a new WatchCat 🐱 object. +func NewWatchCat( + recovery core.Recovery, + configReader configReader, + polling time.Duration, + timeout time.Duration, + logger common.Logger) *WatchCat { + wc := &WatchCat{ + recovery: recovery, + timeout: timeout, + configReader: configReader, + feed: make(chan types.Position), + polling: polling, + logger: logger, + } + return wc +} + +// Feed the WatchCat so it won't produce the termination signal. +func (wc *WatchCat) Feed(position types.Position) { + wc.feed <- position +} + +// Start the WatchCat. +func (wc *WatchCat) Start() { + wc.Stop() + wc.lastPosition = types.Position{} + wc.ctx, wc.cancel = context.WithCancel(context.Background()) + go func() { + var lastPos types.Position + MonitorLoop: + for { + select { + case <-wc.ctx.Done(): + return + default: + } + select { + case <-wc.ctx.Done(): + return + case pos := <-wc.feed: + if !pos.Newer(lastPos) { + wc.logger.Warn("Feed with older height", + "pos", pos, "lastPos", lastPos) + continue + } + lastPos = pos + case <-time.After(wc.timeout): + break MonitorLoop + } + } + go func() { + for { + select { + case <-wc.ctx.Done(): + return + case <-wc.feed: + } + } + }() + defer wc.cancel() + proposed := false + threshold := uint64(utils.GetDKGThreshold( + utils.GetConfigWithPanic(wc.configReader, lastPos.Round, wc.logger))) + wc.logger.Info("Threshold for recovery", "votes", threshold) + ResetLoop: + for { + if !proposed { + wc.logger.Info("Calling Recovery.ProposeSkipBlock", + "height", lastPos.Height) + if err := wc.recovery.ProposeSkipBlock(lastPos.Height); err != nil { + wc.logger.Warn("Failed to proposeSkipBlock", "height", lastPos.Height, "error", err) + } else { + proposed = true + } + } + votes, err := wc.recovery.Votes(lastPos.Height) + if err != nil { + wc.logger.Error("Failed to get recovery votes", "height", lastPos.Height, "error", err) + } else if votes >= threshold { + wc.logger.Info("Threshold for recovery reached!") + wc.lastPosition = lastPos + break ResetLoop + } + select { + case <-wc.ctx.Done(): + return + case <-time.After(wc.polling): + } + } + }() +} + +// Stop the WatchCat. +func (wc *WatchCat) Stop() { + if wc.cancel != nil { + wc.cancel() + } +} + +// Meow return a closed channel if syncer should be terminated. +func (wc *WatchCat) Meow() <-chan struct{} { + return wc.ctx.Done() +} + +// LastPosition returns the last position for recovery. +func (wc *WatchCat) LastPosition() types.Position { + return wc.lastPosition +} |