From e6f5201b178f40b516ffe7b98757df25f8aee028 Mon Sep 17 00:00:00 2001 From: Wei-Ning Huang Date: Sun, 23 Jun 2019 15:39:23 +0800 Subject: import: switch consensus core to gitlab.com/tangerine-network/tangerine-consensus --- .../tangerine-consensus/core/syncer/agreement.go | 301 +++++++++++++++++++++ 1 file changed, 301 insertions(+) create mode 100644 vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/agreement.go (limited to 'vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/agreement.go') diff --git a/vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/agreement.go b/vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/agreement.go new file mode 100644 index 000000000..c20bd6018 --- /dev/null +++ b/vendor/github.com/tangerine-network/tangerine-consensus/core/syncer/agreement.go @@ -0,0 +1,301 @@ +// Copyright 2018 The dexon-consensus Authors +// This file is part of the dexon-consensus-core library. +// +// The dexon-consensus-core library is free software: you can redistribute it +// and/or modify it under the terms of the GNU Lesser General Public License as +// published by the Free Software Foundation, either version 3 of the License, +// or (at your option) any later version. +// +// The dexon-consensus-core library is distributed in the hope that it will be +// useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +// General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the dexon-consensus-core library. If not, see +// . + +package syncer + +import ( + "bytes" + "context" + "fmt" + "time" + + "github.com/tangerine-network/tangerine-consensus/common" + "github.com/tangerine-network/tangerine-consensus/core" + "github.com/tangerine-network/tangerine-consensus/core/crypto" + "github.com/tangerine-network/tangerine-consensus/core/types" + "github.com/tangerine-network/tangerine-consensus/core/utils" +) + +// Struct agreement implements struct of BA (Byzantine Agreement) protocol +// needed in syncer, which only receives agreement results. +type agreement struct { + chainTip uint64 + cache *utils.NodeSetCache + tsigVerifierCache *core.TSigVerifierCache + inputChan chan interface{} + outputChan chan<- *types.Block + pullChan chan<- common.Hash + blocks map[types.Position]map[common.Hash]*types.Block + agreementResults map[common.Hash][]byte + latestCRSRound uint64 + pendingAgrs map[uint64]map[common.Hash]*types.AgreementResult + pendingBlocks map[uint64]map[common.Hash]*types.Block + logger common.Logger + confirmedBlocks map[common.Hash]struct{} + ctx context.Context + ctxCancel context.CancelFunc +} + +// newAgreement creates a new agreement instance. +func newAgreement(chainTip uint64, + ch chan<- *types.Block, pullChan chan<- common.Hash, + cache *utils.NodeSetCache, verifier *core.TSigVerifierCache, + logger common.Logger) *agreement { + a := &agreement{ + chainTip: chainTip, + cache: cache, + tsigVerifierCache: verifier, + inputChan: make(chan interface{}, 1000), + outputChan: ch, + pullChan: pullChan, + blocks: make(map[types.Position]map[common.Hash]*types.Block), + agreementResults: make(map[common.Hash][]byte), + logger: logger, + pendingAgrs: make( + map[uint64]map[common.Hash]*types.AgreementResult), + pendingBlocks: make( + map[uint64]map[common.Hash]*types.Block), + confirmedBlocks: make(map[common.Hash]struct{}), + } + a.ctx, a.ctxCancel = context.WithCancel(context.Background()) + return a +} + +// run starts the agreement, this does not start a new routine, go a new +// routine explicitly in the caller. +func (a *agreement) run() { + defer a.ctxCancel() + for { + select { + case val, ok := <-a.inputChan: + if !ok { + // InputChan is closed by network when network ends. + return + } + switch v := val.(type) { + case *types.Block: + if v.Position.Round >= core.DKGDelayRound && v.IsFinalized() { + a.processFinalizedBlock(v) + } else { + a.processBlock(v) + } + case *types.AgreementResult: + a.processAgreementResult(v) + case uint64: + a.processNewCRS(v) + } + } + } +} + +func (a *agreement) processBlock(b *types.Block) { + if _, exist := a.confirmedBlocks[b.Hash]; exist { + return + } + if rand, exist := a.agreementResults[b.Hash]; exist { + if len(b.Randomness) == 0 { + b.Randomness = rand + } + a.confirm(b) + } else { + if _, exist := a.blocks[b.Position]; !exist { + a.blocks[b.Position] = make(map[common.Hash]*types.Block) + } + a.blocks[b.Position][b.Hash] = b + } +} + +func (a *agreement) processFinalizedBlock(block *types.Block) { + // Cache those results that CRS is not ready yet. + if _, exists := a.confirmedBlocks[block.Hash]; exists { + a.logger.Trace("finalized block already confirmed", "block", block) + return + } + if block.Position.Round > a.latestCRSRound { + pendingsForRound, exists := a.pendingBlocks[block.Position.Round] + if !exists { + pendingsForRound = make(map[common.Hash]*types.Block) + a.pendingBlocks[block.Position.Round] = pendingsForRound + } + pendingsForRound[block.Hash] = block + a.logger.Trace("finalized block cached", "block", block) + return + } + if err := utils.VerifyBlockSignature(block); err != nil { + return + } + verifier, ok, err := a.tsigVerifierCache.UpdateAndGet( + block.Position.Round) + if err != nil { + a.logger.Error("error verifying block randomness", + "block", block, + "error", err) + return + } + if !ok { + a.logger.Error("cannot verify block randomness", "block", block) + return + } + if !verifier.VerifySignature(block.Hash, crypto.Signature{ + Type: "bls", + Signature: block.Randomness, + }) { + a.logger.Error("incorrect block randomness", "block", block) + return + } + a.confirm(block) +} + +func (a *agreement) processAgreementResult(r *types.AgreementResult) { + // Cache those results that CRS is not ready yet. + if _, exists := a.confirmedBlocks[r.BlockHash]; exists { + a.logger.Trace("Agreement result already confirmed", "result", r) + return + } + if r.Position.Round > a.latestCRSRound { + pendingsForRound, exists := a.pendingAgrs[r.Position.Round] + if !exists { + pendingsForRound = make(map[common.Hash]*types.AgreementResult) + a.pendingAgrs[r.Position.Round] = pendingsForRound + } + pendingsForRound[r.BlockHash] = r + a.logger.Trace("Agreement result cached", "result", r) + return + } + if err := core.VerifyAgreementResult(r, a.cache); err != nil { + a.logger.Error("Agreement result verification failed", + "result", r, + "error", err) + return + } + if r.Position.Round >= core.DKGDelayRound { + verifier, ok, err := a.tsigVerifierCache.UpdateAndGet(r.Position.Round) + if err != nil { + a.logger.Error("error verifying agreement result randomness", + "result", r, + "error", err) + return + } + if !ok { + a.logger.Error("cannot verify agreement result randomness", "result", r) + return + } + if !verifier.VerifySignature(r.BlockHash, crypto.Signature{ + Type: "bls", + Signature: r.Randomness, + }) { + a.logger.Error("incorrect agreement result randomness", "result", r) + return + } + } else { + // Special case for rounds before DKGDelayRound. + if bytes.Compare(r.Randomness, core.NoRand) != 0 { + a.logger.Error("incorrect agreement result randomness", "result", r) + return + } + } + if r.IsEmptyBlock { + b := &types.Block{ + Position: r.Position, + Randomness: r.Randomness, + } + // Empty blocks should be confirmed directly, they won't be sent over + // the wire. + a.confirm(b) + return + } + if bs, exist := a.blocks[r.Position]; exist { + if b, exist := bs[r.BlockHash]; exist { + b.Randomness = r.Randomness + a.confirm(b) + return + } + } + a.agreementResults[r.BlockHash] = r.Randomness +loop: + for { + select { + case a.pullChan <- r.BlockHash: + break loop + case <-a.ctx.Done(): + a.logger.Error("Pull request is not sent", + "position", &r.Position, + "hash", r.BlockHash.String()[:6]) + return + case <-time.After(500 * time.Millisecond): + a.logger.Debug("Pull request is unable to send", + "position", &r.Position, + "hash", r.BlockHash.String()[:6]) + } + } +} + +func (a *agreement) processNewCRS(round uint64) { + if round <= a.latestCRSRound { + return + } + prevRound := a.latestCRSRound + 1 + a.latestCRSRound = round + // Verify all pending results. + for r := prevRound; r <= a.latestCRSRound; r++ { + pendingsForRound := a.pendingAgrs[r] + if pendingsForRound == nil { + continue + } + delete(a.pendingAgrs, r) + for _, res := range pendingsForRound { + if err := core.VerifyAgreementResult(res, a.cache); err != nil { + a.logger.Error("Invalid agreement result", + "result", res, + "error", err) + continue + } + a.logger.Error("Flush agreement result", "result", res) + a.processAgreementResult(res) + break + } + } +} + +// confirm notifies consensus the confirmation of a block in BA. +func (a *agreement) confirm(b *types.Block) { + if !b.IsFinalized() { + panic(fmt.Errorf("confirm a block %s without randomness", b)) + } + if _, exist := a.confirmedBlocks[b.Hash]; !exist { + delete(a.blocks, b.Position) + delete(a.agreementResults, b.Hash) + loop: + for { + select { + case a.outputChan <- b: + break loop + case <-a.ctx.Done(): + a.logger.Error("Confirmed block is not sent", "block", b) + return + case <-time.After(500 * time.Millisecond): + a.logger.Debug("Agreement output channel is full", "block", b) + } + } + a.confirmedBlocks[b.Hash] = struct{}{} + } + if b.Position.Height > a.chainTip+1 { + if _, exist := a.confirmedBlocks[b.ParentHash]; !exist { + a.pullChan <- b.ParentHash + } + } +} -- cgit