diff options
Diffstat (limited to 'dex')
-rw-r--r-- | dex/api.go | 16 | ||||
-rw-r--r-- | dex/app.go | 5 | ||||
-rw-r--r-- | dex/backend.go | 41 | ||||
-rw-r--r-- | dex/blockproposer.go | 201 | ||||
-rw-r--r-- | dex/handler.go | 8 |
5 files changed, 247 insertions, 24 deletions
diff --git a/dex/api.go b/dex/api.go index 40928a5c1..4d39f9f6e 100644 --- a/dex/api.go +++ b/dex/api.go @@ -131,6 +131,22 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) { return true, nil } +func (api *PrivateAdminAPI) StartProposing() error { + return api.dex.StartProposing() +} + +func (api *PrivateAdminAPI) StopProposing() { + api.dex.StopProposing() +} + +func (api *PrivateAdminAPI) IsLatticeSyncing() bool { + return api.dex.IsLatticeSyncing() +} + +func (api *PrivateAdminAPI) IsProposing() bool { + return api.dex.IsProposing() +} + // PublicDebugAPI is the collection of Ethereum full node APIs exposed // over the public debugging endpoint. type PublicDebugAPI struct { diff --git a/dex/app.go b/dex/app.go index 9dcfd87e9..d04b2afd6 100644 --- a/dex/app.go +++ b/dex/app.go @@ -451,6 +451,9 @@ func (d *DexconApp) BlockDelivered( blockPosition coreTypes.Position, result coreTypes.FinalizationResult) { + log.Debug("DexconApp block deliver", "height", result.Height, "hash", blockHash, "position", blockPosition.String()) + defer log.Debug("DexconApp block delivered", "height", result.Height, "hash", blockHash, "position", blockPosition.String()) + chainID := blockPosition.ChainID d.chainLock(chainID) defer d.chainUnlock(chainID) @@ -461,6 +464,7 @@ func (d *DexconApp) BlockDelivered( } block.Payload = nil + block.Finalization = result dexconMeta, err := rlp.EncodeToBytes(block) if err != nil { panic(err) @@ -501,6 +505,7 @@ func (d *DexconApp) BlockConfirmed(block coreTypes.Block) { d.chainLock(block.Position.ChainID) defer d.chainUnlock(block.Position.ChainID) + log.Debug("DexconApp block confirmed", "block", block.String()) if err := d.blockchain.AddConfirmedBlock(&block); err != nil { panic(err) } diff --git a/dex/backend.go b/dex/backend.go index 5eb9a85fc..8fe38cd45 100644 --- a/dex/backend.go +++ b/dex/backend.go @@ -21,9 +21,6 @@ import ( "fmt" "time" - dexCore "github.com/dexon-foundation/dexon-consensus/core" - coreEcdsa "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa" - "github.com/dexon-foundation/dexon/accounts" "github.com/dexon-foundation/dexon/consensus" "github.com/dexon-foundation/dexon/consensus/dexcon" @@ -31,7 +28,6 @@ import ( "github.com/dexon-foundation/dexon/core/bloombits" "github.com/dexon-foundation/dexon/core/rawdb" "github.com/dexon-foundation/dexon/core/vm" - dexDB "github.com/dexon-foundation/dexon/dex/db" "github.com/dexon-foundation/dexon/dex/downloader" "github.com/dexon-foundation/dexon/eth/filters" "github.com/dexon-foundation/dexon/eth/gasprice" @@ -74,7 +70,8 @@ type Dexon struct { app *DexconApp governance *DexconGovernance network *DexconNetwork - consensus *dexCore.Consensus + + bp *blockProposer networkID uint64 netRPCService *ethapi.PublicNetAPI @@ -154,6 +151,14 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { // Set config fetcher so engine can fetch current system configuration from state. engine.SetConfigFetcher(dex.governance) + dMoment := time.Unix(config.DMoment, int64(0)) + log.Info("DEXON Consensus DMoment", "time", dMoment) + + // Force starting with full sync mode if this node is a bootstrap proposer. + if config.BlockProposerEnabled && dMoment.After(time.Now()) { + config.SyncMode = downloader.FullSync + } + pm, err := NewProtocolManager(dex.chainConfig, config.SyncMode, config.NetworkId, dex.eventMux, dex.txPool, dex.engine, dex.blockchain, chainDb, config.BlockProposerEnabled, dex.governance, dex.app) @@ -164,13 +169,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Dexon, error) { dex.protocolManager = pm dex.network = NewDexconNetwork(pm) - privKey := coreEcdsa.NewPrivateKeyFromECDSA(config.PrivateKey) - - dMoment := time.Unix(config.DMoment, int64(0)) - log.Info("DEXON Consensus DMoment", "time", dMoment) - - dex.consensus = dexCore.NewConsensus(dMoment, - dex.app, dex.governance, dexDB.NewDatabase(chainDb), dex.network, privKey, log.Root()) + dex.bp = NewBlockProposer(dex, dMoment) return dex, nil } @@ -240,15 +239,25 @@ func (s *Dexon) Start(srvr *p2p.Server) error { } func (s *Dexon) Stop() error { - s.consensus.Stop() + s.bp.Stop() s.app.Stop() return nil } func (s *Dexon) StartProposing() error { - // TODO: Run with the latest confirmed block in compaction chain. - s.consensus.Run() - return nil + return s.bp.Start() +} + +func (s *Dexon) StopProposing() { + s.bp.Stop() +} + +func (s *Dexon) IsLatticeSyncing() bool { + return s.bp.IsLatticeSyncing() +} + +func (s *Dexon) IsProposing() bool { + return s.bp.IsProposing() } // CreateDB creates the chain database. diff --git a/dex/blockproposer.go b/dex/blockproposer.go new file mode 100644 index 000000000..21b8ddbde --- /dev/null +++ b/dex/blockproposer.go @@ -0,0 +1,201 @@ +package dex + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + dexCore "github.com/dexon-foundation/dexon-consensus/core" + coreEcdsa "github.com/dexon-foundation/dexon-consensus/core/crypto/ecdsa" + "github.com/dexon-foundation/dexon-consensus/core/syncer" + coreTypes "github.com/dexon-foundation/dexon-consensus/core/types" + + "github.com/dexon-foundation/dexon/core" + "github.com/dexon-foundation/dexon/dex/db" + "github.com/dexon-foundation/dexon/log" + "github.com/dexon-foundation/dexon/rlp" +) + +type blockProposer struct { + mu sync.Mutex + running int32 + syncing int32 + proposing int32 + dex *Dexon + dMoment time.Time + + wg sync.WaitGroup + stopCh chan struct{} +} + +func NewBlockProposer(dex *Dexon, dMoment time.Time) *blockProposer { + return &blockProposer{ + dex: dex, + dMoment: dMoment, + } +} + +func (b *blockProposer) Start() error { + b.mu.Lock() + defer b.mu.Unlock() + + if !atomic.CompareAndSwapInt32(&b.running, 0, 1) { + return fmt.Errorf("block proposer is already running") + } + log.Info("Block proposer started") + + b.stopCh = make(chan struct{}) + b.wg.Add(1) + go func() { + defer b.wg.Done() + defer atomic.StoreInt32(&b.running, 0) + + var err error + var c *dexCore.Consensus + + if b.dMoment.After(time.Now()) { + c = b.initConsensus() + } else { + c, err = b.syncConsensus() + } + + if err != nil { + log.Error("Block proposer stopped, before start running", "err", err) + return + } + + b.run(c) + log.Info("Block proposer successfully stopped") + }() + return nil +} + +func (b *blockProposer) run(c *dexCore.Consensus) { + log.Info("Start running consensus core") + go c.Run() + atomic.StoreInt32(&b.proposing, 1) + <-b.stopCh + log.Debug("Block proposer receive stop signal") + c.Stop() +} + +func (b *blockProposer) Stop() { + b.mu.Lock() + defer b.mu.Unlock() + + if atomic.LoadInt32(&b.running) == 1 { + b.dex.protocolManager.isBlockProposer = false + close(b.stopCh) + b.wg.Wait() + atomic.StoreInt32(&b.proposing, 0) + } +} + +func (b *blockProposer) IsLatticeSyncing() bool { + return atomic.LoadInt32(&b.syncing) == 1 +} + +func (b *blockProposer) IsProposing() bool { + return atomic.LoadInt32(&b.proposing) == 1 +} + +func (b *blockProposer) initConsensus() *dexCore.Consensus { + db := db.NewDatabase(b.dex.chainDb) + privkey := coreEcdsa.NewPrivateKeyFromECDSA(b.dex.config.PrivateKey) + return dexCore.NewConsensus(b.dMoment, + b.dex.app, b.dex.governance, db, b.dex.network, privkey, log.Root()) +} + +func (b *blockProposer) syncConsensus() (*dexCore.Consensus, error) { + atomic.StoreInt32(&b.syncing, 1) + defer atomic.StoreInt32(&b.syncing, 0) + + db := db.NewDatabase(b.dex.chainDb) + privkey := coreEcdsa.NewPrivateKeyFromECDSA(b.dex.config.PrivateKey) + consensusSync := syncer.NewConsensus(b.dMoment, b.dex.app, b.dex.governance, + db, b.dex.network, privkey, log.Root()) + + blocksToSync := func(coreHeight, height uint64) []*coreTypes.Block { + var blocks []*coreTypes.Block + for len(blocks) < 1024 && coreHeight < height { + var block coreTypes.Block + b := b.dex.blockchain.GetBlockByNumber(coreHeight + 1) + if err := rlp.DecodeBytes(b.Header().DexconMeta, &block); err != nil { + panic(err) + } + blocks = append(blocks, &block) + coreHeight = coreHeight + 1 + } + return blocks + } + + // Sync all blocks in compaction chain to core. + _, coreHeight := db.GetCompactionChainTipInfo() + +Loop: + for { + currentBlock := b.dex.blockchain.CurrentBlock() + log.Debug("Syncing compaction chain", "core height", coreHeight, + "height", currentBlock.NumberU64()) + blocks := blocksToSync(coreHeight, currentBlock.NumberU64()) + + if len(blocks) == 0 { + break Loop + } + + log.Debug("Filling compaction chain", "num", len(blocks), + "first", blocks[0].Finalization.Height, + "last", blocks[len(blocks)-1].Finalization.Height) + if _, err := consensusSync.SyncBlocks(blocks, false); err != nil { + return nil, err + } + coreHeight = blocks[len(blocks)-1].Finalization.Height + + select { + case <-b.stopCh: + return nil, errors.New("early stop") + default: + } + } + + // Enable isBlockProposer flag to start receiving msg. + b.dex.protocolManager.isBlockProposer = true + + ch := make(chan core.ChainHeadEvent) + sub := b.dex.blockchain.SubscribeChainHeadEvent(ch) + defer sub.Unsubscribe() + + // Listen chain head event until synced. +ListenLoop: + for { + select { + case ev := <-ch: + blocks := blocksToSync(coreHeight, ev.Block.NumberU64()) + if len(blocks) > 0 { + log.Debug("Filling compaction chain", "num", len(blocks), + "first", blocks[0].Finalization.Height, + "last", blocks[len(blocks)-1].Finalization.Height) + synced, err := consensusSync.SyncBlocks(blocks, true) + if err != nil { + log.Error("SyncBlocks fail", "err", err) + return nil, err + } + if synced { + log.Debug("Consensus core synced") + break ListenLoop + } + coreHeight = blocks[len(blocks)-1].Finalization.Height + } + case <-sub.Err(): + log.Debug("System stopped when syncing consensus core") + return nil, errors.New("system stop") + case <-b.stopCh: + log.Debug("Early stop, before consensus core can run") + return nil, errors.New("early stop") + } + } + + return consensusSync.GetSyncedConsensus() +} diff --git a/dex/handler.go b/dex/handler.go index 9174d8516..ff87884d2 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -713,10 +713,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == NewBlockHashesMsg: - // Ignore new block hash messages in block proposer mode. - if pm.isBlockProposer { - break - } var announces newBlockHashesData if err := msg.Decode(&announces); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) @@ -737,10 +733,6 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } case msg.Code == NewBlockMsg: - // Ignore new block messages in block proposer mode. - if pm.isBlockProposer { - break - } // Retrieve and decode the propagated block var block types.Block if err := msg.Decode(&block); err != nil { |