diff options
author | Jimmy Hu <jimmy.hu@dexon.org> | 2018-10-25 16:59:30 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-25 16:59:30 +0800 |
commit | 04eeac10e6c690e62ae57ef0e2bdf4618b8782d1 (patch) | |
tree | e0b95167d1f42a9304fb9e924378464edbb517e9 /core/lattice.go | |
parent | 233f1e8de99bf2a0023f05d1c67e48cc770621df (diff) | |
download | dexon-consensus-04eeac10e6c690e62ae57ef0e2bdf4618b8782d1.tar.gz dexon-consensus-04eeac10e6c690e62ae57ef0e2bdf4618b8782d1.tar.zst dexon-consensus-04eeac10e6c690e62ae57ef0e2bdf4618b8782d1.zip |
core: lattice sync (#257)
Diffstat (limited to 'core/lattice.go')
-rw-r--r-- | core/lattice.go | 115 |
1 files changed, 78 insertions, 37 deletions
diff --git a/core/lattice.go b/core/lattice.go index 3259f35..69ad51c 100644 --- a/core/lattice.go +++ b/core/lattice.go @@ -34,6 +34,7 @@ type Lattice struct { app Application debug Debug pool blockPool + retryAdd bool data *latticeData toModule *totalOrdering ctModule *consensusTimestamp @@ -137,11 +138,6 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) { s.lock.RLock() defer s.lock.RUnlock() if err = s.data.sanityCheck(b); err != nil { - // Add to block pool, once the lattice updated, - // would be checked again. - if err == ErrAckingBlockNotExists { - s.pool.addBlock(b) - } s.logger.Error("Sanity Check failed", "error", err) return } @@ -159,51 +155,85 @@ func (s *Lattice) SanityCheck(b *types.Block) (err error) { return } +// addBlockToLattice adds a block into lattice, and deliver blocks with the acks +// already delivered. +// +// NOTE: assume the block passed sanity check. +func (s *Lattice) addBlockToLattice( + input *types.Block) (outputBlocks []*types.Block, err error) { + s.lock.Lock() + defer s.lock.Unlock() + if tip := s.data.chains[input.Position.ChainID].tip; tip != nil { + if !input.Position.Newer(&tip.Position) { + return + } + } + s.pool.addBlock(input) + // Replay tips in pool to check their validity. + for { + hasOutput := false + for i := uint32(0); i < s.chainNum; i++ { + var tip *types.Block + if tip = s.pool.tip(i); tip == nil { + continue + } + err = s.data.sanityCheck(tip) + if err == nil { + var output []*types.Block + if output, err = s.data.addBlock(tip); err != nil { + s.logger.Error("Sanity Check failed", "error", err) + continue + } + hasOutput = true + outputBlocks = append(outputBlocks, output...) + } + if _, ok := err.(*ErrAckingBlockNotExists); ok { + err = nil + continue + } + s.pool.removeTip(i) + } + if !hasOutput { + break + } + } + + for _, b := range outputBlocks { + // TODO(jimmy-dexon): change this name of classic DEXON algorithm. + if s.debug != nil { + s.debug.StronglyAcked(b.Hash) + } + s.logger.Debug("Calling Application.BlockConfirmed", "block", input) + s.app.BlockConfirmed(*b.Clone()) + // Purge blocks in pool with the same chainID and lower height. + s.pool.purgeBlocks(b.Position.ChainID, b.Position.Height) + } + + return +} + // ProcessBlock adds a block into lattice, and deliver ordered blocks. // If any block pass sanity check after this block add into lattice, they // would be returned, too. // // NOTE: assume the block passed sanity check. func (s *Lattice) ProcessBlock( - input *types.Block) (verified, delivered []*types.Block, err error) { - + input *types.Block) (delivered []*types.Block, err error) { var ( - tip, b *types.Block - toDelivered []*types.Block + b *types.Block inLattice []*types.Block + toDelivered []*types.Block deliveredMode uint32 ) - s.lock.Lock() - defer s.lock.Unlock() - if inLattice, err = s.data.addBlock(input); err != nil { - // TODO(mission): if sanity check failed with "acking block doesn't - // exists", we should keep it in a pool. - s.logger.Error("Sanity Check failed when adding blocks", "error", err) + + if inLattice, err = s.addBlockToLattice(input); err != nil { return } - // TODO(mission): remove this hack, BA related stuffs should not - // be done here. - if s.debug != nil { - s.debug.StronglyAcked(input.Hash) - } - s.logger.Debug("Calling Application.BlockConfirmed", "block", input) - s.app.BlockConfirmed(*input.Clone()) - // Purge blocks in pool with the same chainID and lower height. - s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height) - // Replay tips in pool to check their validity. - for i := uint32(0); i < s.chainNum; i++ { - if tip = s.pool.tip(i); tip == nil { - continue - } - err = s.data.sanityCheck(tip) - if err == nil { - verified = append(verified, tip) - } - if err == ErrAckingBlockNotExists { - continue - } - s.pool.removeTip(i) + + if len(inLattice) == 0 { + return } + // Perform total ordering for each block added to lattice. for _, b = range inLattice { toDelivered, deliveredMode, err = s.toModule.processBlock(b) @@ -265,3 +295,14 @@ func (s *Lattice) AppendConfig(round uint64, config *types.Config) (err error) { } return } + +// ProcessFinalizedBlock is used for syncing lattice data. +func (s *Lattice) ProcessFinalizedBlock(input *types.Block) { + defer func() { s.retryAdd = true }() + s.lock.Lock() + defer s.lock.Unlock() + if err := s.data.addFinalizedBlock(input); err != nil { + panic(err) + } + s.pool.purgeBlocks(input.Position.ChainID, input.Position.Height) +} |