aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/downloader.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r--eth/downloader/downloader.go55
1 files changed, 33 insertions, 22 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 7ae7aa221..24ba3da17 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -102,6 +102,9 @@ type headHeaderRetrievalFn func() *types.Header
// headBlockRetrievalFn is a callback type for retrieving the head block from the local chain.
type headBlockRetrievalFn func() *types.Block
+// headFastBlockRetrievalFn is a callback type for retrieving the head fast block from the local chain.
+type headFastBlockRetrievalFn func() *types.Block
+
// tdRetrievalFn is a callback type for retrieving the total difficulty of a local block.
type tdRetrievalFn func(common.Hash) *big.Int
@@ -188,17 +191,18 @@ type Downloader struct {
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields
// Callbacks
- hasHeader headerCheckFn // Checks if a header is present in the chain
- hasBlock blockCheckFn // Checks if a block is present in the chain
- getHeader headerRetrievalFn // Retrieves a header from the chain
- getBlock blockRetrievalFn // Retrieves a block from the chain
- headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
- headBlock headBlockRetrievalFn // Retrieves the head block from the chain
- getTd tdRetrievalFn // Retrieves the TD of a block from the chain
- insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
- insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
- insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
- dropPeer peerDropFn // Drops a peer for misbehaving
+ hasHeader headerCheckFn // Checks if a header is present in the chain
+ hasBlock blockCheckFn // Checks if a block is present in the chain
+ getHeader headerRetrievalFn // Retrieves a header from the chain
+ getBlock blockRetrievalFn // Retrieves a block from the chain
+ headHeader headHeaderRetrievalFn // Retrieves the head header from the chain
+ headBlock headBlockRetrievalFn // Retrieves the head block from the chain
+ headFastBlock headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
+ getTd tdRetrievalFn // Retrieves the TD of a block from the chain
+ insertHeaders headerChainInsertFn // Injects a batch of headers into the chain
+ insertBlocks blockChainInsertFn // Injects a batch of blocks into the chain
+ insertReceipts receiptChainInsertFn // Injects a batch of blocks and their receipts into the chain
+ dropPeer peerDropFn // Drops a peer for misbehaving
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
@@ -229,8 +233,8 @@ type Downloader struct {
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn,
- headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn,
- insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader {
+ headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, getTd tdRetrievalFn,
+ insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader {
return &Downloader{
mode: mode,
@@ -243,6 +247,7 @@ func New(mode SyncMode, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock bl
getBlock: getBlock,
headHeader: headHeader,
headBlock: headBlock,
+ headFastBlock: headFastBlock,
getTd: getTd,
insertHeaders: insertHeaders,
insertBlocks: insertBlocks,
@@ -393,7 +398,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
}()
glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version)
- defer glog.V(logger.Debug).Infof("Synchronisation terminated")
+ defer func(start time.Time) {
+ glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start))
+ }(time.Now())
switch {
case p.version == 61:
@@ -989,6 +996,8 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
head := d.headHeader().Number.Uint64()
if d.mode == FullSync {
head = d.headBlock().NumberU64()
+ } else if d.mode == FastSync {
+ head = d.headFastBlock().NumberU64()
}
from := int64(head) - int64(MaxHeaderFetch) + 1
if from < 0 {
@@ -1020,7 +1029,7 @@ func (d *Downloader) findAncestor(p *peer) (uint64, error) {
// Check if a common ancestor was found
finished = true
for i := len(headers) - 1; i >= 0; i-- {
- if (d.mode == FullSync && d.hasBlock(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
+ if (d.mode != LightSync && d.hasBlock(headers[i].Hash())) || (d.mode == LightSync && d.hasHeader(headers[i].Hash())) {
number, hash = headers[i].Number.Uint64(), headers[i].Hash()
break
}
@@ -1182,17 +1191,18 @@ func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
// Otherwise insert all the new headers, aborting in case of junk
glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headerPack.headers), from)
+ if d.mode == FastSync || d.mode == LightSync {
+ if n, err := d.insertHeaders(headerPack.headers, false); err != nil {
+ glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err)
+ return errInvalidChain
+ }
+ }
if d.mode == FullSync || d.mode == FastSync {
inserts := d.queue.Schedule(headerPack.headers, from, d.mode == FastSync)
if len(inserts) != len(headerPack.headers) {
glog.V(logger.Debug).Infof("%v: stale headers", p)
return errBadPeer
}
- } else {
- if n, err := d.insertHeaders(headerPack.headers, true); err != nil {
- glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headerPack.headers[n].Number, headerPack.headers[n].Hash().Bytes()[:4], err)
- return errInvalidChain
- }
}
// Notify the content fetchers of new headers, but stop if queue is full
cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders
@@ -1394,6 +1404,7 @@ func (d *Downloader) fetchParts(from uint64, errCancel error, deliveryCh chan da
for _, pid := range expire() {
if peer := d.peers.Peer(pid); peer != nil {
peer.Demote()
+ setIdle(peer)
glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
}
}
@@ -1497,7 +1508,7 @@ func (d *Downloader) process() {
// Actually import the blocks
if glog.V(logger.Debug) {
first, last := results[0].Header, results[len(results)-1].Header
- glog.V(logger.Debug).Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
+ glog.Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
}
for len(results) != 0 {
// Check for any termination requests
@@ -1536,7 +1547,7 @@ func (d *Downloader) process() {
index, err = d.insertHeaders(headers, true)
}
if err != nil {
- glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash(), err)
+ glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
d.cancel()
return
}