aboutsummaryrefslogtreecommitdiffstats
path: root/swarm
diff options
context:
space:
mode:
authorElad <theman@elad.im>2018-12-11 16:21:58 +0800
committerAnton Evangelatov <anton.evangelatov@gmail.com>2018-12-11 16:21:58 +0800
commitbb724080cac9fa36ec6b638cfd5cf0e54bc23362 (patch)
tree355425051728e3c3dd89565feefc46a81a3302f7 /swarm
parentb2aac658b0e366f128eda5e057e8e1bf5ec4e427 (diff)
downloaddexon-bb724080cac9fa36ec6b638cfd5cf0e54bc23362.tar.gz
dexon-bb724080cac9fa36ec6b638cfd5cf0e54bc23362.tar.zst
dexon-bb724080cac9fa36ec6b638cfd5cf0e54bc23362.zip
cmd/swarm, metrics, swarm/api/client, swarm/storage, swarm/metrics, swarm/api/http: add instrumentation (#18274)
Diffstat (limited to 'swarm')
-rw-r--r--swarm/api/client/client.go76
-rw-r--r--swarm/api/http/middleware.go11
-rw-r--r--swarm/metrics/flags.go32
-rw-r--r--swarm/storage/chunker.go8
4 files changed, 107 insertions, 20 deletions
diff --git a/swarm/api/client/client.go b/swarm/api/client/client.go
index d9837ca73..f793ca8b8 100644
--- a/swarm/api/client/client.go
+++ b/swarm/api/client/client.go
@@ -19,6 +19,7 @@ package client
import (
"archive/tar"
"bytes"
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -26,6 +27,7 @@ import (
"io/ioutil"
"mime/multipart"
"net/http"
+ "net/http/httptrace"
"net/textproto"
"net/url"
"os"
@@ -33,9 +35,14 @@ import (
"regexp"
"strconv"
"strings"
+ "time"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api"
+ "github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
+ "github.com/pborman/uuid"
)
var (
@@ -474,6 +481,11 @@ type UploadFn func(file *File) error
// TarUpload uses the given Uploader to upload files to swarm as a tar stream,
// returning the resulting manifest hash
func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, toEncrypt bool) (string, error) {
+ ctx, sp := spancontext.StartSpan(context.Background(), "api.client.tarupload")
+ defer sp.Finish()
+
+ var tn time.Time
+
reqR, reqW := io.Pipe()
defer reqR.Close()
addr := hash
@@ -489,6 +501,12 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
if err != nil {
return "", err
}
+
+ trace := GetClientTrace("swarm api client - upload tar", "api.client.uploadtar", uuid.New()[:8], &tn)
+
+ req = req.WithContext(httptrace.WithClientTrace(ctx, trace))
+ transport := http.DefaultTransport
+
req.Header.Set("Content-Type", "application/x-tar")
if defaultPath != "" {
q := req.URL.Query()
@@ -529,8 +547,8 @@ func (c *Client) TarUpload(hash string, uploader Uploader, defaultPath string, t
}
reqW.CloseWithError(err)
}()
-
- res, err := http.DefaultClient.Do(req)
+ tn = time.Now()
+ res, err := transport.RoundTrip(req)
if err != nil {
return "", err
}
@@ -728,3 +746,57 @@ func (c *Client) GetFeedRequest(query *feed.Query, manifestAddressOrDomain strin
}
return &metadata, nil
}
+
+func GetClientTrace(traceMsg, metricPrefix, ruid string, tn *time.Time) *httptrace.ClientTrace {
+ trace := &httptrace.ClientTrace{
+ GetConn: func(_ string) {
+ log.Trace(traceMsg+" - http get", "event", "GetConn", "ruid", ruid)
+ metrics.GetOrRegisterResettingTimer(metricPrefix+".getconn", nil).Update(time.Since(*tn))
+ },
+ GotConn: func(_ httptrace.GotConnInfo) {
+ log.Trace(traceMsg+" - http get", "event", "GotConn", "ruid", ruid)
+ metrics.GetOrRegisterResettingTimer(metricPrefix+".gotconn", nil).Update(time.Since(*tn))
+ },
+ PutIdleConn: func(err error) {
+ log.Trace(traceMsg+" - http get", "event", "PutIdleConn", "ruid", ruid, "err", err)
+ metrics.GetOrRegisterResettingTimer(metricPrefix+".putidle", nil).Update(time.Since(*tn))
+ },
+ GotFirstResponseByte: func() {
+ log.Trace(traceMsg+" - http get", "event", "GotFirstResponseByte", "ruid", ruid)
+ metrics.GetOrRegisterResettingTimer(metricPrefix+".firstbyte", nil).Update(time.Since(*tn))
+ },
+ Got100Continue: func() {
+ log.Trace(traceMsg, "event", "Got100Continue", "ruid", ruid)
+ metrics.GetOrRegisterResettingTimer(metricPrefix+".got100continue", nil).Update(time.Since(*tn))
+ },
+ DNSStart: func(_ httptrace.DNSStartInfo) {
+ log.Trace(traceMsg, "event", "DNSStart", "ruid", ruid)
+ metrics.GetOrRegisterResettingTimer(metricPrefix+".dnsstart", nil).Update(time.Since(*tn))
+ },
+ DNSDone: func(_ httptrace.DNSDoneInfo) {
+ log.Trace(traceMsg, "event", "DNSDone", "ruid", ruid)
+ metrics.GetOrRegisterResettingTimer(metricPrefix+".dnsdone", nil).Update(time.Since(*tn))
+ },
+ ConnectStart: func(network, addr string) {
+ log.Trace(traceMsg, "event", "ConnectStart", "ruid", ruid, "network", network, "addr", addr)
+ metrics.GetOrRegisterResettingTimer(metricPrefix+".connectstart", nil).Update(time.Since(*tn))
+ },
+ ConnectDone: func(network, addr string, err error) {
+ log.Trace(traceMsg, "event", "ConnectDone", "ruid", ruid, "network", network, "addr", addr, "err", err)
+ metrics.GetOrRegisterResettingTimer(metricPrefix+".connectdone", nil).Update(time.Since(*tn))
+ },
+ WroteHeaders: func() {
+ log.Trace(traceMsg, "event", "WroteHeaders(request)", "ruid", ruid)
+ metrics.GetOrRegisterResettingTimer(metricPrefix+".wroteheaders", nil).Update(time.Since(*tn))
+ },
+ Wait100Continue: func() {
+ log.Trace(traceMsg, "event", "Wait100Continue", "ruid", ruid)
+ metrics.GetOrRegisterResettingTimer(metricPrefix+".wait100continue", nil).Update(time.Since(*tn))
+ },
+ WroteRequest: func(_ httptrace.WroteRequestInfo) {
+ log.Trace(traceMsg, "event", "WroteRequest", "ruid", ruid)
+ metrics.GetOrRegisterResettingTimer(metricPrefix+".wroterequest", nil).Update(time.Since(*tn))
+ },
+ }
+ return trace
+}
diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go
index 115a00856..f7f819eab 100644
--- a/swarm/api/http/middleware.go
+++ b/swarm/api/http/middleware.go
@@ -74,13 +74,15 @@ func ParseURI(h http.Handler) http.Handler {
func InitLoggingResponseWriter(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- startTime := time.Now()
- defer metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.time", r.Method), nil).UpdateSince(startTime)
+ tn := time.Now()
writer := newLoggingResponseWriter(w)
h.ServeHTTP(writer, r)
- log.Info("request served", "ruid", GetRUID(r.Context()), "code", writer.statusCode)
- metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.%d.time", r.Method, writer.statusCode), nil).UpdateSince(startTime)
+
+ ts := time.Since(tn)
+ log.Info("request served", "ruid", GetRUID(r.Context()), "code", writer.statusCode, "time", ts*time.Millisecond)
+ metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.time", r.Method), nil).Update(ts)
+ metrics.GetOrRegisterResettingTimer(fmt.Sprintf("http.request.%s.%d.time", r.Method, writer.statusCode), nil).Update(ts)
})
}
@@ -93,6 +95,7 @@ func InstrumentOpenTracing(h http.Handler) http.Handler {
}
spanName := fmt.Sprintf("http.%s.%s", r.Method, uri.Scheme)
ctx, sp := spancontext.StartSpan(r.Context(), spanName)
+
defer sp.Finish()
h.ServeHTTP(w, r.WithContext(ctx))
})
diff --git a/swarm/metrics/flags.go b/swarm/metrics/flags.go
index 79490fd36..7c12120a6 100644
--- a/swarm/metrics/flags.go
+++ b/swarm/metrics/flags.go
@@ -27,26 +27,26 @@ import (
)
var (
- metricsEnableInfluxDBExportFlag = cli.BoolFlag{
+ MetricsEnableInfluxDBExportFlag = cli.BoolFlag{
Name: "metrics.influxdb.export",
Usage: "Enable metrics export/push to an external InfluxDB database",
}
- metricsInfluxDBEndpointFlag = cli.StringFlag{
+ MetricsInfluxDBEndpointFlag = cli.StringFlag{
Name: "metrics.influxdb.endpoint",
Usage: "Metrics InfluxDB endpoint",
Value: "http://127.0.0.1:8086",
}
- metricsInfluxDBDatabaseFlag = cli.StringFlag{
+ MetricsInfluxDBDatabaseFlag = cli.StringFlag{
Name: "metrics.influxdb.database",
Usage: "Metrics InfluxDB database",
Value: "metrics",
}
- metricsInfluxDBUsernameFlag = cli.StringFlag{
+ MetricsInfluxDBUsernameFlag = cli.StringFlag{
Name: "metrics.influxdb.username",
Usage: "Metrics InfluxDB username",
Value: "",
}
- metricsInfluxDBPasswordFlag = cli.StringFlag{
+ MetricsInfluxDBPasswordFlag = cli.StringFlag{
Name: "metrics.influxdb.password",
Usage: "Metrics InfluxDB password",
Value: "",
@@ -55,7 +55,7 @@ var (
// It is used so that we can group all nodes and average a measurement across all of them, but also so
// that we can select a specific node and inspect its measurements.
// https://docs.influxdata.com/influxdb/v1.4/concepts/key_concepts/#tag-key
- metricsInfluxDBHostTagFlag = cli.StringFlag{
+ MetricsInfluxDBHostTagFlag = cli.StringFlag{
Name: "metrics.influxdb.host.tag",
Usage: "Metrics InfluxDB `host` tag attached to all measurements",
Value: "localhost",
@@ -65,20 +65,24 @@ var (
// Flags holds all command-line flags required for metrics collection.
var Flags = []cli.Flag{
utils.MetricsEnabledFlag,
- metricsEnableInfluxDBExportFlag,
- metricsInfluxDBEndpointFlag, metricsInfluxDBDatabaseFlag, metricsInfluxDBUsernameFlag, metricsInfluxDBPasswordFlag, metricsInfluxDBHostTagFlag,
+ MetricsEnableInfluxDBExportFlag,
+ MetricsInfluxDBEndpointFlag,
+ MetricsInfluxDBDatabaseFlag,
+ MetricsInfluxDBUsernameFlag,
+ MetricsInfluxDBPasswordFlag,
+ MetricsInfluxDBHostTagFlag,
}
func Setup(ctx *cli.Context) {
if gethmetrics.Enabled {
log.Info("Enabling swarm metrics collection")
var (
- enableExport = ctx.GlobalBool(metricsEnableInfluxDBExportFlag.Name)
- endpoint = ctx.GlobalString(metricsInfluxDBEndpointFlag.Name)
- database = ctx.GlobalString(metricsInfluxDBDatabaseFlag.Name)
- username = ctx.GlobalString(metricsInfluxDBUsernameFlag.Name)
- password = ctx.GlobalString(metricsInfluxDBPasswordFlag.Name)
- hosttag = ctx.GlobalString(metricsInfluxDBHostTagFlag.Name)
+ enableExport = ctx.GlobalBool(MetricsEnableInfluxDBExportFlag.Name)
+ endpoint = ctx.GlobalString(MetricsInfluxDBEndpointFlag.Name)
+ database = ctx.GlobalString(MetricsInfluxDBDatabaseFlag.Name)
+ username = ctx.GlobalString(MetricsInfluxDBUsernameFlag.Name)
+ password = ctx.GlobalString(MetricsInfluxDBPasswordFlag.Name)
+ hosttag = ctx.GlobalString(MetricsInfluxDBHostTagFlag.Name)
)
// Start system runtime metrics collection
diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go
index 40292e88f..cbe65372a 100644
--- a/swarm/storage/chunker.go
+++ b/swarm/storage/chunker.go
@@ -22,6 +22,7 @@ import (
"fmt"
"io"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/metrics"
ch "github.com/ethereum/go-ethereum/swarm/chunk"
@@ -410,10 +411,14 @@ func (r *LazyChunkReader) Size(ctx context.Context, quitC chan bool) (n int64, e
log.Debug("lazychunkreader.size", "addr", r.addr)
if r.chunkData == nil {
+
+ startTime := time.Now()
chunkData, err := r.getter.Get(cctx, Reference(r.addr))
if err != nil {
+ metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime)
return 0, err
}
+ metrics.GetOrRegisterResettingTimer("lcr.getter.get", nil).UpdateSince(startTime)
r.chunkData = chunkData
s := r.chunkData.Size()
log.Debug("lazychunkreader.size", "key", r.addr, "size", s)
@@ -542,8 +547,10 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS
wg.Add(1)
go func(j int64) {
childAddress := chunkData[8+j*r.hashSize : 8+(j+1)*r.hashSize]
+ startTime := time.Now()
chunkData, err := r.getter.Get(r.ctx, Reference(childAddress))
if err != nil {
+ metrics.GetOrRegisterResettingTimer("lcr.getter.get.err", nil).UpdateSince(startTime)
log.Debug("lazychunkreader.join", "key", fmt.Sprintf("%x", childAddress), "err", err)
select {
case errC <- fmt.Errorf("chunk %v-%v not found; key: %s", off, off+treeSize, fmt.Sprintf("%x", childAddress)):
@@ -551,6 +558,7 @@ func (r *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeS
}
return
}
+ metrics.GetOrRegisterResettingTimer("lcr.getter.get", nil).UpdateSince(startTime)
if l := len(chunkData); l < 9 {
select {
case errC <- fmt.Errorf("chunk %v-%v incomplete; key: %s, data length %v", off, off+treeSize, fmt.Sprintf("%x", childAddress), l):