From dcca613a0b4c6ce56e52f4607cf71f4f1338db8f Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Fri, 23 Feb 2018 14:19:59 +0100 Subject: swarm: initial instrumentation (#15969) * swarm: initial instrumentation with go-metrics * swarm: initialise metrics collection and add ResettingTimer to HTTP requests * swarm: update metrics flags names. remove redundant Timer. * swarm: rename method for periodically updating gauges * swarm: finalise metrics after feedback * swarm/network: always init kad metrics containers * swarm/network: off-by-one index in metrics containers * swarm, metrics: resolved conflicts --- swarm/api/api.go | 53 ++++++++++++++++++++++++ swarm/api/http/error.go | 9 +++++ swarm/api/http/server.go | 69 ++++++++++++++++++++++++++++++++ swarm/fuse/swarmfs_util.go | 7 ---- swarm/metrics/flags.go | 82 ++++++++++++++++++++++++++++++++++++++ swarm/network/depo.go | 15 +++++++ swarm/network/hive.go | 10 +++++ swarm/network/kademlia/kademlia.go | 28 ++++++++++++- swarm/network/protocol.go | 24 +++++++++++ swarm/storage/chunker.go | 14 +++++++ swarm/storage/dbstore.go | 9 +++++ swarm/storage/localstore.go | 16 ++++++++ swarm/storage/memstore.go | 14 +++++++ swarm/swarm.go | 32 +++++++++++++++ 14 files changed, 374 insertions(+), 8 deletions(-) create mode 100644 swarm/metrics/flags.go (limited to 'swarm') diff --git a/swarm/api/api.go b/swarm/api/api.go index fdf76d390..0cf12fdbe 100644 --- a/swarm/api/api.go +++ b/swarm/api/api.go @@ -32,11 +32,31 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/storage" ) var hashMatcher = regexp.MustCompile("^[0-9A-Fa-f]{64}") +//setup metrics +var ( + apiResolveCount = metrics.NewRegisteredCounter("api.resolve.count", nil) + apiResolveFail = metrics.NewRegisteredCounter("api.resolve.fail", nil) + apiPutCount = metrics.NewRegisteredCounter("api.put.count", nil) + apiPutFail = metrics.NewRegisteredCounter("api.put.fail", nil) + apiGetCount = metrics.NewRegisteredCounter("api.get.count", nil) + apiGetNotFound = metrics.NewRegisteredCounter("api.get.notfound", nil) + apiGetHttp300 = metrics.NewRegisteredCounter("api.get.http.300", nil) + apiModifyCount = metrics.NewRegisteredCounter("api.modify.count", nil) + apiModifyFail = metrics.NewRegisteredCounter("api.modify.fail", nil) + apiAddFileCount = metrics.NewRegisteredCounter("api.addfile.count", nil) + apiAddFileFail = metrics.NewRegisteredCounter("api.addfile.fail", nil) + apiRmFileCount = metrics.NewRegisteredCounter("api.removefile.count", nil) + apiRmFileFail = metrics.NewRegisteredCounter("api.removefile.fail", nil) + apiAppendFileCount = metrics.NewRegisteredCounter("api.appendfile.count", nil) + apiAppendFileFail = metrics.NewRegisteredCounter("api.appendfile.fail", nil) +) + type Resolver interface { Resolve(string) (common.Hash, error) } @@ -155,6 +175,7 @@ type ErrResolve error // DNS Resolver func (self *Api) Resolve(uri *URI) (storage.Key, error) { + apiResolveCount.Inc(1) log.Trace(fmt.Sprintf("Resolving : %v", uri.Addr)) // if the URI is immutable, check if the address is a hash @@ -169,6 +190,7 @@ func (self *Api) Resolve(uri *URI) (storage.Key, error) { // if DNS is not configured, check if the address is a hash if self.dns == nil { if !isHash { + apiResolveFail.Inc(1) return nil, fmt.Errorf("no DNS to resolve name: %q", uri.Addr) } return common.Hex2Bytes(uri.Addr), nil @@ -179,6 +201,7 @@ func (self *Api) Resolve(uri *URI) (storage.Key, error) { if err == nil { return resolved[:], nil } else if !isHash { + apiResolveFail.Inc(1) return nil, err } return common.Hex2Bytes(uri.Addr), nil @@ -186,16 +209,19 @@ func (self *Api) Resolve(uri *URI) (storage.Key, error) { // Put provides singleton manifest creation on top of dpa store func (self *Api) Put(content, contentType string) (storage.Key, error) { + apiPutCount.Inc(1) r := strings.NewReader(content) wg := &sync.WaitGroup{} key, err := self.dpa.Store(r, int64(len(content)), wg, nil) if err != nil { + apiPutFail.Inc(1) return nil, err } manifest := fmt.Sprintf(`{"entries":[{"hash":"%v","contentType":"%s"}]}`, key, contentType) r = strings.NewReader(manifest) key, err = self.dpa.Store(r, int64(len(manifest)), wg, nil) if err != nil { + apiPutFail.Inc(1) return nil, err } wg.Wait() @@ -206,8 +232,10 @@ func (self *Api) Put(content, contentType string) (storage.Key, error) { // to resolve basePath to content using dpa retrieve // it returns a section reader, mimeType, status and an error func (self *Api) Get(key storage.Key, path string) (reader storage.LazySectionReader, mimeType string, status int, err error) { + apiGetCount.Inc(1) trie, err := loadManifest(self.dpa, key, nil) if err != nil { + apiGetNotFound.Inc(1) status = http.StatusNotFound log.Warn(fmt.Sprintf("loadManifestTrie error: %v", err)) return @@ -221,6 +249,7 @@ func (self *Api) Get(key storage.Key, path string) (reader storage.LazySectionRe key = common.Hex2Bytes(entry.Hash) status = entry.Status if status == http.StatusMultipleChoices { + apiGetHttp300.Inc(1) return } else { mimeType = entry.ContentType @@ -229,6 +258,7 @@ func (self *Api) Get(key storage.Key, path string) (reader storage.LazySectionRe } } else { status = http.StatusNotFound + apiGetNotFound.Inc(1) err = fmt.Errorf("manifest entry for '%s' not found", path) log.Warn(fmt.Sprintf("%v", err)) } @@ -236,9 +266,11 @@ func (self *Api) Get(key storage.Key, path string) (reader storage.LazySectionRe } func (self *Api) Modify(key storage.Key, path, contentHash, contentType string) (storage.Key, error) { + apiModifyCount.Inc(1) quitC := make(chan bool) trie, err := loadManifest(self.dpa, key, quitC) if err != nil { + apiModifyFail.Inc(1) return nil, err } if contentHash != "" { @@ -253,19 +285,23 @@ func (self *Api) Modify(key storage.Key, path, contentHash, contentType string) } if err := trie.recalcAndStore(); err != nil { + apiModifyFail.Inc(1) return nil, err } return trie.hash, nil } func (self *Api) AddFile(mhash, path, fname string, content []byte, nameresolver bool) (storage.Key, string, error) { + apiAddFileCount.Inc(1) uri, err := Parse("bzz:/" + mhash) if err != nil { + apiAddFileFail.Inc(1) return nil, "", err } mkey, err := self.Resolve(uri) if err != nil { + apiAddFileFail.Inc(1) return nil, "", err } @@ -284,16 +320,19 @@ func (self *Api) AddFile(mhash, path, fname string, content []byte, nameresolver mw, err := self.NewManifestWriter(mkey, nil) if err != nil { + apiAddFileFail.Inc(1) return nil, "", err } fkey, err := mw.AddEntry(bytes.NewReader(content), entry) if err != nil { + apiAddFileFail.Inc(1) return nil, "", err } newMkey, err := mw.Store() if err != nil { + apiAddFileFail.Inc(1) return nil, "", err } @@ -303,13 +342,16 @@ func (self *Api) AddFile(mhash, path, fname string, content []byte, nameresolver } func (self *Api) RemoveFile(mhash, path, fname string, nameresolver bool) (string, error) { + apiRmFileCount.Inc(1) uri, err := Parse("bzz:/" + mhash) if err != nil { + apiRmFileFail.Inc(1) return "", err } mkey, err := self.Resolve(uri) if err != nil { + apiRmFileFail.Inc(1) return "", err } @@ -320,16 +362,19 @@ func (self *Api) RemoveFile(mhash, path, fname string, nameresolver bool) (strin mw, err := self.NewManifestWriter(mkey, nil) if err != nil { + apiRmFileFail.Inc(1) return "", err } err = mw.RemoveEntry(filepath.Join(path, fname)) if err != nil { + apiRmFileFail.Inc(1) return "", err } newMkey, err := mw.Store() if err != nil { + apiRmFileFail.Inc(1) return "", err } @@ -338,6 +383,7 @@ func (self *Api) RemoveFile(mhash, path, fname string, nameresolver bool) (strin } func (self *Api) AppendFile(mhash, path, fname string, existingSize int64, content []byte, oldKey storage.Key, offset int64, addSize int64, nameresolver bool) (storage.Key, string, error) { + apiAppendFileCount.Inc(1) buffSize := offset + addSize if buffSize < existingSize { @@ -366,10 +412,12 @@ func (self *Api) AppendFile(mhash, path, fname string, existingSize int64, conte uri, err := Parse("bzz:/" + mhash) if err != nil { + apiAppendFileFail.Inc(1) return nil, "", err } mkey, err := self.Resolve(uri) if err != nil { + apiAppendFileFail.Inc(1) return nil, "", err } @@ -380,11 +428,13 @@ func (self *Api) AppendFile(mhash, path, fname string, existingSize int64, conte mw, err := self.NewManifestWriter(mkey, nil) if err != nil { + apiAppendFileFail.Inc(1) return nil, "", err } err = mw.RemoveEntry(filepath.Join(path, fname)) if err != nil { + apiAppendFileFail.Inc(1) return nil, "", err } @@ -398,11 +448,13 @@ func (self *Api) AppendFile(mhash, path, fname string, existingSize int64, conte fkey, err := mw.AddEntry(io.Reader(combinedReader), entry) if err != nil { + apiAppendFileFail.Inc(1) return nil, "", err } newMkey, err := mw.Store() if err != nil { + apiAppendFileFail.Inc(1) return nil, "", err } @@ -412,6 +464,7 @@ func (self *Api) AppendFile(mhash, path, fname string, existingSize int64, conte } func (self *Api) BuildDirectoryTree(mhash string, nameresolver bool) (key storage.Key, manifestEntryMap map[string]*manifestTrieEntry, err error) { + uri, err := Parse("bzz:/" + mhash) if err != nil { return nil, nil, err diff --git a/swarm/api/http/error.go b/swarm/api/http/error.go index dbd97182f..831cf23fe 100644 --- a/swarm/api/http/error.go +++ b/swarm/api/http/error.go @@ -29,12 +29,19 @@ import ( "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" ) //templateMap holds a mapping of an HTTP error code to a template var templateMap map[int]*template.Template +//metrics variables +var ( + htmlCounter = metrics.NewRegisteredCounter("api.http.errorpage.html.count", nil) + jsonCounter = metrics.NewRegisteredCounter("api.http.errorpage.json.count", nil) +) + //parameters needed for formatting the correct HTML page type ErrorParams struct { Msg string @@ -132,6 +139,7 @@ func respond(w http.ResponseWriter, r *http.Request, params *ErrorParams) { //return a HTML page func respondHtml(w http.ResponseWriter, params *ErrorParams) { + htmlCounter.Inc(1) err := params.template.Execute(w, params) if err != nil { log.Error(err.Error()) @@ -140,6 +148,7 @@ func respondHtml(w http.ResponseWriter, params *ErrorParams) { //return JSON func respondJson(w http.ResponseWriter, params *ErrorParams) { + jsonCounter.Inc(1) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(params) } diff --git a/swarm/api/http/server.go b/swarm/api/http/server.go index b89487af7..6ebfc8059 100644 --- a/swarm/api/http/server.go +++ b/swarm/api/http/server.go @@ -37,11 +37,35 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/rs/cors" ) +//setup metrics +var ( + postRawCount = metrics.NewRegisteredCounter("api.http.post.raw.count", nil) + postRawFail = metrics.NewRegisteredCounter("api.http.post.raw.fail", nil) + postFilesCount = metrics.NewRegisteredCounter("api.http.post.files.count", nil) + postFilesFail = metrics.NewRegisteredCounter("api.http.post.files.fail", nil) + deleteCount = metrics.NewRegisteredCounter("api.http.delete.count", nil) + deleteFail = metrics.NewRegisteredCounter("api.http.delete.fail", nil) + getCount = metrics.NewRegisteredCounter("api.http.get.count", nil) + getFail = metrics.NewRegisteredCounter("api.http.get.fail", nil) + getFileCount = metrics.NewRegisteredCounter("api.http.get.file.count", nil) + getFileNotFound = metrics.NewRegisteredCounter("api.http.get.file.notfound", nil) + getFileFail = metrics.NewRegisteredCounter("api.http.get.file.fail", nil) + getFilesCount = metrics.NewRegisteredCounter("api.http.get.files.count", nil) + getFilesFail = metrics.NewRegisteredCounter("api.http.get.files.fail", nil) + getListCount = metrics.NewRegisteredCounter("api.http.get.list.count", nil) + getListFail = metrics.NewRegisteredCounter("api.http.get.list.fail", nil) + requestCount = metrics.NewRegisteredCounter("http.request.count", nil) + htmlRequestCount = metrics.NewRegisteredCounter("http.request.html.count", nil) + jsonRequestCount = metrics.NewRegisteredCounter("http.request.json.count", nil) + requestTimer = metrics.NewRegisteredResettingTimer("http.request.time", nil) +) + // ServerConfig is the basic configuration needed for the HTTP server and also // includes CORS settings. type ServerConfig struct { @@ -89,18 +113,22 @@ type Request struct { // HandlePostRaw handles a POST request to a raw bzz-raw:/ URI, stores the request // body in swarm and returns the resulting storage key as a text/plain response func (s *Server) HandlePostRaw(w http.ResponseWriter, r *Request) { + postRawCount.Inc(1) if r.uri.Path != "" { + postRawFail.Inc(1) s.BadRequest(w, r, "raw POST request cannot contain a path") return } if r.Header.Get("Content-Length") == "" { + postRawFail.Inc(1) s.BadRequest(w, r, "missing Content-Length header in request") return } key, err := s.api.Store(r.Body, r.ContentLength, nil) if err != nil { + postRawFail.Inc(1) s.Error(w, r, err) return } @@ -117,8 +145,10 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *Request) { // existing manifest or to a new manifest under and returns the // resulting manifest hash as a text/plain response func (s *Server) HandlePostFiles(w http.ResponseWriter, r *Request) { + postFilesCount.Inc(1) contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { + postFilesFail.Inc(1) s.BadRequest(w, r, err.Error()) return } @@ -127,12 +157,14 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *Request) { if r.uri.Addr != "" { key, err = s.api.Resolve(r.uri) if err != nil { + postFilesFail.Inc(1) s.Error(w, r, fmt.Errorf("error resolving %s: %s", r.uri.Addr, err)) return } } else { key, err = s.api.NewManifest() if err != nil { + postFilesFail.Inc(1) s.Error(w, r, err) return } @@ -152,6 +184,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *Request) { } }) if err != nil { + postFilesFail.Inc(1) s.Error(w, r, fmt.Errorf("error creating manifest: %s", err)) return } @@ -270,8 +303,10 @@ func (s *Server) handleDirectUpload(req *Request, mw *api.ManifestWriter) error // from and returns the resulting manifest hash as a // text/plain response func (s *Server) HandleDelete(w http.ResponseWriter, r *Request) { + deleteCount.Inc(1) key, err := s.api.Resolve(r.uri) if err != nil { + deleteFail.Inc(1) s.Error(w, r, fmt.Errorf("error resolving %s: %s", r.uri.Addr, err)) return } @@ -281,6 +316,7 @@ func (s *Server) HandleDelete(w http.ResponseWriter, r *Request) { return mw.RemoveEntry(r.uri.Path) }) if err != nil { + deleteFail.Inc(1) s.Error(w, r, fmt.Errorf("error updating manifest: %s", err)) return } @@ -296,8 +332,10 @@ func (s *Server) HandleDelete(w http.ResponseWriter, r *Request) { // - bzz-hash:// and responds with the hash of the content stored // at the given storage key as a text/plain response func (s *Server) HandleGet(w http.ResponseWriter, r *Request) { + getCount.Inc(1) key, err := s.api.Resolve(r.uri) if err != nil { + getFail.Inc(1) s.Error(w, r, fmt.Errorf("error resolving %s: %s", r.uri.Addr, err)) return } @@ -307,6 +345,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *Request) { if r.uri.Path != "" { walker, err := s.api.NewManifestWalker(key, nil) if err != nil { + getFail.Inc(1) s.BadRequest(w, r, fmt.Sprintf("%s is not a manifest", key)) return } @@ -335,6 +374,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *Request) { return api.SkipManifest }) if entry == nil { + getFail.Inc(1) s.NotFound(w, r, fmt.Errorf("Manifest entry could not be loaded")) return } @@ -344,6 +384,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *Request) { // check the root chunk exists by retrieving the file's size reader := s.api.Retrieve(key) if _, err := reader.Size(nil); err != nil { + getFail.Inc(1) s.NotFound(w, r, fmt.Errorf("Root chunk not found %s: %s", key, err)) return } @@ -370,19 +411,23 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *Request) { // header of "application/x-tar" and returns a tar stream of all files // contained in the manifest func (s *Server) HandleGetFiles(w http.ResponseWriter, r *Request) { + getFilesCount.Inc(1) if r.uri.Path != "" { + getFilesFail.Inc(1) s.BadRequest(w, r, "files request cannot contain a path") return } key, err := s.api.Resolve(r.uri) if err != nil { + getFilesFail.Inc(1) s.Error(w, r, fmt.Errorf("error resolving %s: %s", r.uri.Addr, err)) return } walker, err := s.api.NewManifestWalker(key, nil) if err != nil { + getFilesFail.Inc(1) s.Error(w, r, err) return } @@ -430,6 +475,7 @@ func (s *Server) HandleGetFiles(w http.ResponseWriter, r *Request) { return nil }) if err != nil { + getFilesFail.Inc(1) s.logError("error generating tar stream: %s", err) } } @@ -438,6 +484,7 @@ func (s *Server) HandleGetFiles(w http.ResponseWriter, r *Request) { // a list of all files contained in under grouped into // common prefixes using "/" as a delimiter func (s *Server) HandleGetList(w http.ResponseWriter, r *Request) { + getListCount.Inc(1) // ensure the root path has a trailing slash so that relative URLs work if r.uri.Path == "" && !strings.HasSuffix(r.URL.Path, "/") { http.Redirect(w, &r.Request, r.URL.Path+"/", http.StatusMovedPermanently) @@ -446,6 +493,7 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *Request) { key, err := s.api.Resolve(r.uri) if err != nil { + getListFail.Inc(1) s.Error(w, r, fmt.Errorf("error resolving %s: %s", r.uri.Addr, err)) return } @@ -453,6 +501,7 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *Request) { list, err := s.getManifestList(key, r.uri.Path) if err != nil { + getListFail.Inc(1) s.Error(w, r, err) return } @@ -470,6 +519,7 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *Request) { List: &list, }) if err != nil { + getListFail.Inc(1) s.logError("error rendering list HTML: %s", err) } return @@ -538,6 +588,7 @@ func (s *Server) getManifestList(key storage.Key, prefix string) (list api.Manif // HandleGetFile handles a GET request to bzz:/// and responds // with the content of the file at from the given func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) { + getFileCount.Inc(1) // ensure the root path has a trailing slash so that relative URLs work if r.uri.Path == "" && !strings.HasSuffix(r.URL.Path, "/") { http.Redirect(w, &r.Request, r.URL.Path+"/", http.StatusMovedPermanently) @@ -546,6 +597,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) { key, err := s.api.Resolve(r.uri) if err != nil { + getFileFail.Inc(1) s.Error(w, r, fmt.Errorf("error resolving %s: %s", r.uri.Addr, err)) return } @@ -554,8 +606,10 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) { if err != nil { switch status { case http.StatusNotFound: + getFileNotFound.Inc(1) s.NotFound(w, r, err) default: + getFileFail.Inc(1) s.Error(w, r, err) } return @@ -567,6 +621,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) { list, err := s.getManifestList(key, r.uri.Path) if err != nil { + getFileFail.Inc(1) s.Error(w, r, err) return } @@ -579,6 +634,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) { // check the root chunk exists by retrieving the file's size if _, err := reader.Size(nil); err != nil { + getFileNotFound.Inc(1) s.NotFound(w, r, fmt.Errorf("File not found %s: %s", r.uri, err)) return } @@ -589,6 +645,19 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *Request) { } func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if metrics.Enabled { + //The increment for request count and request timer themselves have a flag check + //for metrics.Enabled. Nevertheless, we introduce the if here because we + //are looking into the header just to see what request type it is (json/html). + //So let's take advantage and add all metrics related stuff here + requestCount.Inc(1) + defer requestTimer.UpdateSince(time.Now()) + if r.Header.Get("Accept") == "application/json" { + jsonRequestCount.Inc(1) + } else { + htmlRequestCount.Inc(1) + } + } s.logDebug("HTTP %s request URL: '%s', Host: '%s', Path: '%s', Referer: '%s', Accept: '%s'", r.Method, r.RequestURI, r.URL.Host, r.URL.Path, r.Referer(), r.Header.Get("Accept")) uri, err := api.Parse(strings.TrimLeft(r.URL.Path, "/")) diff --git a/swarm/fuse/swarmfs_util.go b/swarm/fuse/swarmfs_util.go index d39966c0e..169b67487 100644 --- a/swarm/fuse/swarmfs_util.go +++ b/swarm/fuse/swarmfs_util.go @@ -47,7 +47,6 @@ func externalUnmount(mountPoint string) error { } func addFileToSwarm(sf *SwarmFile, content []byte, size int) error { - fkey, mhash, err := sf.mountInfo.swarmApi.AddFile(sf.mountInfo.LatestManifest, sf.path, sf.name, content, true) if err != nil { return err @@ -64,11 +63,9 @@ func addFileToSwarm(sf *SwarmFile, content []byte, size int) error { log.Info("Added new file:", "fname", sf.name, "New Manifest hash", mhash) return nil - } func removeFileFromSwarm(sf *SwarmFile) error { - mkey, err := sf.mountInfo.swarmApi.RemoveFile(sf.mountInfo.LatestManifest, sf.path, sf.name, true) if err != nil { return err @@ -83,7 +80,6 @@ func removeFileFromSwarm(sf *SwarmFile) error { } func removeDirectoryFromSwarm(sd *SwarmDir) error { - if len(sd.directories) == 0 && len(sd.files) == 0 { return nil } @@ -103,11 +99,9 @@ func removeDirectoryFromSwarm(sd *SwarmDir) error { } return nil - } func appendToExistingFileInSwarm(sf *SwarmFile, content []byte, offset int64, length int64) error { - fkey, mhash, err := sf.mountInfo.swarmApi.AppendFile(sf.mountInfo.LatestManifest, sf.path, sf.name, sf.fileSize, content, sf.key, offset, length, true) if err != nil { return err @@ -124,5 +118,4 @@ func appendToExistingFileInSwarm(sf *SwarmFile, content []byte, offset int64, le log.Info("Appended file:", "fname", sf.name, "New Manifest hash", mhash) return nil - } diff --git a/swarm/metrics/flags.go b/swarm/metrics/flags.go new file mode 100644 index 000000000..60e34e6e4 --- /dev/null +++ b/swarm/metrics/flags.go @@ -0,0 +1,82 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package metrics + +import ( + "time" + + "github.com/ethereum/go-ethereum/cmd/utils" + "github.com/ethereum/go-ethereum/log" + gethmetrics "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/metrics/influxdb" + "gopkg.in/urfave/cli.v1" +) + +var ( + metricsInfluxDBEndpointFlag = cli.StringFlag{ + Name: "metrics.influxdb.endpoint", + Usage: "Metrics InfluxDB endpoint", + Value: "http://127.0.0.1:8086", + } + metricsInfluxDBDatabaseFlag = cli.StringFlag{ + Name: "metrics.influxdb.database", + Usage: "metrics InfluxDB database", + Value: "metrics", + } + metricsInfluxDBUsernameFlag = cli.StringFlag{ + Name: "metrics.influxdb.username", + Usage: "metrics InfluxDB username", + Value: "", + } + metricsInfluxDBPasswordFlag = cli.StringFlag{ + Name: "metrics.influxdb.password", + Usage: "metrics InfluxDB password", + Value: "", + } + // The `host` tag is part of every measurement sent to InfluxDB. Queries on tags are faster in InfluxDB. + // It is used so that we can group all nodes and average a measurement across all of them, but also so + // that we can select a specific node and inspect its measurements. + // https://docs.influxdata.com/influxdb/v1.4/concepts/key_concepts/#tag-key + metricsInfluxDBHostTagFlag = cli.StringFlag{ + Name: "metrics.influxdb.host.tag", + Usage: "metrics InfluxDB `host` tag attached to all measurements", + Value: "localhost", + } +) + +// Flags holds all command-line flags required for metrics collection. +var Flags = []cli.Flag{ + utils.MetricsEnabledFlag, + metricsInfluxDBEndpointFlag, metricsInfluxDBDatabaseFlag, metricsInfluxDBUsernameFlag, metricsInfluxDBPasswordFlag, metricsInfluxDBHostTagFlag, +} + +func Setup(ctx *cli.Context) { + if gethmetrics.Enabled { + var ( + endpoint = ctx.GlobalString(metricsInfluxDBEndpointFlag.Name) + database = ctx.GlobalString(metricsInfluxDBDatabaseFlag.Name) + username = ctx.GlobalString(metricsInfluxDBUsernameFlag.Name) + password = ctx.GlobalString(metricsInfluxDBPasswordFlag.Name) + hosttag = ctx.GlobalString(metricsInfluxDBHostTagFlag.Name) + ) + + log.Info("Enabling swarm metrics collection and export") + go influxdb.InfluxDBWithTags(gethmetrics.DefaultRegistry, 10*time.Second, endpoint, database, username, password, "swarm.", map[string]string{ + "host": hosttag, + }) + } +} diff --git a/swarm/network/depo.go b/swarm/network/depo.go index 17540d2f9..5ffbf8be1 100644 --- a/swarm/network/depo.go +++ b/swarm/network/depo.go @@ -23,9 +23,19 @@ import ( "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/storage" ) +//metrics variables +var ( + syncReceiveCount = metrics.NewRegisteredCounter("network.sync.recv.count", nil) + syncReceiveIgnore = metrics.NewRegisteredCounter("network.sync.recv.ignore", nil) + syncSendCount = metrics.NewRegisteredCounter("network.sync.send.count", nil) + syncSendRefused = metrics.NewRegisteredCounter("network.sync.send.refused", nil) + syncSendNotFound = metrics.NewRegisteredCounter("network.sync.send.notfound", nil) +) + // Handler for storage/retrieval related protocol requests // implements the StorageHandler interface used by the bzz protocol type Depo struct { @@ -107,6 +117,7 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) { log.Trace(fmt.Sprintf("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key)) // not found in memory cache, ie., a genuine store request // create chunk + syncReceiveCount.Inc(1) chunk = storage.NewChunk(req.Key, nil) case chunk.SData == nil: @@ -116,6 +127,7 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) { default: // data is found, store request ignored // this should update access count? + syncReceiveIgnore.Inc(1) log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v found locally. ignore.", req)) islocal = true //return @@ -172,11 +184,14 @@ func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer) SData: chunk.SData, requestTimeout: req.timeout, // } + syncSendCount.Inc(1) p.syncer.addRequest(sreq, DeliverReq) } else { + syncSendRefused.Inc(1) log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log())) } } else { + syncSendNotFound.Inc(1) log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log())) } } diff --git a/swarm/network/hive.go b/swarm/network/hive.go index 2504a4610..8404ffcc2 100644 --- a/swarm/network/hive.go +++ b/swarm/network/hive.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/netutil" "github.com/ethereum/go-ethereum/swarm/network/kademlia" @@ -39,6 +40,12 @@ import ( // connections and disconnections are reported and relayed // to keep the nodetable uptodate +var ( + peersNumGauge = metrics.NewRegisteredGauge("network.peers.num", nil) + addPeerCounter = metrics.NewRegisteredCounter("network.addpeer.count", nil) + removePeerCounter = metrics.NewRegisteredCounter("network.removepeer.count", nil) +) + type Hive struct { listenAddr func() string callInterval uint64 @@ -192,6 +199,7 @@ func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPee func (self *Hive) keepAlive() { alarm := time.NewTicker(time.Duration(self.callInterval)).C for { + peersNumGauge.Update(int64(self.kad.Count())) select { case <-alarm: if self.kad.DBCount() > 0 { @@ -223,6 +231,7 @@ func (self *Hive) Stop() error { // called at the end of a successful protocol handshake func (self *Hive) addPeer(p *peer) error { + addPeerCounter.Inc(1) defer func() { select { case self.more <- true: @@ -247,6 +256,7 @@ func (self *Hive) addPeer(p *peer) error { // called after peer disconnected func (self *Hive) removePeer(p *peer) { + removePeerCounter.Inc(1) log.Debug(fmt.Sprintf("bee %v removed", p)) self.kad.Off(p, saveSync) select { diff --git a/swarm/network/kademlia/kademlia.go b/swarm/network/kademlia/kademlia.go index 0abc42a19..b5999b52d 100644 --- a/swarm/network/kademlia/kademlia.go +++ b/swarm/network/kademlia/kademlia.go @@ -24,6 +24,16 @@ import ( "time" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" +) + +//metrics variables +//For metrics, we want to count how many times peers are added/removed +//at a certain index. Thus we do that with an array of counters with +//entry for each index +var ( + bucketAddIndexCount []metrics.Counter + bucketRmIndexCount []metrics.Counter ) const ( @@ -88,12 +98,14 @@ type Node interface { // params is KadParams configuration func New(addr Address, params *KadParams) *Kademlia { buckets := make([][]Node, params.MaxProx+1) - return &Kademlia{ + kad := &Kademlia{ addr: addr, KadParams: params, buckets: buckets, db: newKadDb(addr, params), } + kad.initMetricsVariables() + return kad } // accessor for KAD base address @@ -138,6 +150,7 @@ func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error // TODO: give priority to peers with active traffic if len(bucket) < self.BucketSize { // >= allows us to add peers beyond the bucketsize limitation self.buckets[index] = append(bucket, node) + bucketAddIndexCount[index].Inc(1) log.Debug(fmt.Sprintf("add node %v to table", node)) self.setProxLimit(index, true) record.node = node @@ -178,6 +191,7 @@ func (self *Kademlia) Off(node Node, cb func(*NodeRecord, Node)) (err error) { defer self.lock.Unlock() index := self.proximityBin(node.Addr()) + bucketRmIndexCount[index].Inc(1) bucket := self.buckets[index] for i := 0; i < len(bucket); i++ { if node.Addr() == bucket[i].Addr() { @@ -426,3 +440,15 @@ func (self *Kademlia) String() string { rows = append(rows, "=========================================================================") return strings.Join(rows, "\n") } + +//We have to build up the array of counters for each index +func (self *Kademlia) initMetricsVariables() { + //create the arrays + bucketAddIndexCount = make([]metrics.Counter, self.MaxProx+1) + bucketRmIndexCount = make([]metrics.Counter, self.MaxProx+1) + //at each index create a metrics counter + for i := 0; i < (self.KadParams.MaxProx + 1); i++ { + bucketAddIndexCount[i] = metrics.NewRegisteredCounter(fmt.Sprintf("network.kademlia.bucket.add.%d.index", i), nil) + bucketRmIndexCount[i] = metrics.NewRegisteredCounter(fmt.Sprintf("network.kademlia.bucket.rm.%d.index", i), nil) + } +} diff --git a/swarm/network/protocol.go b/swarm/network/protocol.go index a418c1dbb..1cbe00a97 100644 --- a/swarm/network/protocol.go +++ b/swarm/network/protocol.go @@ -39,12 +39,26 @@ import ( "github.com/ethereum/go-ethereum/contracts/chequebook" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap" "github.com/ethereum/go-ethereum/swarm/services/swap/swap" "github.com/ethereum/go-ethereum/swarm/storage" ) +//metrics variables +var ( + storeRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.storerequest.count", nil) + retrieveRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.retrieverequest.count", nil) + peersMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.peers.count", nil) + syncRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.syncrequest.count", nil) + unsyncedKeysMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.unsyncedkeys.count", nil) + deliverRequestMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.deliverrequest.count", nil) + paymentMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.payment.count", nil) + invalidMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.invalid.count", nil) + handleStatusMsgCounter = metrics.NewRegisteredCounter("network.protocol.msg.handlestatus.count", nil) +) + const ( Version = 0 ProtocolLength = uint64(8) @@ -206,6 +220,7 @@ func (self *bzz) handle() error { case storeRequestMsg: // store requests are dispatched to netStore + storeRequestMsgCounter.Inc(1) var req storeRequestMsgData if err := msg.Decode(&req); err != nil { return fmt.Errorf("<- %v: %v", msg, err) @@ -221,6 +236,7 @@ func (self *bzz) handle() error { case retrieveRequestMsg: // retrieve Requests are dispatched to netStore + retrieveRequestMsgCounter.Inc(1) var req retrieveRequestMsgData if err := msg.Decode(&req); err != nil { return fmt.Errorf("<- %v: %v", msg, err) @@ -241,6 +257,7 @@ func (self *bzz) handle() error { case peersMsg: // response to lookups and immediate response to retrieve requests // dispatches new peer data to the hive that adds them to KADDB + peersMsgCounter.Inc(1) var req peersMsgData if err := msg.Decode(&req); err != nil { return fmt.Errorf("<- %v: %v", msg, err) @@ -250,6 +267,7 @@ func (self *bzz) handle() error { self.hive.HandlePeersMsg(&req, &peer{bzz: self}) case syncRequestMsg: + syncRequestMsgCounter.Inc(1) var req syncRequestMsgData if err := msg.Decode(&req); err != nil { return fmt.Errorf("<- %v: %v", msg, err) @@ -260,6 +278,7 @@ func (self *bzz) handle() error { case unsyncedKeysMsg: // coming from parent node offering + unsyncedKeysMsgCounter.Inc(1) var req unsyncedKeysMsgData if err := msg.Decode(&req); err != nil { return fmt.Errorf("<- %v: %v", msg, err) @@ -274,6 +293,7 @@ func (self *bzz) handle() error { case deliveryRequestMsg: // response to syncKeysMsg hashes filtered not existing in db // also relays the last synced state to the source + deliverRequestMsgCounter.Inc(1) var req deliveryRequestMsgData if err := msg.Decode(&req); err != nil { return fmt.Errorf("<-msg %v: %v", msg, err) @@ -287,6 +307,7 @@ func (self *bzz) handle() error { case paymentMsg: // swap protocol message for payment, Units paid for, Cheque paid with + paymentMsgCounter.Inc(1) if self.swapEnabled { var req paymentMsgData if err := msg.Decode(&req); err != nil { @@ -298,6 +319,7 @@ func (self *bzz) handle() error { default: // no other message is allowed + invalidMsgCounter.Inc(1) return fmt.Errorf("invalid message code: %v", msg.Code) } return nil @@ -332,6 +354,8 @@ func (self *bzz) handleStatus() (err error) { return fmt.Errorf("first msg has code %x (!= %x)", msg.Code, statusMsg) } + handleStatusMsgCounter.Inc(1) + if msg.Size > ProtocolMaxMsgSize { return fmt.Errorf("message too long: %v > %v", msg.Size, ProtocolMaxMsgSize) } diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 98cd6e75e..2b397f801 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -23,6 +23,8 @@ import ( "io" "sync" "time" + + "github.com/ethereum/go-ethereum/metrics" ) /* @@ -63,6 +65,11 @@ var ( errOperationTimedOut = errors.New("operation timed out") ) +//metrics variables +var ( + newChunkCounter = metrics.NewRegisteredCounter("storage.chunks.new", nil) +) + type TreeChunker struct { branches int64 hashFunc SwarmHasher @@ -298,6 +305,13 @@ func (self *TreeChunker) hashChunk(hasher SwarmHash, job *hashJob, chunkC chan * job.parentWg.Done() if chunkC != nil { + //NOTE: this increases the chunk count even if the local node already has this chunk; + //on file upload the node will increase this counter even if the same file has already been uploaded + //So it should be evaluated whether it is worth keeping this counter + //and/or actually better track when the chunk is Put to the local database + //(which may question the need for disambiguation when a completely new chunk has been created + //and/or a chunk is being put to the local DB; for chunk tracking it may be worth distinguishing + newChunkCounter.Inc(1) chunkC <- newChunk } } diff --git a/swarm/storage/dbstore.go b/swarm/storage/dbstore.go index 46a5c16cc..421bb061d 100644 --- a/swarm/storage/dbstore.go +++ b/swarm/storage/dbstore.go @@ -33,11 +33,18 @@ import ( "sync" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" ) +//metrics variables +var ( + gcCounter = metrics.NewRegisteredCounter("storage.db.dbstore.gc.count", nil) + dbStoreDeleteCounter = metrics.NewRegisteredCounter("storage.db.dbstore.rm.count", nil) +) + const ( defaultDbCapacity = 5000000 defaultRadius = 0 // not yet used @@ -255,6 +262,7 @@ func (s *DbStore) collectGarbage(ratio float32) { // actual gc for i := 0; i < gcnt; i++ { if s.gcArray[i].value <= cutval { + gcCounter.Inc(1) s.delete(s.gcArray[i].idx, s.gcArray[i].idxKey) } } @@ -383,6 +391,7 @@ func (s *DbStore) delete(idx uint64, idxKey []byte) { batch := new(leveldb.Batch) batch.Delete(idxKey) batch.Delete(getDataKey(idx)) + dbStoreDeleteCounter.Inc(1) s.entryCnt-- batch.Put(keyEntryCnt, U64ToBytes(s.entryCnt)) s.db.Write(batch) diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go index b442e6cc5..ece0c8615 100644 --- a/swarm/storage/localstore.go +++ b/swarm/storage/localstore.go @@ -18,6 +18,13 @@ package storage import ( "encoding/binary" + + "github.com/ethereum/go-ethereum/metrics" +) + +//metrics variables +var ( + dbStorePutCounter = metrics.NewRegisteredCounter("storage.db.dbstore.put.count", nil) ) // LocalStore is a combination of inmemory db over a disk persisted db @@ -39,6 +46,14 @@ func NewLocalStore(hash SwarmHasher, params *StoreParams) (*LocalStore, error) { }, nil } +func (self *LocalStore) CacheCounter() uint64 { + return uint64(self.memStore.(*MemStore).Counter()) +} + +func (self *LocalStore) DbCounter() uint64 { + return self.DbStore.(*DbStore).Counter() +} + // LocalStore is itself a chunk store // unsafe, in that the data is not integrity checked func (self *LocalStore) Put(chunk *Chunk) { @@ -48,6 +63,7 @@ func (self *LocalStore) Put(chunk *Chunk) { chunk.wg.Add(1) } go func() { + dbStorePutCounter.Inc(1) self.DbStore.Put(chunk) if chunk.wg != nil { chunk.wg.Done() diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go index 3cb25ac62..d6be54220 100644 --- a/swarm/storage/memstore.go +++ b/swarm/storage/memstore.go @@ -23,6 +23,13 @@ import ( "sync" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" +) + +//metrics variables +var ( + memstorePutCounter = metrics.NewRegisteredCounter("storage.db.memstore.put.count", nil) + memstoreRemoveCounter = metrics.NewRegisteredCounter("storage.db.memstore.rm.count", nil) ) const ( @@ -130,6 +137,10 @@ func (s *MemStore) setCapacity(c uint) { s.capacity = c } +func (s *MemStore) Counter() uint { + return s.entryCnt +} + // entry (not its copy) is going to be in MemStore func (s *MemStore) Put(entry *Chunk) { if s.capacity == 0 { @@ -145,6 +156,8 @@ func (s *MemStore) Put(entry *Chunk) { s.accessCnt++ + memstorePutCounter.Inc(1) + node := s.memtree bitpos := uint(0) for node.entry == nil { @@ -289,6 +302,7 @@ func (s *MemStore) removeOldest() { } if node.entry.SData != nil { + memstoreRemoveCounter.Inc(1) node.entry = nil s.entryCnt-- } diff --git a/swarm/swarm.go b/swarm/swarm.go index 3c77d6eab..0a120db1f 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -34,6 +34,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" @@ -46,6 +47,16 @@ import ( "github.com/ethereum/go-ethereum/swarm/storage" ) +var ( + startTime time.Time + updateGaugesPeriod = 5 * time.Second + startCounter = metrics.NewRegisteredCounter("stack,start", nil) + stopCounter = metrics.NewRegisteredCounter("stack,stop", nil) + uptimeGauge = metrics.NewRegisteredGauge("stack.uptime", nil) + dbSizeGauge = metrics.NewRegisteredGauge("storage.db.chunks.size", nil) + cacheSizeGauge = metrics.NewRegisteredGauge("storage.db.cache.size", nil) +) + // the swarm stack type Swarm struct { config *api.Config // swarm configuration @@ -262,6 +273,7 @@ Start is called when the stack is started */ // implements the node.Service interface func (self *Swarm) Start(srv *p2p.Server) error { + startTime = time.Now() connectPeer := func(url string) error { node, err := discover.ParseNode(url) if err != nil { @@ -307,9 +319,28 @@ func (self *Swarm) Start(srv *p2p.Server) error { } } + self.periodicallyUpdateGauges() + + startCounter.Inc(1) return nil } +func (self *Swarm) periodicallyUpdateGauges() { + ticker := time.NewTicker(updateGaugesPeriod) + + go func() { + for range ticker.C { + self.updateGauges() + } + }() +} + +func (self *Swarm) updateGauges() { + dbSizeGauge.Update(int64(self.lstore.DbCounter())) + cacheSizeGauge.Update(int64(self.lstore.CacheCounter())) + uptimeGauge.Update(time.Since(startTime).Nanoseconds()) +} + // implements the node.Service interface // stops all component services. func (self *Swarm) Stop() error { @@ -324,6 +355,7 @@ func (self *Swarm) Stop() error { self.lstore.DbStore.Close() } self.sfs.Stop() + stopCounter.Inc(1) return err } -- cgit