aboutsummaryrefslogtreecommitdiffstats
path: root/eth/downloader/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r--eth/downloader/queue.go138
1 files changed, 2 insertions, 136 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 01897af6d..fd239f7e4 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -45,7 +45,6 @@ var (
var (
errNoFetchesPending = errors.New("no fetches pending")
- errStateSyncPending = errors.New("state trie sync already scheduled")
errStaleDelivery = errors.New("stale delivery")
)
@@ -74,10 +73,6 @@ type queue struct {
mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode
- hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority)
- hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch
- hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order
-
headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
// Headers are "special", they download in batches, supported by a skeleton chain
@@ -85,7 +80,6 @@ type queue struct {
headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
- headerDonePool map[uint64]struct{} // [eth/62] Set of the completed header fetches
headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
headerProced int // [eth/62] Number of headers already processed from the results
headerOffset uint64 // [eth/62] Number of the first header in the result cache
@@ -124,8 +118,6 @@ type queue struct {
func newQueue(stateDb ethdb.Database) *queue {
lock := new(sync.Mutex)
return &queue{
- hashPool: make(map[common.Hash]int),
- hashQueue: prque.New(),
headerPendPool: make(map[string]*fetchRequest),
headerContCh: make(chan bool),
blockTaskPool: make(map[common.Hash]*types.Header),
@@ -158,10 +150,6 @@ func (q *queue) Reset() {
q.mode = FullSync
q.fastSyncPivot = 0
- q.hashPool = make(map[common.Hash]int)
- q.hashQueue.Reset()
- q.hashCounter = 0
-
q.headerHead = common.Hash{}
q.headerPendPool = make(map[string]*fetchRequest)
@@ -208,7 +196,7 @@ func (q *queue) PendingBlocks() int {
q.lock.Lock()
defer q.lock.Unlock()
- return q.hashQueue.Size() + q.blockTaskQueue.Size()
+ return q.blockTaskQueue.Size()
}
// PendingReceipts retrieves the number of block receipts pending for retrieval.
@@ -272,7 +260,7 @@ func (q *queue) Idle() bool {
q.lock.Lock()
defer q.lock.Unlock()
- queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
+ queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
cached := len(q.blockDonePool) + len(q.receiptDonePool)
@@ -323,34 +311,6 @@ func (q *queue) ShouldThrottleReceipts() bool {
return pending >= len(q.resultCache)-len(q.receiptDonePool)
}
-// Schedule61 adds a set of hashes for the download queue for scheduling, returning
-// the new hashes encountered.
-func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- // Insert all the hashes prioritised in the arrival order
- inserts := make([]common.Hash, 0, len(hashes))
- for _, hash := range hashes {
- // Skip anything we already have
- if old, ok := q.hashPool[hash]; ok {
- glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old)
- continue
- }
- // Update the counters and insert the hash
- q.hashCounter = q.hashCounter + 1
- inserts = append(inserts, hash)
-
- q.hashPool[hash] = q.hashCounter
- if fifo {
- q.hashQueue.Push(hash, -float32(q.hashCounter)) // Lowest gets schedules first
- } else {
- q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
- }
- }
- return inserts
-}
-
// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
// up an already retrieved header skeleton.
func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
@@ -550,15 +510,6 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
return request
}
-// ReserveBlocks reserves a set of block hashes for the given peer, skipping any
-// previously failed download.
-func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return q.reserveHashes(p, count, q.hashQueue, nil, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool))
-}
-
// ReserveNodeData reserves a set of node data hashes for the given peer, skipping
// any previously failed download.
func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest {
@@ -753,11 +704,6 @@ func (q *queue) CancelHeaders(request *fetchRequest) {
q.cancel(request, q.headerTaskQueue, q.headerPendPool)
}
-// CancelBlocks aborts a fetch request, returning all pending hashes to the queue.
-func (q *queue) CancelBlocks(request *fetchRequest) {
- q.cancel(request, q.hashQueue, q.blockPendPool)
-}
-
// CancelBodies aborts a body fetch request, returning all pending headers to the
// task queue.
func (q *queue) CancelBodies(request *fetchRequest) {
@@ -801,9 +747,6 @@ func (q *queue) Revoke(peerId string) {
defer q.lock.Unlock()
if request, ok := q.blockPendPool[peerId]; ok {
- for hash, index := range request.Hashes {
- q.hashQueue.Push(hash, float32(index))
- }
for _, header := range request.Headers {
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
@@ -832,15 +775,6 @@ func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
}
-// ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
-// canceling them and returning the responsible peers for penalisation.
-func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return q.expire(timeout, q.blockPendPool, q.hashQueue, blockTimeoutMeter)
-}
-
// ExpireBodies checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
@@ -907,74 +841,6 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
return expiries
}
-// DeliverBlocks injects a block retrieval response into the download queue. The
-// method returns the number of blocks accepted from the delivery and also wakes
-// any threads waiting for data delivery.
-func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- // Short circuit if the blocks were never requested
- request := q.blockPendPool[id]
- if request == nil {
- return 0, errNoFetchesPending
- }
- blockReqTimer.UpdateSince(request.Time)
- delete(q.blockPendPool, id)
-
- // If no blocks were retrieved, mark them as unavailable for the origin peer
- if len(blocks) == 0 {
- for hash, _ := range request.Hashes {
- request.Peer.MarkLacking(hash)
- }
- }
- // Iterate over the downloaded blocks and add each of them
- accepted, errs := 0, make([]error, 0)
- for _, block := range blocks {
- // Skip any blocks that were not requested
- hash := block.Hash()
- if _, ok := request.Hashes[hash]; !ok {
- errs = append(errs, fmt.Errorf("non-requested block %x", hash))
- continue
- }
- // Reconstruct the next result if contents match up
- index := int(block.Number().Int64() - int64(q.resultOffset))
- if index >= len(q.resultCache) || index < 0 {
- errs = []error{errInvalidChain}
- break
- }
- q.resultCache[index] = &fetchResult{
- Header: block.Header(),
- Transactions: block.Transactions(),
- Uncles: block.Uncles(),
- }
- q.blockDonePool[block.Hash()] = struct{}{}
-
- delete(request.Hashes, hash)
- delete(q.hashPool, hash)
- accepted++
- }
- // Return all failed or missing fetches to the queue
- for hash, index := range request.Hashes {
- q.hashQueue.Push(hash, float32(index))
- }
- // Wake up WaitResults
- if accepted > 0 {
- q.active.Signal()
- }
- // If none of the blocks were good, it's a stale delivery
- switch {
- case len(errs) == 0:
- return accepted, nil
- case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
- return accepted, errs[0]
- case len(errs) == len(blocks):
- return accepted, errStaleDelivery
- default:
- return accepted, fmt.Errorf("multiple failures: %v", errs)
- }
-}
-
// DeliverHeaders injects a header retrieval response into the header results
// cache. This method either accepts all headers it received, or none of them
// if they do not map correctly to the skeleton.