diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 143 |
1 files changed, 82 insertions, 61 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index e4d1392d0..a5d03d17e 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -114,21 +114,11 @@ type Downloader struct { syncStatsState stateSyncStats syncStatsLock sync.RWMutex // Lock protecting the sync stats fields + lightchain LightChain + chain BlockChain + // Callbacks - hasHeader headerCheckFn // Checks if a header is present in the chain - hasBlockAndState blockAndStateCheckFn // Checks if a block and associated state is present in the chain - getHeader headerRetrievalFn // Retrieves a header from the chain - getBlock blockRetrievalFn // Retrieves a block from the chain - headHeader headHeaderRetrievalFn // Retrieves the head header from the chain - headBlock headBlockRetrievalFn // Retrieves the head block from the chain - headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain - commitHeadBlock headBlockCommitterFn // Commits a manually assembled block as the chain head - getTd tdRetrievalFn // Retrieves the TD of a block from the chain - insertHeaders headerChainInsertFn // Injects a batch of headers into the chain - insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain - insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain - rollback chainRollbackFn // Removes a batch of recently added chain links - dropPeer peerDropFn // Drops a peer for misbehaving + dropPeer peerDropFn // Drops a peer for misbehaving // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing @@ -163,11 +153,56 @@ type Downloader struct { chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) } +type LightChain interface { + // HasHeader verifies a header's presence in the local chain. + HasHeader(common.Hash) bool + + // GetHeaderByHash retrieves a header from the local chain. + GetHeaderByHash(common.Hash) *types.Header + + // CurrentHeader retrieves the head header from the local chain. + CurrentHeader() *types.Header + + // GetTdByHash returns the total difficulty of a local block. + GetTdByHash(common.Hash) *big.Int + + // InsertHeaderChain inserts a batch of headers into the local chain. + InsertHeaderChain([]*types.Header, int) (int, error) + + // Rollback removes a few recently added elements from the local chain. + Rollback([]common.Hash) +} + +type BlockChain interface { + LightChain + + // HasBlockAndState verifies block and associated states' presence in the local chain. + HasBlockAndState(common.Hash) bool + + // GetBlockByHash retrieves a block from the local chain. + GetBlockByHash(common.Hash) *types.Block + + // CurrentBlock retrieves the head block from the local chain. + CurrentBlock() *types.Block + + // CurrentFastBlock retrieves the head fast block from the local chain. + CurrentFastBlock() *types.Block + + // FastSyncCommitHead directly commits the head block to a certain entity. + FastSyncCommitHead(common.Hash) error + + // InsertChain inserts a batch of blocks into the local chain. + InsertChain(types.Blocks) (int, error) + + // InsertReceiptChain inserts a batch of receipts into the local chain. + InsertReceiptChain(types.Blocks, []types.Receipts) (int, error) +} + // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn, - getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, - headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, - insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader { +func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { + if lightchain == nil { + lightchain = chain + } dl := &Downloader{ mode: mode, @@ -177,19 +212,8 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader he stateDB: stateDb, rttEstimate: uint64(rttMaxEstimate), rttConfidence: uint64(1000000), - hasHeader: hasHeader, - hasBlockAndState: hasBlockAndState, - getHeader: getHeader, - getBlock: getBlock, - headHeader: headHeader, - headBlock: headBlock, - headFastBlock: headFastBlock, - commitHeadBlock: commitHeadBlock, - getTd: getTd, - insertHeaders: insertHeaders, - insertBlocks: insertBlocks, - insertReceipts: insertReceipts, - rollback: rollback, + chain: chain, + lightchain: lightchain, dropPeer: dropPeer, headerCh: make(chan dataPack, 1), bodyCh: make(chan dataPack, 1), @@ -223,11 +247,11 @@ func (d *Downloader) Progress() ethereum.SyncProgress { current := uint64(0) switch d.mode { case FullSync: - current = d.headBlock().NumberU64() + current = d.chain.CurrentBlock().NumberU64() case FastSync: - current = d.headFastBlock().NumberU64() + current = d.chain.CurrentFastBlock().NumberU64() case LightSync: - current = d.headHeader().Number.Uint64() + current = d.lightchain.CurrentHeader().Number.Uint64() } return ethereum.SyncProgress{ StartingBlock: d.syncStatsChainOrigin, @@ -572,13 +596,13 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { // the head links match), we do a binary search to find the common ancestor. func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // Figure out the valid ancestor range to prevent rewrite attacks - floor, ceil := int64(-1), d.headHeader().Number.Uint64() + floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64() p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height) if d.mode == FullSync { - ceil = d.headBlock().NumberU64() + ceil = d.chain.CurrentBlock().NumberU64() } else if d.mode == FastSync { - ceil = d.headFastBlock().NumberU64() + ceil = d.chain.CurrentFastBlock().NumberU64() } if ceil >= MaxForkAncestry { floor = int64(ceil - MaxForkAncestry) @@ -638,7 +662,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { continue } // Otherwise check if we already know the header or not - if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) { + if (d.mode == FullSync && d.chain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash() // If every header is known, even future ones, the peer straight out lied about its head @@ -703,11 +727,11 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { arrived = true // Modify the search interval based on the response - if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) { + if (d.mode == FullSync && !d.chain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash())) { end = check break } - header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists + header := d.lightchain.GetHeaderByHash(headers[0].Hash()) // Independent of sync mode, header surely exists if header.Number.Uint64() != check { p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check) return 0, errBadPeer @@ -1124,23 +1148,19 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { for i, header := range rollback { hashes[i] = header.Hash() } - lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, common.Big0, common.Big0 - if d.headFastBlock != nil { - lastFastBlock = d.headFastBlock().Number() - } - if d.headBlock != nil { - lastBlock = d.headBlock().Number() + lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0 + if d.mode != LightSync { + lastFastBlock = d.chain.CurrentFastBlock().Number() + lastBlock = d.chain.CurrentBlock().Number() } - d.rollback(hashes) + d.lightchain.Rollback(hashes) curFastBlock, curBlock := common.Big0, common.Big0 - if d.headFastBlock != nil { - curFastBlock = d.headFastBlock().Number() - } - if d.headBlock != nil { - curBlock = d.headBlock().Number() + if d.mode != LightSync { + curFastBlock = d.chain.CurrentFastBlock().Number() + curBlock = d.chain.CurrentBlock().Number() } log.Warn("Rolled back headers", "count", len(hashes), - "header", fmt.Sprintf("%d->%d", lastHeader, d.headHeader().Number), + "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number), "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock), "block", fmt.Sprintf("%d->%d", lastBlock, curBlock)) @@ -1190,7 +1210,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // L: Request new headers up from 11 (R's TD was higher, it must have something) // R: Nothing to give if d.mode != LightSync { - if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 { + if !gotHeaders && td.Cmp(d.chain.GetTdByHash(d.chain.CurrentBlock().Hash())) > 0 { return errStallingPeer } } @@ -1202,7 +1222,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // queued for processing when the header download completes. However, as long as the // peer gave us something useful, we're already happy/progressed (above check). if d.mode == FastSync || d.mode == LightSync { - if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 { + if td.Cmp(d.lightchain.GetTdByHash(d.lightchain.CurrentHeader().Hash())) > 0 { return errStallingPeer } } @@ -1232,7 +1252,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // Collect the yet unknown headers to mark them as uncertain unknown := make([]*types.Header, 0, len(headers)) for _, header := range chunk { - if !d.hasHeader(header.Hash()) { + if !d.lightchain.HasHeader(header.Hash()) { unknown = append(unknown, header) } } @@ -1241,7 +1261,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { frequency = 1 } - if n, err := d.insertHeaders(chunk, frequency); err != nil { + if n, err := d.chain.InsertHeaderChain(chunk, frequency); err != nil { // If some headers were inserted, add them too to the rollback list if n > 0 { rollback = append(rollback, chunk[:n]...) @@ -1328,7 +1348,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { for i, result := range results[:items] { blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) } - if index, err := d.insertBlocks(blocks); err != nil { + if index, err := d.chain.InsertChain(blocks); err != nil { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) return errInvalidChain } @@ -1368,6 +1388,7 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { stateSync.Cancel() if err := d.commitPivotBlock(P); err != nil { return err + } } if err := d.importBlockResults(afterP); err != nil { @@ -1416,7 +1437,7 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles) receipts[i] = result.Receipts } - if index, err := d.insertReceipts(blocks, receipts); err != nil { + if index, err := d.chain.InsertReceiptChain(blocks, receipts); err != nil { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) return errInvalidChain } @@ -1434,10 +1455,10 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error { return err } log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash()) - if _, err := d.insertReceipts([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil { + if _, err := d.chain.InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil { return err } - return d.commitHeadBlock(b.Hash()) + return d.chain.FastSyncCommitHead(b.Hash()) } // DeliverHeaders injects a new batch of block headers received from a remote |