diff options
Diffstat (limited to 'cmd/swarm/swarm-smoke/upload_and_sync.go')
-rw-r--r-- | cmd/swarm/swarm-smoke/upload_and_sync.go | 245 |
1 files changed, 91 insertions, 154 deletions
diff --git a/cmd/swarm/swarm-smoke/upload_and_sync.go b/cmd/swarm/swarm-smoke/upload_and_sync.go index d605f79a3..90230df25 100644 --- a/cmd/swarm/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm/swarm-smoke/upload_and_sync.go @@ -19,91 +19,122 @@ package main import ( "bytes" "context" - "crypto/md5" - crand "crypto/rand" - "errors" "fmt" - "io" "io/ioutil" "math/rand" - "net/http" - "net/http/httptrace" "os" "sync" "time" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/api" - "github.com/ethereum/go-ethereum/swarm/api/client" - "github.com/ethereum/go-ethereum/swarm/spancontext" + "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/testutil" - opentracing "github.com/opentracing/opentracing-go" "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) -func generateEndpoints(scheme string, cluster string, app string, from int, to int) { - if cluster == "prod" { - for port := from; port < to; port++ { - endpoints = append(endpoints, fmt.Sprintf("%s://%v.swarm-gateways.net", scheme, port)) - } - } else { - for port := from; port < to; port++ { - endpoints = append(endpoints, fmt.Sprintf("%s://%s-%v-%s.stg.swarm-gateways.net", scheme, app, port, cluster)) - } - } - - if includeLocalhost { - endpoints = append(endpoints, "http://localhost:8500") - } -} - -func cliUploadAndSync(c *cli.Context) error { - log.PrintOrigins(true) - log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(verbosity), log.StreamHandler(os.Stdout, log.TerminalFormat(true)))) - - metrics.GetOrRegisterCounter("upload-and-sync", nil).Inc(1) +func uploadAndSyncCmd(ctx *cli.Context, tuid string) error { + randomBytes := testutil.RandomBytes(seed, filesize*1000) errc := make(chan error) + go func() { - errc <- uploadAndSync(c) + errc <- uplaodAndSync(ctx, randomBytes, tuid) }() select { case err := <-errc: if err != nil { - metrics.GetOrRegisterCounter("upload-and-sync.fail", nil).Inc(1) + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.fail", commandName), nil).Inc(1) } return err case <-time.After(time.Duration(timeout) * time.Second): - metrics.GetOrRegisterCounter("upload-and-sync.timeout", nil).Inc(1) - return fmt.Errorf("timeout after %v sec", timeout) + metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1) + + e := fmt.Errorf("timeout after %v sec", timeout) + // trigger debug functionality on randomBytes + err := trackChunks(randomBytes[:]) + if err != nil { + e = fmt.Errorf("%v; triggerChunkDebug failed: %v", e, err) + } + + return e } } -func uploadAndSync(c *cli.Context) error { - defer func(now time.Time) { - totalTime := time.Since(now) +func trackChunks(testData []byte) error { + log.Warn("Test timed out; running chunk debug sequence") - log.Info("total time", "time", totalTime, "kb", filesize) - metrics.GetOrRegisterCounter("upload-and-sync.total-time", nil).Inc(int64(totalTime)) - }(time.Now()) + addrs, err := getAllRefs(testData) + if err != nil { + return err + } + log.Trace("All references retrieved") - generateEndpoints(scheme, cluster, appName, from, to) - seed := int(time.Now().UnixNano() / 1e6) - log.Info("uploading to "+endpoints[0]+" and syncing", "seed", seed) + // has-chunks + for _, host := range hosts { + httpHost := fmt.Sprintf("ws://%s:%d", host, 8546) + log.Trace("Calling `Has` on host", "httpHost", httpHost) + rpcClient, err := rpc.Dial(httpHost) + if err != nil { + log.Trace("Error dialing host", "err", err) + return err + } + log.Trace("rpc dial ok") + var hasInfo []api.HasInfo + err = rpcClient.Call(&hasInfo, "bzz_has", addrs) + if err != nil { + log.Trace("Error calling host", "err", err) + return err + } + log.Trace("rpc call ok") + count := 0 + for _, info := range hasInfo { + if !info.Has { + count++ + log.Error("Host does not have chunk", "host", httpHost, "chunk", info.Addr) + } + } + if count == 0 { + log.Info("Host reported to have all chunks", "host", httpHost) + } + } + return nil +} - randomBytes := testutil.RandomBytes(seed, filesize*1000) +func getAllRefs(testData []byte) (storage.AddressCollection, error) { + log.Trace("Getting all references for given root hash") + datadir, err := ioutil.TempDir("", "chunk-debug") + if err != nil { + return nil, fmt.Errorf("unable to create temp dir: %v", err) + } + defer os.RemoveAll(datadir) + fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32)) + if err != nil { + return nil, err + } + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(trackTimeout)*time.Second) + defer cancel() + + reader := bytes.NewReader(testData) + return fileStore.GetAllReferences(ctx, reader, false) +} + +func uplaodAndSync(c *cli.Context, randomBytes []byte, tuid string) error { + log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "tuid", tuid, "seed", seed) t1 := time.Now() - hash, err := upload(&randomBytes, endpoints[0]) + hash, err := upload(randomBytes, httpEndpoint(hosts[0])) if err != nil { log.Error(err.Error()) return err } - metrics.GetOrRegisterCounter("upload-and-sync.upload-time", nil).Inc(int64(time.Since(t1))) + t2 := time.Since(t1) + metrics.GetOrRegisterResettingTimer("upload-and-sync.upload-time", nil).Update(t2) fhash, err := digest(bytes.NewReader(randomBytes)) if err != nil { @@ -111,147 +142,53 @@ func uploadAndSync(c *cli.Context) error { return err } - log.Info("uploaded successfully", "hash", hash, "digest", fmt.Sprintf("%x", fhash)) + log.Info("uploaded successfully", "tuid", tuid, "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) time.Sleep(time.Duration(syncDelay) * time.Second) wg := sync.WaitGroup{} if single { - rand.Seed(time.Now().UTC().UnixNano()) - randIndex := 1 + rand.Intn(len(endpoints)-1) + randIndex := 1 + rand.Intn(len(hosts)-1) ruid := uuid.New()[:8] wg.Add(1) go func(endpoint string, ruid string) { for { start := time.Now() - err := fetch(hash, endpoint, fhash, ruid) - fetchTime := time.Since(start) + err := fetch(hash, endpoint, fhash, ruid, tuid) if err != nil { continue } + ended := time.Since(start) - metrics.GetOrRegisterMeter("upload-and-sync.single.fetch-time", nil).Mark(int64(fetchTime)) + metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).Update(ended) + log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint) wg.Done() return } - }(endpoints[randIndex], ruid) + }(httpEndpoint(hosts[randIndex]), ruid) } else { - for _, endpoint := range endpoints { + for _, endpoint := range hosts[1:] { ruid := uuid.New()[:8] wg.Add(1) go func(endpoint string, ruid string) { for { start := time.Now() - err := fetch(hash, endpoint, fhash, ruid) - fetchTime := time.Since(start) + err := fetch(hash, endpoint, fhash, ruid, tuid) if err != nil { continue } + ended := time.Since(start) - metrics.GetOrRegisterMeter("upload-and-sync.each.fetch-time", nil).Mark(int64(fetchTime)) + metrics.GetOrRegisterResettingTimer("upload-and-sync.each.fetch-time", nil).Update(ended) + log.Info("fetch successful", "tuid", tuid, "ruid", ruid, "took", ended, "endpoint", endpoint) wg.Done() return } - }(endpoint, ruid) + }(httpEndpoint(endpoint), ruid) } } wg.Wait() - log.Info("all endpoints synced random file successfully") + log.Info("all hosts synced random file successfully") return nil } - -// fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file -func fetch(hash string, endpoint string, original []byte, ruid string) error { - ctx, sp := spancontext.StartSpan(context.Background(), "upload-and-sync.fetch") - defer sp.Finish() - - log.Trace("sleeping", "ruid", ruid) - time.Sleep(3 * time.Second) - log.Trace("http get request", "ruid", ruid, "api", endpoint, "hash", hash) - - var tn time.Time - reqUri := endpoint + "/bzz:/" + hash + "/" - req, _ := http.NewRequest("GET", reqUri, nil) - - opentracing.GlobalTracer().Inject( - sp.Context(), - opentracing.HTTPHeaders, - opentracing.HTTPHeadersCarrier(req.Header)) - - trace := client.GetClientTrace("upload-and-sync - http get", "upload-and-sync", ruid, &tn) - - req = req.WithContext(httptrace.WithClientTrace(ctx, trace)) - transport := http.DefaultTransport - - //transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - - tn = time.Now() - res, err := transport.RoundTrip(req) - if err != nil { - log.Error(err.Error(), "ruid", ruid) - return err - } - log.Trace("http get response", "ruid", ruid, "api", endpoint, "hash", hash, "code", res.StatusCode, "len", res.ContentLength) - - if res.StatusCode != 200 { - err := fmt.Errorf("expected status code %d, got %v", 200, res.StatusCode) - log.Warn(err.Error(), "ruid", ruid) - return err - } - - defer res.Body.Close() - - rdigest, err := digest(res.Body) - if err != nil { - log.Warn(err.Error(), "ruid", ruid) - return err - } - - if !bytes.Equal(rdigest, original) { - err := fmt.Errorf("downloaded imported file md5=%x is not the same as the generated one=%x", rdigest, original) - log.Warn(err.Error(), "ruid", ruid) - return err - } - - log.Trace("downloaded file matches random file", "ruid", ruid, "len", res.ContentLength) - - return nil -} - -// upload is uploading a file `f` to `endpoint` via the `swarm up` cmd -func upload(dataBytes *[]byte, endpoint string) (string, error) { - swarm := client.NewClient(endpoint) - f := &client.File{ - ReadCloser: ioutil.NopCloser(bytes.NewReader(*dataBytes)), - ManifestEntry: api.ManifestEntry{ - ContentType: "text/plain", - Mode: 0660, - Size: int64(len(*dataBytes)), - }, - } - - // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. - return swarm.Upload(f, "", false) -} - -func digest(r io.Reader) ([]byte, error) { - h := md5.New() - _, err := io.Copy(h, r) - if err != nil { - return nil, err - } - return h.Sum(nil), nil -} - -// generates random data in heap buffer -func generateRandomData(datasize int) ([]byte, error) { - b := make([]byte, datasize) - c, err := crand.Read(b) - if err != nil { - return nil, err - } else if c != datasize { - return nil, errors.New("short read") - } - return b, nil -} |