diff options
-rw-r--r-- | eth/downloader/downloader.go | 203 | ||||
-rw-r--r-- | eth/downloader/modes.go | 8 | ||||
-rw-r--r-- | eth/downloader/peer.go | 29 | ||||
-rw-r--r-- | eth/downloader/queue.go | 22 |
4 files changed, 137 insertions, 125 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 1fac6156d..f71ecbb43 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -23,7 +23,6 @@ import ( "fmt" "math" "math/big" - "strings" "sync" "sync/atomic" "time" @@ -248,9 +247,10 @@ func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHea getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error { - log.Trace(fmt.Sprint("Registering peer", id)) - if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil { - log.Error(fmt.Sprint("Register failed:", err)) + logger := log.New("peer", id) + logger.Trace("Registering sync peer") + if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData, logger)); err != nil { + logger.Error("Failed to register sync peer", "error", err) return err } d.qosReduceConfidence() @@ -263,9 +263,10 @@ func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHea // the queue. func (d *Downloader) UnregisterPeer(id string) error { // Unregister the peer from the active peer set and revoke any fetch tasks - log.Trace(fmt.Sprint("Unregistering peer", id)) + logger := log.New("peer", id) + logger.Trace("Unregistering sync peer") if err := d.peers.Unregister(id); err != nil { - log.Error(fmt.Sprint("Unregister failed:", err)) + logger.Error("Failed to unregister sync peer", "error", err) return err } d.queue.Revoke(id) @@ -284,24 +285,19 @@ func (d *Downloader) UnregisterPeer(id string) error { // Synchronise tries to sync up our local block chain with a remote peer, both // adding various sanity checks as well as wrapping it with various log entries. func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error { - log.Trace(fmt.Sprintf("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td)) - err := d.synchronise(id, head, td, mode) switch err { case nil: - log.Trace(fmt.Sprintf("Synchronisation completed")) - case errBusy: - log.Trace(fmt.Sprintf("Synchronisation already in progress")) case errTimeout, errBadPeer, errStallingPeer, errEmptyHeaderSet, errPeersUnavailable, errTooOld, errInvalidAncestor, errInvalidChain: - log.Debug(fmt.Sprintf("Removing peer %v: %v", id, err)) + log.Warn("Synchronisation failed, dropping peer", "peer", id, "error", err) d.dropPeer(id) default: - log.Warn(fmt.Sprintf("Synchronisation failed: %v", err)) + log.Warn("Synchronisation failed, retrying", "error", err) } return err } @@ -322,7 +318,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode // Post a user notification of the sync (only once per session) if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { - log.Info(fmt.Sprint("Block synchronisation started")) + log.Info("Block synchronisation started") } // Reset the queue, peer set and wake channels to clean any internal leftover state d.queue.Reset() @@ -387,9 +383,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e return errTooOld } - log.Debug(fmt.Sprintf("Synchronising with the network using: %s [eth/%d]", p.id, p.version)) + log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash.Hex()[2:10], "td", td, "mode", syncModeLabels[d.mode]) defer func(start time.Time) { - log.Debug(fmt.Sprintf("Synchronisation terminated after %v", time.Since(start))) + log.Debug("Synchronisation terminated", "elapsed", time.Since(start)) }(time.Now()) // Look up the sync boundaries: the common ancestor and the target block @@ -437,7 +433,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e origin = 0 } } - log.Debug(fmt.Sprintf("Fast syncing until pivot block #%d", pivot)) + log.Debug("Fast syncing until pivot block", "pivot", pivot) } d.queue.Prepare(origin+1, d.mode, pivot, latest) if d.syncInitHook != nil { @@ -522,13 +518,14 @@ func (d *Downloader) Terminate() { // fetchHeight retrieves the head header of the remote peer to aid in estimating // the total time a pending synchronisation would take. func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { - log.Debug(fmt.Sprintf("%v: retrieving remote chain height", p)) + p.logger.Debug("Retrieving remote chain height") // Request the advertised remote head block and wait for the response head, _ := p.currentHead() go p.getRelHeaders(head, 1, 0, false) - timeout := time.After(d.requestTTL()) + ttl := d.requestTTL() + timeout := time.After(ttl) for { select { case <-d.cancelCh: @@ -537,19 +534,21 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { case packet := <-d.headerCh: // Discard anything not from the origin peer if packet.PeerId() != p.id { - log.Debug(fmt.Sprintf("Received headers from incorrect peer(%s)", packet.PeerId())) + log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) break } // Make sure the peer actually gave something valid headers := packet.(*headerPack).headers if len(headers) != 1 { - log.Debug(fmt.Sprintf("%v: invalid number of head headers: %d != 1", p, len(headers))) + p.logger.Debug("Multiple headers for single request", "headers", len(headers)) return nil, errBadPeer } - return headers[0], nil + head := headers[0] + p.logger.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash().Hex()[2:10]) + return head, nil case <-timeout: - log.Debug(fmt.Sprintf("%v: head header timeout", p)) + p.logger.Debug("Waiting for head header timed out", "elapsed", ttl) return nil, errTimeout case <-d.bodyCh: @@ -566,10 +565,10 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { // In the rare scenario when we ended up on a long reorganisation (i.e. none of // the head links match), we do a binary search to find the common ancestor. func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { - log.Debug(fmt.Sprintf("%v: looking for common ancestor (remote height %d)", p, height)) - // Figure out the valid ancestor range to prevent rewrite attacks floor, ceil := int64(-1), d.headHeader().Number.Uint64() + + p.logger.Debug("Looking for common ancestor", "local", ceil, "remote", height) if d.mode == FullSync { ceil = d.headBlock().NumberU64() } else if d.mode == FastSync { @@ -597,7 +596,9 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} - timeout := time.After(d.requestTTL()) + + ttl := d.requestTTL() + timeout := time.After(ttl) for finished := false; !finished; { select { @@ -607,19 +608,19 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { case packet := <-d.headerCh: // Discard anything not from the origin peer if packet.PeerId() != p.id { - log.Debug(fmt.Sprintf("Received headers from incorrect peer(%s)", packet.PeerId())) + log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) break } // Make sure the peer actually gave something valid headers := packet.(*headerPack).headers if len(headers) == 0 { - log.Warn(fmt.Sprintf("%v: empty head header set", p)) + p.logger.Warn("Empty head header set") return 0, errEmptyHeaderSet } // Make sure the peer's reply conforms to the request for i := 0; i < len(headers); i++ { if number := headers[i].Number.Int64(); number != from+int64(i)*16 { - log.Warn(fmt.Sprintf("%v: head header set (item %d) broke chain ordering: requested %d, got %d", p, i, from+int64(i)*16, number)) + p.logger.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number) return 0, errInvalidChain } } @@ -636,7 +637,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // If every header is known, even future ones, the peer straight out lied about its head if number > height && i == limit-1 { - log.Warn(fmt.Sprintf("%v: lied about chain head: reported %d, found above %d", p, height, number)) + p.logger.Warn("Lied about chain head", "reported", height, "found", number) return 0, errStallingPeer } break @@ -644,7 +645,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { } case <-timeout: - log.Debug(fmt.Sprintf("%v: head header timeout", p)) + p.logger.Debug("Waiting for head header timed out", "elapsed", ttl) return 0, errTimeout case <-d.bodyCh: @@ -656,10 +657,10 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // If the head fetch already found an ancestor, return if !common.EmptyHash(hash) { if int64(number) <= floor { - log.Warn(fmt.Sprintf("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, number, hash[:4], floor)) + p.logger.Warn("Ancestor below allowance", "number", number, "hash", hash.Hex()[2:10], "allowance", floor) return 0, errInvalidAncestor } - log.Debug(fmt.Sprintf("%v: common ancestor: #%d [%x…]", p, number, hash[:4])) + p.logger.Debug("Found common ancestor", "number", number, "hash", hash.Hex()[2:10]) return number, nil } // Ancestor not found, we need to binary search over our chain @@ -671,7 +672,9 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // Split our chain interval in two, and request the hash to cross check check := (start + end) / 2 - timeout := time.After(d.requestTTL()) + ttl := d.requestTTL() + timeout := time.After(ttl) + go p.getAbsHeaders(uint64(check), 1, 0, false) // Wait until a reply arrives to this request @@ -683,13 +686,13 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { case packer := <-d.headerCh: // Discard anything not from the origin peer if packer.PeerId() != p.id { - log.Debug(fmt.Sprintf("Received headers from incorrect peer(%s)", packer.PeerId())) + log.Debug("Received headers from incorrect peer", "peer", packer.PeerId()) break } // Make sure the peer actually gave something valid headers := packer.(*headerPack).headers if len(headers) != 1 { - log.Debug(fmt.Sprintf("%v: invalid search header set (%d)", p, len(headers))) + p.logger.Debug("Multiple headers for single request", "headers", len(headers)) return 0, errBadPeer } arrived = true @@ -701,13 +704,13 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { } header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists if header.Number.Uint64() != check { - log.Debug(fmt.Sprintf("%v: non requested header #%d [%x…], instead of #%d", p, header.Number, header.Hash().Bytes()[:4], check)) + p.logger.Debug("Received non requested header", "number", header.Number, "hash", header.Hash().Hex()[2:10], "request", check) return 0, errBadPeer } start = check case <-timeout: - log.Debug(fmt.Sprintf("%v: search header timeout", p)) + p.logger.Debug("Waiting for search header timed out", "elapsed", ttl) return 0, errTimeout case <-d.bodyCh: @@ -719,10 +722,10 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { } // Ensure valid ancestry and return if int64(start) <= floor { - log.Warn(fmt.Sprintf("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, start, hash[:4], floor)) + p.logger.Warn("Ancestor below allowance", "number", start, "hash", hash.Hex()[2:10], "allowance", floor) return 0, errInvalidAncestor } - log.Debug(fmt.Sprintf("%v: common ancestor: #%d [%x…]", p, start, hash[:4])) + p.logger.Debug("Found common ancestor", "number", start, "hash", hash.Hex()[2:10]) return start, nil } @@ -735,8 +738,8 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // can fill in the skeleton - not even the origin peer - it's assumed invalid and // the origin is dropped. func (d *Downloader) fetchHeaders(p *peer, from uint64) error { - log.Debug(fmt.Sprintf("%v: directing header downloads from #%d", p, from)) - defer log.Debug(fmt.Sprintf("%v: header download terminated", p)) + p.logger.Debug("Directing header downloads", "origin", from) + defer p.logger.Debug("Header download terminated") // Create a timeout timer, and the associated header fetcher skeleton := true // Skeleton assembly phase or finishing up @@ -745,15 +748,18 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { <-timeout.C // timeout channel should be initially empty defer timeout.Stop() + var ttl time.Duration getHeaders := func(from uint64) { request = time.Now() - timeout.Reset(d.requestTTL()) + + ttl = d.requestTTL() + timeout.Reset(ttl) if skeleton { - log.Trace(fmt.Sprintf("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)) + p.logger.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) } else { - log.Trace(fmt.Sprintf("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from)) + p.logger.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) } } @@ -768,7 +774,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { case packet := <-d.headerCh: // Make sure the active peer is giving us the skeleton headers if packet.PeerId() != p.id { - log.Debug(fmt.Sprintf("Received skeleton headers from incorrect peer (%s)", packet.PeerId())) + log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId()) break } headerReqTimer.UpdateSince(request) @@ -782,7 +788,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { } // If no more headers are inbound, notify the content fetchers and return if packet.Items() == 0 { - log.Debug(fmt.Sprintf("%v: no available headers", p)) + p.logger.Debug("No more headers available") select { case d.headerProcCh <- nil: return nil @@ -796,7 +802,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { if skeleton { filled, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { - log.Debug(fmt.Sprintf("%v: skeleton chain invalid: %v", p, err)) + p.logger.Debug("Skeleton chain invalid", "error", err) return errInvalidChain } headers = filled[proced:] @@ -804,7 +810,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { } // Insert all the new headers and fetch the next batch if len(headers) > 0 { - log.Trace(fmt.Sprintf("%v: schedule %d headers from #%d", p, len(headers), from)) + p.logger.Trace("Scheduling new headers", "count", len(headers), "from", from) select { case d.headerProcCh <- headers: case <-d.cancelCh: @@ -816,7 +822,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { case <-timeout.C: // Header retrieval timed out, consider the peer bad and drop - log.Debug(fmt.Sprintf("%v: header request timed out", p)) + p.logger.Debug("Header request timed out", "elapsed", ttl) headerTimeoutMeter.Mark(1) d.dropPeer(p.id) @@ -846,7 +852,7 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { // The method returs the entire filled skeleton and also the number of headers // already forwarded for processing. func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) { - log.Debug(fmt.Sprintf("Filling up skeleton from #%d", from)) + log.Debug("Filling up skeleton", "from", from) d.queue.ScheduleSkeleton(from, skeleton) var ( @@ -865,9 +871,9 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( ) err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, - nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header") + nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers") - log.Debug(fmt.Sprintf("Skeleton fill terminated: %v", err)) + log.Debug("Skeleton fill terminated", "error", err) filled, proced := d.queue.RetrieveHeaders() return filled, proced, err @@ -877,7 +883,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( // available peers, reserving a chunk of blocks for each, waiting for delivery // and also periodically checking for timeouts. func (d *Downloader) fetchBodies(from uint64) error { - log.Debug(fmt.Sprintf("Downloading block bodies from #%d", from)) + log.Debug("Downloading block bodies", "origin", from) var ( deliver = func(packet dataPack) (int, error) { @@ -891,9 +897,9 @@ func (d *Downloader) fetchBodies(from uint64) error { ) err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, - d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "Body") + d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies") - log.Debug(fmt.Sprintf("Block body download terminated: %v", err)) + log.Debug("Block body download terminated", "error", err) return err } @@ -901,7 +907,7 @@ func (d *Downloader) fetchBodies(from uint64) error { // available peers, reserving a chunk of receipts for each, waiting for delivery // and also periodically checking for timeouts. func (d *Downloader) fetchReceipts(from uint64) error { - log.Debug(fmt.Sprintf("Downloading receipts from #%d", from)) + log.Debug("Downloading transaction receipts", "origin", from) var ( deliver = func(packet dataPack) (int, error) { @@ -915,9 +921,9 @@ func (d *Downloader) fetchReceipts(from uint64) error { ) err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, - d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt") + d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts") - log.Debug(fmt.Sprintf("Receipt download terminated: %v", err)) + log.Debug("Transaction receipt download terminated", "error", err) return err } @@ -925,7 +931,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { // available peers, reserving a chunk of nodes for each, waiting for delivery and // also periodically checking for timeouts. func (d *Downloader) fetchNodeData() error { - log.Debug(fmt.Sprintf("Downloading node state data")) + log.Debug("Downloading node state data") var ( deliver = func(packet dataPack) (int, error) { @@ -933,12 +939,12 @@ func (d *Downloader) fetchNodeData() error { return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) { // If the peer returned old-requested data, forgive if err == trie.ErrNotRequested { - log.Debug(fmt.Sprintf("peer %s: replied to stale state request, forgiving", packet.PeerId())) + log.Debug("Forgiving reply to stale state request", "peer", packet.PeerId()) return } if err != nil { // If the node data processing failed, the root hash is very wrong, abort - log.Error(fmt.Sprintf("peer %s: state processing failed: %v", packet.PeerId(), err)) + log.Error("State processing failed", "peer", packet.PeerId(), "error", err) d.cancel() return } @@ -957,12 +963,12 @@ func (d *Downloader) fetchNodeData() error { // If real database progress was made, reset any fast-sync pivot failure if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 { - log.Debug(fmt.Sprintf("fast-sync progressed, resetting fail counter from %d", atomic.LoadUint32(&d.fsPivotFails))) + log.Debug("Fast-sync progressed, resetting fail counter", "previous", atomic.LoadUint32(&d.fsPivotFails)) atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block } // Log a message to the user and return if delivered > 0 { - log.Info(fmt.Sprintf("imported %3d state entries in %9v: processed %d, pending at least %d", delivered, common.PrettyDuration(time.Since(start)), syncStatsStateDone, pending)) + log.Info("Imported new state entries", "count", delivered, "elapsed", common.PrettyDuration(time.Since(start)), "processed", syncStatsStateDone, "pending", pending) } }) } @@ -977,9 +983,9 @@ func (d *Downloader) fetchNodeData() error { ) err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire, d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch, - d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "State") + d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "states") - log.Debug(fmt.Sprintf("Node state data download terminated: %v", err)) + log.Debug("Node state data download terminated", "error", err) return err } @@ -1044,11 +1050,11 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv // Issue a log to the user to see what's going on switch { case err == nil && packet.Items() == 0: - log.Trace(fmt.Sprintf("%s: no %s delivered", peer, strings.ToLower(kind))) + peer.logger.Trace("Requested data not delivered", "type", kind) case err == nil: - log.Trace(fmt.Sprintf("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))) + peer.logger.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats()) default: - log.Trace(fmt.Sprintf("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)) + peer.logger.Trace("Failed to deliver retrieved data", "type", kind, "error", err) } } // Blocks assembled, try to update the progress @@ -1091,10 +1097,10 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv // and latency of a peer separately, which requires pushing the measures capacity a bit and seeing // how response times reacts, to it always requests one more than the minimum (i.e. min 2). if fails > 2 { - log.Trace(fmt.Sprintf("%s: %s delivery timeout", peer, strings.ToLower(kind))) + peer.logger.Trace("Data delivery timed out", "type", kind) setIdle(peer, 0) } else { - log.Debug(fmt.Sprintf("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))) + peer.logger.Debug("Stalling delivery, dropping", "type", kind) d.dropPeer(pid) } } @@ -1102,7 +1108,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv // If there's nothing more to fetch, wait or terminate if pending() == 0 { if !inFlight() && finished { - log.Debug(fmt.Sprintf("%s fetching completed", kind)) + log.Debug("Data fetching completed", "type", kind) return nil } break @@ -1130,15 +1136,13 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv if request == nil { continue } - log.Trace("", "msg", log.Lazy{Fn: func() string { - if request.From > 0 { - return fmt.Sprintf("%s: requesting %s(s) from #%d", peer, strings.ToLower(kind), request.From) - } else if len(request.Headers) > 0 { - return fmt.Sprintf("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number) - } else { - return fmt.Sprintf("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind)) - } - }}) + if request.From > 0 { + peer.logger.Trace("Requesting new batch of data", "type", kind, "from", request.From) + } else if len(request.Headers) > 0 { + peer.logger.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number) + } else { + peer.logger.Trace("Requesting new batch of data", "type", kind, "count", len(request.Hashes)) + } // Fetch the chunk and make sure any errors return the hashes to the queue if fetchHook != nil { fetchHook(request.Headers) @@ -1149,7 +1153,7 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv // case, the internal state of the downloader and the queue is very wrong so // better hard crash and note the error instead of silently accumulating into // a much bigger issue. - panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, strings.ToLower(kind))) + panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind)) } running = true } @@ -1193,8 +1197,10 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { if d.headBlock != nil { curBlock = d.headBlock().Number() } - log.Warn(fmt.Sprintf("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)", - len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, curFastBlock, lastBlock, curBlock)) + log.Warn("Rolled back headers", "count", len(hashes), + "header", fmt.Sprintf("%d->%d", lastHeader, d.headHeader().Number), + "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock), + "block", fmt.Sprintf("%d->%d", lastBlock, curBlock)) // If we're already past the pivot point, this could be an attack, thread carefully if rollback[len(rollback)-1].Number.Uint64() > pivot { @@ -1202,7 +1208,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { if atomic.LoadUint32(&d.fsPivotFails) == 0 { for _, header := range rollback { if header.Number.Uint64() == pivot { - log.Warn(fmt.Sprintf("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])) + log.Warn("Fast-sync critical section failure, locked pivot to header", "number", pivot, "hash", header.Hash().Hex()[2:10]) d.fsPivotLock = header } } @@ -1298,7 +1304,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { if n > 0 { rollback = append(rollback, chunk[:n]...) } - log.Debug(fmt.Sprintf("invalid header #%d [%x…]: %v", chunk[n].Number, chunk[n].Hash().Bytes()[:4], err)) + log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash().Hex()[2:10], "error", err) return errInvalidChain } // All verifications passed, store newly found uncertain headers @@ -1310,7 +1316,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // If we're fast syncing and just pulled in the pivot, make sure it's the one locked in if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot { if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() { - log.Warn(fmt.Sprintf("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4])) + log.Warn("Pivot doesn't match locked in one", "remoteNumber", pivot.Number, "remoteHash", pivot.Hash().Hex()[2:10], "localNumber", d.fsPivotLock.Number, "localHash", d.fsPivotLock.Hash().Hex()[2:10]) return errInvalidChain } } @@ -1327,7 +1333,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // Otherwise insert the headers for content retrieval inserts := d.queue.Schedule(chunk, origin) if len(inserts) != len(chunk) { - log.Debug(fmt.Sprintf("stale headers")) + log.Debug("Stale headers") return errBadPeer } } @@ -1358,10 +1364,15 @@ func (d *Downloader) processContent() error { d.chainInsertHook(results) } // Actually import the blocks - log.Debug("", "msg", log.Lazy{Fn: func() string { - first, last := results[0].Header, results[len(results)-1].Header - return fmt.Sprintf("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4]) - }}) + first, last := results[0].Header, results[len(results)-1].Header + log.Debug("Inserting downloaded chain", "items", len(results), + "from", log.Lazy{Fn: func() string { + return fmt.Sprintf("#%d [%x…]", first.Number, first.Hash().Bytes()[:4]) + }}, + "till", log.Lazy{Fn: func() string { + return fmt.Sprintf("#%d [%x…]", last.Number, last.Hash().Bytes()[:4]) + }}) + for len(results) != 0 { // Check for any termination requests select { @@ -1395,14 +1406,14 @@ func (d *Downloader) processContent() error { case len(receipts) > 0: index, err = d.insertReceipts(blocks, receipts) if err == nil && blocks[len(blocks)-1].NumberU64() == pivot { - log.Debug(fmt.Sprintf("Committing block #%d [%x…] as the new head", blocks[len(blocks)-1].Number(), blocks[len(blocks)-1].Hash().Bytes()[:4])) + log.Debug("Committing block as new head", "number", blocks[len(blocks)-1].Number(), "hash", blocks[len(blocks)-1].Hash().Hex()[2:10]) index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash()) } default: index, err = d.insertBlocks(blocks) } if err != nil { - log.Debug(fmt.Sprintf("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)) + log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash().Hex()[2:10], "error", err) return errInvalidChain } // Shift the results to the next batch @@ -1470,7 +1481,7 @@ func (d *Downloader) qosTuner() { atomic.StoreUint64(&d.rttConfidence, conf) // Log the new QoS values and sleep until the next RTT - log.Debug(fmt.Sprintf("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())) + log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL()) select { case <-d.quitCh: return @@ -1500,7 +1511,7 @@ func (d *Downloader) qosReduceConfidence() { atomic.StoreUint64(&d.rttConfidence, conf) rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate)) - log.Debug(fmt.Sprintf("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())) + log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL()) } // requestRTT returns the current target round trip time for a download request diff --git a/eth/downloader/modes.go b/eth/downloader/modes.go index ec339c074..c2ce0cfef 100644 --- a/eth/downloader/modes.go +++ b/eth/downloader/modes.go @@ -24,3 +24,11 @@ const ( FastSync // Quickly download the headers, full sync only at the chain head LightSync // Download only the headers and terminate afterwards ) + +// syncModeLabels contains a mapping of sync modes to textual label used by the +// logging system. +var syncModeLabels = map[SyncMode]string{ + FullSync: "full", + FastSync: "fast", + LightSync: "light", +} diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index ea4b6a6f2..c1a9b859d 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -25,12 +25,12 @@ import ( "math" "math/big" "sort" - "strings" "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" ) const ( @@ -86,7 +86,8 @@ type peer struct { getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data - version int // Eth protocol version number to switch strategies + version int // Eth protocol version number to switch strategies + logger log.Logger // Contextual logger to add extra infos to peer logs lock sync.RWMutex } @@ -94,7 +95,8 @@ type peer struct { // mechanisms. func newPeer(id string, version int, currentHead currentHeadRetrievalFn, getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, - getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer { + getReceipts receiptFetcherFn, getNodeData stateFetcherFn, logger log.Logger) *peer { + return &peer{ id: id, lacking: make(map[common.Hash]struct{}), @@ -108,6 +110,7 @@ func newPeer(id string, version int, currentHead currentHeadRetrievalFn, getNodeData: getNodeData, version: version, + logger: logger, } } @@ -268,6 +271,11 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id *throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed)) + + p.logger.Trace("Peer throughput measurements updated", + "hps", p.headerThroughput, "bps", p.blockThroughput, + "rps", p.receiptThroughput, "sps", p.stateThroughput, + "miss", len(p.lacking), "rtt", p.rtt) } // HeaderCapacity retrieves the peers header download allowance based on its @@ -332,21 +340,6 @@ func (p *peer) Lacks(hash common.Hash) bool { return ok } -// String implements fmt.Stringer. -func (p *peer) String() string { - p.lock.RLock() - defer p.lock.RUnlock() - - return fmt.Sprintf("Peer %s [%s]", p.id, strings.Join([]string{ - fmt.Sprintf("hs %3.2f/s", p.headerThroughput), - fmt.Sprintf("bs %3.2f/s", p.blockThroughput), - fmt.Sprintf("rs %3.2f/s", p.receiptThroughput), - fmt.Sprintf("ss %3.2f/s", p.stateThroughput), - fmt.Sprintf("miss %4d", len(p.lacking)), - fmt.Sprintf("rtt %v", p.rtt), - }, ", ")) -} - // peerSet represents the collection of active peer participating in the chain // download procedure. type peerSet struct { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index fa04e0d23..9530e15be 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -364,20 +364,20 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { // Make sure chain order is honoured and preserved throughout hash := header.Hash() if header.Number == nil || header.Number.Uint64() != from { - log.Warn(fmt.Sprintf("Header #%v [%x…] broke chain ordering, expected %d", header.Number, hash[:4], from)) + log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash.Hex()[2:10], "expected", from) break } if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash { - log.Warn(fmt.Sprintf("Header #%v [%x…] broke chain ancestry", header.Number, hash[:4])) + log.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash.Hex()[2:10]) break } // Make sure no duplicate requests are executed if _, ok := q.blockTaskPool[hash]; ok { - log.Warn(fmt.Sprintf("Header #%d [%x…] already scheduled for block fetch", header.Number.Uint64(), hash[:4])) + log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash.Hex()[2:10]) continue } if _, ok := q.receiptTaskPool[hash]; ok { - log.Warn(fmt.Sprintf("Header #%d [%x…] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4])) + log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash.Hex()[2:10]) continue } // Queue the header for content retrieval @@ -391,7 +391,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header { } if q.mode == FastSync && header.Number.Uint64() == q.fastSyncPivot { // Pivoting point of the fast sync, switch the state retrieval to this - log.Debug(fmt.Sprintf("Switching state downloads to %d [%x…]", header.Number.Uint64(), header.Hash().Bytes()[:4])) + log.Debug("Switching state downloads to new block", "number", header.Number, "hash", hash.Hex()[2:10]) q.stateTaskIndex = 0 q.stateTaskPool = make(map[common.Hash]int) @@ -872,10 +872,10 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh accepted := len(headers) == MaxHeaderFetch if accepted { if headers[0].Number.Uint64() != request.From { - log.Trace(fmt.Sprintf("Peer %s: first header #%v [%x…] broke chain ordering, expected %d", id, headers[0].Number, headers[0].Hash().Bytes()[:4], request.From)) + log.Trace("First header broke chain ordering", "peer", id, "number", headers[0].Number, "hash", headers[0].Hash().Hex()[2:10], request.From) accepted = false } else if headers[len(headers)-1].Hash() != target { - log.Trace(fmt.Sprintf("Peer %s: last header #%v [%x…] broke skeleton structure, expected %x", id, headers[len(headers)-1].Number, headers[len(headers)-1].Hash().Bytes()[:4], target[:4])) + log.Trace("Last header broke skeleton structure ", "peer", id, "number", headers[len(headers)-1].Number, "hash", headers[len(headers)-1].Hash().Hex()[2:10], "expected", target.Hex()[2:10]) accepted = false } } @@ -883,12 +883,12 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh for i, header := range headers[1:] { hash := header.Hash() if want := request.From + 1 + uint64(i); header.Number.Uint64() != want { - log.Warn(fmt.Sprintf("Peer %s: header #%v [%x…] broke chain ordering, expected %d", id, header.Number, hash[:4], want)) + log.Warn("Header broke chain ordering", "peer", id, "number", header.Number, "hash", hash.Hex()[2:10], "expected", want) accepted = false break } if headers[i].Hash() != header.ParentHash { - log.Warn(fmt.Sprintf("Peer %s: header #%v [%x…] broke chain ancestry", id, header.Number, hash[:4])) + log.Warn("Header broke chain ancestry", "peer", id, "number", header.Number, "hash", hash.Hex()[2:10]) accepted = false break } @@ -896,7 +896,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh } // If the batch of headers wasn't accepted, mark as unavailable if !accepted { - log.Trace(fmt.Sprintf("Peer %s: skeleton filling from header #%d not accepted", id, request.From)) + log.Trace("Skeleton filling not accepted", "peer", id, "from", request.From) miss := q.headerPeerMiss[id] if miss == nil { @@ -923,7 +923,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh select { case headerProcCh <- process: - log.Trace(fmt.Sprintf("%s: pre-scheduled %d headers from #%v", id, len(process), process[0].Number)) + log.Trace("Pre-scheduled new headers", "peer", id, "count", len(process), "from", process[0].Number) q.headerProced += len(process) default: } |