diff options
author | Sonic <sonic@dexon.org> | 2018-11-20 14:13:53 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2019-04-09 21:32:53 +0800 |
commit | afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef (patch) | |
tree | 28013bb14a19611adecda49d1dfbeb0a34dad66a /dex | |
parent | 2113837c006aad6af75c09d37514591fd6863dbc (diff) | |
download | dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar.gz dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.tar.zst dexon-afc01df41cafd3a8b56db9f32e23da3bf0e6b7ef.zip |
dex: implement downloader for dex
We need governance state to verify block's signature (randomness),
but in ethereum fast sync mode, eth downloader only downloads the whole
state of pivot block, so we don't have governance state to verify the
downloaded block that is before pivot block if we don't processing
transaction.
To avoid running transactions, dex downloader also downloads the
governance state (merkle proof and storage) at snapshot height of each round,
so that we can verify blocks in fast sync mode.
Diffstat (limited to 'dex')
-rw-r--r-- | dex/downloader/downloader.go | 199 | ||||
-rw-r--r-- | dex/downloader/fakepeer.go | 17 | ||||
-rw-r--r-- | dex/downloader/governance.go | 170 | ||||
-rw-r--r-- | dex/downloader/metrics.go | 5 | ||||
-rw-r--r-- | dex/downloader/peer.go | 19 | ||||
-rw-r--r-- | dex/downloader/queue.go | 42 | ||||
-rw-r--r-- | dex/downloader/types.go | 11 | ||||
-rw-r--r-- | dex/governance.go | 4 | ||||
-rw-r--r-- | dex/handler.go | 85 | ||||
-rw-r--r-- | dex/peer.go | 25 | ||||
-rw-r--r-- | dex/protocol.go | 10 |
11 files changed, 499 insertions, 88 deletions
diff --git a/dex/downloader/downloader.go b/dex/downloader/downloader.go index 0383a3709..69d95d801 100644 --- a/dex/downloader/downloader.go +++ b/dex/downloader/downloader.go @@ -1,4 +1,3 @@ -// Copyright 2015 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -25,15 +24,21 @@ import ( "sync/atomic" "time" + dexCore "github.com/dexon-foundation/dexon-consensus/core" + ethereum "github.com/dexon-foundation/dexon" "github.com/dexon-foundation/dexon/common" "github.com/dexon-foundation/dexon/core/rawdb" + "github.com/dexon-foundation/dexon/core/state" "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/core/vm" + "github.com/dexon-foundation/dexon/crypto" "github.com/dexon-foundation/dexon/ethdb" "github.com/dexon-foundation/dexon/event" "github.com/dexon-foundation/dexon/log" "github.com/dexon-foundation/dexon/metrics" "github.com/dexon-foundation/dexon/params" + "github.com/dexon-foundation/dexon/trie" ) var ( @@ -63,11 +68,10 @@ var ( reorgProtThreshold = 48 // Threshold number of recent blocks to disable mini reorg protection reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs - fsHeaderCheckFrequency = 100 // Verification frequency of the downloaded headers during fast sync - fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected - fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it - fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download - fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync + fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected + fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it + fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download + fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in fast sync ) var ( @@ -103,6 +107,9 @@ type Downloader struct { peers *peerSet // Set of active peers from which download can proceed stateDB ethdb.Database + gov *governance + verifierCache *dexCore.TSigVerifierCache + rttEstimate uint64 // Round trip time to target for download requests rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops) @@ -125,12 +132,14 @@ type Downloader struct { committed int32 // Channels - headerCh chan dataPack // [eth/62] Channel receiving inbound block headers - bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies - receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts - bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks - receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks - headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks + headerCh chan dataPack // [eth/62] Channel receiving inbound block headers + govStateCh chan dataPack + bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies + receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts + bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks + receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks + + headerProcCh chan []*types.HeaderWithGovState // [eth/62] Channel to feed the header processor new tasks // for stateFetcher stateSyncStart chan *stateSync @@ -161,14 +170,18 @@ type LightChain interface { // GetHeaderByHash retrieves a header from the local chain. GetHeaderByHash(common.Hash) *types.Header + GetHeaderByNumber(number uint64) *types.Header + // CurrentHeader retrieves the head header from the local chain. CurrentHeader() *types.Header + GetGovStateByNumber(number uint64) (*types.GovState, error) + // GetTd returns the total difficulty of a local block. GetTd(common.Hash, uint64) *big.Int - // InsertHeaderChain inserts a batch of headers into the local chain. - InsertHeaderChain([]*types.Header, int) (int, error) + // InsertHeaderChain2 inserts a batch of headers into the local chain. + InsertHeaderChain2([]*types.HeaderWithGovState, *dexCore.TSigVerifierCache) (int, error) // Rollback removes a few recently added elements from the local chain. Rollback([]common.Hash) @@ -193,8 +206,8 @@ type BlockChain interface { // FastSyncCommitHead directly commits the head block to a certain entity. FastSyncCommitHead(common.Hash) error - // InsertChain inserts a batch of blocks into the local chain. - InsertChain(types.Blocks) (int, error) + // InsertChain2 inserts a batch of blocks into the local chain. + InsertChain2(types.Blocks) (int, error) // InsertReceiptChain inserts a batch of receipts into the local chain. InsertReceiptChain(types.Blocks, []types.Receipts) (int, error) @@ -218,11 +231,12 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockC lightchain: lightchain, dropPeer: dropPeer, headerCh: make(chan dataPack, 1), + govStateCh: make(chan dataPack, 1), bodyCh: make(chan dataPack, 1), receiptCh: make(chan dataPack, 1), bodyWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1), - headerProcCh: make(chan []*types.Header, 1), + headerProcCh: make(chan []*types.HeaderWithGovState, 1), quitCh: make(chan struct{}), stateCh: make(chan dataPack), stateSyncStart: make(chan *stateSync), @@ -457,6 +471,35 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I if d.mode == FastSync && pivot != 0 { d.committed = 0 } + + if d.mode == FastSync || d.mode == LightSync { + // fetch gov state + govState, err := d.fetchGovState(p, latest.Hash(), latest.Root) + if err != nil { + return err + } + + originHeader := d.lightchain.GetHeaderByNumber(origin) + if originHeader == nil { + return fmt.Errorf("origin header not exists, number: %d", origin) + } + + // prepare state origin - 2 + d.gov = newGovernance(govState) + for i := uint64(0); i < 3; i++ { + if originHeader.Round >= i { + h := d.gov.GetRoundHeight(originHeader.Round - i) + s, err := d.lightchain.GetGovStateByNumber(h) + if err != nil { + return err + } + d.gov.StoreState(s) + } + } + + d.verifierCache = dexCore.NewTSigVerifierCache(d.gov, 5) + } + // Initiate the sync using a concurrent header and content retrieval algorithm d.queue.Prepare(origin+1, d.mode) if d.syncInitHook != nil { @@ -551,7 +594,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { // Request the advertised remote head block and wait for the response head, _ := p.peer.Head() - go p.peer.RequestHeadersByHash(head, 1, 0, false) + go p.peer.RequestHeadersByHash(head, 1, 0, false, false) ttl := d.requestTTL() timeout := time.After(ttl) @@ -572,7 +615,7 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { p.log.Debug("Multiple headers for single request", "headers", len(headers)) return nil, errBadPeer } - head := headers[0] + head := headers[0].Header p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash()) return head, nil @@ -587,6 +630,69 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { } } +func (d *Downloader) fetchGovState(p *peerConnection, + hash common.Hash, root common.Hash) (*types.GovState, error) { + go p.peer.RequestGovStateByHash(hash) + + ttl := d.requestTTL() + timeout := time.After(ttl) + for { + select { + case <-d.cancelCh: + return nil, errCancelBlockFetch + case packet := <-d.govStateCh: + if packet.PeerId() != p.id { + log.Debug("Received gov state from incorrect peer", "peer", packet.PeerId()) + break + } + + // TODO(sonic): refactor this. + govState := packet.(*govStatePack).govState + + // reconstruct the gov state + db := ethdb.NewMemDatabase() + for _, value := range govState.Proof { + db.Put(crypto.Keccak256(value), value) + } + + // proof and state should split + // check the state object of governance contract + key := crypto.Keccak256(vm.GovernanceContractAddress.Bytes()) + _, _, err := trie.VerifyProof(root, key, db) + if err != nil { + return nil, err + } + + triedb := trie.NewDatabase(db) + t, err := trie.New(common.Hash{}, triedb) + for _, entry := range govState.Storage { + t.TryUpdate(entry[0], entry[1]) + } + err = triedb.Commit(t.Hash(), false) + if err != nil { + return nil, err + } + + statedb, err := state.New(root, state.NewDatabase(db)) + if err != nil { + return nil, err + } + + storageTrie := statedb.StorageTrie(vm.GovernanceContractAddress) + if storageTrie == nil { + return nil, fmt.Errorf("storage not match") + } + return govState, nil + case <-timeout: + p.log.Debug("Waiting for head header timed out", "elapsed", ttl) + return nil, errTimeout + + case <-d.bodyCh: + case <-d.receiptCh: + } + } +} + // findAncestor tries to locate the common ancestor link of the local chain and // a remote peers blockchain. In the general case when our node was in sync and // on the correct chain, checking the top N links should already get us a match. @@ -621,7 +727,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err if count > limit { count = limit } - go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false) + go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false, false) // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} @@ -705,7 +811,7 @@ func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, err ttl := d.requestTTL() timeout := time.After(ttl) - go p.peer.RequestHeadersByNumber(check, 1, 0, false) + go p.peer.RequestHeadersByNumber(check, 1, 0, false, false) // Wait until a reply arrives to this request for arrived := false; !arrived; { @@ -789,10 +895,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) if skeleton { p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) - go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) + go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false, true) } else { p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) - go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) + go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false, true) } } // Start pulling the header chain skeleton until all is done @@ -935,7 +1041,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) // // The method returns the entire filled skeleton and also the number of headers // already forwarded for processing. -func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) { +func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.HeaderWithGovState) ([]*types.HeaderWithGovState, int, error) { log.Debug("Filling up skeleton", "from", from) d.queue.ScheduleSkeleton(from, skeleton) @@ -1235,9 +1341,9 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er case <-d.cancelCh: return errCancelHeaderProcessing - case headers := <-d.headerProcCh: + case headersWithGovState := <-d.headerProcCh: // Terminate header processing if we synced up - if len(headers) == 0 { + if len(headersWithGovState) == 0 { // Notify everyone that headers are fully processed for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { select { @@ -1283,7 +1389,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er // Otherwise split the chunk of headers into batches and process them gotHeaders = true - for len(headers) > 0 { + for len(headersWithGovState) > 0 { // Terminate if something failed in between processing chunks select { case <-d.cancelCh: @@ -1292,29 +1398,35 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er } // Select the next chunk of headers to import limit := maxHeadersProcess - if limit > len(headers) { - limit = len(headers) + if limit > len(headersWithGovState) { + limit = len(headersWithGovState) } - chunk := headers[:limit] + chunk := headersWithGovState[:limit] // In case of header only syncing, validate the chunk immediately if d.mode == FastSync || d.mode == LightSync { + // TODO(sonic) update the gov state to make TSigVerify correct // Collect the yet unknown headers to mark them as uncertain - unknown := make([]*types.Header, 0, len(headers)) + unknown := make([]*types.Header, 0, len(headersWithGovState)) for _, header := range chunk { if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) { - unknown = append(unknown, header) + unknown = append(unknown, header.Header) } } - // If we're importing pure headers, verify based on their recentness - frequency := fsHeaderCheckFrequency - if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { - frequency = 1 + + for _, header := range chunk { + if header.GovState != nil { + log.Debug("Got gov state, store it", "round", header.Round, "number", header.Number.Uint64()) + d.gov.StoreState(header.GovState) + } } - if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { + + if n, err := d.lightchain.InsertHeaderChain2(chunk, d.verifierCache); err != nil { // If some headers were inserted, add them too to the rollback list if n > 0 { - rollback = append(rollback, chunk[:n]...) + for _, h := range chunk[:n] { + rollback = append(rollback, h.Header) + } } log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err) return errInvalidChain @@ -1324,6 +1436,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er if len(rollback) > fsHeaderSafetyNet { rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...) } + } // Unless we're doing light chains, schedule the headers for associated content retrieval if d.mode == FullSync || d.mode == FastSync { @@ -1342,7 +1455,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er return errBadPeer } } - headers = headers[limit:] + headersWithGovState = headersWithGovState[limit:] origin += uint64(limit) } @@ -1400,7 +1513,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { for i, result := range results { blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) } - if index, err := d.blockchain.InsertChain(blocks); err != nil { + if index, err := d.blockchain.InsertChain2(blocks); err != nil { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) return errInvalidChain } @@ -1565,10 +1678,14 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error { // DeliverHeaders injects a new batch of block headers received from a remote // node into the download schedule. -func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { +func (d *Downloader) DeliverHeaders(id string, headers []*types.HeaderWithGovState) (err error) { return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter) } +func (d *Downloader) DeliverGovState(id string, govState *types.GovState) error { + return d.deliver(id, d.govStateCh, &govStatePack{id, govState}, govStateInMeter, govStateDropMeter) +} + // DeliverBodies injects a new batch of block bodies received from a remote node. func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) { return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter) diff --git a/dex/downloader/fakepeer.go b/dex/downloader/fakepeer.go index 3e29357ba..f0d596a4b 100644 --- a/dex/downloader/fakepeer.go +++ b/dex/downloader/fakepeer.go @@ -88,7 +88,14 @@ func (p *FakePeer) RequestHeadersByHash(hash common.Hash, amount int, skip int, } } } - p.dl.DeliverHeaders(p.id, headers) + + // TODO(sonic): fix this + var headersWithGovState []*types.HeaderWithGovState + for _, h := range headers { + headersWithGovState = append(headersWithGovState, + &types.HeaderWithGovState{Header: h}) + } + p.dl.DeliverHeaders(p.id, headersWithGovState) return nil } @@ -115,7 +122,13 @@ func (p *FakePeer) RequestHeadersByNumber(number uint64, amount int, skip int, r } headers = append(headers, origin) } - p.dl.DeliverHeaders(p.id, headers) + // TODO(sonic): fix this + var headersWithGovState []*types.HeaderWithGovState + for _, h := range headers { + headersWithGovState = append(headersWithGovState, + &types.HeaderWithGovState{Header: h}) + } + p.dl.DeliverHeaders(p.id, headersWithGovState) return nil } diff --git a/dex/downloader/governance.go b/dex/downloader/governance.go new file mode 100644 index 000000000..40233f1a8 --- /dev/null +++ b/dex/downloader/governance.go @@ -0,0 +1,170 @@ +package downloader + +import ( + "math/big" + "sync" + "time" + + dexCore "github.com/dexon-foundation/dexon-consensus/core" + coreTypes "github.com/dexon-foundation/dexon-consensus/core/types" + dkgTypes "github.com/dexon-foundation/dexon-consensus/core/types/dkg" + + "github.com/dexon-foundation/dexon/common" + "github.com/dexon-foundation/dexon/core/state" + "github.com/dexon-foundation/dexon/core/types" + "github.com/dexon-foundation/dexon/core/vm" + "github.com/dexon-foundation/dexon/crypto" + "github.com/dexon-foundation/dexon/ethdb" + "github.com/dexon-foundation/dexon/log" + "github.com/dexon-foundation/dexon/rlp" + "github.com/dexon-foundation/dexon/trie" +) + +// This is a goverance for fast sync +type governance struct { + db ethdb.Database + headRoot common.Hash + headHeight uint64 + height2Root map[uint64]common.Hash + mu sync.Mutex +} + +func newGovernance(headState *types.GovState) *governance { + db := ethdb.NewMemDatabase() + g := &governance{ + db: db, + headRoot: headState.Root, + headHeight: headState.Number.Uint64(), + height2Root: make(map[uint64]common.Hash), + } + g.StoreState(headState) + + statedb, err := state.New(headState.Root, state.NewDatabase(g.db)) + if statedb == nil || err != nil { + log.Error("New governance fail", "statedb == nil", statedb == nil, "err", err) + } + return g +} + +func (g *governance) getHeadHelper() *vm.GovernanceStateHelper { + return g.getHelper(g.headRoot) +} + +func (g *governance) getHelperByRound(round uint64) *vm.GovernanceStateHelper { + height := g.GetRoundHeight(round) + root, exists := g.height2Root[height] + if !exists { + log.Debug("Gov get helper by round", "round", round, "exists", exists) + return nil + } + return g.getHelper(root) +} + +func (g *governance) getHelper(root common.Hash) *vm.GovernanceStateHelper { + statedb, err := state.New(root, state.NewDatabase(g.db)) + if statedb == nil || err != nil { + return nil + } + return &vm.GovernanceStateHelper{statedb} +} + +func (g *governance) GetRoundHeight(round uint64) uint64 { + var h uint64 + if helper := g.getHeadHelper(); helper != nil { + h = helper.RoundHeight(big.NewInt(int64(round))).Uint64() + } + return h +} + +func (g *governance) StoreState(s *types.GovState) { + g.mu.Lock() + defer g.mu.Unlock() + + // store the hight -> root mapping + g.height2Root[s.Number.Uint64()] = s.Root + + // store the account + for _, node := range s.Proof { + g.db.Put(crypto.Keccak256(node), node) + } + + // store the storage + triedb := trie.NewDatabase(g.db) + t, err := trie.New(common.Hash{}, triedb) + if err != nil { + panic(err) + } + for _, kv := range s.Storage { + t.TryUpdate(kv[0], kv[1]) + } + t.Commit(nil) + triedb.Commit(t.Hash(), false) + + if s.Number.Uint64() > g.headHeight { + log.Debug("Gov head root changed", "number", s.Number.Uint64()) + g.headRoot = s.Root + g.headHeight = s.Number.Uint64() + } +} + +// Return the genesis configuration if round == 0. +func (g *governance) Configuration(round uint64) *coreTypes.Config { + if round < dexCore.ConfigRoundShift { + round = 0 + } else { + round -= dexCore.ConfigRoundShift + } + helper := g.getHelperByRound(round) + if helper == nil { + log.Warn("Get config helper fail", "round - round shift", round) + return nil + } + c := helper.Configuration() + return &coreTypes.Config{ + NumChains: c.NumChains, + LambdaBA: time.Duration(c.LambdaBA) * time.Millisecond, + LambdaDKG: time.Duration(c.LambdaDKG) * time.Millisecond, + K: int(c.K), + PhiRatio: c.PhiRatio, + NotarySetSize: c.NotarySetSize, + DKGSetSize: c.DKGSetSize, + RoundInterval: time.Duration(c.RoundInterval) * time.Millisecond, + MinBlockInterval: time.Duration(c.MinBlockInterval) * time.Millisecond, + } +} + +// DKGComplaints gets all the DKGComplaints of round. +func (g *governance) DKGComplaints(round uint64) []*dkgTypes.Complaint { + helper := g.getHeadHelper() + var dkgComplaints []*dkgTypes.Complaint + for _, pk := range helper.DKGComplaints(big.NewInt(int64(round))) { + x := new(dkgTypes.Complaint) + if err := rlp.DecodeBytes(pk, x); err != nil { + panic(err) + } + dkgComplaints = append(dkgComplaints, x) + } + return dkgComplaints +} + +// DKGMasterPublicKeys gets all the DKGMasterPublicKey of round. +func (g *governance) DKGMasterPublicKeys(round uint64) []*dkgTypes.MasterPublicKey { + helper := g.getHeadHelper() + var dkgMasterPKs []*dkgTypes.MasterPublicKey + for _, pk := range helper.DKGMasterPublicKeys(big.NewInt(int64(round))) { + x := new(dkgTypes.MasterPublicKey) + if err := rlp.DecodeBytes(pk, x); err != nil { + panic(err) + } + dkgMasterPKs = append(dkgMasterPKs, x) + } + return dkgMasterPKs +} + +// IsDKGFinal checks if DKG is final. +func (g *governance) IsDKGFinal(round uint64) bool { + helper := g.getHeadHelper() + threshold := 2*uint64(g.Configuration(round).DKGSetSize)/3 + 1 + count := helper.DKGFinalizedsCount(big.NewInt(int64(round))).Uint64() + return count >= threshold +} diff --git a/dex/downloader/metrics.go b/dex/downloader/metrics.go index 0d6041712..395950759 100644 --- a/dex/downloader/metrics.go +++ b/dex/downloader/metrics.go @@ -28,6 +28,11 @@ var ( headerDropMeter = metrics.NewRegisteredMeter("dex/downloader/headers/drop", nil) headerTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/headers/timeout", nil) + govStateInMeter = metrics.NewRegisteredMeter("dex/downloader/govStates/in", nil) + govStateReqTimer = metrics.NewRegisteredTimer("dex/downloader/govStates/req", nil) + govStateDropMeter = metrics.NewRegisteredMeter("dex/downloader/govStates/drop", nil) + govStateTimeoutMeter = metrics.NewRegisteredMeter("dex/downloader/govStates/timeout", nil) + bodyInMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/in", nil) bodyReqTimer = metrics.NewRegisteredTimer("dex/downloader/bodies/req", nil) bodyDropMeter = metrics.NewRegisteredMeter("dex/downloader/bodies/drop", nil) diff --git a/dex/downloader/peer.go b/dex/downloader/peer.go index 1fd82fbe3..b6f2936b7 100644 --- a/dex/downloader/peer.go +++ b/dex/downloader/peer.go @@ -78,8 +78,9 @@ type peerConnection struct { // LightPeer encapsulates the methods required to synchronise with a remote light peer. type LightPeer interface { Head() (common.Hash, *big.Int) - RequestHeadersByHash(common.Hash, int, int, bool) error - RequestHeadersByNumber(uint64, int, int, bool) error + RequestHeadersByHash(common.Hash, int, int, bool, bool) error + RequestHeadersByNumber(uint64, int, int, bool, bool) error + RequestGovStateByHash(common.Hash) error } // Peer encapsulates the methods required to synchronise with a remote full peer. @@ -96,11 +97,15 @@ type lightPeerWrapper struct { } func (w *lightPeerWrapper) Head() (common.Hash, *big.Int) { return w.peer.Head() } -func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse bool) error { - return w.peer.RequestHeadersByHash(h, amount, skip, reverse) +func (w *lightPeerWrapper) RequestHeadersByHash(h common.Hash, amount int, skip int, reverse, withGov bool) error { + return w.peer.RequestHeadersByHash(h, amount, skip, reverse, withGov) } -func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse bool) error { - return w.peer.RequestHeadersByNumber(i, amount, skip, reverse) +func (w *lightPeerWrapper) RequestHeadersByNumber(i uint64, amount int, skip int, reverse, withGov bool) error { + return w.peer.RequestHeadersByNumber(i, amount, skip, reverse, withGov) +} +func (w *lightPeerWrapper) RequestGovStateByHash(common.Hash) error { + // TODO(sonic): support this + panic("RequestGovStateByHash not supported in light client mode sync") } func (w *lightPeerWrapper) RequestBodies([]common.Hash) error { panic("RequestBodies not supported in light client mode sync") @@ -156,7 +161,7 @@ func (p *peerConnection) FetchHeaders(from uint64, count int) error { p.headerStarted = time.Now() // Issue the header retrieval request (absolut upwards without gaps) - go p.peer.RequestHeadersByNumber(from, count, 0, false) + go p.peer.RequestHeadersByNumber(from, count, 0, false, true) return nil } diff --git a/dex/downloader/queue.go b/dex/downloader/queue.go index 12c75e793..f3a36ec3c 100644 --- a/dex/downloader/queue.go +++ b/dex/downloader/queue.go @@ -68,15 +68,15 @@ type queue struct { mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching // Headers are "special", they download in batches, supported by a skeleton chain - headerHead common.Hash // [eth/62] Hash of the last queued header to verify order - headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers - headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for - headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable - headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations - headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers - headerProced int // [eth/62] Number of headers already processed from the results - headerOffset uint64 // [eth/62] Number of the first header in the result cache - headerContCh chan bool // [eth/62] Channel to notify when header download finishes + headerHead common.Hash // [eth/62] Hash of the last queued header to verify order + headerTaskPool map[uint64]*types.HeaderWithGovState // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers + headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for + headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable + headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations + headerResults []*types.HeaderWithGovState // [eth/62] Result cache accumulating the completed headers + headerProced int // [eth/62] Number of headers already processed from the results + headerOffset uint64 // [eth/62] Number of the first header in the result cache + headerContCh chan bool // [eth/62] Channel to notify when header download finishes // All data retrievals below are based on an already assembles header chain blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers @@ -267,7 +267,7 @@ func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[comm // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill // up an already retrieved header skeleton. -func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { +func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.HeaderWithGovState) { q.lock.Lock() defer q.lock.Unlock() @@ -276,10 +276,10 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { panic("skeleton assembly already in progress") } // Schedule all the header retrieval tasks for the skeleton assembly - q.headerTaskPool = make(map[uint64]*types.Header) + q.headerTaskPool = make(map[uint64]*types.HeaderWithGovState) q.headerTaskQueue = prque.New(nil) q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains - q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) + q.headerResults = make([]*types.HeaderWithGovState, len(skeleton)*MaxHeaderFetch) q.headerProced = 0 q.headerOffset = from q.headerContCh = make(chan bool, 1) @@ -294,7 +294,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { // RetrieveHeaders retrieves the header chain assemble based on the scheduled // skeleton. -func (q *queue) RetrieveHeaders() ([]*types.Header, int) { +func (q *queue) RetrieveHeaders() ([]*types.HeaderWithGovState, int) { q.lock.Lock() defer q.lock.Unlock() @@ -306,7 +306,7 @@ func (q *queue) RetrieveHeaders() ([]*types.Header, int) { // Schedule adds a set of headers for the download queue for scheduling, returning // the new headers encountered. -func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { +func (q *queue) Schedule(headers []*types.HeaderWithGovState, from uint64) []*types.Header { q.lock.Lock() defer q.lock.Unlock() @@ -333,14 +333,14 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { continue } // Queue the header for content retrieval - q.blockTaskPool[hash] = header - q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) + q.blockTaskPool[hash] = header.Header + q.blockTaskQueue.Push(header.Header, -int64(header.Number.Uint64())) if q.mode == FastSync { - q.receiptTaskPool[hash] = header - q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) + q.receiptTaskPool[hash] = header.Header + q.receiptTaskQueue.Push(header.Header, -int64(header.Number.Uint64())) } - inserts = append(inserts, header) + inserts = append(inserts, header.Header) q.headerHead = hash from++ } @@ -679,7 +679,7 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, // If the headers are accepted, the method makes an attempt to deliver the set // of ready headers to the processor to keep the pipeline full. However it will // not block to prevent stalling other pending deliveries. -func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh chan []*types.Header) (int, error) { +func (q *queue) DeliverHeaders(id string, headers []*types.HeaderWithGovState, headerProcCh chan []*types.HeaderWithGovState) (int, error) { q.lock.Lock() defer q.lock.Unlock() @@ -743,7 +743,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh } if ready > 0 { // Headers are ready for delivery, gather them and push forward (non blocking) - process := make([]*types.Header, ready) + process := make([]*types.HeaderWithGovState, ready) copy(process, q.headerResults[q.headerProced:q.headerProced+ready]) select { diff --git a/dex/downloader/types.go b/dex/downloader/types.go index d320b7590..a7143ccd9 100644 --- a/dex/downloader/types.go +++ b/dex/downloader/types.go @@ -35,13 +35,22 @@ type dataPack interface { // headerPack is a batch of block headers returned by a peer. type headerPack struct { peerID string - headers []*types.Header + headers []*types.HeaderWithGovState } func (p *headerPack) PeerId() string { return p.peerID } func (p *headerPack) Items() int { return len(p.headers) } func (p *headerPack) Stats() string { return fmt.Sprintf("%d", len(p.headers)) } +type govStatePack struct { + peerID string + govState *types.GovState +} + +func (p *govStatePack) PeerId() string { return p.peerID } +func (p *govStatePack) Items() int { return 1 } +func (p *govStatePack) Stats() string { return "1" } + // bodyPack is a batch of block bodies returned by a peer. type bodyPack struct { peerID string diff --git a/dex/governance.go b/dex/governance.go index db33f7820..01f39b68c 100644 --- a/dex/governance.go +++ b/dex/governance.go @@ -46,7 +46,7 @@ func NewDexconGovernance(backend *DexAPIBackend, chainConfig *params.ChainConfig return g } -func (d *DexconGovernance) getRoundHeight(ctx context.Context, round uint64) (uint64, error) { +func (d *DexconGovernance) GetRoundHeight(ctx context.Context, round uint64) (uint64, error) { state, _, err := d.b.StateAndHeaderByNumber(ctx, rpc.LatestBlockNumber) if state == nil || err != nil { return 0, err @@ -72,7 +72,7 @@ func (d *DexconGovernance) getGovStateAtRound(round uint64) *vm.GovernanceStateH round -= dexCore.ConfigRoundShift } ctx := context.Background() - blockHeight, err := d.getRoundHeight(ctx, round) + blockHeight, err := d.GetRoundHeight(ctx, round) if err != nil { return nil } diff --git a/dex/handler.go b/dex/handler.go index d66403fe6..0c7a3e919 100644 --- a/dex/handler.go +++ b/dex/handler.go @@ -34,6 +34,7 @@ package dex import ( + "context" "encoding/json" "errors" "fmt" @@ -220,7 +221,7 @@ func NewProtocolManager( return 0, nil } atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import - return manager.blockchain.InsertChain(blocks) + return manager.blockchain.InsertChain2(blocks) } manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) @@ -414,10 +415,11 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { first := true maxNonCanonical := uint64(100) + round := map[uint64]uint64{} // Gather headers until the fetch or network limits is reached var ( bytes common.StorageSize - headers []*types.Header + headers []*types.HeaderWithGovState unknown bool ) for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { @@ -439,7 +441,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if origin == nil { break } - headers = append(headers, origin) + headers = append(headers, &types.HeaderWithGovState{Header: origin}) + if round[origin.Round] == 0 { + round[origin.Round] = origin.Number.Uint64() + } bytes += estHeaderRlpSize // Advance to the next header of the query @@ -489,20 +494,71 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { query.Origin.Number += query.Skip + 1 } } + + ctx := context.Background() + if query.WithGov && len(headers) > 0 { + last := headers[len(headers)-1] + currentBlock := pm.blockchain.CurrentBlock() + + // Do not reply if we don't have current gov state + if currentBlock.NumberU64() < last.Number.Uint64() { + log.Debug("Current block < last request", + "current", currentBlock.NumberU64(), "last", last.Number.Uint64()) + return p.SendBlockHeaders([]*types.HeaderWithGovState{}) + } + + snapshotHeight := map[uint64]struct{}{} + for r, height := range round { + log.Trace("#Include round", "round", r) + if r == 0 { + continue + } + h, err := pm.gov.GetRoundHeight(ctx, r) + if err != nil { + log.Warn("Get round height fail", "err", err) + return p.SendBlockHeaders([]*types.HeaderWithGovState{}) + } + log.Trace("#Snapshot height", "height", h) + if h == 0 { + h = height + } + snapshotHeight[h] = struct{}{} + } + + for _, header := range headers { + if _, exist := snapshotHeight[header.Number.Uint64()]; exist { + s, err := pm.blockchain.GetGovStateByHash(header.Hash()) + if err != nil { + log.Warn("Get gov state by hash fail", "number", header.Number.Uint64(), "err", err) + return p.SendBlockHeaders([]*types.HeaderWithGovState{}) + } + header.GovState = s + } + log.Trace("Send header", "round", header.Round, "number", header.Number.Uint64(), "gov state == nil", header.GovState == nil) + } + } return p.SendBlockHeaders(headers) case msg.Code == BlockHeadersMsg: // A batch of headers arrived to one of our previous requests - var headers []*types.Header + var headers []*types.HeaderWithGovState if err := msg.Decode(&headers); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } // Filter out any explicitly requested headers, deliver the rest to the downloader filter := len(headers) == 1 if filter { - headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) + h := []*types.Header{headers[0].Header} + h = pm.fetcher.FilterHeaders(p.id, h, time.Now()) + if len(h) == 0 { + headers = nil + } + } + for _, header := range headers { + log.Trace("Received header", "round", header.Round, "number", header.Number.Uint64(), "gov state == nil", header.GovState == nil) } if len(headers) > 0 || !filter { + // if the header that has gov state is filter out, the header's gov state is useless err := pm.downloader.DeliverHeaders(p.id, headers) if err != nil { log.Debug("Failed to deliver headers", "err", err) @@ -834,6 +890,25 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return err } } + case msg.Code == GetGovStateMsg: + var hash common.Hash + if err := msg.Decode(&hash); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + govState, err := pm.blockchain.GetGovStateByHash(hash) + if err != nil { + // TODO(sonic): handle this error + panic(err) + } + return p.SendGovState(govState) + case msg.Code == GovStateMsg: + var govState types.GovState + if err := msg.Decode(&govState); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + if err := pm.downloader.DeliverGovState(p.id, &govState); err != nil { + log.Debug("Failed to deliver govstates", "err", err) + } default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } diff --git a/dex/peer.go b/dex/peer.go index 263dc5647..67aa50694 100644 --- a/dex/peer.go +++ b/dex/peer.go @@ -536,7 +536,7 @@ func (p *peer) AsyncSendPullVotes(pos coreTypes.Position) { } // SendBlockHeaders sends a batch of block headers to the remote peer. -func (p *peer) SendBlockHeaders(headers []*types.Header) error { +func (p *peer) SendBlockHeaders(headers []*types.HeaderWithGovState) error { return p2p.Send(p.rw, BlockHeadersMsg, headers) } @@ -563,25 +563,34 @@ func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error { return p2p.Send(p.rw, ReceiptsMsg, receipts) } +func (p *peer) SendGovState(govState *types.GovState) error { + return p2p.Send(p.rw, GovStateMsg, govState) +} + // RequestOneHeader is a wrapper around the header query functions to fetch a // single header. It is used solely by the fetcher. func (p *peer) RequestOneHeader(hash common.Hash) error { p.Log().Debug("Fetching single header", "hash", hash) - return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false}) + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false, WithGov: false}) } // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the // specified header query, based on the hash of an origin block. -func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { - p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) - return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) +func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse, withGov bool) error { + p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse, "withgov", withGov) + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, WithGov: withGov}) } // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the // specified header query, based on the number of an origin block. -func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { - p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) - return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) +func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse, withGov bool) error { + p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse, "withgov", withGov) + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse, WithGov: withGov}) +} + +func (p *peer) RequestGovStateByHash(hash common.Hash) error { + p.Log().Debug("Fetching one gov state", "hash", hash) + return p2p.Send(p.rw, GetGovStateMsg, hash) } // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes diff --git a/dex/protocol.go b/dex/protocol.go index 04928d2b2..0c271248e 100644 --- a/dex/protocol.go +++ b/dex/protocol.go @@ -34,6 +34,7 @@ package dex import ( + "context" "crypto/ecdsa" "fmt" "io" @@ -60,7 +61,7 @@ var ProtocolName = "dex" var ProtocolVersions = []uint{dex64} // ProtocolLengths are the number of implemented message corresponding to different protocol versions. -var ProtocolLengths = []uint64{40} +var ProtocolLengths = []uint64{42} const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message @@ -93,6 +94,9 @@ const ( DKGPartialSignatureMsg = 0x25 PullBlocksMsg = 0x26 PullVotesMsg = 0x27 + + GetGovStateMsg = 0x28 + GovStateMsg = 0x29 ) type errCode int @@ -140,6 +144,8 @@ type txPool interface { } type governance interface { + GetRoundHeight(context.Context, uint64) (uint64, error) + GetNumChains(uint64) uint32 LenCRS() uint64 @@ -184,6 +190,8 @@ type getBlockHeadersData struct { Amount uint64 // Maximum number of headers to retrieve Skip uint64 // Blocks to skip between consecutive headers Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis) + + WithGov bool } // hashOrNumber is a combined field for specifying an origin block. |