diff options
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r-- | eth/downloader/queue.go | 239 |
1 files changed, 207 insertions, 32 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 96e08e144..a527414ff 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -43,16 +43,20 @@ var ( // fetchRequest is a currently running block retrieval operation. type fetchRequest struct { - Peer *peer // Peer to which the request was sent - Hashes map[common.Hash]int // Requested hashes with their insertion index (priority) - Time time.Time // Time when the request was made + Peer *peer // Peer to which the request was sent + Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) + Headers []*types.Header // [eth/62] Requested headers, sorted by request order + Time time.Time // Time when the request was made } // queue represents hashes that are either need fetching or are being fetched type queue struct { - hashPool map[common.Hash]int // Pending hashes, mapping to their insertion index (priority) - hashQueue *prque.Prque // Priority queue of the block hashes to fetch - hashCounter int // Counter indexing the added hashes to ensure retrieval order + hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority) + hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch + hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order + + headerPool map[common.Hash]*types.Header // [eth/62] Pending headers, mapping from their hashes + headerQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the bodies for pendPool map[string]*fetchRequest // Currently pending block retrieval operations @@ -66,11 +70,13 @@ type queue struct { // newQueue creates a new download queue for scheduling block retrieval. func newQueue() *queue { return &queue{ - hashPool: make(map[common.Hash]int), - hashQueue: prque.New(), - pendPool: make(map[string]*fetchRequest), - blockPool: make(map[common.Hash]uint64), - blockCache: make([]*Block, blockCacheLimit), + hashPool: make(map[common.Hash]int), + hashQueue: prque.New(), + headerPool: make(map[common.Hash]*types.Header), + headerQueue: prque.New(), + pendPool: make(map[string]*fetchRequest), + blockPool: make(map[common.Hash]uint64), + blockCache: make([]*Block, blockCacheLimit), } } @@ -83,6 +89,9 @@ func (q *queue) Reset() { q.hashQueue.Reset() q.hashCounter = 0 + q.headerPool = make(map[common.Hash]*types.Header) + q.headerQueue.Reset() + q.pendPool = make(map[string]*fetchRequest) q.blockPool = make(map[common.Hash]uint64) @@ -90,21 +99,21 @@ func (q *queue) Reset() { q.blockCache = make([]*Block, blockCacheLimit) } -// Size retrieves the number of hashes in the queue, returning separately for +// Size retrieves the number of blocks in the queue, returning separately for // pending and already downloaded. func (q *queue) Size() (int, int) { q.lock.RLock() defer q.lock.RUnlock() - return len(q.hashPool), len(q.blockPool) + return len(q.hashPool) + len(q.headerPool), len(q.blockPool) } -// Pending retrieves the number of hashes pending for retrieval. +// Pending retrieves the number of blocks pending for retrieval. func (q *queue) Pending() int { q.lock.RLock() defer q.lock.RUnlock() - return q.hashQueue.Size() + return q.hashQueue.Size() + q.headerQueue.Size() } // InFlight retrieves the number of fetch requests currently in flight. @@ -124,7 +133,7 @@ func (q *queue) Throttle() bool { // Calculate the currently in-flight block requests pending := 0 for _, request := range q.pendPool { - pending += len(request.Hashes) + pending += len(request.Hashes) + len(request.Headers) } // Throttle if more blocks are in-flight than free space in the cache return pending >= len(q.blockCache)-len(q.blockPool) @@ -138,15 +147,18 @@ func (q *queue) Has(hash common.Hash) bool { if _, ok := q.hashPool[hash]; ok { return true } + if _, ok := q.headerPool[hash]; ok { + return true + } if _, ok := q.blockPool[hash]; ok { return true } return false } -// Insert adds a set of hashes for the download queue for scheduling, returning +// Insert61 adds a set of hashes for the download queue for scheduling, returning // the new hashes encountered. -func (q *queue) Insert(hashes []common.Hash, fifo bool) []common.Hash { +func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash { q.lock.Lock() defer q.lock.Unlock() @@ -172,6 +184,29 @@ func (q *queue) Insert(hashes []common.Hash, fifo bool) []common.Hash { return inserts } +// Insert adds a set of headers for the download queue for scheduling, returning +// the new headers encountered. +func (q *queue) Insert(headers []*types.Header) []*types.Header { + q.lock.Lock() + defer q.lock.Unlock() + + // Insert all the headers prioritized by the contained block number + inserts := make([]*types.Header, 0, len(headers)) + for _, header := range headers { + // Make sure no duplicate requests are executed + hash := header.Hash() + if _, ok := q.headerPool[hash]; ok { + glog.V(logger.Warn).Infof("Header %x already scheduled", hash) + continue + } + // Queue the header for body retrieval + inserts = append(inserts, header) + q.headerPool[hash] = header + q.headerQueue.Push(header, -float32(header.Number.Uint64())) + } + return inserts +} + // GetHeadBlock retrieves the first block from the cache, or nil if it hasn't // been downloaded yet (or simply non existent). func (q *queue) GetHeadBlock() *Block { @@ -227,9 +262,9 @@ func (q *queue) TakeBlocks() []*Block { return blocks } -// Reserve reserves a set of hashes for the given peer, skipping any previously +// Reserve61 reserves a set of hashes for the given peer, skipping any previously // failed download. -func (q *queue) Reserve(p *peer, count int) *fetchRequest { +func (q *queue) Reserve61(p *peer, count int) *fetchRequest { q.lock.Lock() defer q.lock.Unlock() @@ -276,6 +311,68 @@ func (q *queue) Reserve(p *peer, count int) *fetchRequest { return request } +// Reserve reserves a set of headers for the given peer, skipping any previously +// failed download. Beside the next batch of needed fetches, it also returns a +// flag whether empty blocks were queued requiring processing. +func (q *queue) Reserve(p *peer, count int) (*fetchRequest, bool, error) { + q.lock.Lock() + defer q.lock.Unlock() + + // Short circuit if the pool has been depleted, or if the peer's already + // downloading something (sanity check not to corrupt state) + if q.headerQueue.Empty() { + return nil, false, nil + } + if _, ok := q.pendPool[p.id]; ok { + return nil, false, nil + } + // Calculate an upper limit on the bodies we might fetch (i.e. throttling) + space := len(q.blockCache) - len(q.blockPool) + for _, request := range q.pendPool { + space -= len(request.Headers) + } + // Retrieve a batch of headers, skipping previously failed ones + send := make([]*types.Header, 0, count) + skip := make([]*types.Header, 0) + + process := false + for proc := 0; proc < space && len(send) < count && !q.headerQueue.Empty(); proc++ { + header := q.headerQueue.PopItem().(*types.Header) + + // If the header defines an empty block, deliver straight + if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { + if err := q.enqueue("", types.NewBlockWithHeader(header)); err != nil { + return nil, false, errInvalidChain + } + delete(q.headerPool, header.Hash()) + process, space, proc = true, space-1, proc-1 + continue + } + // If it's a content block, add to the body fetch request + if p.ignored.Has(header.Hash()) { + skip = append(skip, header) + } else { + send = append(send, header) + } + } + // Merge all the skipped headers back + for _, header := range skip { + q.headerQueue.Push(header, -float32(header.Number.Uint64())) + } + // Assemble and return the block download request + if len(send) == 0 { + return nil, process, nil + } + request := &fetchRequest{ + Peer: p, + Headers: send, + Time: time.Now(), + } + q.pendPool[p.id] = request + + return request, process, nil +} + // Cancel aborts a fetch request, returning all pending hashes to the queue. func (q *queue) Cancel(request *fetchRequest) { q.lock.Lock() @@ -284,6 +381,9 @@ func (q *queue) Cancel(request *fetchRequest) { for hash, index := range request.Hashes { q.hashQueue.Push(hash, float32(index)) } + for _, header := range request.Headers { + q.headerQueue.Push(header, -float32(header.Number.Uint64())) + } delete(q.pendPool, request.Peer.id) } @@ -310,8 +410,8 @@ func (q *queue) Expire(timeout time.Duration) []string { return peers } -// Deliver injects a block retrieval response into the download queue. -func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { +// Deliver61 injects a block retrieval response into the download queue. +func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) { q.lock.Lock() defer q.lock.Unlock() @@ -337,19 +437,12 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { errs = append(errs, fmt.Errorf("non-requested block %x", hash)) continue } - // If a requested block falls out of the range, the hash chain is invalid - index := int(int64(block.NumberU64()) - int64(q.blockOffset)) - if index >= len(q.blockCache) || index < 0 { - return errInvalidChain - } - // Otherwise merge the block and mark the hash block - q.blockCache[index] = &Block{ - RawBlock: block, - OriginPeer: id, + // Queue the block up for processing + if err := q.enqueue(id, block); err != nil { + return err } delete(request.Hashes, hash) delete(q.hashPool, hash) - q.blockPool[hash] = block.NumberU64() } // Return all failed or missing fetches to the queue for hash, index := range request.Hashes { @@ -365,6 +458,88 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { return nil } +// Deliver injects a block body retrieval response into the download queue. +func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { + q.lock.Lock() + defer q.lock.Unlock() + + // Short circuit if the block bodies were never requested + request := q.pendPool[id] + if request == nil { + return errNoFetchesPending + } + delete(q.pendPool, id) + + // If no block bodies were retrieved, mark them as unavailable for the origin peer + if len(txLists) == 0 || len(uncleLists) == 0 { + for hash, _ := range request.Headers { + request.Peer.ignored.Add(hash) + } + } + // Assemble each of the block bodies with their headers and queue for processing + errs := make([]error, 0) + for i, header := range request.Headers { + // Short circuit block assembly if no more bodies are found + if i >= len(txLists) || i >= len(uncleLists) { + break + } + // Reconstruct the next block if contents match up + if types.DeriveSha(types.Transactions(txLists[i])) != header.TxHash || types.CalcUncleHash(uncleLists[i]) != header.UncleHash { + errs = []error{errInvalidBody} + break + } + block := types.NewBlockWithHeader(header).WithBody(txLists[i], uncleLists[i]) + + // Queue the block up for processing + if err := q.enqueue(id, block); err != nil { + errs = []error{err} + break + } + request.Headers[i] = nil + delete(q.headerPool, header.Hash()) + } + // Return all failed or missing fetches to the queue + for _, header := range request.Headers { + if header != nil { + q.headerQueue.Push(header, -float32(header.Number.Uint64())) + } + } + // If none of the blocks were good, it's a stale delivery + switch { + case len(errs) == 0: + return nil + + case len(errs) == 1 && errs[0] == errInvalidBody: + return errInvalidBody + + case len(errs) == 1 && errs[0] == errInvalidChain: + return errInvalidChain + + case len(errs) == len(request.Headers): + return errStaleDelivery + + default: + return fmt.Errorf("multiple failures: %v", errs) + } +} + +// enqueue inserts a new block into the final delivery queue, waiting for pickup +// by the processor. +func (q *queue) enqueue(origin string, block *types.Block) error { + // If a requested block falls out of the range, the hash chain is invalid + index := int(int64(block.NumberU64()) - int64(q.blockOffset)) + if index >= len(q.blockCache) || index < 0 { + return errInvalidChain + } + // Otherwise merge the block and mark the hash done + q.blockCache[index] = &Block{ + RawBlock: block, + OriginPeer: origin, + } + q.blockPool[block.Header().Hash()] = block.NumberU64() + return nil +} + // Prepare configures the block cache offset to allow accepting inbound blocks. func (q *queue) Prepare(offset uint64) { q.lock.Lock() |