aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader')
-rw-r--r--eth/downloader/downloader.go68
1 files changed, 47 insertions, 21 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 83e6b8d32..56671dc2e 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -54,8 +54,9 @@ type blockPack struct {
}
type syncPack struct {
- peer *peer
- hash common.Hash
+ peer *peer
+ hash common.Hash
+ ignoreInitial bool
}
func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn) *Downloader {
@@ -104,11 +105,13 @@ func (d *Downloader) UnregisterPeer(id string) {
func (d *Downloader) peerHandler() {
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
- itimer := time.NewTicker(5 * time.Second)
+ //itimer := time.NewTicker(5 * time.Second)
+ itimer := time.NewTimer(5 * time.Second)
out:
for {
select {
case <-d.newPeerCh:
+ itimer.Stop()
// Meet the `minDesiredPeerCount` before we select our best peer
if len(d.peers) < minDesiredPeerCount {
break
@@ -137,7 +140,7 @@ func (d *Downloader) selectPeer(p *peer) {
}
glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
- d.syncCh <- syncPack{p, p.recentHash}
+ d.syncCh <- syncPack{p, p.recentHash, false}
}
}
@@ -151,7 +154,7 @@ out:
// Start the fetcher. This will block the update entirely
// interupts need to be send to the appropriate channels
// respectively.
- if err := d.startFetchingHashes(selectedPeer, sync.hash); err != nil {
+ if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil {
// handle error
glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
// XXX Reset
@@ -178,11 +181,18 @@ out:
}
// XXX Make synchronous
-func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash) error {
- glog.V(logger.Debug).Infoln("Downloading hashes")
+func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error {
+ glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id)
start := time.Now()
+ // We ignore the initial hash in some cases (e.g. we received a block without it's parent)
+ // In such circumstances we don't need to download the block so don't add it to the queue.
+ if !ignoreInitial {
+ // Add the hash to the queue first
+ d.queue.hashPool.Add(hash)
+ }
+
// Get the first batch of hashes
p.getHashes(hash)
atomic.StoreInt32(&d.fetchingHashes, 1)
@@ -195,7 +205,7 @@ out:
hashSet := set.New()
for _, hash := range hashes {
if d.hasBlock(hash) {
- glog.V(logger.Debug).Infof("Found common hash %x\n", hash)
+ glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4])
done = true
break
@@ -207,7 +217,7 @@ out:
// Add hashes to the chunk set
// Check if we're done fetching
- if !done {
+ if !done && len(hashes) > 0 {
//fmt.Println("re-fetch. current =", d.queue.hashPool.Size())
// Get the next set of hashes
p.getHashes(hashes[len(hashes)-1])
@@ -218,7 +228,7 @@ out:
}
}
}
- glog.V(logger.Detail).Infoln("Download hashes: done. Took", time.Since(start))
+ glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start))
return nil
}
@@ -242,6 +252,10 @@ out:
// from the available peers.
if d.queue.hashPool.Size() > 0 {
availablePeers := d.peers.get(idleState)
+ if len(availablePeers) == 0 {
+ glog.V(logger.Detail).Infoln("No peers available out of", len(d.peers))
+ }
+
for _, peer := range availablePeers {
// Get a possible chunk. If nil is returned no chunk
// could be returned due to no hashes available.
@@ -317,21 +331,33 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
return
}
- glog.V(logger.Detail).Infoln("Inserting new block from:", id)
- d.queue.addBlock(id, block, td)
-
+ peer := d.peers.getPeer(id)
// if the peer is in our healthy list of peers; update the td
- // here is a good chance to add the peer back to the list
- if peer := d.peers.getPeer(id); peer != nil {
- peer.mu.Lock()
- peer.td = td
- peer.recentHash = block.Hash()
- peer.mu.Unlock()
+ // and add the block. Otherwise just ignore it
+ if peer == nil {
+ glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id)
+ return
}
+ peer.mu.Lock()
+ peer.td = td
+ peer.recentHash = block.Hash()
+ peer.mu.Unlock()
+
+ glog.V(logger.Detail).Infoln("Inserting new block from:", id)
+ d.queue.addBlock(id, block, td)
+
// if neither go ahead to process
if !(d.isFetchingHashes() || d.isDownloadingBlocks()) {
- d.process()
+ // Check if the parent of the received block is known.
+ // If the block is not know, request it otherwise, request.
+ phash := block.ParentHash()
+ if !d.hasBlock(phash) {
+ glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4])
+ d.syncCh <- syncPack{peer, peer.recentHash, true}
+ } else {
+ d.process()
+ }
}
}
@@ -369,7 +395,7 @@ func (d *Downloader) process() error {
// TODO change this. This shite
for i, block := range blocks[:max] {
if !d.hasBlock(block.ParentHash()) {
- d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash()}
+ d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash(), true}
// remove processed blocks
blocks = blocks[i:]