diff options
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r-- | eth/downloader/queue.go | 28 |
1 files changed, 16 insertions, 12 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 903f043eb..b24ce42e8 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -40,9 +40,9 @@ type queue struct { pendPool map[string]*fetchRequest // Currently pending block retrieval operations - blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes - blockCache []*Block // Downloaded but not yet delivered blocks - blockOffset int // Offset of the first cached block in the block-chain + blockPool map[common.Hash]uint64 // Hash-set of the downloaded data blocks, mapping to cache indexes + blockCache []*Block // Downloaded but not yet delivered blocks + blockOffset uint64 // Offset of the first cached block in the block-chain lock sync.RWMutex } @@ -53,7 +53,7 @@ func newQueue() *queue { hashPool: make(map[common.Hash]int), hashQueue: prque.New(), pendPool: make(map[string]*fetchRequest), - blockPool: make(map[common.Hash]int), + blockPool: make(map[common.Hash]uint64), blockCache: make([]*Block, blockCacheLimit), } } @@ -69,7 +69,7 @@ func (q *queue) Reset() { q.pendPool = make(map[string]*fetchRequest) - q.blockPool = make(map[common.Hash]int) + q.blockPool = make(map[common.Hash]uint64) q.blockOffset = 0 q.blockCache = make([]*Block, blockCacheLimit) } @@ -130,7 +130,7 @@ func (q *queue) Has(hash common.Hash) bool { // Insert adds a set of hashes for the download queue for scheduling, returning // the new hashes encountered. -func (q *queue) Insert(hashes []common.Hash) []common.Hash { +func (q *queue) Insert(hashes []common.Hash, fifo bool) []common.Hash { q.lock.Lock() defer q.lock.Unlock() @@ -147,7 +147,11 @@ func (q *queue) Insert(hashes []common.Hash) []common.Hash { inserts = append(inserts, hash) q.hashPool[hash] = q.hashCounter - q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first + if fifo { + q.hashQueue.Push(hash, -float32(q.hashCounter)) // Lowest gets schedules first + } else { + q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first + } } return inserts } @@ -175,7 +179,7 @@ func (q *queue) GetBlock(hash common.Hash) *Block { return nil } // Return the block if it's still available in the cache - if q.blockOffset <= index && index < q.blockOffset+len(q.blockCache) { + if q.blockOffset <= index && index < q.blockOffset+uint64(len(q.blockCache)) { return q.blockCache[index-q.blockOffset] } return nil @@ -202,7 +206,7 @@ func (q *queue) TakeBlocks() []*Block { for k, n := len(q.blockCache)-len(blocks), len(q.blockCache); k < n; k++ { q.blockCache[k] = nil } - q.blockOffset += len(blocks) + q.blockOffset += uint64(len(blocks)) return blocks } @@ -318,7 +322,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { continue } // If a requested block falls out of the range, the hash chain is invalid - index := int(block.NumberU64()) - q.blockOffset + index := int(int64(block.NumberU64()) - int64(q.blockOffset)) if index >= len(q.blockCache) || index < 0 { return errInvalidChain } @@ -329,7 +333,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { } delete(request.Hashes, hash) delete(q.hashPool, hash) - q.blockPool[hash] = int(block.NumberU64()) + q.blockPool[hash] = block.NumberU64() } // Return all failed or missing fetches to the queue for hash, index := range request.Hashes { @@ -346,7 +350,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { } // Prepare configures the block cache offset to allow accepting inbound blocks. -func (q *queue) Prepare(offset int) { +func (q *queue) Prepare(offset uint64) { q.lock.Lock() defer q.lock.Unlock() |