diff options
Diffstat (limited to 'eth/downloader/statesync.go')
-rw-r--r-- | eth/downloader/statesync.go | 36 |
1 files changed, 15 insertions, 21 deletions
diff --git a/eth/downloader/statesync.go b/eth/downloader/statesync.go index a0b05c9be..9cc65a208 100644 --- a/eth/downloader/statesync.go +++ b/eth/downloader/statesync.go @@ -20,7 +20,6 @@ import ( "fmt" "hash" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -132,7 +131,10 @@ func (d *Downloader) runStateSync(s *stateSync) *stateSync { // Send the next finished request to the current sync: case deliverReqCh <- deliverReq: - finished = append(finished[:0], finished[1:]...) + // Shift out the first request, but also set the emptied slot to nil for GC + copy(finished, finished[1:]) + finished[len(finished)-1] = nil + finished = finished[:len(finished)-1] // Handle incoming state packs: case pack := <-d.stateCh: @@ -291,6 +293,9 @@ func (s *stateSync) loop() error { case <-s.cancel: return errCancelStateFetch + case <-s.d.cancelCh: + return errCancelStateFetch + case req := <-s.deliver: // Response, disconnect or timeout triggered, drop the peer if stalling log.Trace("Received node data response", "peer", req.peer.id, "count", len(req.response), "dropped", req.dropped, "timeout", !req.dropped && req.timedOut()) @@ -301,15 +306,11 @@ func (s *stateSync) loop() error { s.d.dropPeer(req.peer.id) } // Process all the received blobs and check for stale delivery - stale, err := s.process(req) - if err != nil { + if err := s.process(req); err != nil { log.Warn("Node data write error", "err", err) return err } - // The the delivery contains requested data, mark the node idle (otherwise it's a timed out delivery) - if !stale { - req.peer.SetNodeDataIdle(len(req.response)) - } + req.peer.SetNodeDataIdle(len(req.response)) } } return s.commit(true) @@ -349,6 +350,7 @@ func (s *stateSync) assignTasks() { case s.d.trackStateReq <- req: req.peer.FetchNodeData(req.items) case <-s.cancel: + case <-s.d.cancelCh: } } } @@ -387,7 +389,7 @@ func (s *stateSync) fillTasks(n int, req *stateReq) { // process iterates over a batch of delivered state data, injecting each item // into a running state sync, re-queuing any items that were requested but not // delivered. -func (s *stateSync) process(req *stateReq) (bool, error) { +func (s *stateSync) process(req *stateReq) error { // Collect processing stats and update progress if valid data was received duplicate, unexpected := 0, 0 @@ -398,7 +400,7 @@ func (s *stateSync) process(req *stateReq) (bool, error) { }(time.Now()) // Iterate over all the delivered data and inject one-by-one into the trie - progress, stale := false, len(req.response) > 0 + progress := false for _, blob := range req.response { prog, hash, err := s.processNodeData(blob) @@ -412,20 +414,12 @@ func (s *stateSync) process(req *stateReq) (bool, error) { case trie.ErrAlreadyProcessed: duplicate++ default: - return stale, fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) + return fmt.Errorf("invalid state node %s: %v", hash.TerminalString(), err) } - // If the node delivered a requested item, mark the delivery non-stale if _, ok := req.tasks[hash]; ok { delete(req.tasks, hash) - stale = false } } - // If we're inside the critical section, reset fail counter since we progressed. - if progress && atomic.LoadUint32(&s.d.fsPivotFails) > 1 { - log.Trace("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&s.d.fsPivotFails)) - atomic.StoreUint32(&s.d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block - } - // Put unfulfilled tasks back into the retry queue npeers := s.d.peers.Len() for hash, task := range req.tasks { @@ -438,12 +432,12 @@ func (s *stateSync) process(req *stateReq) (bool, error) { // If we've requested the node too many times already, it may be a malicious // sync where nobody has the right data. Abort. if len(task.attempts) >= npeers { - return stale, fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) + return fmt.Errorf("state node %s failed with all peers (%d tries, %d peers)", hash.TerminalString(), len(task.attempts), npeers) } // Missing item, place into the retry queue. s.tasks[hash] = task } - return stale, nil + return nil } // processNodeData tries to inject a trie node data blob delivered from a remote |