diff options
Diffstat (limited to 'swarm')
38 files changed, 463 insertions, 275 deletions
diff --git a/swarm/api/act.go b/swarm/api/act.go index 52d909827..e54369f9a 100644 --- a/swarm/api/act.go +++ b/swarm/api/act.go @@ -458,6 +458,9 @@ func DoACT(ctx *cli.Context, privateKey *ecdsa.PrivateKey, salt []byte, grantees return nil, nil, nil, err } sessionKey, err := NewSessionKeyPK(privateKey, granteePub, salt) + if err != nil { + return nil, nil, nil, err + } hasher := sha3.NewKeccak256() hasher.Write(append(sessionKey, 0)) diff --git a/swarm/api/client/client_test.go b/swarm/api/client/client_test.go index 03c6cbb28..c30d69911 100644 --- a/swarm/api/client/client_test.go +++ b/swarm/api/client/client_test.go @@ -457,6 +457,9 @@ func TestClientCreateUpdateFeed(t *testing.T) { } feedManifestHash, err := client.CreateFeedWithManifest(createRequest) + if err != nil { + t.Fatal(err) + } correctManifestAddrHex := "0e9b645ebc3da167b1d56399adc3276f7a08229301b72a03336be0e7d4b71882" if feedManifestHash != correctManifestAddrHex { diff --git a/swarm/api/filesystem.go b/swarm/api/filesystem.go index 43695efc1..266ef71be 100644 --- a/swarm/api/filesystem.go +++ b/swarm/api/filesystem.go @@ -122,6 +122,10 @@ func (fs *FileSystem) Upload(lpath, index string, toEncrypt bool) (string, error var wait func(context.Context) error ctx := context.TODO() hash, wait, err = fs.api.fileStore.Store(ctx, f, stat.Size(), toEncrypt) + if err != nil { + errors[i] = err + return + } if hash != nil { list[i].Hash = hash.Hex() } diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go index 3b2dcc7d5..ccc040c54 100644 --- a/swarm/api/http/middleware.go +++ b/swarm/api/http/middleware.go @@ -50,7 +50,7 @@ func ParseURI(h http.Handler) http.Handler { uri, err := api.Parse(strings.TrimLeft(r.URL.Path, "/")) if err != nil { w.WriteHeader(http.StatusBadRequest) - RespondError(w, r, fmt.Sprintf("invalid URI %q", r.URL.Path), http.StatusBadRequest) + respondError(w, r, fmt.Sprintf("invalid URI %q", r.URL.Path), http.StatusBadRequest) return } if uri.Addr != "" && strings.HasPrefix(uri.Addr, "0x") { diff --git a/swarm/api/http/response.go b/swarm/api/http/response.go index c9fb9d285..d4e81d7f6 100644 --- a/swarm/api/http/response.go +++ b/swarm/api/http/response.go @@ -53,23 +53,23 @@ func ShowMultipleChoices(w http.ResponseWriter, r *http.Request, list api.Manife log.Debug("ShowMultipleChoices", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context())) msg := "" if list.Entries == nil { - RespondError(w, r, "Could not resolve", http.StatusInternalServerError) + respondError(w, r, "Could not resolve", http.StatusInternalServerError) return } requestUri := strings.TrimPrefix(r.RequestURI, "/") uri, err := api.Parse(requestUri) if err != nil { - RespondError(w, r, "Bad Request", http.StatusBadRequest) + respondError(w, r, "Bad Request", http.StatusBadRequest) } uri.Scheme = "bzz-list" msg += fmt.Sprintf("Disambiguation:<br/>Your request may refer to multiple choices.<br/>Click <a class=\"orange\" href='"+"/"+uri.String()+"'>here</a> if your browser does not redirect you within 5 seconds.<script>setTimeout(\"location.href='%s';\",5000);</script><br/>", "/"+uri.String()) - RespondTemplate(w, r, "error", msg, http.StatusMultipleChoices) + respondTemplate(w, r, "error", msg, http.StatusMultipleChoices) } -func RespondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg string, code int) { - log.Debug("RespondTemplate", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context())) +func respondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg string, code int) { + log.Debug("respondTemplate", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context())) respond(w, r, &ResponseParams{ Code: code, Msg: template.HTML(msg), @@ -78,13 +78,12 @@ func RespondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg s }) } -func RespondError(w http.ResponseWriter, r *http.Request, msg string, code int) { - log.Debug("RespondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code) - RespondTemplate(w, r, "error", msg, code) +func respondError(w http.ResponseWriter, r *http.Request, msg string, code int) { + log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code) + respondTemplate(w, r, "error", msg, code) } func respond(w http.ResponseWriter, r *http.Request, params *ResponseParams) { - w.WriteHeader(params.Code) if params.Code >= 400 { @@ -96,7 +95,7 @@ func respond(w http.ResponseWriter, r *http.Request, params *ResponseParams) { // this cannot be in a switch since an Accept header can have multiple values: "Accept: */*, text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8" if strings.Contains(acceptHeader, "application/json") { if err := respondJSON(w, r, params); err != nil { - RespondError(w, r, "Internal server error", http.StatusInternalServerError) + respondError(w, r, "Internal server error", http.StatusInternalServerError) } } else if strings.Contains(acceptHeader, "text/html") { respondHTML(w, r, params) @@ -107,7 +106,7 @@ func respond(w http.ResponseWriter, r *http.Request, params *ResponseParams) { func respondHTML(w http.ResponseWriter, r *http.Request, params *ResponseParams) { htmlCounter.Inc(1) - log.Debug("respondHTML", "ruid", GetRUID(r.Context())) + log.Info("respondHTML", "ruid", GetRUID(r.Context()), "code", params.Code) err := params.template.Execute(w, params) if err != nil { log.Error(err.Error()) @@ -116,14 +115,14 @@ func respondHTML(w http.ResponseWriter, r *http.Request, params *ResponseParams) func respondJSON(w http.ResponseWriter, r *http.Request, params *ResponseParams) error { jsonCounter.Inc(1) - log.Debug("respondJSON", "ruid", GetRUID(r.Context())) + log.Info("respondJSON", "ruid", GetRUID(r.Context()), "code", params.Code) w.Header().Set("Content-Type", "application/json") return json.NewEncoder(w).Encode(params) } func respondPlaintext(w http.ResponseWriter, r *http.Request, params *ResponseParams) error { plaintextCounter.Inc(1) - log.Debug("respondPlaintext", "ruid", GetRUID(r.Context())) + log.Info("respondPlaintext", "ruid", GetRUID(r.Context()), "code", params.Code) w.Header().Set("Content-Type", "text/plain") strToWrite := "Code: " + fmt.Sprintf("%d", params.Code) + "\n" strToWrite += "Message: " + string(params.Msg) + "\n" diff --git a/swarm/api/http/sctx.go b/swarm/api/http/sctx.go index 431e11735..b8dafab0b 100644 --- a/swarm/api/http/sctx.go +++ b/swarm/api/http/sctx.go @@ -7,14 +7,10 @@ import ( "github.com/ethereum/go-ethereum/swarm/sctx" ) -type contextKey int - -const ( - uriKey contextKey = iota -) +type uriKey struct{} func GetRUID(ctx context.Context) string { - v, ok := ctx.Value(sctx.HTTPRequestIDKey).(string) + v, ok := ctx.Value(sctx.HTTPRequestIDKey{}).(string) if ok { return v } @@ -22,11 +18,11 @@ func GetRUID(ctx context.Context) string { } func SetRUID(ctx context.Context, ruid string) context.Context { - return context.WithValue(ctx, sctx.HTTPRequestIDKey, ruid) + return context.WithValue(ctx, sctx.HTTPRequestIDKey{}, ruid) } func GetURI(ctx context.Context) *api.URI { - v, ok := ctx.Value(uriKey).(*api.URI) + v, ok := ctx.Value(uriKey{}).(*api.URI) if ok { return v } @@ -34,5 +30,5 @@ func GetURI(ctx context.Context) *api.URI { } func SetURI(ctx context.Context, uri *api.URI) context.Context { - return context.WithValue(ctx, uriKey, uri) + return context.WithValue(ctx, uriKey{}, uri) } diff --git a/swarm/api/http/server.go b/swarm/api/http/server.go index b4294b058..3c6735a73 100644 --- a/swarm/api/http/server.go +++ b/swarm/api/http/server.go @@ -41,16 +41,9 @@ import ( "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" - "github.com/rs/cors" ) -type resourceResponse struct { - Manifest storage.Address `json:"manifest"` - Resource string `json:"resource"` - Update storage.Address `json:"update"` -} - var ( postRawCount = metrics.NewRegisteredCounter("api.http.post.raw.count", nil) postRawFail = metrics.NewRegisteredCounter("api.http.post.raw.fail", nil) @@ -191,10 +184,10 @@ func (s *Server) HandleBzzGet(w http.ResponseWriter, r *http.Request) { if err != nil { if isDecryptError(err) { w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", uri.Address().String())) - RespondError(w, r, err.Error(), http.StatusUnauthorized) + respondError(w, r, err.Error(), http.StatusUnauthorized) return } - RespondError(w, r, fmt.Sprintf("Had an error building the tarball: %v", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("Had an error building the tarball: %v", err), http.StatusInternalServerError) return } defer reader.Close() @@ -218,7 +211,7 @@ func (s *Server) HandleBzzGet(w http.ResponseWriter, r *http.Request) { func (s *Server) HandleRootPaths(w http.ResponseWriter, r *http.Request) { switch r.RequestURI { case "/": - RespondTemplate(w, r, "landing-page", "Swarm: Please request a valid ENS or swarm hash with the appropriate bzz scheme", 200) + respondTemplate(w, r, "landing-page", "Swarm: Please request a valid ENS or swarm hash with the appropriate bzz scheme", 200) return case "/robots.txt": w.Header().Set("Last-Modified", time.Now().Format(http.TimeFormat)) @@ -227,7 +220,7 @@ func (s *Server) HandleRootPaths(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write(faviconBytes) default: - RespondError(w, r, "Not Found", http.StatusNotFound) + respondError(w, r, "Not Found", http.StatusNotFound) } } @@ -247,26 +240,26 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) { if uri.Path != "" { postRawFail.Inc(1) - RespondError(w, r, "raw POST request cannot contain a path", http.StatusBadRequest) + respondError(w, r, "raw POST request cannot contain a path", http.StatusBadRequest) return } if uri.Addr != "" && uri.Addr != "encrypt" { postRawFail.Inc(1) - RespondError(w, r, "raw POST request addr can only be empty or \"encrypt\"", http.StatusBadRequest) + respondError(w, r, "raw POST request addr can only be empty or \"encrypt\"", http.StatusBadRequest) return } if r.Header.Get("Content-Length") == "" { postRawFail.Inc(1) - RespondError(w, r, "missing Content-Length header in request", http.StatusBadRequest) + respondError(w, r, "missing Content-Length header in request", http.StatusBadRequest) return } addr, _, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt) if err != nil { postRawFail.Inc(1) - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) return } @@ -290,7 +283,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) if err != nil { postFilesFail.Inc(1) - RespondError(w, r, err.Error(), http.StatusBadRequest) + respondError(w, r, err.Error(), http.StatusBadRequest) return } @@ -305,7 +298,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { addr, err = s.api.Resolve(r.Context(), uri.Addr) if err != nil { postFilesFail.Inc(1) - RespondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusInternalServerError) return } log.Debug("resolved key", "ruid", ruid, "key", addr) @@ -313,7 +306,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { addr, err = s.api.NewManifest(r.Context(), toEncrypt) if err != nil { postFilesFail.Inc(1) - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) return } log.Debug("new manifest", "ruid", ruid, "key", addr) @@ -324,7 +317,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { case "application/x-tar": _, err := s.handleTarUpload(r, mw) if err != nil { - RespondError(w, r, fmt.Sprintf("error uploading tarball: %v", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("error uploading tarball: %v", err), http.StatusInternalServerError) return err } return nil @@ -337,7 +330,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { }) if err != nil { postFilesFail.Inc(1) - RespondError(w, r, fmt.Sprintf("cannot create manifest: %s", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("cannot create manifest: %s", err), http.StatusInternalServerError) return } @@ -373,7 +366,7 @@ func (s *Server) handleMultipartUpload(r *http.Request, boundary string, mw *api } var size int64 - var reader io.Reader = part + var reader io.Reader if contentLength := part.Header.Get("Content-Length"); contentLength != "" { size, err = strconv.ParseInt(contentLength, 10, 64) if err != nil { @@ -446,7 +439,7 @@ func (s *Server) HandleDelete(w http.ResponseWriter, r *http.Request) { newKey, err := s.api.Delete(r.Context(), uri.Addr, uri.Path) if err != nil { deleteFail.Inc(1) - RespondError(w, r, fmt.Sprintf("could not delete from manifest: %v", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("could not delete from manifest: %v", err), http.StatusInternalServerError) return } @@ -467,7 +460,7 @@ func (s *Server) HandlePostFeed(w http.ResponseWriter, r *http.Request) { // Creation and update must send feed.updateRequestJSON JSON structure body, err := ioutil.ReadAll(r.Body) if err != nil { - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) return } @@ -478,7 +471,7 @@ func (s *Server) HandlePostFeed(w http.ResponseWriter, r *http.Request) { if err == api.ErrCannotLoadFeedManifest || err == api.ErrCannotResolveFeedURI { httpStatus = http.StatusNotFound } - RespondError(w, r, fmt.Sprintf("cannot retrieve feed from manifest: %s", err), httpStatus) + respondError(w, r, fmt.Sprintf("cannot retrieve feed from manifest: %s", err), httpStatus) return } @@ -487,32 +480,32 @@ func (s *Server) HandlePostFeed(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() if err := updateRequest.FromValues(query, body); err != nil { // decodes request from query parameters - RespondError(w, r, err.Error(), http.StatusBadRequest) + respondError(w, r, err.Error(), http.StatusBadRequest) return } - if updateRequest.IsUpdate() { + switch { + case updateRequest.IsUpdate(): // Verify that the signature is intact and that the signer is authorized // to update this feed // Check this early, to avoid creating a feed and then not being able to set its first update. if err = updateRequest.Verify(); err != nil { - RespondError(w, r, err.Error(), http.StatusForbidden) + respondError(w, r, err.Error(), http.StatusForbidden) return } _, err = s.api.FeedsUpdate(r.Context(), &updateRequest) if err != nil { - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) return } - } - - if query.Get("manifest") == "1" { + fallthrough + case query.Get("manifest") == "1": // we create a manifest so we can retrieve feed updates with bzz:// later // this manifest has a special "feed type" manifest, and saves the // feed identification used to retrieve feed updates later m, err := s.api.NewFeedManifest(r.Context(), &updateRequest.Feed) if err != nil { - RespondError(w, r, fmt.Sprintf("failed to create feed manifest: %v", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("failed to create feed manifest: %v", err), http.StatusInternalServerError) return } // the key to the manifest will be passed back to the client @@ -520,12 +513,14 @@ func (s *Server) HandlePostFeed(w http.ResponseWriter, r *http.Request) { // the manifest key can be set as content in the resolver of the ENS name outdata, err := json.Marshal(m) if err != nil { - RespondError(w, r, fmt.Sprintf("failed to create json response: %s", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("failed to create json response: %s", err), http.StatusInternalServerError) return } fmt.Fprint(w, string(outdata)) w.Header().Add("Content-type", "application/json") + default: + respondError(w, r, "Missing signature in feed update request", http.StatusBadRequest) } } @@ -557,7 +552,7 @@ func (s *Server) HandleGetFeed(w http.ResponseWriter, r *http.Request) { if err == api.ErrCannotLoadFeedManifest || err == api.ErrCannotResolveFeedURI { httpStatus = http.StatusNotFound } - RespondError(w, r, fmt.Sprintf("cannot retrieve feed information from manifest: %s", err), httpStatus) + respondError(w, r, fmt.Sprintf("cannot retrieve feed information from manifest: %s", err), httpStatus) return } @@ -566,12 +561,12 @@ func (s *Server) HandleGetFeed(w http.ResponseWriter, r *http.Request) { unsignedUpdateRequest, err := s.api.FeedsNewRequest(r.Context(), fd) if err != nil { getFail.Inc(1) - RespondError(w, r, fmt.Sprintf("cannot retrieve feed metadata for feed=%s: %s", fd.Hex(), err), http.StatusNotFound) + respondError(w, r, fmt.Sprintf("cannot retrieve feed metadata for feed=%s: %s", fd.Hex(), err), http.StatusNotFound) return } rawResponse, err := unsignedUpdateRequest.MarshalJSON() if err != nil { - RespondError(w, r, fmt.Sprintf("cannot encode unsigned feed update request: %v", err), http.StatusInternalServerError) + respondError(w, r, fmt.Sprintf("cannot encode unsigned feed update request: %v", err), http.StatusInternalServerError) return } w.Header().Add("Content-type", "application/json") @@ -582,7 +577,7 @@ func (s *Server) HandleGetFeed(w http.ResponseWriter, r *http.Request) { lookupParams := &feed.Query{Feed: *fd} if err = lookupParams.FromValues(r.URL.Query()); err != nil { // parse period, version - RespondError(w, r, fmt.Sprintf("invalid feed update request:%s", err), http.StatusBadRequest) + respondError(w, r, fmt.Sprintf("invalid feed update request:%s", err), http.StatusBadRequest) return } @@ -591,7 +586,7 @@ func (s *Server) HandleGetFeed(w http.ResponseWriter, r *http.Request) { // any error from the switch statement will end up here if err != nil { code, err2 := s.translateFeedError(w, r, "feed lookup fail", err) - RespondError(w, r, err2.Error(), code) + respondError(w, r, err2.Error(), code) return } @@ -637,7 +632,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *http.Request) { addr, err := s.api.ResolveURI(r.Context(), uri, pass) if err != nil { getFail.Inc(1) - RespondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound) + respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound) return } w.Header().Set("Cache-Control", "max-age=2147483648, immutable") // url was of type bzz://<hex key>/path, so we are sure it is immutable. @@ -661,7 +656,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *http.Request) { reader, isEncrypted := s.api.Retrieve(r.Context(), addr) if _, err := reader.Size(r.Context(), nil); err != nil { getFail.Inc(1) - RespondError(w, r, fmt.Sprintf("root chunk not found %s: %s", addr, err), http.StatusNotFound) + respondError(w, r, fmt.Sprintf("root chunk not found %s: %s", addr, err), http.StatusNotFound) return } @@ -701,7 +696,7 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *http.Request) { addr, err := s.api.Resolve(r.Context(), uri.Addr) if err != nil { getListFail.Inc(1) - RespondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound) + respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound) return } log.Debug("handle.get.list: resolved", "ruid", ruid, "key", addr) @@ -711,10 +706,10 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *http.Request) { getListFail.Inc(1) if isDecryptError(err) { w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", addr.String())) - RespondError(w, r, err.Error(), http.StatusUnauthorized) + respondError(w, r, err.Error(), http.StatusUnauthorized) return } - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) return } @@ -762,7 +757,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { manifestAddr, err = s.api.Resolve(r.Context(), uri.Addr) if err != nil { getFileFail.Inc(1) - RespondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound) + respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound) return } } else { @@ -786,17 +781,17 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { if err != nil { if isDecryptError(err) { w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", manifestAddr)) - RespondError(w, r, err.Error(), http.StatusUnauthorized) + respondError(w, r, err.Error(), http.StatusUnauthorized) return } switch status { case http.StatusNotFound: getFileNotFound.Inc(1) - RespondError(w, r, err.Error(), http.StatusNotFound) + respondError(w, r, err.Error(), http.StatusNotFound) default: getFileFail.Inc(1) - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) } return } @@ -809,10 +804,10 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { getFileFail.Inc(1) if isDecryptError(err) { w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", manifestAddr)) - RespondError(w, r, err.Error(), http.StatusUnauthorized) + respondError(w, r, err.Error(), http.StatusUnauthorized) return } - RespondError(w, r, err.Error(), http.StatusInternalServerError) + respondError(w, r, err.Error(), http.StatusInternalServerError) return } @@ -825,7 +820,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { // check the root chunk exists by retrieving the file's size if _, err := reader.Size(r.Context(), nil); err != nil { getFileNotFound.Inc(1) - RespondError(w, r, fmt.Sprintf("file not found %s: %s", uri, err), http.StatusNotFound) + respondError(w, r, fmt.Sprintf("file not found %s: %s", uri, err), http.StatusNotFound) return } diff --git a/swarm/api/http/server_test.go b/swarm/api/http/server_test.go index 1cf7ff577..04d0e045a 100644 --- a/swarm/api/http/server_test.go +++ b/swarm/api/http/server_test.go @@ -263,7 +263,7 @@ func TestBzzFeed(t *testing.T) { if resp.StatusCode == http.StatusOK { t.Fatal("Expected error status since feed update does not contain multihash. Received 200 OK") } - b, err = ioutil.ReadAll(resp.Body) + _, err = ioutil.ReadAll(resp.Body) if err != nil { t.Fatal(err) } @@ -333,15 +333,45 @@ func TestBzzFeed(t *testing.T) { } urlQuery = testUrl.Query() body = updateRequest.AppendValues(urlQuery) // this adds all query parameters + goodQueryParameters := urlQuery.Encode() // save the query parameters for a second attempt + + // create bad query parameters in which the signature is missing + urlQuery.Del("signature") testUrl.RawQuery = urlQuery.Encode() + // 1st attempt with bad query parameters in which the signature is missing resp, err = http.Post(testUrl.String(), "application/octet-stream", bytes.NewReader(body)) if err != nil { t.Fatal(err) } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - t.Fatalf("Update returned %s", resp.Status) + expectedCode := http.StatusBadRequest + if resp.StatusCode != expectedCode { + t.Fatalf("Update returned %s. Expected %d", resp.Status, expectedCode) + } + + // 2nd attempt with bad query parameters in which the signature is of incorrect length + urlQuery.Set("signature", "0xabcd") // should be 130 hex chars + resp, err = http.Post(testUrl.String(), "application/octet-stream", bytes.NewReader(body)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + expectedCode = http.StatusBadRequest + if resp.StatusCode != expectedCode { + t.Fatalf("Update returned %s. Expected %d", resp.Status, expectedCode) + } + + // 3rd attempt, with good query parameters: + testUrl.RawQuery = goodQueryParameters + resp, err = http.Post(testUrl.String(), "application/octet-stream", bytes.NewReader(body)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + expectedCode = http.StatusOK + if resp.StatusCode != expectedCode { + t.Fatalf("Update returned %s. Expected %d", resp.Status, expectedCode) } // get latest update through bzz-feed directly @@ -461,6 +491,9 @@ func testBzzGetPath(encrypted bool, t *testing.T) { } defer resp.Body.Close() respbody, err = ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Error while reading response body: %v", err) + } if string(respbody) != testmanifest[v] { isexpectedfailrequest := false diff --git a/swarm/api/manifest.go b/swarm/api/manifest.go index 7c4cc88e4..890ed88bd 100644 --- a/swarm/api/manifest.go +++ b/swarm/api/manifest.go @@ -557,7 +557,6 @@ func (mt *manifestTrie) findPrefixOf(path string, quitC chan bool) (entry *manif if path != entry.Path { return nil, 0 } - pos = epl } } return nil, 0 diff --git a/swarm/network/hive_test.go b/swarm/network/hive_test.go index 059c3dc96..56adc5a8e 100644 --- a/swarm/network/hive_test.go +++ b/swarm/network/hive_test.go @@ -70,6 +70,9 @@ func TestHiveStatePersistance(t *testing.T) { defer os.RemoveAll(dir) store, err := state.NewDBStore(dir) //start the hive with an empty dbstore + if err != nil { + t.Fatal(err) + } params := NewHiveParams() s, pp := newHiveTester(t, params, 5, store) @@ -90,6 +93,9 @@ func TestHiveStatePersistance(t *testing.T) { store.Close() persistedStore, err := state.NewDBStore(dir) //start the hive with an empty dbstore + if err != nil { + t.Fatal(err) + } s1, pp := newHiveTester(t, params, 1, persistedStore) diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go index 55a0c6f13..cd94741be 100644 --- a/swarm/network/kademlia.go +++ b/swarm/network/kademlia.go @@ -261,7 +261,7 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) { // found among live peers, do nothing return v }) - if ins { + if ins && !p.BzzPeer.LightNode { a := newEntry(p.BzzAddr) a.conn = p // insert new online peer into addrs @@ -329,14 +329,18 @@ func (k *Kademlia) Off(p *Peer) { k.lock.Lock() defer k.lock.Unlock() var del bool - k.addrs, _, _, _ = pot.Swap(k.addrs, p, pof, func(v pot.Val) pot.Val { - // v cannot be nil, must check otherwise we overwrite entry - if v == nil { - panic(fmt.Sprintf("connected peer not found %v", p)) - } + if !p.BzzPeer.LightNode { + k.addrs, _, _, _ = pot.Swap(k.addrs, p, pof, func(v pot.Val) pot.Val { + // v cannot be nil, must check otherwise we overwrite entry + if v == nil { + panic(fmt.Sprintf("connected peer not found %v", p)) + } + del = true + return newEntry(p.BzzAddr) + }) + } else { del = true - return newEntry(p.BzzAddr) - }) + } if del { k.conns, _, _, _ = pot.Swap(k.conns, p, pof, func(_ pot.Val) pot.Val { diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go index 903c8dbda..d2e051f45 100644 --- a/swarm/network/kademlia_test.go +++ b/swarm/network/kademlia_test.go @@ -46,19 +46,19 @@ func newTestKademlia(b string) *Kademlia { return NewKademlia(base, params) } -func newTestKadPeer(k *Kademlia, s string) *Peer { - return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s)}, k) +func newTestKadPeer(k *Kademlia, s string, lightNode bool) *Peer { + return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s), LightNode: lightNode}, k) } func On(k *Kademlia, ons ...string) { for _, s := range ons { - k.On(newTestKadPeer(k, s)) + k.On(newTestKadPeer(k, s, false)) } } func Off(k *Kademlia, offs ...string) { for _, s := range offs { - k.Off(newTestKadPeer(k, s)) + k.Off(newTestKadPeer(k, s, false)) } } @@ -254,6 +254,56 @@ func TestSuggestPeerFindPeers(t *testing.T) { } +// a node should stay in the address book if it's removed from the kademlia +func TestOffEffectingAddressBookNormalNode(t *testing.T) { + k := newTestKademlia("00000000") + // peer added to kademlia + k.On(newTestKadPeer(k, "01000000", false)) + // peer should be in the address book + if k.addrs.Size() != 1 { + t.Fatal("known peer addresses should contain 1 entry") + } + // peer should be among live connections + if k.conns.Size() != 1 { + t.Fatal("live peers should contain 1 entry") + } + // remove peer from kademlia + k.Off(newTestKadPeer(k, "01000000", false)) + // peer should be in the address book + if k.addrs.Size() != 1 { + t.Fatal("known peer addresses should contain 1 entry") + } + // peer should not be among live connections + if k.conns.Size() != 0 { + t.Fatal("live peers should contain 0 entry") + } +} + +// a light node should not be in the address book +func TestOffEffectingAddressBookLightNode(t *testing.T) { + k := newTestKademlia("00000000") + // light node peer added to kademlia + k.On(newTestKadPeer(k, "01000000", true)) + // peer should not be in the address book + if k.addrs.Size() != 0 { + t.Fatal("known peer addresses should contain 0 entry") + } + // peer should be among live connections + if k.conns.Size() != 1 { + t.Fatal("live peers should contain 1 entry") + } + // remove peer from kademlia + k.Off(newTestKadPeer(k, "01000000", true)) + // peer should not be in the address book + if k.addrs.Size() != 0 { + t.Fatal("known peer addresses should contain 0 entry") + } + // peer should not be among live connections + if k.conns.Size() != 0 { + t.Fatal("live peers should contain 0 entry") + } +} + func TestSuggestPeerRetries(t *testing.T) { k := newTestKademlia("00000000") k.RetryInterval = int64(300 * time.Millisecond) // cycle diff --git a/swarm/network/protocol_test.go b/swarm/network/protocol_test.go index 4b83c7a27..f0d266628 100644 --- a/swarm/network/protocol_test.go +++ b/swarm/network/protocol_test.go @@ -50,10 +50,6 @@ type testStore struct { values map[string][]byte } -func newTestStore() *testStore { - return &testStore{values: make(map[string][]byte)} -} - func (t *testStore) Load(key string) ([]byte, error) { t.Lock() defer t.Unlock() @@ -157,17 +153,7 @@ func newBzzHandshakeTester(t *testing.T, n int, addr *BzzAddr, lightNode bool) * // should test handshakes in one exchange? parallelisation func (s *bzzTester) testHandshake(lhs, rhs *HandshakeMsg, disconnects ...*p2ptest.Disconnect) error { - var peers []enode.ID - id := rhs.Addr.ID() - if len(disconnects) > 0 { - for _, d := range disconnects { - peers = append(peers, d.Peer) - } - } else { - peers = []enode.ID{id} - } - - if err := s.TestExchanges(HandshakeMsgExchange(lhs, rhs, id)...); err != nil { + if err := s.TestExchanges(HandshakeMsgExchange(lhs, rhs, rhs.Addr.ID())...); err != nil { return err } diff --git a/swarm/network/simulation/bucket_test.go b/swarm/network/simulation/bucket_test.go index 461d99825..69df19bfe 100644 --- a/swarm/network/simulation/bucket_test.go +++ b/swarm/network/simulation/bucket_test.go @@ -94,7 +94,7 @@ func TestServiceBucket(t *testing.T) { t.Fatalf("expected %q, got %q", customValue, s) } - v, ok = sim.NodeItem(id2, customKey) + _, ok = sim.NodeItem(id2, customKey) if ok { t.Fatal("bucket item should not be found") } @@ -119,7 +119,7 @@ func TestServiceBucket(t *testing.T) { t.Fatalf("expected %q, got %q", testValue+id1.String(), s) } - v, ok = items[id2] + _, ok = items[id2] if ok { t.Errorf("node 2 item should not be found") } diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 9092ffe3e..0109fbdef 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -245,7 +245,10 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) ( } else { d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool { id := p.ID() - // TODO: skip light nodes that do not accept retrieve requests + if p.LightNode { + // skip light nodes + return true + } if req.SkipPeer(id.String()) { log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id) return true diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index 949645558..c77682e0e 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -29,17 +29,25 @@ import ( "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/protocols" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/network" + pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/state" "github.com/ethereum/go-ethereum/swarm/storage" ) +//Tests initializing a retrieve request func TestStreamerRetrieveRequest(t *testing.T) { - tester, streamer, _, teardown, err := newStreamerTester(t, nil) + regOpts := &RegistryOptions{ + Retrieval: RetrievalClientOnly, + Syncing: SyncingDisabled, + } + tester, streamer, _, teardown, err := newStreamerTester(t, regOpts) defer teardown() if err != nil { t.Fatal(err) @@ -55,10 +63,21 @@ func TestStreamerRetrieveRequest(t *testing.T) { ) streamer.delivery.RequestFromPeers(ctx, req) + stream := NewStream(swarmChunkServerStreamName, "", true) + err = tester.TestExchanges(p2ptest.Exchange{ Label: "RetrieveRequestMsg", Expects: []p2ptest.Expect{ - { + { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly` + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: nil, + Priority: Top, + }, + Peer: node.ID(), + }, + { //expect a retrieve request message for the given hash Code: 5, Msg: &RetrieveRequestMsg{ Addr: hash0[:], @@ -74,9 +93,12 @@ func TestStreamerRetrieveRequest(t *testing.T) { } } +//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet) +//Should time out as the peer does not have the chunk (no syncing happened previously) func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalEnabled, + Syncing: SyncingDisabled, //do no syncing }) defer teardown() if err != nil { @@ -89,16 +111,31 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { peer := streamer.getPeer(node.ID()) + stream := NewStream(swarmChunkServerStreamName, "", true) + //simulate pre-subscription to RETRIEVE_REQUEST stream on peer peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{ - Stream: NewStream(swarmChunkServerStreamName, "", true), + Stream: stream, History: nil, Priority: Top, }) + //test the exchange err = tester.TestExchanges(p2ptest.Exchange{ + Expects: []p2ptest.Expect{ + { //first expect a subscription to the RETRIEVE_REQUEST stream + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: nil, + Priority: Top, + }, + Peer: node.ID(), + }, + }, + }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ - { + { //then the actual RETRIEVE_REQUEST.... Code: 5, Msg: &RetrieveRequestMsg{ Addr: chunk.Address()[:], @@ -107,7 +144,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { }, }, Expects: []p2ptest.Expect{ - { + { //to which the peer responds with offered hashes Code: 1, Msg: &OfferedHashesMsg{ HandoverProof: nil, @@ -120,7 +157,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { }, }) - expectedError := `exchange #0 "RetrieveRequestMsg": timed out` + //should fail with a timeout as the peer we are requesting + //the chunk from does not have the chunk + expectedError := `exchange #1 "RetrieveRequestMsg": timed out` if err == nil || err.Error() != expectedError { t.Fatalf("Expected error %v, got %v", expectedError, err) } @@ -130,7 +169,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { // offered hashes or delivery if skipHash is set to true func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalEnabled, + Syncing: SyncingDisabled, }) defer teardown() if err != nil { @@ -138,6 +178,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } node := tester.Nodes[0] + peer := streamer.getPeer(node.ID()) stream := NewStream(swarmChunkServerStreamName, "", true) @@ -156,6 +197,18 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } err = tester.TestExchanges(p2ptest.Exchange{ + Expects: []p2ptest.Expect{ + { + Code: 4, + Msg: &SubscribeMsg{ + Stream: stream, + History: nil, + Priority: Top, + }, + Peer: node.ID(), + }, + }, + }, p2ptest.Exchange{ Label: "RetrieveRequestMsg", Triggers: []p2ptest.Trigger{ { @@ -224,9 +277,90 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { } } +// if there is one peer in the Kademlia, RequestFromPeers should return it +func TestRequestFromPeers(t *testing.T) { + dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8") + + addr := network.RandomAddr() + to := network.NewKademlia(addr.OAddr, network.NewKadParams()) + delivery := NewDelivery(to, nil) + protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil) + peer := network.NewPeer(&network.BzzPeer{ + BzzAddr: network.RandomAddr(), + LightNode: false, + Peer: protocolsPeer, + }, to) + to.On(peer) + r := NewRegistry(addr.ID(), delivery, nil, nil, nil) + + // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished + sp := &Peer{ + Peer: protocolsPeer, + pq: pq.New(int(PriorityQueue), PriorityQueueCap), + streamer: r, + } + r.setPeer(sp) + req := network.NewRequest( + storage.Address(hash0[:]), + true, + &sync.Map{}, + ) + ctx := context.Background() + id, _, err := delivery.RequestFromPeers(ctx, req) + + if err != nil { + t.Fatal(err) + } + if *id != dummyPeerID { + t.Fatalf("Expected an id, got %v", id) + } +} + +// RequestFromPeers should not return light nodes +func TestRequestFromPeersWithLightNode(t *testing.T) { + dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8") + + addr := network.RandomAddr() + to := network.NewKademlia(addr.OAddr, network.NewKadParams()) + delivery := NewDelivery(to, nil) + + protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil) + // setting up a lightnode + peer := network.NewPeer(&network.BzzPeer{ + BzzAddr: network.RandomAddr(), + LightNode: true, + Peer: protocolsPeer, + }, to) + to.On(peer) + r := NewRegistry(addr.ID(), delivery, nil, nil, nil) + // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished + sp := &Peer{ + Peer: protocolsPeer, + pq: pq.New(int(PriorityQueue), PriorityQueueCap), + streamer: r, + } + r.setPeer(sp) + + req := network.NewRequest( + storage.Address(hash0[:]), + true, + &sync.Map{}, + ) + + ctx := context.Background() + // making a request which should return with "no peer found" + _, _, err := delivery.RequestFromPeers(ctx, req) + + expectedError := "no peer found" + if err.Error() != expectedError { + t.Fatalf("expected '%v', got %v", expectedError, err) + } +} + func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, }) defer teardown() if err != nil { @@ -241,6 +375,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { node := tester.Nodes[0] + //subscribe to custom stream stream := NewStream("foo", "", true) err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) if err != nil { @@ -253,7 +388,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Expects: []p2ptest.Expect{ - { + { //first expect subscription to the custom stream... Code: 4, Msg: &SubscribeMsg{ Stream: stream, @@ -267,7 +402,8 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { p2ptest.Exchange{ Label: "ChunkDelivery message", Triggers: []p2ptest.Trigger{ - { + { //...then trigger a chunk delivery for the given chunk from peer in order for + //local node to get the chunk delivered Code: 6, Msg: &ChunkDeliveryMsg{ Addr: chunkKey, @@ -342,8 +478,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - SkipCheck: skipCheck, - DoServeRetrieve: true, + SkipCheck: skipCheck, + Syncing: SyncingDisabled, + Retrieval: RetrievalEnabled, }) bucket.Store(bucketKeyRegistry, r) @@ -408,20 +545,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck return err } - //each of the nodes (except pivot node) subscribes to the stream of the next node - for j, node := range nodeIDs[0 : nodes-1] { - sid := nodeIDs[j+1] - item, ok := sim.NodeItem(node, bucketKeyRegistry) - if !ok { - return fmt.Errorf("No registry") - } - registry := item.(*Registry) - err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", true), nil, Top) - if err != nil { - return err - } - } - //get the pivot node's filestore item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore) if !ok { @@ -530,7 +653,8 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: skipCheck, - DoSync: true, + Syncing: SyncingDisabled, + Retrieval: RetrievalDisabled, SyncUpdateDelay: 0, }) diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 3164193b3..0c95fabb7 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -83,6 +83,8 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingRegisterOnly, SkipCheck: skipCheck, }) bucket.Store(bucketKeyRegistry, r) diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go index 0d3bc7f54..65cde2411 100644 --- a/swarm/network/stream/lightnode_test.go +++ b/swarm/network/stream/lightnode_test.go @@ -25,7 +25,8 @@ import ( // when it is serving Retrieve requests. func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { registryOptions := &RegistryOptions{ - DoServeRetrieve: true, + Retrieval: RetrievalClientOnly, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(t, registryOptions) defer teardown() @@ -63,7 +64,8 @@ func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) { // requests are disabled func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { registryOptions := &RegistryOptions{ - DoServeRetrieve: false, + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(t, registryOptions) defer teardown() @@ -106,7 +108,8 @@ func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) { // when syncing is enabled. func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { registryOptions := &RegistryOptions{ - DoSync: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingRegisterOnly, } tester, _, _, teardown, err := newStreamerTester(t, registryOptions) defer teardown() @@ -150,7 +153,8 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) { // when syncing is disabled. func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) { registryOptions := &RegistryOptions{ - DoSync: false, + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, } tester, _, _, teardown, err := newStreamerTester(t, registryOptions) defer teardown() diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go index b81cfc0ca..ad1519341 100644 --- a/swarm/network/stream/snapshot_retrieval_test.go +++ b/swarm/network/stream/snapshot_retrieval_test.go @@ -127,10 +127,9 @@ func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s no netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoSync: true, + Retrieval: RetrievalEnabled, + Syncing: SyncingAutoSubscribe, SyncUpdateDelay: 3 * time.Second, - DoRetrieve: true, - DoServeRetrieve: true, }) fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams()) diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go index 8d89f28cb..2ddbed936 100644 --- a/swarm/network/stream/snapshot_sync_test.go +++ b/swarm/network/stream/snapshot_sync_test.go @@ -165,8 +165,8 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoSync: true, - DoServeRetrieve: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingAutoSubscribe, SyncUpdateDelay: 3 * time.Second, }) @@ -360,8 +360,8 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ - DoServeRetrieve: true, - DoSync: true, + Retrieval: RetrievalDisabled, + Syncing: SyncingRegisterOnly, }) bucket.Store(bucketKeyRegistry, r) diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index 0ac374def..695ff0c50 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -47,6 +47,31 @@ const ( HashSize = 32 ) +//Enumerate options for syncing and retrieval +type SyncingOption int +type RetrievalOption int + +//Syncing options +const ( + //Syncing disabled + SyncingDisabled SyncingOption = iota + //Register the client and the server but not subscribe + SyncingRegisterOnly + //Both client and server funcs are registered, subscribe sent automatically + SyncingAutoSubscribe +) + +const ( + //Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) + RetrievalDisabled RetrievalOption = iota + //Only the client side of the retrieve request is registered. + //(light nodes do not serve retrieve requests) + //once the client is registered, subscription to retrieve request stream is always sent + RetrievalClientOnly + //Both client and server funcs are registered, subscribe sent automatically + RetrievalEnabled +) + // Registry registry for outgoing and incoming streamer constructors type Registry struct { addr enode.ID @@ -60,16 +85,15 @@ type Registry struct { peers map[enode.ID]*Peer delivery *Delivery intervalsStore state.Store - doRetrieve bool + autoRetrieval bool //automatically subscribe to retrieve request stream maxPeerServers int } // RegistryOptions holds optional values for NewRegistry constructor. type RegistryOptions struct { SkipCheck bool - DoSync bool // Sets if the server syncs with peers. Default is true, set to false by lightnode or nosync flags. - DoRetrieve bool // Sets if the server issues Retrieve requests. Default is true. - DoServeRetrieve bool // Sets if the server serves Retrieve requests. Default is true, set to false by lightnode flag. + Syncing SyncingOption //Defines syncing behavior + Retrieval RetrievalOption //Defines retrieval behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry } @@ -82,6 +106,9 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy if options.SyncUpdateDelay <= 0 { options.SyncUpdateDelay = 15 * time.Second } + //check if retriaval has been disabled + retrieval := options.Retrieval != RetrievalDisabled + streamer := &Registry{ addr: localID, skipCheck: options.SkipCheck, @@ -90,13 +117,14 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy peers: make(map[enode.ID]*Peer), delivery: delivery, intervalsStore: intervalsStore, - doRetrieve: options.DoRetrieve, + autoRetrieval: retrieval, maxPeerServers: options.MaxPeerServers, } streamer.api = NewAPI(streamer) delivery.getPeer = streamer.getPeer - if options.DoServeRetrieve { + //if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only) + if options.Retrieval == RetrievalEnabled { streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) { if !live { return nil, errors.New("only live retrieval requests supported") @@ -105,16 +133,21 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy }) } - streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { - return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) - }) + //if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests) + if options.Retrieval != RetrievalDisabled { + streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) { + return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live)) + }) + } - if options.DoSync { + //If syncing is not disabled, the syncing functions are registered (both client and server) + if options.Syncing != SyncingDisabled { RegisterSwarmSyncerServer(streamer, syncChunkStore) RegisterSwarmSyncerClient(streamer, syncChunkStore) } - if options.DoSync { + //if syncing is set to automatically subscribe to the syncing stream, start the subscription process + if options.Syncing == SyncingAutoSubscribe { // latestIntC function ensures that // - receiving from the in chan is not blocked by processing inside the for loop // - the latest int value is delivered to the loop after the processing is done @@ -385,7 +418,7 @@ func (r *Registry) Run(p *network.BzzPeer) error { defer close(sp.quit) defer sp.close() - if r.doRetrieve { + if r.autoRetrieval && !p.LightNode { err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top) if err != nil { return err diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index e7f79e7a1..16c74d3b3 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -765,6 +765,8 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { func TestMaxPeerServersWithUnsubscribe(t *testing.T) { var maxPeerServers = 6 tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingDisabled, MaxPeerServers: maxPeerServers, }) defer teardown() diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go index f2be3bef9..b0e35b0db 100644 --- a/swarm/network/stream/syncer_test.go +++ b/swarm/network/stream/syncer_test.go @@ -62,6 +62,9 @@ func createMockStore(globalStore *mockdb.GlobalStore, id enode.ID, addr *network params.Init(datadir) params.BaseKey = addr.Over() lstore, err = storage.NewLocalStore(params, mockStore) + if err != nil { + return nil, "", err + } return lstore, datadir, nil } @@ -114,6 +117,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck bucket.Store(bucketKeyDelivery, delivery) r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + Retrieval: RetrievalDisabled, + Syncing: SyncingAutoSubscribe, SkipCheck: skipCheck, }) diff --git a/swarm/pot/address.go b/swarm/pot/address.go index 3974ebcaa..728dac14e 100644 --- a/swarm/pot/address.go +++ b/swarm/pot/address.go @@ -79,46 +79,6 @@ func (a Address) Bytes() []byte { return a[:] } -/* -Proximity(x, y) returns the proximity order of the MSB distance between x and y - -The distance metric MSB(x, y) of two equal length byte sequences x an y is the -value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed. -the binary cast is big endian: most significant bit first (=MSB). - -Proximity(x, y) is a discrete logarithmic scaling of the MSB distance. -It is defined as the reverse rank of the integer part of the base 2 -logarithm of the distance. -It is calculated by counting the number of common leading zeros in the (MSB) -binary representation of the x^y. - -(0 farthest, 255 closest, 256 self) -*/ -func proximity(one, other Address) (ret int, eq bool) { - return posProximity(one, other, 0) -} - -// posProximity(a, b, pos) returns proximity order of b wrt a (symmetric) pretending -// the first pos bits match, checking only bits index >= pos -func posProximity(one, other Address, pos int) (ret int, eq bool) { - for i := pos / 8; i < len(one); i++ { - if one[i] == other[i] { - continue - } - oxo := one[i] ^ other[i] - start := 0 - if i == pos/8 { - start = pos % 8 - } - for j := start; j < 8; j++ { - if (oxo>>uint8(7-j))&0x01 != 0 { - return i*8 + j, false - } - } - } - return len(one) * 8, true -} - // ProxCmp compares the distances a->target and b->target. // Returns -1 if a is closer to target, 1 if b is closer to target // and 0 if they are equal. diff --git a/swarm/pss/client/client_test.go b/swarm/pss/client/client_test.go index 48edc6cce..8f2f0e805 100644 --- a/swarm/pss/client/client_test.go +++ b/swarm/pss/client/client_test.go @@ -252,7 +252,13 @@ func newServices() adapters.Services { ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() keys, err := wapi.NewKeyPair(ctxlocal) + if err != nil { + return nil, err + } privkey, err := w.GetPrivateKey(keys) + if err != nil { + return nil, err + } psparams := pss.NewPssParams().WithPrivateKey(privkey) pskad := kademlia(ctx.Config.ID) ps, err := pss.NewPss(pskad, psparams) @@ -288,10 +294,6 @@ type testStore struct { values map[string][]byte } -func newTestStore() *testStore { - return &testStore{values: make(map[string][]byte)} -} - func (t *testStore) Load(key string) ([]byte, error) { return nil, nil } diff --git a/swarm/pss/notify/notify_test.go b/swarm/pss/notify/notify_test.go index 675b41ada..d4d383a6b 100644 --- a/swarm/pss/notify/notify_test.go +++ b/swarm/pss/notify/notify_test.go @@ -223,7 +223,13 @@ func newServices(allowRaw bool) adapters.Services { ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() keys, err := wapi.NewKeyPair(ctxlocal) + if err != nil { + return nil, err + } privkey, err := w.GetPrivateKey(keys) + if err != nil { + return nil, err + } pssp := pss.NewPssParams().WithPrivateKey(privkey) pssp.MsgTTL = time.Second * 30 pssp.AllowRaw = allowRaw diff --git a/swarm/pss/protocol_test.go b/swarm/pss/protocol_test.go index f4209fea5..4ef3e90a0 100644 --- a/swarm/pss/protocol_test.go +++ b/swarm/pss/protocol_test.go @@ -93,11 +93,17 @@ func testProtocol(t *testing.T) { lctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic) + if err != nil { + t.Fatal(err) + } defer lsub.Unsubscribe() rmsgC := make(chan APIMsg) rctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic) + if err != nil { + t.Fatal(err) + } defer rsub.Unsubscribe() // set reciprocal public keys diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 574714114..66a90be62 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -976,11 +976,6 @@ func TestNetwork10000(t *testing.T) { } func testNetwork(t *testing.T) { - type msgnotifyC struct { - id enode.ID - msgIdx int - } - paramstring := strings.Split(t.Name(), "/") nodecount, _ := strconv.ParseInt(paramstring[1], 10, 0) msgcount, _ := strconv.ParseInt(paramstring[2], 10, 0) diff --git a/swarm/pss/types.go b/swarm/pss/types.go index 1e33ecdca..56c2c51dc 100644 --- a/swarm/pss/types.go +++ b/swarm/pss/types.go @@ -169,10 +169,6 @@ type stateStore struct { values map[string][]byte } -func newStateStore() *stateStore { - return &stateStore{values: make(map[string][]byte)} -} - func (store *stateStore) Load(key string) ([]byte, error) { return nil, nil } diff --git a/swarm/sctx/sctx.go b/swarm/sctx/sctx.go index bed2b1145..fb7d35b00 100644 --- a/swarm/sctx/sctx.go +++ b/swarm/sctx/sctx.go @@ -2,19 +2,17 @@ package sctx import "context" -type ContextKey int - -const ( - HTTPRequestIDKey ContextKey = iota - requestHostKey +type ( + HTTPRequestIDKey struct{} + requestHostKey struct{} ) func SetHost(ctx context.Context, domain string) context.Context { - return context.WithValue(ctx, requestHostKey, domain) + return context.WithValue(ctx, requestHostKey{}, domain) } func GetHost(ctx context.Context) string { - v, ok := ctx.Value(requestHostKey).(string) + v, ok := ctx.Value(requestHostKey{}).(string) if ok { return v } diff --git a/swarm/state/dbstore.go b/swarm/state/dbstore.go index 5e5c172b2..b0aa92e27 100644 --- a/swarm/state/dbstore.go +++ b/swarm/state/dbstore.go @@ -69,7 +69,7 @@ func (s *DBStore) Get(key string, i interface{}) (err error) { // Put stores an object that implements Binary for a specific key. func (s *DBStore) Put(key string, i interface{}) (err error) { - bytes := []byte{} + var bytes []byte marshaler, ok := i.(encoding.BinaryMarshaler) if !ok { diff --git a/swarm/state/dbstore_test.go b/swarm/state/dbstore_test.go index 6683e788f..f7098956d 100644 --- a/swarm/state/dbstore_test.go +++ b/swarm/state/dbstore_test.go @@ -112,6 +112,9 @@ func testPersistedStore(t *testing.T, store Store) { as := []string{} err = store.Get("key2", &as) + if err != nil { + t.Fatal(err) + } if len(as) != 3 { t.Fatalf("serialized array did not match expectation") diff --git a/swarm/state/inmemorystore.go b/swarm/state/inmemorystore.go index 1ca25404a..3ba48592b 100644 --- a/swarm/state/inmemorystore.go +++ b/swarm/state/inmemorystore.go @@ -59,7 +59,7 @@ func (s *InmemoryStore) Get(key string, i interface{}) (err error) { func (s *InmemoryStore) Put(key string, i interface{}) (err error) { s.mu.Lock() defer s.mu.Unlock() - bytes := []byte{} + var bytes []byte marshaler, ok := i.(encoding.BinaryMarshaler) if !ok { diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index 33133edd7..600be164a 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -88,17 +88,6 @@ func mputRandomChunks(store ChunkStore, n int, chunksize int64) ([]Chunk, error) return mput(store, n, GenerateRandomChunk) } -func mputChunks(store ChunkStore, chunks ...Chunk) error { - i := 0 - f := func(n int64) Chunk { - chunk := chunks[i] - i++ - return chunk - } - _, err := mput(store, len(chunks), f) - return err -} - func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error) { // put to localstore and wait for stored channel // does not check delivery error state diff --git a/swarm/storage/feed/handler_test.go b/swarm/storage/feed/handler_test.go index cf95bc1f5..fb2ef3a6b 100644 --- a/swarm/storage/feed/handler_test.go +++ b/swarm/storage/feed/handler_test.go @@ -27,7 +27,6 @@ import ( "time" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/storage" @@ -506,15 +505,3 @@ func newCharlieSigner() *GenericSigner { privKey, _ := crypto.HexToECDSA("facadefacadefacadefacadefacadefacadefacadefacadefacadefacadefaca") return NewGenericSigner(privKey) } - -func getUpdateDirect(rh *Handler, addr storage.Address) ([]byte, error) { - chunk, err := rh.chunkStore.Get(context.TODO(), addr) - if err != nil { - return nil, err - } - var r Request - if err := r.fromChunk(addr, chunk.Data()); err != nil { - return nil, err - } - return r.data, nil -} diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 49508911f..9feb68741 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -39,7 +39,6 @@ import ( "github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/storage/mock" "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" ) const ( @@ -72,13 +71,6 @@ var ( ErrDBClosed = errors.New("LDBStore closed") ) -type gcItem struct { - idx *dpaDBIndex - value uint64 - idxKey []byte - po uint8 -} - type LDBStoreParams struct { *StoreParams Path string @@ -961,15 +953,3 @@ func (s *LDBStore) SyncIterator(since uint64, until uint64, po uint8, f func(Add } return it.Error() } - -func databaseExists(path string) bool { - o := &opt.Options{ - ErrorIfMissing: true, - } - tdb, err := leveldb.OpenFile(path, o) - if err != nil { - return false - } - defer tdb.Close() - return true -} diff --git a/swarm/storage/types.go b/swarm/storage/types.go index 8c70f4584..092843db0 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -80,6 +80,19 @@ func (a Address) bits(i, j uint) uint { return res } +// Proximity(x, y) returns the proximity order of the MSB distance between x and y +// +// The distance metric MSB(x, y) of two equal length byte sequences x an y is the +// value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed. +// the binary cast is big endian: most significant bit first (=MSB). +// +// Proximity(x, y) is a discrete logarithmic scaling of the MSB distance. +// It is defined as the reverse rank of the integer part of the base 2 +// logarithm of the distance. +// It is calculated by counting the number of common leading zeros in the (MSB) +// binary representation of the x^y. +// +// (0 farthest, 255 closest, 256 self) func Proximity(one, other []byte) (ret int) { b := (MaxPO-1)/8 + 1 if b > len(one) { @@ -231,11 +244,8 @@ func GenerateRandomChunk(dataSize int64) Chunk { } func GenerateRandomChunks(dataSize int64, count int) (chunks []Chunk) { - if dataSize > ch.DefaultSize { - dataSize = ch.DefaultSize - } for i := 0; i < count; i++ { - ch := GenerateRandomChunk(ch.DefaultSize) + ch := GenerateRandomChunk(dataSize) chunks = append(chunks, ch) } return chunks diff --git a/swarm/swarm.go b/swarm/swarm.go index 7214abbda..1fb5443fd 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -175,18 +175,24 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e if err := nodeID.UnmarshalText([]byte(config.NodeID)); err != nil { return nil, err } + + syncing := stream.SyncingAutoSubscribe + if !config.SyncEnabled || config.LightNodeEnabled { + syncing = stream.SyncingDisabled + } + + retrieval := stream.RetrievalEnabled + if config.LightNodeEnabled { + retrieval = stream.RetrievalClientOnly + } + registryOptions := &stream.RegistryOptions{ SkipCheck: config.DeliverySkipCheck, - DoSync: config.SyncEnabled, - DoRetrieve: true, - DoServeRetrieve: true, + Syncing: syncing, + Retrieval: retrieval, SyncUpdateDelay: config.SyncUpdateDelay, MaxPeerServers: config.MaxStreamPeerServers, } - if config.LightNodeEnabled { - registryOptions.DoSync = false - registryOptions.DoRetrieve = false - } self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, stateStore, registryOptions) // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage |