diff options
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r-- | eth/downloader/queue.go | 198 |
1 files changed, 101 insertions, 97 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 1fb5b6e12..584797d7b 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -101,11 +101,14 @@ type queue struct { resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block chain - lock sync.RWMutex + lock *sync.Mutex + active *sync.Cond + closed bool } // newQueue creates a new download queue for scheduling block retrieval. func newQueue(stateDb ethdb.Database) *queue { + lock := new(sync.Mutex) return &queue{ hashPool: make(map[common.Hash]int), hashQueue: prque.New(), @@ -122,6 +125,8 @@ func newQueue(stateDb ethdb.Database) *queue { statePendPool: make(map[string]*fetchRequest), stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), + active: sync.NewCond(lock), + lock: lock, } } @@ -133,6 +138,7 @@ func (q *queue) Reset() { q.stateSchedLock.Lock() defer q.stateSchedLock.Unlock() + q.closed = false q.mode = FullSync q.fastSyncPivot = 0 @@ -162,18 +168,27 @@ func (q *queue) Reset() { q.resultOffset = 0 } +// Close marks the end of the sync, unblocking WaitResults. +// It may be called even if the queue is already closed. +func (q *queue) Close() { + q.lock.Lock() + q.closed = true + q.lock.Unlock() + q.active.Broadcast() +} + // PendingBlocks retrieves the number of block (body) requests pending for retrieval. func (q *queue) PendingBlocks() int { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.hashQueue.Size() + q.blockTaskQueue.Size() } // PendingReceipts retrieves the number of block receipts pending for retrieval. func (q *queue) PendingReceipts() int { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.receiptTaskQueue.Size() } @@ -192,8 +207,8 @@ func (q *queue) PendingNodeData() int { // InFlightBlocks retrieves whether there are block fetch requests currently in // flight. func (q *queue) InFlightBlocks() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.blockPendPool) > 0 } @@ -201,8 +216,8 @@ func (q *queue) InFlightBlocks() bool { // InFlightReceipts retrieves whether there are receipt fetch requests currently // in flight. func (q *queue) InFlightReceipts() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.receiptPendPool) > 0 } @@ -210,8 +225,8 @@ func (q *queue) InFlightReceipts() bool { // InFlightNodeData retrieves whether there are node data entry fetch requests // currently in flight. func (q *queue) InFlightNodeData() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0 } @@ -219,8 +234,8 @@ func (q *queue) InFlightNodeData() bool { // Idle returns if the queue is fully idle or has some data still inside. This // method is used by the tester to detect termination events. func (q *queue) Idle() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) @@ -237,8 +252,8 @@ func (q *queue) Idle() bool { // FastSyncPivot retrieves the currently used fast sync pivot point. func (q *queue) FastSyncPivot() uint64 { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() return q.fastSyncPivot } @@ -246,8 +261,8 @@ func (q *queue) FastSyncPivot() uint64 { // ShouldThrottleBlocks checks if the download should be throttled (active block (body) // fetches exceed block cache). func (q *queue) ShouldThrottleBlocks() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() // Calculate the currently in-flight block (body) requests pending := 0 @@ -261,8 +276,8 @@ func (q *queue) ShouldThrottleBlocks() bool { // ShouldThrottleReceipts checks if the download should be throttled (active receipt // fetches exceed block cache). func (q *queue) ShouldThrottleReceipts() bool { - q.lock.RLock() - defer q.lock.RUnlock() + q.lock.Lock() + defer q.lock.Unlock() // Calculate the currently in-flight receipt requests pending := 0 @@ -351,91 +366,74 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { return inserts } -// GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't -// been downloaded yet (or simply non existent). -func (q *queue) GetHeadResult() *fetchResult { - q.lock.RLock() - defer q.lock.RUnlock() +// WaitResults retrieves and permanently removes a batch of fetch +// results from the cache. the result slice will be empty if the queue +// has been closed. +func (q *queue) WaitResults() []*fetchResult { + q.lock.Lock() + defer q.lock.Unlock() - // If there are no results pending, return nil - if len(q.resultCache) == 0 || q.resultCache[0] == nil { - return nil - } - // If the next result is still incomplete, return nil - if q.resultCache[0].Pending > 0 { - return nil + nproc := q.countProcessableItems() + for nproc == 0 && !q.closed { + q.active.Wait() + nproc = q.countProcessableItems() } - // If the next result is the fast sync pivot... - if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot { - // If the pivot state trie is still being pulled, return nil - if len(q.stateTaskPool) > 0 { - return nil + results := make([]*fetchResult, nproc) + copy(results, q.resultCache[:nproc]) + if len(results) > 0 { + // Mark results as done before dropping them from the cache. + for _, result := range results { + hash := result.Header.Hash() + delete(q.blockDonePool, hash) + delete(q.receiptDonePool, hash) } - if q.PendingNodeData() > 0 { - return nil - } - // If the state is done, but not enough post-pivot headers were verified, stall... - for i := 0; i < fsHeaderForceVerify; i++ { - if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil { - return nil - } + // Delete the results from the cache and clear the tail. + copy(q.resultCache, q.resultCache[nproc:]) + for i := len(q.resultCache) - nproc; i < len(q.resultCache); i++ { + q.resultCache[i] = nil } + // Advance the expected block number of the first cache entry. + q.resultOffset += uint64(nproc) } - return q.resultCache[0] + return results } -// TakeResults retrieves and permanently removes a batch of fetch results from -// the cache. -func (q *queue) TakeResults() []*fetchResult { - q.lock.Lock() - defer q.lock.Unlock() - - // Accumulate all available results - results := []*fetchResult{} +// countProcessableItems counts the processable items. +func (q *queue) countProcessableItems() int { for i, result := range q.resultCache { - // Stop if no more results are ready + // Don't process incomplete or unavailable items. if result == nil || result.Pending > 0 { - break + return i } - // The fast sync pivot block may only be processed after state fetch completes - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot { - if len(q.stateTaskPool) > 0 { - break - } - if q.PendingNodeData() > 0 { - break - } - // Even is state fetch is done, ensure post-pivot headers passed verifications - safe := true - for j := 0; j < fsHeaderForceVerify; j++ { - if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { - safe = false + // Special handling for the fast-sync pivot block: + if q.mode == FastSync { + bnum := result.Header.Number.Uint64() + if bnum == q.fastSyncPivot { + // If the state of the pivot block is not + // available yet, we cannot proceed and return 0. + // + // Stop before processing the pivot block to ensure that + // resultCache has space for fsHeaderForceVerify items. Not + // doing this could leave us unable to download the required + // amount of headers. + if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 { + return i + } + for j := 0; j < fsHeaderForceVerify; j++ { + if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { + return i + } } } - if !safe { - break + // If we're just the fast sync pivot, stop as well + // because the following batch needs different insertion. + // This simplifies handling the switchover in d.process. + if bnum == q.fastSyncPivot+1 && i > 0 { + return i } } - // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion - if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 { - break - } - results = append(results, result) - - hash := result.Header.Hash() - delete(q.blockDonePool, hash) - delete(q.receiptDonePool, hash) } - // Delete the results from the slice and let them be garbage collected - // without this slice trick the results would stay in memory until nil - // would be assigned to them. - copy(q.resultCache, q.resultCache[len(results):]) - for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ { - q.resultCache[k] = nil - } - q.resultOffset += uint64(len(results)) - - return results + return len(q.resultCache) } // ReserveBlocks reserves a set of block hashes for the given peer, skipping any @@ -584,6 +582,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ // If we're the first to request this task, initialise the result container index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 { + common.Report("index allocation went beyond available resultCache space") return nil, false, errInvalidChain } if q.resultCache[index] == nil { @@ -617,6 +616,10 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ for _, header := range skip { taskQueue.Push(header, -float32(header.Number.Uint64())) } + if progress { + // Wake WaitResults, resultCache was modified + q.active.Signal() + } // Assemble and return the block download request if len(send) == 0 { return nil, progress, nil @@ -737,7 +740,7 @@ func (q *queue) ExpireNodeData(timeout time.Duration) []string { // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. // -// Note, this method expects the queue lock to be already held for writing. The +// Note, this method expects the queue lock to be already held. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { @@ -813,17 +816,16 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } + // Wake up WaitResults + q.active.Signal() // If none of the blocks were good, it's a stale delivery switch { case len(errs) == 0: return nil - case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock): return errs[0] - case len(errs) == len(blocks): return errStaleDelivery - default: return fmt.Errorf("multiple failures: %v", errs) } @@ -915,14 +917,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ taskQueue.Push(header, -float32(header.Number.Uint64())) } } + // Wake up WaitResults + q.active.Signal() // If none of the data was good, it's a stale delivery switch { case failure == nil || failure == errInvalidChain: return failure - case useful: return fmt.Errorf("partial failure: %v", failure) - default: return errStaleDelivery } @@ -977,10 +979,8 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i switch { case len(errs) == 0: return nil - case len(errs) == len(request.Hashes): return errStaleDelivery - default: return fmt.Errorf("multiple failures: %v", errs) } @@ -989,6 +989,10 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i // deliverNodeData is the asynchronous node data processor that injects a batch // of sync results into the state scheduler. func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(error, int)) { + // Wake up WaitResults after the state has been written because it + // might be waiting for the pivot block state to get completed. + defer q.active.Signal() + // Process results one by one to permit task fetches in between for i, result := range results { q.stateSchedLock.Lock() |