aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
authorPéter Szilágyi <peterke@gmail.com>2015-05-07 17:59:19 +0800
committerPéter Szilágyi <peterke@gmail.com>2015-05-07 17:59:19 +0800
commit45f8304f3c44e5379c7e30ab144d73e591e270af (patch)
tree4516358d4916867d26249795917903eb9ff1fd11 /eth/downloader/queue.go
parent4800c94392e814a2cb9d343aab4706be0cd0851d (diff)
downloadgo-tangerine-45f8304f3c44e5379c7e30ab144d73e591e270af.tar.gz
go-tangerine-45f8304f3c44e5379c7e30ab144d73e591e270af.tar.zst
go-tangerine-45f8304f3c44e5379c7e30ab144d73e591e270af.zip
eth/downloader: fix expiration not running while fetching
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go26
1 files changed, 14 insertions, 12 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index eae567052..d849d4d68 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -12,7 +12,7 @@ import (
)
const (
- blockCacheLimit = 4096 // Maximum number of blocks to cache before throttling the download
+ blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download
)
// fetchRequest is a currently running block retrieval operation.
@@ -28,8 +28,7 @@ type queue struct {
hashQueue *prque.Prque // Priority queue of the block hashes to fetch
hashCounter int // Counter indexing the added hashes to ensure retrieval order
- pendPool map[string]*fetchRequest // Currently pending block retrieval operations
- pendCount int // Number of pending block fetches (to throttle the download)
+ pendPool map[string]*fetchRequest // Currently pending block retrieval operations
blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes
blockCache []*types.Block // Downloaded but not yet delivered blocks
@@ -58,7 +57,6 @@ func (q *queue) Reset() {
q.hashCounter = 0
q.pendPool = make(map[string]*fetchRequest)
- q.pendCount = 0
q.blockPool = make(map[common.Hash]int)
q.blockOffset = 0
@@ -106,7 +104,13 @@ func (q *queue) Throttle() bool {
q.lock.RLock()
defer q.lock.RUnlock()
- return q.pendCount >= len(q.blockCache)-len(q.blockPool)
+ // Calculate the currently in-flight block requests
+ pending := 0
+ for _, request := range q.pendPool {
+ pending += len(request.Hashes)
+ }
+ // Throttle if more blocks are in-flight than free space in the cache
+ return pending >= len(q.blockCache)-len(q.blockPool)
}
// Has checks if a hash is within the download queue or not.
@@ -206,10 +210,14 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest {
q.lock.Lock()
defer q.lock.Unlock()
- // Short circuit if the pool has been depleted
+ // Short circuit if the pool has been depleted, or if the peer's already
+ // downloading something (sanity check not to corrupt state)
if q.hashQueue.Empty() {
return nil
}
+ if _, ok := q.pendPool[p.id]; ok {
+ return nil
+ }
// Retrieve a batch of hashes, skipping previously failed ones
send := make(map[common.Hash]int)
skip := make(map[common.Hash]int)
@@ -236,7 +244,6 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest {
Time: time.Now(),
}
q.pendPool[p.id] = request
- q.pendCount += len(request.Hashes)
return request
}
@@ -250,7 +257,6 @@ func (q *queue) Cancel(request *fetchRequest) {
q.hashQueue.Push(hash, float32(index))
}
delete(q.pendPool, request.Peer.id)
- q.pendCount -= len(request.Hashes)
}
// Expire checks for in flight requests that exceeded a timeout allowance,
@@ -266,7 +272,6 @@ func (q *queue) Expire(timeout time.Duration) []string {
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
- q.pendCount -= len(request.Hashes)
peers = append(peers, id)
}
}
@@ -289,9 +294,6 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
}
delete(q.pendPool, id)
- // Mark all the hashes in the request as non-pending
- q.pendCount -= len(request.Hashes)
-
// If no blocks were retrieved, mark them as unavailable for the origin peer
if len(blocks) == 0 {
for hash, _ := range request.Hashes {