diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-05-07 17:59:19 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-05-07 17:59:19 +0800 |
commit | 45f8304f3c44e5379c7e30ab144d73e591e270af (patch) | |
tree | 4516358d4916867d26249795917903eb9ff1fd11 /eth/downloader/queue.go | |
parent | 4800c94392e814a2cb9d343aab4706be0cd0851d (diff) | |
download | go-tangerine-45f8304f3c44e5379c7e30ab144d73e591e270af.tar.gz go-tangerine-45f8304f3c44e5379c7e30ab144d73e591e270af.tar.zst go-tangerine-45f8304f3c44e5379c7e30ab144d73e591e270af.zip |
eth/downloader: fix expiration not running while fetching
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r-- | eth/downloader/queue.go | 26 |
1 files changed, 14 insertions, 12 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index eae567052..d849d4d68 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -12,7 +12,7 @@ import ( ) const ( - blockCacheLimit = 4096 // Maximum number of blocks to cache before throttling the download + blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download ) // fetchRequest is a currently running block retrieval operation. @@ -28,8 +28,7 @@ type queue struct { hashQueue *prque.Prque // Priority queue of the block hashes to fetch hashCounter int // Counter indexing the added hashes to ensure retrieval order - pendPool map[string]*fetchRequest // Currently pending block retrieval operations - pendCount int // Number of pending block fetches (to throttle the download) + pendPool map[string]*fetchRequest // Currently pending block retrieval operations blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes blockCache []*types.Block // Downloaded but not yet delivered blocks @@ -58,7 +57,6 @@ func (q *queue) Reset() { q.hashCounter = 0 q.pendPool = make(map[string]*fetchRequest) - q.pendCount = 0 q.blockPool = make(map[common.Hash]int) q.blockOffset = 0 @@ -106,7 +104,13 @@ func (q *queue) Throttle() bool { q.lock.RLock() defer q.lock.RUnlock() - return q.pendCount >= len(q.blockCache)-len(q.blockPool) + // Calculate the currently in-flight block requests + pending := 0 + for _, request := range q.pendPool { + pending += len(request.Hashes) + } + // Throttle if more blocks are in-flight than free space in the cache + return pending >= len(q.blockCache)-len(q.blockPool) } // Has checks if a hash is within the download queue or not. @@ -206,10 +210,14 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest { q.lock.Lock() defer q.lock.Unlock() - // Short circuit if the pool has been depleted + // Short circuit if the pool has been depleted, or if the peer's already + // downloading something (sanity check not to corrupt state) if q.hashQueue.Empty() { return nil } + if _, ok := q.pendPool[p.id]; ok { + return nil + } // Retrieve a batch of hashes, skipping previously failed ones send := make(map[common.Hash]int) skip := make(map[common.Hash]int) @@ -236,7 +244,6 @@ func (q *queue) Reserve(p *peer, max int) *fetchRequest { Time: time.Now(), } q.pendPool[p.id] = request - q.pendCount += len(request.Hashes) return request } @@ -250,7 +257,6 @@ func (q *queue) Cancel(request *fetchRequest) { q.hashQueue.Push(hash, float32(index)) } delete(q.pendPool, request.Peer.id) - q.pendCount -= len(request.Hashes) } // Expire checks for in flight requests that exceeded a timeout allowance, @@ -266,7 +272,6 @@ func (q *queue) Expire(timeout time.Duration) []string { for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } - q.pendCount -= len(request.Hashes) peers = append(peers, id) } } @@ -289,9 +294,6 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { } delete(q.pendPool, id) - // Mark all the hashes in the request as non-pending - q.pendCount -= len(request.Hashes) - // If no blocks were retrieved, mark them as unavailable for the origin peer if len(blocks) == 0 { for hash, _ := range request.Hashes { |