diff options
author | Péter Szilágyi <peterke@gmail.com> | 2015-10-06 00:37:56 +0800 |
---|---|---|
committer | Péter Szilágyi <peterke@gmail.com> | 2015-10-19 15:03:09 +0800 |
commit | ab27bee25a845be90bd60e774ff68d2ea1501772 (patch) | |
tree | 44d6a980fabd4cb065abe333e93a1088a3502466 /eth/downloader/queue.go | |
parent | 832b37c8221e330896c36eb419d92af6b1fdc9dd (diff) | |
download | dexon-ab27bee25a845be90bd60e774ff68d2ea1501772.tar.gz dexon-ab27bee25a845be90bd60e774ff68d2ea1501772.tar.zst dexon-ab27bee25a845be90bd60e774ff68d2ea1501772.zip |
core, eth, trie: direct state trie synchronization
Diffstat (limited to 'eth/downloader/queue.go')
-rw-r--r-- | eth/downloader/queue.go | 271 |
1 files changed, 216 insertions, 55 deletions
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index c53ad939e..942ed0d63 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -26,9 +26,13 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/trie" "github.com/rcrowley/go-metrics" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) @@ -39,13 +43,14 @@ var ( var ( errNoFetchesPending = errors.New("no fetches pending") + errStateSyncPending = errors.New("state trie sync already scheduled") errStaleDelivery = errors.New("stale delivery") ) // fetchRequest is a currently running data retrieval operation. type fetchRequest struct { Peer *peer // Peer to which the request was sent - Hashes map[common.Hash]int // [eth/61] Requested block with their insertion index (priority) + Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) Headers []*types.Header // [eth/62] Requested headers, sorted by request order Time time.Time // Time when the request was made } @@ -64,6 +69,9 @@ type fetchResult struct { // queue represents hashes that are either need fetching or are being fetched type queue struct { + mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching + fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode + hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority) hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order @@ -80,15 +88,22 @@ type queue struct { receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches + stateTaskIndex int // [eth/63] Counter indexing the added hashes to ensure prioritized retrieval order + stateTaskPool map[common.Hash]int // [eth/63] Pending node data retrieval tasks, mapping to their priority + stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for + statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations + + stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly + stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator + resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block-chain - resultParts int // Number of fetch components required to complete an item lock sync.RWMutex } // newQueue creates a new download queue for scheduling block retrieval. -func newQueue() *queue { +func newQueue(stateDb ethdb.Database) *queue { return &queue{ hashPool: make(map[common.Hash]int), hashQueue: prque.New(), @@ -100,6 +115,10 @@ func newQueue() *queue { receiptTaskQueue: prque.New(), receiptPendPool: make(map[string]*fetchRequest), receiptDonePool: make(map[common.Hash]struct{}), + stateTaskPool: make(map[common.Hash]int), + stateTaskQueue: prque.New(), + statePendPool: make(map[string]*fetchRequest), + stateDatabase: stateDb, resultCache: make([]*fetchResult, blockCacheLimit), } } @@ -109,6 +128,9 @@ func (q *queue) Reset() { q.lock.Lock() defer q.lock.Unlock() + q.mode = FullSync + q.fastSyncPivot = 0 + q.hashPool = make(map[common.Hash]int) q.hashQueue.Reset() q.hashCounter = 0 @@ -125,9 +147,14 @@ func (q *queue) Reset() { q.receiptPendPool = make(map[string]*fetchRequest) q.receiptDonePool = make(map[common.Hash]struct{}) + q.stateTaskIndex = 0 + q.stateTaskPool = make(map[common.Hash]int) + q.stateTaskQueue.Reset() + q.statePendPool = make(map[string]*fetchRequest) + q.stateScheduler = nil + q.resultCache = make([]*fetchResult, blockCacheLimit) q.resultOffset = 0 - q.resultParts = 0 } // PendingBlocks retrieves the number of block (body) requests pending for retrieval. @@ -146,12 +173,20 @@ func (q *queue) PendingReceipts() int { return q.receiptTaskQueue.Size() } +// PendingNodeData retrieves the number of node data entries pending for retrieval. +func (q *queue) PendingNodeData() int { + q.lock.RLock() + defer q.lock.RUnlock() + + return q.stateTaskQueue.Size() +} + // InFlight retrieves the number of fetch requests currently in flight. func (q *queue) InFlight() int { q.lock.RLock() defer q.lock.RUnlock() - return len(q.blockPendPool) + len(q.receiptPendPool) + return len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) } // Idle returns if the queue is fully idle or has some data still inside. This @@ -160,8 +195,8 @@ func (q *queue) Idle() bool { q.lock.RLock() defer q.lock.RUnlock() - queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() - pending := len(q.blockPendPool) + len(q.receiptPendPool) + queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() + pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) cached := len(q.blockDonePool) + len(q.receiptDonePool) return (queued + pending + cached) == 0 @@ -227,7 +262,7 @@ func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash { // Schedule adds a set of headers for the download queue for scheduling, returning // the new headers encountered. -func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) []*types.Header { +func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { q.lock.Lock() defer q.lock.Unlock() @@ -256,10 +291,21 @@ func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) [] // Queue the header for content retrieval q.blockTaskPool[hash] = header q.blockTaskQueue.Push(header, -float32(header.Number.Uint64())) - if receipts { + + if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot { + // Fast phase of the fast sync, retrieve receipts too q.receiptTaskPool[hash] = header q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64())) } + if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot { + // Pivoting point of the fast sync, retrieve the state tries + q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase) + for _, hash := range q.stateScheduler.Missing(0) { + q.stateTaskPool[hash] = q.stateTaskIndex + q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) + q.stateTaskIndex++ + } + } inserts = append(inserts, header) q.headerHead = hash from++ @@ -279,6 +325,9 @@ func (q *queue) GetHeadResult() *fetchResult { if q.resultCache[0].Pending > 0 { return nil } + if q.mode == FastSync && q.resultCache[0].Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 { + return nil + } return q.resultCache[0] } @@ -291,9 +340,18 @@ func (q *queue) TakeResults() []*fetchResult { // Accumulate all available results results := []*fetchResult{} for _, result := range q.resultCache { + // Stop if no more results are ready if result == nil || result.Pending > 0 { break } + // The fast sync pivot block may only be processed after state fetch completes + if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot && len(q.stateTaskPool) > 0 { + break + } + // If we've just inserted the fast sync pivot, stop as the following batch needs different insertion + if q.mode == FastSync && result.Header.Number.Uint64() == q.fastSyncPivot+1 && len(results) > 0 { + break + } results = append(results, result) hash := result.Header.Hash() @@ -312,31 +370,45 @@ func (q *queue) TakeResults() []*fetchResult { return results } -// Reserve61 reserves a set of hashes for the given peer, skipping any previously -// failed download. -func (q *queue) Reserve61(p *peer, count int) *fetchRequest { +// ReserveBlocks reserves a set of block hashes for the given peer, skipping any +// previously failed download. +func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest { + return q.reserveHashes(p, count, q.hashQueue, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool)) +} + +// ReserveNodeData reserves a set of node data hashes for the given peer, skipping +// any previously failed download. +func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest { + return q.reserveHashes(p, count, q.stateTaskQueue, q.statePendPool, 0) +} + +// reserveHashes reserves a set of hashes for the given peer, skipping previously +// failed ones. +func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, maxPending int) *fetchRequest { q.lock.Lock() defer q.lock.Unlock() // Short circuit if the pool has been depleted, or if the peer's already // downloading something (sanity check not to corrupt state) - if q.hashQueue.Empty() { + if taskQueue.Empty() { return nil } - if _, ok := q.blockPendPool[p.id]; ok { + if _, ok := pendPool[p.id]; ok { return nil } // Calculate an upper limit on the hashes we might fetch (i.e. throttling) - space := len(q.resultCache) - len(q.blockDonePool) - for _, request := range q.blockPendPool { - space -= len(request.Hashes) + allowance := maxPending + if allowance > 0 { + for _, request := range pendPool { + allowance -= len(request.Hashes) + } } // Retrieve a batch of hashes, skipping previously failed ones send := make(map[common.Hash]int) skip := make(map[common.Hash]int) - for proc := 0; proc < space && len(send) < count && !q.hashQueue.Empty(); proc++ { - hash, priority := q.hashQueue.Pop() + for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ { + hash, priority := taskQueue.Pop() if p.ignored.Has(hash) { skip[hash.(common.Hash)] = int(priority) } else { @@ -345,7 +417,7 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest { } // Merge all the skipped hashes back for hash, index := range skip { - q.hashQueue.Push(hash, float32(index)) + taskQueue.Push(hash, float32(index)) } // Assemble and return the block download request if len(send) == 0 { @@ -356,19 +428,19 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest { Hashes: send, Time: time.Now(), } - q.blockPendPool[p.id] = request + pendPool[p.id] = request return request } -// ReserveBlocks reserves a set of body fetches for the given peer, skipping any +// ReserveBodies reserves a set of body fetches for the given peer, skipping any // previously failed downloads. Beside the next batch of needed fetches, it also // returns a flag whether empty blocks were queued requiring processing. -func (q *queue) ReserveBlocks(p *peer, count int) (*fetchRequest, bool, error) { +func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) { noop := func(header *types.Header) bool { return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash } - return q.reserveFetch(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop) + return q.reserveHeaders(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop) } // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping @@ -378,13 +450,13 @@ func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) noop := func(header *types.Header) bool { return header.ReceiptHash == types.EmptyRootHash } - return q.reserveFetch(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop) + return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop) } -// reserveFetch reserves a set of data download operations for a given peer, +// reserveHeaders reserves a set of data download operations for a given peer, // skipping any previously failed ones. This method is a generic version used // by the individual special reservation functions. -func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, +func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, noop func(*types.Header) bool) (*fetchRequest, bool, error) { q.lock.Lock() defer q.lock.Unlock() @@ -416,8 +488,12 @@ func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types return nil, false, errInvalidChain } if q.resultCache[index] == nil { + components := 1 + if q.mode == FastSync && header.Number.Uint64() <= q.fastSyncPivot { + components = 2 + } q.resultCache[index] = &fetchResult{ - Pending: q.resultParts, + Pending: components, Header: header, } } @@ -456,30 +532,36 @@ func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types return request, progress, nil } -// Cancel61 aborts a fetch request, returning all pending hashes to the queue. -func (q *queue) Cancel61(request *fetchRequest) { - q.cancel(request, nil, q.blockPendPool) +// CancelBlocks aborts a fetch request, returning all pending hashes to the queue. +func (q *queue) CancelBlocks(request *fetchRequest) { + q.cancel(request, q.hashQueue, q.blockPendPool) } -// CancelBlocks aborts a body fetch request, returning all pending hashes to the +// CancelBodies aborts a body fetch request, returning all pending headers to the // task queue. -func (q *queue) CancelBlocks(request *fetchRequest) { +func (q *queue) CancelBodies(request *fetchRequest) { q.cancel(request, q.blockTaskQueue, q.blockPendPool) } -// CancelReceipts aborts a body fetch request, returning all pending hashes to +// CancelReceipts aborts a body fetch request, returning all pending headers to // the task queue. func (q *queue) CancelReceipts(request *fetchRequest) { q.cancel(request, q.receiptTaskQueue, q.receiptPendPool) } +// CancelNodeData aborts a node state data fetch request, returning all pending +// hashes to the task queue. +func (q *queue) CancelNodeData(request *fetchRequest) { + q.cancel(request, q.stateTaskQueue, q.statePendPool) +} + // Cancel aborts a fetch request, returning all pending hashes to the task queue. func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { q.lock.Lock() defer q.lock.Unlock() for hash, index := range request.Hashes { - q.hashQueue.Push(hash, float32(index)) + taskQueue.Push(hash, float32(index)) } for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) @@ -509,29 +591,41 @@ func (q *queue) Revoke(peerId string) { } delete(q.receiptPendPool, peerId) } + if request, ok := q.statePendPool[peerId]; ok { + for hash, index := range request.Hashes { + q.stateTaskQueue.Push(hash, float32(index)) + } + delete(q.statePendPool, peerId) + } } -// Expire61 checks for in flight requests that exceeded a timeout allowance, +// ExpireBlocks checks for in flight requests that exceeded a timeout allowance, // canceling them and returning the responsible peers for penalization. -func (q *queue) Expire61(timeout time.Duration) []string { - return q.expire(timeout, q.blockPendPool, nil) +func (q *queue) ExpireBlocks(timeout time.Duration) []string { + return q.expire(timeout, q.blockPendPool, q.hashQueue, blockTimeoutMeter) } -// ExpireBlocks checks for in flight block body requests that exceeded a timeout +// ExpireBodies checks for in flight block body requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalization. -func (q *queue) ExpireBlocks(timeout time.Duration) []string { - return q.expire(timeout, q.blockPendPool, q.blockTaskQueue) +func (q *queue) ExpireBodies(timeout time.Duration) []string { + return q.expire(timeout, q.blockPendPool, q.blockTaskQueue, bodyTimeoutMeter) } // ExpireReceipts checks for in flight receipt requests that exceeded a timeout // allowance, canceling them and returning the responsible peers for penalization. func (q *queue) ExpireReceipts(timeout time.Duration) []string { - return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue) + return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue, receiptTimeoutMeter) +} + +// ExpireNodeData checks for in flight node data requests that exceeded a timeout +// allowance, canceling them and returning the responsible peers for penalization. +func (q *queue) ExpireNodeData(timeout time.Duration) []string { + return q.expire(timeout, q.statePendPool, q.stateTaskQueue, stateTimeoutMeter) } // expire is the generic check that move expired tasks from a pending pool back // into a task pool, returning all entities caught with expired tasks. -func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque) []string { +func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) []string { q.lock.Lock() defer q.lock.Unlock() @@ -540,14 +634,11 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, for id, request := range pendPool { if time.Since(request.Time) > timeout { // Update the metrics with the timeout - if len(request.Hashes) > 0 { - blockTimeoutMeter.Mark(1) - } else { - bodyTimeoutMeter.Mark(1) - } + timeoutMeter.Mark(1) + // Return any non satisfied requests to the pool for hash, index := range request.Hashes { - q.hashQueue.Push(hash, float32(index)) + taskQueue.Push(hash, float32(index)) } for _, header := range request.Headers { taskQueue.Push(header, -float32(header.Number.Uint64())) @@ -562,8 +653,8 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, return peers } -// Deliver61 injects a block retrieval response into the download queue. -func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) { +// DeliverBlocks injects a block retrieval response into the download queue. +func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error { q.lock.Lock() defer q.lock.Unlock() @@ -626,8 +717,8 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) { } } -// DeliverBlocks injects a block (body) retrieval response into the results queue. -func (q *queue) DeliverBlocks(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { +// DeliverBodies injects a block body retrieval response into the results queue. +func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error { reconstruct := func(header *types.Header, index int, result *fetchResult) error { if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash { return errInvalidBody @@ -717,14 +808,84 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ } } +// DeliverNodeData injects a node state data retrieval response into the queue. +func (q *queue) DeliverNodeData(id string, data [][]byte) (int, int, error) { + q.lock.Lock() + defer q.lock.Unlock() + + // Short circuit if the data was never requested + request := q.statePendPool[id] + if request == nil { + return 0, 0, errNoFetchesPending + } + stateReqTimer.UpdateSince(request.Time) + delete(q.statePendPool, id) + + // If no data was retrieved, mark them as unavailable for the origin peer + if len(data) == 0 { + for hash, _ := range request.Hashes { + request.Peer.ignored.Add(hash) + } + } + // Iterate over the downloaded data and verify each of them + errs := make([]error, 0) + processed := 0 + for _, blob := range data { + // Skip any blocks that were not requested + hash := common.BytesToHash(crypto.Sha3(blob)) + if _, ok := request.Hashes[hash]; !ok { + errs = append(errs, fmt.Errorf("non-requested state data %x", hash)) + continue + } + // Inject the next state trie item into the database + if err := q.stateScheduler.Process([]trie.SyncResult{{hash, blob}}); err != nil { + errs = []error{err} + break + } + processed++ + + delete(request.Hashes, hash) + delete(q.stateTaskPool, hash) + } + // Return all failed or missing fetches to the queue + for hash, index := range request.Hashes { + q.stateTaskQueue.Push(hash, float32(index)) + } + // Also enqueue any newly required state trie nodes + discovered := 0 + if len(q.stateTaskPool) < maxQueuedStates { + for _, hash := range q.stateScheduler.Missing(4 * MaxStateFetch) { + q.stateTaskPool[hash] = q.stateTaskIndex + q.stateTaskQueue.Push(hash, -float32(q.stateTaskIndex)) + q.stateTaskIndex++ + discovered++ + } + } + // If none of the data items were good, it's a stale delivery + switch { + case len(errs) == 0: + return processed, discovered, nil + + case len(errs) == len(request.Hashes): + return processed, discovered, errStaleDelivery + + default: + return processed, discovered, fmt.Errorf("multiple failures: %v", errs) + } +} + // Prepare configures the result cache to allow accepting and caching inbound // fetch results. -func (q *queue) Prepare(offset uint64, parts int) { +func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64) { q.lock.Lock() defer q.lock.Unlock() if q.resultOffset < offset { q.resultOffset = offset } - q.resultParts = parts + q.fastSyncPivot = 0 + if mode == FastSync { + q.fastSyncPivot = pivot + } + q.mode = mode } |