diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 55 |
1 files changed, 33 insertions, 22 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 7ae7aa221..24ba3da17 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -102,6 +102,9 @@ type headHeaderRetrievalFn func() *types.Header // headBlockRetrievalFn is a callback type for retrieving the head block from the local chain. type headBlockRetrievalFn func() *types.Block +// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain. +type headFastBlockRetrievalFn func() *types.Block + // tdRetrievalFn is a callback type for retrieving the total difficulty of a local block. type tdRetrievalFn func(common.Hash) *big.Int @@ -188,17 +191,18 @@ type Downloader struct { syncStatsLock sync.RWMutex // Lock protecting the sync stats fields // Callbacks - hasHeader headerCheckFn // Checks if a header is present in the chain - hasBlock blockCheckFn // Checks if a block is present in the chain - getHeader headerRetrievalFn // Retrieves a header from the chain - getBlock blockRetrievalFn // Retrieves a block from the chain - headHeader headHeaderRetrievalFn // Retrieves the head header from the chain - headBlock headBlockRetrievalFn // Retrieves the head block from the chain - getTd tdRetrievalFn // Retrieves the TD of a block from the chain - insertHeaders headerChainInsertFn // Injects a batch of headers into the chain - insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain - insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain - dropPeer peerDropFn // Drops a peer for misbehaving + hasHeader headerCheckFn // Checks if a header is present in the chain + hasBlock blockCheckFn // Checks if a block is present in the chain + getHeader headerRetrievalFn // Retrieves a header from the chain + getBlock blockRetrievalFn // Retrieves a block from the chain + headHeader headHeaderRetrievalFn // Retrieves the head header from the chain + headBlock headBlockRetrievalFn // Retrieves the head block from the chain + headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain + getTd tdRetrievalFn // Retrieves the TD of a block from the chain + insertHeaders headerChainInsertFn // Injects a batch of headers into the chain + insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain + insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain + dropPeer peerDropFn // Drops a peer for misbehaving // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing @@ -229,8 +233,8 @@ type Downloader struct { // New creates a new downloader to fetch hashes and blocks from remote peers. func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn, - headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, - insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader { + headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, getTd tdRetrievalFn, + insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader { return &Downloader{ mode: mode, @@ -243,6 +247,7 @@ func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock bl getBlock: getBlock, headHeader: headHeader, headBlock: headBlock, + headFastBlock: headFastBlock, getTd: getTd, insertHeaders: insertHeaders, insertBlocks: insertBlocks, @@ -393,7 +398,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e }() glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version) - defer glog.V(logger.Debug).Infof("Synchronisation terminated") + defer func(start time.Time) { + glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start)) + }(time.Now()) switch { case p.version == 61: @@ -989,6 +996,8 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { head := d.headHeader().Number.Uint64() if d.mode == FullSync { head = d.headBlock().NumberU64() + } else if d.mode == FastSync { + head = d.headFastBlock().NumberU64() } from := int64(head) - int64(MaxHeaderFetch) + 1 if from < 0 { @@ -1020,7 +1029,7 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) { // Check if a common ancestor was found finished = true for i := len(headers) - 1; i >= 0; i-- { - if (d.mode == FullSync && d.hasBlock(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) { + if (d.mode != LightSync && d.hasBlock(headers[i].Hash())) || (d.mode == LightSync && d.hasHeader(headers[i].Hash())) { number, hash = headers[i].Number.Uint64(), headers[i].Hash() break } @@ -1182,17 +1191,18 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error { // Otherwise insert all the new headers, aborting in case of junk glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headerPack.headers), from) + if d.mode == FastSync || d.mode == LightSync { + if n, err := d.insertHeaders(headerPack.headers, false); err != nil { + glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err) + return errInvalidChain + } + } if d.mode == FullSync || d.mode == FastSync { inserts := d.queue.Schedule(headerPack.headers, from, d.mode == FastSync) if len(inserts) != len(headerPack.headers) { glog.V(logger.Debug).Infof("%v: stale headers", p) return errBadPeer } - } else { - if n, err := d.insertHeaders(headerPack.headers, true); err != nil { - glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err) - return errInvalidChain - } } // Notify the content fetchers of new headers, but stop if queue is full cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders @@ -1394,6 +1404,7 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da for _, pid := range expire() { if peer := d.peers.Peer(pid); peer != nil { peer.Demote() + setIdle(peer) glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind)) } } @@ -1497,7 +1508,7 @@ func (d *Downloader) process() { // Actually import the blocks if glog.V(logger.Debug) { first, last := results[0].Header, results[len(results)-1].Header - glog.V(logger.Debug).Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4]) + glog.Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4]) } for len(results) != 0 { // Check for any termination requests @@ -1536,7 +1547,7 @@ func (d *Downloader) process() { index, err = d.insertHeaders(headers, true) } if err != nil { - glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash(), err) + glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err) d.cancel() return } |