diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-10-13 17:04:25 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-10-21 21:49:55 +0800 |
commit | 5b0ee8ec304663898073b7a4c659e1def23716df (patch) | |
tree | 8f2f49a8d26dc1c29e1d360fb787ab420d90a2ae /eth/downloader/queue.go | |
parent | aa0538db0b5de2bb2c609d629b65d083649f9171 (diff) | |
download | go-tangerine-5b0ee8ec304663898073b7a4c659e1def23716df.tar.gz go-tangerine-5b0ee8ec304663898073b7a4c659e1def23716df.tar.zst go-tangerine-5b0ee8ec304663898073b7a4c659e1def23716df.zip |
core, eth, trie: fix data races and merge/review issues
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r-- | eth/downloader/queue.go | 178 |
1 files changed, 120 insertions, 58 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 17fbb1c7f..56b46e285 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -56,9 +56,8 @@ type fetchRequest struct { Time time.Time // Time when the request was made } -// fetchResult is the assembly collecting partial results from potentially more -// than one fetcher routines, until all outstanding retrievals complete and the -// result as a whole can be processed. +// fetchResult is a struct collecting partial results from data fetchers until +// all outstanding pieces complete and the result as a whole can be processed. type fetchResult struct { Pending int // Number of data fetches still pending @@ -89,7 +88,7 @@ type queue struct { receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches - stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritized retrieval order + stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritised retrieval order stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations @@ -97,10 +96,10 @@ type queue struct { stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator stateProcessors int32 // [eth/63] Number of currently running state processors - stateSchedLock sync.RWMutex // [eth/63] Lock serializing access to the state scheduler + stateSchedLock sync.RWMutex // [eth/63] Lock serialising access to the state scheduler resultCache []*fetchResult // Downloaded but not yet delivered fetch results - resultOffset uint64 // Offset of the first cached fetch result in the block-chain + resultOffset uint64 // Offset of the first cached fetch result in the block chain lock sync.RWMutex } @@ -131,6 +130,9 @@ func (q *queue) Reset() { q.lock.Lock() defer q.lock.Unlock() + q.stateSchedLock.Lock() + defer q.stateSchedLock.Unlock() + q.mode = FullSync q.fastSyncPivot = 0 @@ -233,9 +235,17 @@ func (q *queue) Idle() bool { return (queued + pending + cached) == 0 } -// ThrottleBlocks checks if the download should be throttled (active block (body) +// FastSyncPivot retrieves the currently used fast sync pivot point. +func (q *queue) FastSyncPivot() uint64 { + q.lock.RLock() + defer q.lock.RUnlock() + + return q.fastSyncPivot +} + +// ShouldThrottleBlocks checks if the download should be throttled (active block (body) // fetches exceed block cache). -func (q *queue) ThrottleBlocks() bool { +func (q *queue) ShouldThrottleBlocks() bool { q.lock.RLock() defer q.lock.RUnlock() @@ -248,9 +258,9 @@ func (q *queue) ThrottleBlocks() bool { return pending >= len(q.resultCache)-len(q.blockDonePool) } -// ThrottleReceipts checks if the download should be throttled (active receipt +// ShouldThrottleReceipts checks if the download should be throttled (active receipt // fetches exceed block cache). -func (q *queue) ThrottleReceipts() bool { +func (q *queue) ShouldThrottleReceipts() bool { q.lock.RLock() defer q.lock.RUnlock() @@ -269,7 +279,7 @@ func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash { q.lock.Lock() defer q.lock.Unlock() - // Insert all the hashes prioritized in the arrival order + // Insert all the hashes prioritised in the arrival order inserts := make([]common.Hash, 0, len(hashes)) for _, hash := range hashes { // Skip anything we already have @@ -297,10 +307,10 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { q.lock.Lock() defer q.lock.Unlock() - // Insert all the headers prioritized by the contained block number + // Insert all the headers prioritised by the contained block number inserts := make([]*types.Header, 0, len(headers)) for _, header := range headers { - // Make sure chain order is honored and preserved throughout + // Make sure chain order is honoured and preserved throughout hash := header.Hash() if header.Number == nil || header.Number.Uint64() != from { glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from) @@ -347,19 +357,29 @@ func (q *queue) GetHeadResult() *fetchResult { q.lock.RLock() defer q.lock.RUnlock() + // If there are no results pending, return nil if len(q.resultCache) == 0 || q.resultCache[0] == nil { return nil } + // If the next result is still incomplete, return nil if q.resultCache[0].Pending > 0 { return nil } + // If the next result is the fast sync pivot... if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot { + // If the pivot state trie is still being pulled, return nil if len(q.stateTaskPool) > 0 { return nil } if q.PendingNodeData() > 0 { return nil } + // If the state is done, but not enough post-pivot headers were verified, stall... + for i := 0; i < fsHeaderForceVerify; i++ { + if i+1 >= len(q.resultCache) || q.resultCache[i+1] == nil { + return nil + } + } } return q.resultCache[0] } @@ -372,7 +392,7 @@ func (q *queue) TakeResults() []*fetchResult { // Accumulate all available results results := []*fetchResult{} - for _, result := range q.resultCache { + for i, result := range q.resultCache { // Stop if no more results are ready if result == nil || result.Pending > 0 { break @@ -385,6 +405,16 @@ func (q *queue) TakeResults() []*fetchResult { if q.PendingNodeData() > 0 { break } + // Even is state fetch is done, ensure post-pivot headers passed verifications + safe := true + for j := 0; j < fsHeaderForceVerify; j++ { + if i+j+1 >= len(q.resultCache) || q.resultCache[i+j+1] == nil { + safe = false + } + } + if !safe { + break + } } // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 { @@ -411,6 +441,9 @@ func (q *queue) TakeResults() []*fetchResult { // ReserveBlocks reserves a set of block hashes for the given peer, skipping any // previously failed download. func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest { + q.lock.Lock() + defer q.lock.Unlock() + return q.reserveHashes(p, count, q.hashQueue, nil, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool)) } @@ -430,17 +463,21 @@ func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest { } } } + q.lock.Lock() + defer q.lock.Unlock() + return q.reserveHashes(p, count, q.stateTaskQueue, generator, q.statePendPool, count) } // reserveHashes reserves a set of hashes for the given peer, skipping previously // failed ones. +// +// Note, this method expects the queue lock to be already held for writing. The +// reason the lock is not obtained in here is because the parameters already need +// to access the queue, so they already need a lock anyway. func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGen func(int), pendPool map[string]*fetchRequest, maxPending int) *fetchRequest { - q.lock.Lock() - defer q.lock.Unlock() - - // Short circuit if the peer's already downloading something (sanity check not - // to corrupt state) + // Short circuit if the peer's already downloading something (sanity check to + // not corrupt state) if _, ok := pendPool[p.id]; ok { return nil } @@ -492,30 +529,37 @@ func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGe // previously failed downloads. Beside the next batch of needed fetches, it also // returns a flag whether empty blocks were queued requiring processing. func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) { - noop := func(header *types.Header) bool { + isNoop := func(header *types.Header) bool { return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash } - return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop) + q.lock.Lock() + defer q.lock.Unlock() + + return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, isNoop) } // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping // any previously failed downloads. Beside the next batch of needed fetches, it // also returns a flag whether empty receipts were queued requiring importing. func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) { - noop := func(header *types.Header) bool { + isNoop := func(header *types.Header) bool { return header.ReceiptHash == types.EmptyRootHash } - return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop) + q.lock.Lock() + defer q.lock.Unlock() + + return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, isNoop) } // reserveHeaders reserves a set of data download operations for a given peer, // skipping any previously failed ones. This method is a generic version used // by the individual special reservation functions. +// +// Note, this method expects the queue lock to be already held for writing. The +// reason the lock is not obtained in here is because the parameters already need +// to access the queue, so they already need a lock anyway. func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, - pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, noop func(*types.Header) bool) (*fetchRequest, bool, error) { - q.lock.Lock() - defer q.lock.Unlock() - + pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) { // Short circuit if the pool has been depleted, or if the peer's already // downloading something (sanity check not to corrupt state) if taskQueue.Empty() { @@ -537,7 +581,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ { header := taskQueue.PopItem().(*types.Header) - // If we're the first to request this task, initialize the result container + // If we're the first to request this task, initialise the result container index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 { return nil, false, errInvalidChain @@ -553,7 +597,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ } } // If this fetch task is a noop, skip this fetch operation - if noop(header) { + if isNoop(header) { donePool[header.Hash()] = struct{}{} delete(taskPool, header.Hash()) @@ -562,7 +606,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ progress = true continue } - // Otherwise if not a known unknown block, add to the retrieve list + // Otherwise unless the peer is known not to have the data, add to the retrieve list if p.ignored.Has(header.Hash()) { skip = append(skip, header) } else { @@ -655,35 +699,48 @@ func (q *queue) Revoke(peerId string) { } // ExpireBlocks checks for in flight requests that exceeded a timeout allowance, -// canceling them and returning the responsible peers for penalization. +// canceling them and returning the responsible peers for penalisation. func (q *queue) ExpireBlocks(timeout time.Duration) []string { + q.lock.Lock() + defer q.lock.Unlock() + return q.expire(timeout, q.blockPendPool, q.hashQueue, blockTimeoutMeter) } // ExpireBodies checks for in flight block body requests that exceeded a timeout -// allowance, canceling them and returning the responsible peers for penalization. +// allowance, canceling them and returning the responsible peers for penalisation. func (q *queue) ExpireBodies(timeout time.Duration) []string { + q.lock.Lock() + defer q.lock.Unlock() + return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter) } // ExpireReceipts checks for in flight receipt requests that exceeded a timeout -// allowance, canceling them and returning the responsible peers for penalization. +// allowance, canceling them and returning the responsible peers for penalisation. func (q *queue) ExpireReceipts(timeout time.Duration) []string { + q.lock.Lock() + defer q.lock.Unlock() + return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter) } // ExpireNodeData checks for in flight node data requests that exceeded a timeout -// allowance, canceling them and returning the responsible peers for penalization. +// allowance, canceling them and returning the responsible peers for penalisation. func (q *queue) ExpireNodeData(timeout time.Duration) []string { + q.lock.Lock() + defer q.lock.Unlock() + return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter) } // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. +// +// Note, this method expects the queue lock to be already held for writing. The +// reason the lock is not obtained in here is because the parameters already need +// to access the queue, so they already need a lock anyway. func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { - q.lock.Lock() - defer q.lock.Unlock() - // Iterate over the expired requests and return each to the queue peers := []string{} for id, request := range pendPool { @@ -764,7 +821,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock): return errs[0] - case len(errs) == len(request.Headers): + case len(errs) == len(blocks): return errStaleDelivery default: @@ -774,6 +831,9 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { // DeliverBodies injects a block body retrieval response into the results queue. func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { + q.lock.Lock() + defer q.lock.Unlock() + reconstruct := func(header *types.Header, index int, result *fetchResult) error { if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash { return errInvalidBody @@ -787,6 +847,9 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLi // DeliverReceipts injects a receipt retrieval response into the results queue. func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error { + q.lock.Lock() + defer q.lock.Unlock() + reconstruct := func(header *types.Header, index int, result *fetchResult) error { if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash { return errInvalidReceipt @@ -798,11 +861,12 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error } // deliver injects a data retrieval response into the results queue. +// +// Note, this method expects the queue lock to be already held for writing. The +// reason the lock is not obtained in here is because the parameters already need +// to access the queue, so they already need a lock anyway. func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error { - q.lock.Lock() - defer q.lock.Unlock() - // Short circuit if the data was never requested request := pendPool[id] if request == nil { @@ -818,7 +882,10 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } } // Assemble each of the results with their headers and retrieved data parts - errs := make([]error, 0) + var ( + failure error + useful bool + ) for i, header := range request.Headers { // Short circuit assembly if no more fetch results are found if i >= results { @@ -827,15 +894,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ // Reconstruct the next result if contents match up index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil { - errs = []error{errInvalidChain} + failure = errInvalidChain break } if err := reconstruct(header, i, q.resultCache[index]); err != nil { - errs = []error{err} + failure = err break } donePool[header.Hash()] = struct{}{} q.resultCache[index].Pending-- + useful = true // Clean up a successful fetch request.Headers[i] = nil @@ -847,19 +915,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ taskQueue.Push(header, -float32(header.Number.Uint64())) } } - // If none of the blocks were good, it's a stale delivery + // If none of the data was good, it's a stale delivery switch { - case len(errs) == 0: - return nil - - case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBody || errs[0] == errInvalidReceipt): - return errs[0] + case failure == nil || failure == errInvalidChain: + return failure - case len(errs) == len(request.Headers): - return errStaleDelivery + case useful: + return fmt.Errorf("partial failure: %v", failure) default: - return fmt.Errorf("multiple failures: %v", errs) + return errStaleDelivery } } @@ -876,7 +941,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i stateReqTimer.UpdateSince(request.Time) delete(q.statePendPool, id) - // If no data was retrieved, mark them as unavailable for the origin peer + // If no data was retrieved, mark their hashes as unavailable for the origin peer if len(data) == 0 { for hash, _ := range request.Hashes { request.Peer.ignored.Add(hash) @@ -955,9 +1020,6 @@ func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) { if q.resultOffset < offset { q.resultOffset = offset } - q.fastSyncPivot = 0 - if mode == FastSync { - q.fastSyncPivot = pivot - } + q.fastSyncPivot = pivot q.mode = mode } |