aboutsummaryrefslogtreecommitdiffstats
path: root/swarm
diff options
context:
space:
mode:
Diffstat (limited to 'swarm')
-rw-r--r--swarm/api/act.go3
-rw-r--r--swarm/api/client/client_test.go3
-rw-r--r--swarm/api/filesystem.go4
-rw-r--r--swarm/api/http/middleware.go2
-rw-r--r--swarm/api/http/response.go25
-rw-r--r--swarm/api/http/sctx.go14
-rw-r--r--swarm/api/http/server.go95
-rw-r--r--swarm/api/http/server_test.go39
-rw-r--r--swarm/api/manifest.go1
-rw-r--r--swarm/network/hive_test.go6
-rw-r--r--swarm/network/kademlia.go20
-rw-r--r--swarm/network/kademlia_test.go58
-rw-r--r--swarm/network/protocol_test.go16
-rw-r--r--swarm/network/simulation/bucket_test.go4
-rw-r--r--swarm/network/stream/delivery.go5
-rw-r--r--swarm/network/stream/delivery_test.go180
-rw-r--r--swarm/network/stream/intervals_test.go2
-rw-r--r--swarm/network/stream/lightnode_test.go12
-rw-r--r--swarm/network/stream/snapshot_retrieval_test.go5
-rw-r--r--swarm/network/stream/snapshot_sync_test.go8
-rw-r--r--swarm/network/stream/stream.go57
-rw-r--r--swarm/network/stream/streamer_test.go2
-rw-r--r--swarm/network/stream/syncer_test.go5
-rw-r--r--swarm/pot/address.go40
-rw-r--r--swarm/pss/client/client_test.go10
-rw-r--r--swarm/pss/notify/notify_test.go6
-rw-r--r--swarm/pss/protocol_test.go6
-rw-r--r--swarm/pss/pss_test.go5
-rw-r--r--swarm/pss/types.go4
-rw-r--r--swarm/sctx/sctx.go12
-rw-r--r--swarm/state/dbstore.go2
-rw-r--r--swarm/state/dbstore_test.go3
-rw-r--r--swarm/state/inmemorystore.go2
-rw-r--r--swarm/storage/common_test.go11
-rw-r--r--swarm/storage/feed/handler_test.go13
-rw-r--r--swarm/storage/ldbstore.go20
-rw-r--r--swarm/storage/types.go18
-rw-r--r--swarm/swarm.go20
38 files changed, 463 insertions, 275 deletions
diff --git a/swarm/api/act.go b/swarm/api/act.go
index 52d909827..e54369f9a 100644
--- a/swarm/api/act.go
+++ b/swarm/api/act.go
@@ -458,6 +458,9 @@ func DoACT(ctx *cli.Context, privateKey *ecdsa.PrivateKey, salt []byte, grantees
return nil, nil, nil, err
}
sessionKey, err := NewSessionKeyPK(privateKey, granteePub, salt)
+ if err != nil {
+ return nil, nil, nil, err
+ }
hasher := sha3.NewKeccak256()
hasher.Write(append(sessionKey, 0))
diff --git a/swarm/api/client/client_test.go b/swarm/api/client/client_test.go
index 03c6cbb28..c30d69911 100644
--- a/swarm/api/client/client_test.go
+++ b/swarm/api/client/client_test.go
@@ -457,6 +457,9 @@ func TestClientCreateUpdateFeed(t *testing.T) {
}
feedManifestHash, err := client.CreateFeedWithManifest(createRequest)
+ if err != nil {
+ t.Fatal(err)
+ }
correctManifestAddrHex := "0e9b645ebc3da167b1d56399adc3276f7a08229301b72a03336be0e7d4b71882"
if feedManifestHash != correctManifestAddrHex {
diff --git a/swarm/api/filesystem.go b/swarm/api/filesystem.go
index 43695efc1..266ef71be 100644
--- a/swarm/api/filesystem.go
+++ b/swarm/api/filesystem.go
@@ -122,6 +122,10 @@ func (fs *FileSystem) Upload(lpath, index string, toEncrypt bool) (string, error
var wait func(context.Context) error
ctx := context.TODO()
hash, wait, err = fs.api.fileStore.Store(ctx, f, stat.Size(), toEncrypt)
+ if err != nil {
+ errors[i] = err
+ return
+ }
if hash != nil {
list[i].Hash = hash.Hex()
}
diff --git a/swarm/api/http/middleware.go b/swarm/api/http/middleware.go
index 3b2dcc7d5..ccc040c54 100644
--- a/swarm/api/http/middleware.go
+++ b/swarm/api/http/middleware.go
@@ -50,7 +50,7 @@ func ParseURI(h http.Handler) http.Handler {
uri, err := api.Parse(strings.TrimLeft(r.URL.Path, "/"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
- RespondError(w, r, fmt.Sprintf("invalid URI %q", r.URL.Path), http.StatusBadRequest)
+ respondError(w, r, fmt.Sprintf("invalid URI %q", r.URL.Path), http.StatusBadRequest)
return
}
if uri.Addr != "" && strings.HasPrefix(uri.Addr, "0x") {
diff --git a/swarm/api/http/response.go b/swarm/api/http/response.go
index c9fb9d285..d4e81d7f6 100644
--- a/swarm/api/http/response.go
+++ b/swarm/api/http/response.go
@@ -53,23 +53,23 @@ func ShowMultipleChoices(w http.ResponseWriter, r *http.Request, list api.Manife
log.Debug("ShowMultipleChoices", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()))
msg := ""
if list.Entries == nil {
- RespondError(w, r, "Could not resolve", http.StatusInternalServerError)
+ respondError(w, r, "Could not resolve", http.StatusInternalServerError)
return
}
requestUri := strings.TrimPrefix(r.RequestURI, "/")
uri, err := api.Parse(requestUri)
if err != nil {
- RespondError(w, r, "Bad Request", http.StatusBadRequest)
+ respondError(w, r, "Bad Request", http.StatusBadRequest)
}
uri.Scheme = "bzz-list"
msg += fmt.Sprintf("Disambiguation:<br/>Your request may refer to multiple choices.<br/>Click <a class=\"orange\" href='"+"/"+uri.String()+"'>here</a> if your browser does not redirect you within 5 seconds.<script>setTimeout(\"location.href='%s';\",5000);</script><br/>", "/"+uri.String())
- RespondTemplate(w, r, "error", msg, http.StatusMultipleChoices)
+ respondTemplate(w, r, "error", msg, http.StatusMultipleChoices)
}
-func RespondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg string, code int) {
- log.Debug("RespondTemplate", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()))
+func respondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg string, code int) {
+ log.Debug("respondTemplate", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()))
respond(w, r, &ResponseParams{
Code: code,
Msg: template.HTML(msg),
@@ -78,13 +78,12 @@ func RespondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg s
})
}
-func RespondError(w http.ResponseWriter, r *http.Request, msg string, code int) {
- log.Debug("RespondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code)
- RespondTemplate(w, r, "error", msg, code)
+func respondError(w http.ResponseWriter, r *http.Request, msg string, code int) {
+ log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code)
+ respondTemplate(w, r, "error", msg, code)
}
func respond(w http.ResponseWriter, r *http.Request, params *ResponseParams) {
-
w.WriteHeader(params.Code)
if params.Code >= 400 {
@@ -96,7 +95,7 @@ func respond(w http.ResponseWriter, r *http.Request, params *ResponseParams) {
// this cannot be in a switch since an Accept header can have multiple values: "Accept: */*, text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8"
if strings.Contains(acceptHeader, "application/json") {
if err := respondJSON(w, r, params); err != nil {
- RespondError(w, r, "Internal server error", http.StatusInternalServerError)
+ respondError(w, r, "Internal server error", http.StatusInternalServerError)
}
} else if strings.Contains(acceptHeader, "text/html") {
respondHTML(w, r, params)
@@ -107,7 +106,7 @@ func respond(w http.ResponseWriter, r *http.Request, params *ResponseParams) {
func respondHTML(w http.ResponseWriter, r *http.Request, params *ResponseParams) {
htmlCounter.Inc(1)
- log.Debug("respondHTML", "ruid", GetRUID(r.Context()))
+ log.Info("respondHTML", "ruid", GetRUID(r.Context()), "code", params.Code)
err := params.template.Execute(w, params)
if err != nil {
log.Error(err.Error())
@@ -116,14 +115,14 @@ func respondHTML(w http.ResponseWriter, r *http.Request, params *ResponseParams)
func respondJSON(w http.ResponseWriter, r *http.Request, params *ResponseParams) error {
jsonCounter.Inc(1)
- log.Debug("respondJSON", "ruid", GetRUID(r.Context()))
+ log.Info("respondJSON", "ruid", GetRUID(r.Context()), "code", params.Code)
w.Header().Set("Content-Type", "application/json")
return json.NewEncoder(w).Encode(params)
}
func respondPlaintext(w http.ResponseWriter, r *http.Request, params *ResponseParams) error {
plaintextCounter.Inc(1)
- log.Debug("respondPlaintext", "ruid", GetRUID(r.Context()))
+ log.Info("respondPlaintext", "ruid", GetRUID(r.Context()), "code", params.Code)
w.Header().Set("Content-Type", "text/plain")
strToWrite := "Code: " + fmt.Sprintf("%d", params.Code) + "\n"
strToWrite += "Message: " + string(params.Msg) + "\n"
diff --git a/swarm/api/http/sctx.go b/swarm/api/http/sctx.go
index 431e11735..b8dafab0b 100644
--- a/swarm/api/http/sctx.go
+++ b/swarm/api/http/sctx.go
@@ -7,14 +7,10 @@ import (
"github.com/ethereum/go-ethereum/swarm/sctx"
)
-type contextKey int
-
-const (
- uriKey contextKey = iota
-)
+type uriKey struct{}
func GetRUID(ctx context.Context) string {
- v, ok := ctx.Value(sctx.HTTPRequestIDKey).(string)
+ v, ok := ctx.Value(sctx.HTTPRequestIDKey{}).(string)
if ok {
return v
}
@@ -22,11 +18,11 @@ func GetRUID(ctx context.Context) string {
}
func SetRUID(ctx context.Context, ruid string) context.Context {
- return context.WithValue(ctx, sctx.HTTPRequestIDKey, ruid)
+ return context.WithValue(ctx, sctx.HTTPRequestIDKey{}, ruid)
}
func GetURI(ctx context.Context) *api.URI {
- v, ok := ctx.Value(uriKey).(*api.URI)
+ v, ok := ctx.Value(uriKey{}).(*api.URI)
if ok {
return v
}
@@ -34,5 +30,5 @@ func GetURI(ctx context.Context) *api.URI {
}
func SetURI(ctx context.Context, uri *api.URI) context.Context {
- return context.WithValue(ctx, uriKey, uri)
+ return context.WithValue(ctx, uriKey{}, uri)
}
diff --git a/swarm/api/http/server.go b/swarm/api/http/server.go
index b4294b058..3c6735a73 100644
--- a/swarm/api/http/server.go
+++ b/swarm/api/http/server.go
@@ -41,16 +41,9 @@ import (
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
-
"github.com/rs/cors"
)
-type resourceResponse struct {
- Manifest storage.Address `json:"manifest"`
- Resource string `json:"resource"`
- Update storage.Address `json:"update"`
-}
-
var (
postRawCount = metrics.NewRegisteredCounter("api.http.post.raw.count", nil)
postRawFail = metrics.NewRegisteredCounter("api.http.post.raw.fail", nil)
@@ -191,10 +184,10 @@ func (s *Server) HandleBzzGet(w http.ResponseWriter, r *http.Request) {
if err != nil {
if isDecryptError(err) {
w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", uri.Address().String()))
- RespondError(w, r, err.Error(), http.StatusUnauthorized)
+ respondError(w, r, err.Error(), http.StatusUnauthorized)
return
}
- RespondError(w, r, fmt.Sprintf("Had an error building the tarball: %v", err), http.StatusInternalServerError)
+ respondError(w, r, fmt.Sprintf("Had an error building the tarball: %v", err), http.StatusInternalServerError)
return
}
defer reader.Close()
@@ -218,7 +211,7 @@ func (s *Server) HandleBzzGet(w http.ResponseWriter, r *http.Request) {
func (s *Server) HandleRootPaths(w http.ResponseWriter, r *http.Request) {
switch r.RequestURI {
case "/":
- RespondTemplate(w, r, "landing-page", "Swarm: Please request a valid ENS or swarm hash with the appropriate bzz scheme", 200)
+ respondTemplate(w, r, "landing-page", "Swarm: Please request a valid ENS or swarm hash with the appropriate bzz scheme", 200)
return
case "/robots.txt":
w.Header().Set("Last-Modified", time.Now().Format(http.TimeFormat))
@@ -227,7 +220,7 @@ func (s *Server) HandleRootPaths(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write(faviconBytes)
default:
- RespondError(w, r, "Not Found", http.StatusNotFound)
+ respondError(w, r, "Not Found", http.StatusNotFound)
}
}
@@ -247,26 +240,26 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
if uri.Path != "" {
postRawFail.Inc(1)
- RespondError(w, r, "raw POST request cannot contain a path", http.StatusBadRequest)
+ respondError(w, r, "raw POST request cannot contain a path", http.StatusBadRequest)
return
}
if uri.Addr != "" && uri.Addr != "encrypt" {
postRawFail.Inc(1)
- RespondError(w, r, "raw POST request addr can only be empty or \"encrypt\"", http.StatusBadRequest)
+ respondError(w, r, "raw POST request addr can only be empty or \"encrypt\"", http.StatusBadRequest)
return
}
if r.Header.Get("Content-Length") == "" {
postRawFail.Inc(1)
- RespondError(w, r, "missing Content-Length header in request", http.StatusBadRequest)
+ respondError(w, r, "missing Content-Length header in request", http.StatusBadRequest)
return
}
addr, _, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt)
if err != nil {
postRawFail.Inc(1)
- RespondError(w, r, err.Error(), http.StatusInternalServerError)
+ respondError(w, r, err.Error(), http.StatusInternalServerError)
return
}
@@ -290,7 +283,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
contentType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type"))
if err != nil {
postFilesFail.Inc(1)
- RespondError(w, r, err.Error(), http.StatusBadRequest)
+ respondError(w, r, err.Error(), http.StatusBadRequest)
return
}
@@ -305,7 +298,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
addr, err = s.api.Resolve(r.Context(), uri.Addr)
if err != nil {
postFilesFail.Inc(1)
- RespondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusInternalServerError)
+ respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusInternalServerError)
return
}
log.Debug("resolved key", "ruid", ruid, "key", addr)
@@ -313,7 +306,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
addr, err = s.api.NewManifest(r.Context(), toEncrypt)
if err != nil {
postFilesFail.Inc(1)
- RespondError(w, r, err.Error(), http.StatusInternalServerError)
+ respondError(w, r, err.Error(), http.StatusInternalServerError)
return
}
log.Debug("new manifest", "ruid", ruid, "key", addr)
@@ -324,7 +317,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
case "application/x-tar":
_, err := s.handleTarUpload(r, mw)
if err != nil {
- RespondError(w, r, fmt.Sprintf("error uploading tarball: %v", err), http.StatusInternalServerError)
+ respondError(w, r, fmt.Sprintf("error uploading tarball: %v", err), http.StatusInternalServerError)
return err
}
return nil
@@ -337,7 +330,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
})
if err != nil {
postFilesFail.Inc(1)
- RespondError(w, r, fmt.Sprintf("cannot create manifest: %s", err), http.StatusInternalServerError)
+ respondError(w, r, fmt.Sprintf("cannot create manifest: %s", err), http.StatusInternalServerError)
return
}
@@ -373,7 +366,7 @@ func (s *Server) handleMultipartUpload(r *http.Request, boundary string, mw *api
}
var size int64
- var reader io.Reader = part
+ var reader io.Reader
if contentLength := part.Header.Get("Content-Length"); contentLength != "" {
size, err = strconv.ParseInt(contentLength, 10, 64)
if err != nil {
@@ -446,7 +439,7 @@ func (s *Server) HandleDelete(w http.ResponseWriter, r *http.Request) {
newKey, err := s.api.Delete(r.Context(), uri.Addr, uri.Path)
if err != nil {
deleteFail.Inc(1)
- RespondError(w, r, fmt.Sprintf("could not delete from manifest: %v", err), http.StatusInternalServerError)
+ respondError(w, r, fmt.Sprintf("could not delete from manifest: %v", err), http.StatusInternalServerError)
return
}
@@ -467,7 +460,7 @@ func (s *Server) HandlePostFeed(w http.ResponseWriter, r *http.Request) {
// Creation and update must send feed.updateRequestJSON JSON structure
body, err := ioutil.ReadAll(r.Body)
if err != nil {
- RespondError(w, r, err.Error(), http.StatusInternalServerError)
+ respondError(w, r, err.Error(), http.StatusInternalServerError)
return
}
@@ -478,7 +471,7 @@ func (s *Server) HandlePostFeed(w http.ResponseWriter, r *http.Request) {
if err == api.ErrCannotLoadFeedManifest || err == api.ErrCannotResolveFeedURI {
httpStatus = http.StatusNotFound
}
- RespondError(w, r, fmt.Sprintf("cannot retrieve feed from manifest: %s", err), httpStatus)
+ respondError(w, r, fmt.Sprintf("cannot retrieve feed from manifest: %s", err), httpStatus)
return
}
@@ -487,32 +480,32 @@ func (s *Server) HandlePostFeed(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
if err := updateRequest.FromValues(query, body); err != nil { // decodes request from query parameters
- RespondError(w, r, err.Error(), http.StatusBadRequest)
+ respondError(w, r, err.Error(), http.StatusBadRequest)
return
}
- if updateRequest.IsUpdate() {
+ switch {
+ case updateRequest.IsUpdate():
// Verify that the signature is intact and that the signer is authorized
// to update this feed
// Check this early, to avoid creating a feed and then not being able to set its first update.
if err = updateRequest.Verify(); err != nil {
- RespondError(w, r, err.Error(), http.StatusForbidden)
+ respondError(w, r, err.Error(), http.StatusForbidden)
return
}
_, err = s.api.FeedsUpdate(r.Context(), &updateRequest)
if err != nil {
- RespondError(w, r, err.Error(), http.StatusInternalServerError)
+ respondError(w, r, err.Error(), http.StatusInternalServerError)
return
}
- }
-
- if query.Get("manifest") == "1" {
+ fallthrough
+ case query.Get("manifest") == "1":
// we create a manifest so we can retrieve feed updates with bzz:// later
// this manifest has a special "feed type" manifest, and saves the
// feed identification used to retrieve feed updates later
m, err := s.api.NewFeedManifest(r.Context(), &updateRequest.Feed)
if err != nil {
- RespondError(w, r, fmt.Sprintf("failed to create feed manifest: %v", err), http.StatusInternalServerError)
+ respondError(w, r, fmt.Sprintf("failed to create feed manifest: %v", err), http.StatusInternalServerError)
return
}
// the key to the manifest will be passed back to the client
@@ -520,12 +513,14 @@ func (s *Server) HandlePostFeed(w http.ResponseWriter, r *http.Request) {
// the manifest key can be set as content in the resolver of the ENS name
outdata, err := json.Marshal(m)
if err != nil {
- RespondError(w, r, fmt.Sprintf("failed to create json response: %s", err), http.StatusInternalServerError)
+ respondError(w, r, fmt.Sprintf("failed to create json response: %s", err), http.StatusInternalServerError)
return
}
fmt.Fprint(w, string(outdata))
w.Header().Add("Content-type", "application/json")
+ default:
+ respondError(w, r, "Missing signature in feed update request", http.StatusBadRequest)
}
}
@@ -557,7 +552,7 @@ func (s *Server) HandleGetFeed(w http.ResponseWriter, r *http.Request) {
if err == api.ErrCannotLoadFeedManifest || err == api.ErrCannotResolveFeedURI {
httpStatus = http.StatusNotFound
}
- RespondError(w, r, fmt.Sprintf("cannot retrieve feed information from manifest: %s", err), httpStatus)
+ respondError(w, r, fmt.Sprintf("cannot retrieve feed information from manifest: %s", err), httpStatus)
return
}
@@ -566,12 +561,12 @@ func (s *Server) HandleGetFeed(w http.ResponseWriter, r *http.Request) {
unsignedUpdateRequest, err := s.api.FeedsNewRequest(r.Context(), fd)
if err != nil {
getFail.Inc(1)
- RespondError(w, r, fmt.Sprintf("cannot retrieve feed metadata for feed=%s: %s", fd.Hex(), err), http.StatusNotFound)
+ respondError(w, r, fmt.Sprintf("cannot retrieve feed metadata for feed=%s: %s", fd.Hex(), err), http.StatusNotFound)
return
}
rawResponse, err := unsignedUpdateRequest.MarshalJSON()
if err != nil {
- RespondError(w, r, fmt.Sprintf("cannot encode unsigned feed update request: %v", err), http.StatusInternalServerError)
+ respondError(w, r, fmt.Sprintf("cannot encode unsigned feed update request: %v", err), http.StatusInternalServerError)
return
}
w.Header().Add("Content-type", "application/json")
@@ -582,7 +577,7 @@ func (s *Server) HandleGetFeed(w http.ResponseWriter, r *http.Request) {
lookupParams := &feed.Query{Feed: *fd}
if err = lookupParams.FromValues(r.URL.Query()); err != nil { // parse period, version
- RespondError(w, r, fmt.Sprintf("invalid feed update request:%s", err), http.StatusBadRequest)
+ respondError(w, r, fmt.Sprintf("invalid feed update request:%s", err), http.StatusBadRequest)
return
}
@@ -591,7 +586,7 @@ func (s *Server) HandleGetFeed(w http.ResponseWriter, r *http.Request) {
// any error from the switch statement will end up here
if err != nil {
code, err2 := s.translateFeedError(w, r, "feed lookup fail", err)
- RespondError(w, r, err2.Error(), code)
+ respondError(w, r, err2.Error(), code)
return
}
@@ -637,7 +632,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *http.Request) {
addr, err := s.api.ResolveURI(r.Context(), uri, pass)
if err != nil {
getFail.Inc(1)
- RespondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound)
+ respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound)
return
}
w.Header().Set("Cache-Control", "max-age=2147483648, immutable") // url was of type bzz://<hex key>/path, so we are sure it is immutable.
@@ -661,7 +656,7 @@ func (s *Server) HandleGet(w http.ResponseWriter, r *http.Request) {
reader, isEncrypted := s.api.Retrieve(r.Context(), addr)
if _, err := reader.Size(r.Context(), nil); err != nil {
getFail.Inc(1)
- RespondError(w, r, fmt.Sprintf("root chunk not found %s: %s", addr, err), http.StatusNotFound)
+ respondError(w, r, fmt.Sprintf("root chunk not found %s: %s", addr, err), http.StatusNotFound)
return
}
@@ -701,7 +696,7 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *http.Request) {
addr, err := s.api.Resolve(r.Context(), uri.Addr)
if err != nil {
getListFail.Inc(1)
- RespondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound)
+ respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound)
return
}
log.Debug("handle.get.list: resolved", "ruid", ruid, "key", addr)
@@ -711,10 +706,10 @@ func (s *Server) HandleGetList(w http.ResponseWriter, r *http.Request) {
getListFail.Inc(1)
if isDecryptError(err) {
w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", addr.String()))
- RespondError(w, r, err.Error(), http.StatusUnauthorized)
+ respondError(w, r, err.Error(), http.StatusUnauthorized)
return
}
- RespondError(w, r, err.Error(), http.StatusInternalServerError)
+ respondError(w, r, err.Error(), http.StatusInternalServerError)
return
}
@@ -762,7 +757,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) {
manifestAddr, err = s.api.Resolve(r.Context(), uri.Addr)
if err != nil {
getFileFail.Inc(1)
- RespondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound)
+ respondError(w, r, fmt.Sprintf("cannot resolve %s: %s", uri.Addr, err), http.StatusNotFound)
return
}
} else {
@@ -786,17 +781,17 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) {
if err != nil {
if isDecryptError(err) {
w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", manifestAddr))
- RespondError(w, r, err.Error(), http.StatusUnauthorized)
+ respondError(w, r, err.Error(), http.StatusUnauthorized)
return
}
switch status {
case http.StatusNotFound:
getFileNotFound.Inc(1)
- RespondError(w, r, err.Error(), http.StatusNotFound)
+ respondError(w, r, err.Error(), http.StatusNotFound)
default:
getFileFail.Inc(1)
- RespondError(w, r, err.Error(), http.StatusInternalServerError)
+ respondError(w, r, err.Error(), http.StatusInternalServerError)
}
return
}
@@ -809,10 +804,10 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) {
getFileFail.Inc(1)
if isDecryptError(err) {
w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic realm=%q", manifestAddr))
- RespondError(w, r, err.Error(), http.StatusUnauthorized)
+ respondError(w, r, err.Error(), http.StatusUnauthorized)
return
}
- RespondError(w, r, err.Error(), http.StatusInternalServerError)
+ respondError(w, r, err.Error(), http.StatusInternalServerError)
return
}
@@ -825,7 +820,7 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) {
// check the root chunk exists by retrieving the file's size
if _, err := reader.Size(r.Context(), nil); err != nil {
getFileNotFound.Inc(1)
- RespondError(w, r, fmt.Sprintf("file not found %s: %s", uri, err), http.StatusNotFound)
+ respondError(w, r, fmt.Sprintf("file not found %s: %s", uri, err), http.StatusNotFound)
return
}
diff --git a/swarm/api/http/server_test.go b/swarm/api/http/server_test.go
index 1cf7ff577..04d0e045a 100644
--- a/swarm/api/http/server_test.go
+++ b/swarm/api/http/server_test.go
@@ -263,7 +263,7 @@ func TestBzzFeed(t *testing.T) {
if resp.StatusCode == http.StatusOK {
t.Fatal("Expected error status since feed update does not contain multihash. Received 200 OK")
}
- b, err = ioutil.ReadAll(resp.Body)
+ _, err = ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
@@ -333,15 +333,45 @@ func TestBzzFeed(t *testing.T) {
}
urlQuery = testUrl.Query()
body = updateRequest.AppendValues(urlQuery) // this adds all query parameters
+ goodQueryParameters := urlQuery.Encode() // save the query parameters for a second attempt
+
+ // create bad query parameters in which the signature is missing
+ urlQuery.Del("signature")
testUrl.RawQuery = urlQuery.Encode()
+ // 1st attempt with bad query parameters in which the signature is missing
resp, err = http.Post(testUrl.String(), "application/octet-stream", bytes.NewReader(body))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- t.Fatalf("Update returned %s", resp.Status)
+ expectedCode := http.StatusBadRequest
+ if resp.StatusCode != expectedCode {
+ t.Fatalf("Update returned %s. Expected %d", resp.Status, expectedCode)
+ }
+
+ // 2nd attempt with bad query parameters in which the signature is of incorrect length
+ urlQuery.Set("signature", "0xabcd") // should be 130 hex chars
+ resp, err = http.Post(testUrl.String(), "application/octet-stream", bytes.NewReader(body))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer resp.Body.Close()
+ expectedCode = http.StatusBadRequest
+ if resp.StatusCode != expectedCode {
+ t.Fatalf("Update returned %s. Expected %d", resp.Status, expectedCode)
+ }
+
+ // 3rd attempt, with good query parameters:
+ testUrl.RawQuery = goodQueryParameters
+ resp, err = http.Post(testUrl.String(), "application/octet-stream", bytes.NewReader(body))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer resp.Body.Close()
+ expectedCode = http.StatusOK
+ if resp.StatusCode != expectedCode {
+ t.Fatalf("Update returned %s. Expected %d", resp.Status, expectedCode)
}
// get latest update through bzz-feed directly
@@ -461,6 +491,9 @@ func testBzzGetPath(encrypted bool, t *testing.T) {
}
defer resp.Body.Close()
respbody, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ t.Fatalf("Error while reading response body: %v", err)
+ }
if string(respbody) != testmanifest[v] {
isexpectedfailrequest := false
diff --git a/swarm/api/manifest.go b/swarm/api/manifest.go
index 7c4cc88e4..890ed88bd 100644
--- a/swarm/api/manifest.go
+++ b/swarm/api/manifest.go
@@ -557,7 +557,6 @@ func (mt *manifestTrie) findPrefixOf(path string, quitC chan bool) (entry *manif
if path != entry.Path {
return nil, 0
}
- pos = epl
}
}
return nil, 0
diff --git a/swarm/network/hive_test.go b/swarm/network/hive_test.go
index 059c3dc96..56adc5a8e 100644
--- a/swarm/network/hive_test.go
+++ b/swarm/network/hive_test.go
@@ -70,6 +70,9 @@ func TestHiveStatePersistance(t *testing.T) {
defer os.RemoveAll(dir)
store, err := state.NewDBStore(dir) //start the hive with an empty dbstore
+ if err != nil {
+ t.Fatal(err)
+ }
params := NewHiveParams()
s, pp := newHiveTester(t, params, 5, store)
@@ -90,6 +93,9 @@ func TestHiveStatePersistance(t *testing.T) {
store.Close()
persistedStore, err := state.NewDBStore(dir) //start the hive with an empty dbstore
+ if err != nil {
+ t.Fatal(err)
+ }
s1, pp := newHiveTester(t, params, 1, persistedStore)
diff --git a/swarm/network/kademlia.go b/swarm/network/kademlia.go
index 55a0c6f13..cd94741be 100644
--- a/swarm/network/kademlia.go
+++ b/swarm/network/kademlia.go
@@ -261,7 +261,7 @@ func (k *Kademlia) On(p *Peer) (uint8, bool) {
// found among live peers, do nothing
return v
})
- if ins {
+ if ins && !p.BzzPeer.LightNode {
a := newEntry(p.BzzAddr)
a.conn = p
// insert new online peer into addrs
@@ -329,14 +329,18 @@ func (k *Kademlia) Off(p *Peer) {
k.lock.Lock()
defer k.lock.Unlock()
var del bool
- k.addrs, _, _, _ = pot.Swap(k.addrs, p, pof, func(v pot.Val) pot.Val {
- // v cannot be nil, must check otherwise we overwrite entry
- if v == nil {
- panic(fmt.Sprintf("connected peer not found %v", p))
- }
+ if !p.BzzPeer.LightNode {
+ k.addrs, _, _, _ = pot.Swap(k.addrs, p, pof, func(v pot.Val) pot.Val {
+ // v cannot be nil, must check otherwise we overwrite entry
+ if v == nil {
+ panic(fmt.Sprintf("connected peer not found %v", p))
+ }
+ del = true
+ return newEntry(p.BzzAddr)
+ })
+ } else {
del = true
- return newEntry(p.BzzAddr)
- })
+ }
if del {
k.conns, _, _, _ = pot.Swap(k.conns, p, pof, func(_ pot.Val) pot.Val {
diff --git a/swarm/network/kademlia_test.go b/swarm/network/kademlia_test.go
index 903c8dbda..d2e051f45 100644
--- a/swarm/network/kademlia_test.go
+++ b/swarm/network/kademlia_test.go
@@ -46,19 +46,19 @@ func newTestKademlia(b string) *Kademlia {
return NewKademlia(base, params)
}
-func newTestKadPeer(k *Kademlia, s string) *Peer {
- return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s)}, k)
+func newTestKadPeer(k *Kademlia, s string, lightNode bool) *Peer {
+ return NewPeer(&BzzPeer{BzzAddr: testKadPeerAddr(s), LightNode: lightNode}, k)
}
func On(k *Kademlia, ons ...string) {
for _, s := range ons {
- k.On(newTestKadPeer(k, s))
+ k.On(newTestKadPeer(k, s, false))
}
}
func Off(k *Kademlia, offs ...string) {
for _, s := range offs {
- k.Off(newTestKadPeer(k, s))
+ k.Off(newTestKadPeer(k, s, false))
}
}
@@ -254,6 +254,56 @@ func TestSuggestPeerFindPeers(t *testing.T) {
}
+// a node should stay in the address book if it's removed from the kademlia
+func TestOffEffectingAddressBookNormalNode(t *testing.T) {
+ k := newTestKademlia("00000000")
+ // peer added to kademlia
+ k.On(newTestKadPeer(k, "01000000", false))
+ // peer should be in the address book
+ if k.addrs.Size() != 1 {
+ t.Fatal("known peer addresses should contain 1 entry")
+ }
+ // peer should be among live connections
+ if k.conns.Size() != 1 {
+ t.Fatal("live peers should contain 1 entry")
+ }
+ // remove peer from kademlia
+ k.Off(newTestKadPeer(k, "01000000", false))
+ // peer should be in the address book
+ if k.addrs.Size() != 1 {
+ t.Fatal("known peer addresses should contain 1 entry")
+ }
+ // peer should not be among live connections
+ if k.conns.Size() != 0 {
+ t.Fatal("live peers should contain 0 entry")
+ }
+}
+
+// a light node should not be in the address book
+func TestOffEffectingAddressBookLightNode(t *testing.T) {
+ k := newTestKademlia("00000000")
+ // light node peer added to kademlia
+ k.On(newTestKadPeer(k, "01000000", true))
+ // peer should not be in the address book
+ if k.addrs.Size() != 0 {
+ t.Fatal("known peer addresses should contain 0 entry")
+ }
+ // peer should be among live connections
+ if k.conns.Size() != 1 {
+ t.Fatal("live peers should contain 1 entry")
+ }
+ // remove peer from kademlia
+ k.Off(newTestKadPeer(k, "01000000", true))
+ // peer should not be in the address book
+ if k.addrs.Size() != 0 {
+ t.Fatal("known peer addresses should contain 0 entry")
+ }
+ // peer should not be among live connections
+ if k.conns.Size() != 0 {
+ t.Fatal("live peers should contain 0 entry")
+ }
+}
+
func TestSuggestPeerRetries(t *testing.T) {
k := newTestKademlia("00000000")
k.RetryInterval = int64(300 * time.Millisecond) // cycle
diff --git a/swarm/network/protocol_test.go b/swarm/network/protocol_test.go
index 4b83c7a27..f0d266628 100644
--- a/swarm/network/protocol_test.go
+++ b/swarm/network/protocol_test.go
@@ -50,10 +50,6 @@ type testStore struct {
values map[string][]byte
}
-func newTestStore() *testStore {
- return &testStore{values: make(map[string][]byte)}
-}
-
func (t *testStore) Load(key string) ([]byte, error) {
t.Lock()
defer t.Unlock()
@@ -157,17 +153,7 @@ func newBzzHandshakeTester(t *testing.T, n int, addr *BzzAddr, lightNode bool) *
// should test handshakes in one exchange? parallelisation
func (s *bzzTester) testHandshake(lhs, rhs *HandshakeMsg, disconnects ...*p2ptest.Disconnect) error {
- var peers []enode.ID
- id := rhs.Addr.ID()
- if len(disconnects) > 0 {
- for _, d := range disconnects {
- peers = append(peers, d.Peer)
- }
- } else {
- peers = []enode.ID{id}
- }
-
- if err := s.TestExchanges(HandshakeMsgExchange(lhs, rhs, id)...); err != nil {
+ if err := s.TestExchanges(HandshakeMsgExchange(lhs, rhs, rhs.Addr.ID())...); err != nil {
return err
}
diff --git a/swarm/network/simulation/bucket_test.go b/swarm/network/simulation/bucket_test.go
index 461d99825..69df19bfe 100644
--- a/swarm/network/simulation/bucket_test.go
+++ b/swarm/network/simulation/bucket_test.go
@@ -94,7 +94,7 @@ func TestServiceBucket(t *testing.T) {
t.Fatalf("expected %q, got %q", customValue, s)
}
- v, ok = sim.NodeItem(id2, customKey)
+ _, ok = sim.NodeItem(id2, customKey)
if ok {
t.Fatal("bucket item should not be found")
}
@@ -119,7 +119,7 @@ func TestServiceBucket(t *testing.T) {
t.Fatalf("expected %q, got %q", testValue+id1.String(), s)
}
- v, ok = items[id2]
+ _, ok = items[id2]
if ok {
t.Errorf("node 2 item should not be found")
}
diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go
index 9092ffe3e..0109fbdef 100644
--- a/swarm/network/stream/delivery.go
+++ b/swarm/network/stream/delivery.go
@@ -245,7 +245,10 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
} else {
d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int, nn bool) bool {
id := p.ID()
- // TODO: skip light nodes that do not accept retrieve requests
+ if p.LightNode {
+ // skip light nodes
+ return true
+ }
if req.SkipPeer(id.String()) {
log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id)
return true
diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go
index 949645558..c77682e0e 100644
--- a/swarm/network/stream/delivery_test.go
+++ b/swarm/network/stream/delivery_test.go
@@ -29,17 +29,25 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/protocols"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/network"
+ pq "github.com/ethereum/go-ethereum/swarm/network/priorityqueue"
"github.com/ethereum/go-ethereum/swarm/network/simulation"
"github.com/ethereum/go-ethereum/swarm/state"
"github.com/ethereum/go-ethereum/swarm/storage"
)
+//Tests initializing a retrieve request
func TestStreamerRetrieveRequest(t *testing.T) {
- tester, streamer, _, teardown, err := newStreamerTester(t, nil)
+ regOpts := &RegistryOptions{
+ Retrieval: RetrievalClientOnly,
+ Syncing: SyncingDisabled,
+ }
+ tester, streamer, _, teardown, err := newStreamerTester(t, regOpts)
defer teardown()
if err != nil {
t.Fatal(err)
@@ -55,10 +63,21 @@ func TestStreamerRetrieveRequest(t *testing.T) {
)
streamer.delivery.RequestFromPeers(ctx, req)
+ stream := NewStream(swarmChunkServerStreamName, "", true)
+
err = tester.TestExchanges(p2ptest.Exchange{
Label: "RetrieveRequestMsg",
Expects: []p2ptest.Expect{
- {
+ { //start expecting a subscription for RETRIEVE_REQUEST due to `RetrievalClientOnly`
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ History: nil,
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ { //expect a retrieve request message for the given hash
Code: 5,
Msg: &RetrieveRequestMsg{
Addr: hash0[:],
@@ -74,9 +93,12 @@ func TestStreamerRetrieveRequest(t *testing.T) {
}
}
+//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
+//Should time out as the peer does not have the chunk (no syncing happened previously)
func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
- DoServeRetrieve: true,
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingDisabled, //do no syncing
})
defer teardown()
if err != nil {
@@ -89,16 +111,31 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
peer := streamer.getPeer(node.ID())
+ stream := NewStream(swarmChunkServerStreamName, "", true)
+ //simulate pre-subscription to RETRIEVE_REQUEST stream on peer
peer.handleSubscribeMsg(context.TODO(), &SubscribeMsg{
- Stream: NewStream(swarmChunkServerStreamName, "", true),
+ Stream: stream,
History: nil,
Priority: Top,
})
+ //test the exchange
err = tester.TestExchanges(p2ptest.Exchange{
+ Expects: []p2ptest.Expect{
+ { //first expect a subscription to the RETRIEVE_REQUEST stream
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ History: nil,
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ },
+ }, p2ptest.Exchange{
Label: "RetrieveRequestMsg",
Triggers: []p2ptest.Trigger{
- {
+ { //then the actual RETRIEVE_REQUEST....
Code: 5,
Msg: &RetrieveRequestMsg{
Addr: chunk.Address()[:],
@@ -107,7 +144,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
},
},
Expects: []p2ptest.Expect{
- {
+ { //to which the peer responds with offered hashes
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: nil,
@@ -120,7 +157,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
},
})
- expectedError := `exchange #0 "RetrieveRequestMsg": timed out`
+ //should fail with a timeout as the peer we are requesting
+ //the chunk from does not have the chunk
+ expectedError := `exchange #1 "RetrieveRequestMsg": timed out`
if err == nil || err.Error() != expectedError {
t.Fatalf("Expected error %v, got %v", expectedError, err)
}
@@ -130,7 +169,8 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
// offered hashes or delivery if skipHash is set to true
func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
- DoServeRetrieve: true,
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
@@ -138,6 +178,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
node := tester.Nodes[0]
+
peer := streamer.getPeer(node.ID())
stream := NewStream(swarmChunkServerStreamName, "", true)
@@ -156,6 +197,18 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
err = tester.TestExchanges(p2ptest.Exchange{
+ Expects: []p2ptest.Expect{
+ {
+ Code: 4,
+ Msg: &SubscribeMsg{
+ Stream: stream,
+ History: nil,
+ Priority: Top,
+ },
+ Peer: node.ID(),
+ },
+ },
+ }, p2ptest.Exchange{
Label: "RetrieveRequestMsg",
Triggers: []p2ptest.Trigger{
{
@@ -224,9 +277,90 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
}
+// if there is one peer in the Kademlia, RequestFromPeers should return it
+func TestRequestFromPeers(t *testing.T) {
+ dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
+
+ addr := network.RandomAddr()
+ to := network.NewKademlia(addr.OAddr, network.NewKadParams())
+ delivery := NewDelivery(to, nil)
+ protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
+ peer := network.NewPeer(&network.BzzPeer{
+ BzzAddr: network.RandomAddr(),
+ LightNode: false,
+ Peer: protocolsPeer,
+ }, to)
+ to.On(peer)
+ r := NewRegistry(addr.ID(), delivery, nil, nil, nil)
+
+ // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
+ sp := &Peer{
+ Peer: protocolsPeer,
+ pq: pq.New(int(PriorityQueue), PriorityQueueCap),
+ streamer: r,
+ }
+ r.setPeer(sp)
+ req := network.NewRequest(
+ storage.Address(hash0[:]),
+ true,
+ &sync.Map{},
+ )
+ ctx := context.Background()
+ id, _, err := delivery.RequestFromPeers(ctx, req)
+
+ if err != nil {
+ t.Fatal(err)
+ }
+ if *id != dummyPeerID {
+ t.Fatalf("Expected an id, got %v", id)
+ }
+}
+
+// RequestFromPeers should not return light nodes
+func TestRequestFromPeersWithLightNode(t *testing.T) {
+ dummyPeerID := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
+
+ addr := network.RandomAddr()
+ to := network.NewKademlia(addr.OAddr, network.NewKadParams())
+ delivery := NewDelivery(to, nil)
+
+ protocolsPeer := protocols.NewPeer(p2p.NewPeer(dummyPeerID, "dummy", nil), nil, nil)
+ // setting up a lightnode
+ peer := network.NewPeer(&network.BzzPeer{
+ BzzAddr: network.RandomAddr(),
+ LightNode: true,
+ Peer: protocolsPeer,
+ }, to)
+ to.On(peer)
+ r := NewRegistry(addr.ID(), delivery, nil, nil, nil)
+ // an empty priorityQueue has to be created to prevent a goroutine being called after the test has finished
+ sp := &Peer{
+ Peer: protocolsPeer,
+ pq: pq.New(int(PriorityQueue), PriorityQueueCap),
+ streamer: r,
+ }
+ r.setPeer(sp)
+
+ req := network.NewRequest(
+ storage.Address(hash0[:]),
+ true,
+ &sync.Map{},
+ )
+
+ ctx := context.Background()
+ // making a request which should return with "no peer found"
+ _, _, err := delivery.RequestFromPeers(ctx, req)
+
+ expectedError := "no peer found"
+ if err.Error() != expectedError {
+ t.Fatalf("expected '%v', got %v", expectedError, err)
+ }
+}
+
func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(t, &RegistryOptions{
- DoServeRetrieve: true,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingDisabled,
})
defer teardown()
if err != nil {
@@ -241,6 +375,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
node := tester.Nodes[0]
+ //subscribe to custom stream
stream := NewStream("foo", "", true)
err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top)
if err != nil {
@@ -253,7 +388,7 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
Expects: []p2ptest.Expect{
- {
+ { //first expect subscription to the custom stream...
Code: 4,
Msg: &SubscribeMsg{
Stream: stream,
@@ -267,7 +402,8 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
p2ptest.Exchange{
Label: "ChunkDelivery message",
Triggers: []p2ptest.Trigger{
- {
+ { //...then trigger a chunk delivery for the given chunk from peer in order for
+ //local node to get the chunk delivered
Code: 6,
Msg: &ChunkDeliveryMsg{
Addr: chunkKey,
@@ -342,8 +478,9 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- SkipCheck: skipCheck,
- DoServeRetrieve: true,
+ SkipCheck: skipCheck,
+ Syncing: SyncingDisabled,
+ Retrieval: RetrievalEnabled,
})
bucket.Store(bucketKeyRegistry, r)
@@ -408,20 +545,6 @@ func testDeliveryFromNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
return err
}
- //each of the nodes (except pivot node) subscribes to the stream of the next node
- for j, node := range nodeIDs[0 : nodes-1] {
- sid := nodeIDs[j+1]
- item, ok := sim.NodeItem(node, bucketKeyRegistry)
- if !ok {
- return fmt.Errorf("No registry")
- }
- registry := item.(*Registry)
- err = registry.Subscribe(sid, NewStream(swarmChunkServerStreamName, "", true), nil, Top)
- if err != nil {
- return err
- }
- }
-
//get the pivot node's filestore
item, ok := sim.NodeItem(*sim.PivotNodeID(), bucketKeyFileStore)
if !ok {
@@ -530,7 +653,8 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, conns, chunkCount int, skip
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
- DoSync: true,
+ Syncing: SyncingDisabled,
+ Retrieval: RetrievalDisabled,
SyncUpdateDelay: 0,
})
diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go
index 3164193b3..0c95fabb7 100644
--- a/swarm/network/stream/intervals_test.go
+++ b/swarm/network/stream/intervals_test.go
@@ -83,6 +83,8 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingRegisterOnly,
SkipCheck: skipCheck,
})
bucket.Store(bucketKeyRegistry, r)
diff --git a/swarm/network/stream/lightnode_test.go b/swarm/network/stream/lightnode_test.go
index 0d3bc7f54..65cde2411 100644
--- a/swarm/network/stream/lightnode_test.go
+++ b/swarm/network/stream/lightnode_test.go
@@ -25,7 +25,8 @@ import (
// when it is serving Retrieve requests.
func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) {
registryOptions := &RegistryOptions{
- DoServeRetrieve: true,
+ Retrieval: RetrievalClientOnly,
+ Syncing: SyncingDisabled,
}
tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
defer teardown()
@@ -63,7 +64,8 @@ func TestLigthnodeRetrieveRequestWithRetrieve(t *testing.T) {
// requests are disabled
func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) {
registryOptions := &RegistryOptions{
- DoServeRetrieve: false,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingDisabled,
}
tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
defer teardown()
@@ -106,7 +108,8 @@ func TestLigthnodeRetrieveRequestWithoutRetrieve(t *testing.T) {
// when syncing is enabled.
func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) {
registryOptions := &RegistryOptions{
- DoSync: true,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingRegisterOnly,
}
tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
defer teardown()
@@ -150,7 +153,8 @@ func TestLigthnodeRequestSubscriptionWithSync(t *testing.T) {
// when syncing is disabled.
func TestLigthnodeRequestSubscriptionWithoutSync(t *testing.T) {
registryOptions := &RegistryOptions{
- DoSync: false,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingDisabled,
}
tester, _, _, teardown, err := newStreamerTester(t, registryOptions)
defer teardown()
diff --git a/swarm/network/stream/snapshot_retrieval_test.go b/swarm/network/stream/snapshot_retrieval_test.go
index b81cfc0ca..ad1519341 100644
--- a/swarm/network/stream/snapshot_retrieval_test.go
+++ b/swarm/network/stream/snapshot_retrieval_test.go
@@ -127,10 +127,9 @@ func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s no
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- DoSync: true,
+ Retrieval: RetrievalEnabled,
+ Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
- DoRetrieve: true,
- DoServeRetrieve: true,
})
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
diff --git a/swarm/network/stream/snapshot_sync_test.go b/swarm/network/stream/snapshot_sync_test.go
index 8d89f28cb..2ddbed936 100644
--- a/swarm/network/stream/snapshot_sync_test.go
+++ b/swarm/network/stream/snapshot_sync_test.go
@@ -165,8 +165,8 @@ func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Servic
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- DoSync: true,
- DoServeRetrieve: true,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 3 * time.Second,
})
@@ -360,8 +360,8 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
- DoServeRetrieve: true,
- DoSync: true,
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingRegisterOnly,
})
bucket.Store(bucketKeyRegistry, r)
diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go
index 0ac374def..695ff0c50 100644
--- a/swarm/network/stream/stream.go
+++ b/swarm/network/stream/stream.go
@@ -47,6 +47,31 @@ const (
HashSize = 32
)
+//Enumerate options for syncing and retrieval
+type SyncingOption int
+type RetrievalOption int
+
+//Syncing options
+const (
+ //Syncing disabled
+ SyncingDisabled SyncingOption = iota
+ //Register the client and the server but not subscribe
+ SyncingRegisterOnly
+ //Both client and server funcs are registered, subscribe sent automatically
+ SyncingAutoSubscribe
+)
+
+const (
+ //Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
+ RetrievalDisabled RetrievalOption = iota
+ //Only the client side of the retrieve request is registered.
+ //(light nodes do not serve retrieve requests)
+ //once the client is registered, subscription to retrieve request stream is always sent
+ RetrievalClientOnly
+ //Both client and server funcs are registered, subscribe sent automatically
+ RetrievalEnabled
+)
+
// Registry registry for outgoing and incoming streamer constructors
type Registry struct {
addr enode.ID
@@ -60,16 +85,15 @@ type Registry struct {
peers map[enode.ID]*Peer
delivery *Delivery
intervalsStore state.Store
- doRetrieve bool
+ autoRetrieval bool //automatically subscribe to retrieve request stream
maxPeerServers int
}
// RegistryOptions holds optional values for NewRegistry constructor.
type RegistryOptions struct {
SkipCheck bool
- DoSync bool // Sets if the server syncs with peers. Default is true, set to false by lightnode or nosync flags.
- DoRetrieve bool // Sets if the server issues Retrieve requests. Default is true.
- DoServeRetrieve bool // Sets if the server serves Retrieve requests. Default is true, set to false by lightnode flag.
+ Syncing SyncingOption //Defines syncing behavior
+ Retrieval RetrievalOption //Defines retrieval behavior
SyncUpdateDelay time.Duration
MaxPeerServers int // The limit of servers for each peer in registry
}
@@ -82,6 +106,9 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
if options.SyncUpdateDelay <= 0 {
options.SyncUpdateDelay = 15 * time.Second
}
+ //check if retriaval has been disabled
+ retrieval := options.Retrieval != RetrievalDisabled
+
streamer := &Registry{
addr: localID,
skipCheck: options.SkipCheck,
@@ -90,13 +117,14 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
peers: make(map[enode.ID]*Peer),
delivery: delivery,
intervalsStore: intervalsStore,
- doRetrieve: options.DoRetrieve,
+ autoRetrieval: retrieval,
maxPeerServers: options.MaxPeerServers,
}
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer
- if options.DoServeRetrieve {
+ //if retrieval is enabled, register the server func, so that retrieve requests will be served (non-light nodes only)
+ if options.Retrieval == RetrievalEnabled {
streamer.RegisterServerFunc(swarmChunkServerStreamName, func(_ *Peer, _ string, live bool) (Server, error) {
if !live {
return nil, errors.New("only live retrieval requests supported")
@@ -105,16 +133,21 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
})
}
- streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
- return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
- })
+ //if retrieval is not disabled, register the client func (both light nodes and normal nodes can issue retrieve requests)
+ if options.Retrieval != RetrievalDisabled {
+ streamer.RegisterClientFunc(swarmChunkServerStreamName, func(p *Peer, t string, live bool) (Client, error) {
+ return NewSwarmSyncerClient(p, syncChunkStore, NewStream(swarmChunkServerStreamName, t, live))
+ })
+ }
- if options.DoSync {
+ //If syncing is not disabled, the syncing functions are registered (both client and server)
+ if options.Syncing != SyncingDisabled {
RegisterSwarmSyncerServer(streamer, syncChunkStore)
RegisterSwarmSyncerClient(streamer, syncChunkStore)
}
- if options.DoSync {
+ //if syncing is set to automatically subscribe to the syncing stream, start the subscription process
+ if options.Syncing == SyncingAutoSubscribe {
// latestIntC function ensures that
// - receiving from the in chan is not blocked by processing inside the for loop
// - the latest int value is delivered to the loop after the processing is done
@@ -385,7 +418,7 @@ func (r *Registry) Run(p *network.BzzPeer) error {
defer close(sp.quit)
defer sp.close()
- if r.doRetrieve {
+ if r.autoRetrieval && !p.LightNode {
err := r.Subscribe(p.ID(), NewStream(swarmChunkServerStreamName, "", true), nil, Top)
if err != nil {
return err
diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go
index e7f79e7a1..16c74d3b3 100644
--- a/swarm/network/stream/streamer_test.go
+++ b/swarm/network/stream/streamer_test.go
@@ -765,6 +765,8 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
var maxPeerServers = 6
tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingDisabled,
MaxPeerServers: maxPeerServers,
})
defer teardown()
diff --git a/swarm/network/stream/syncer_test.go b/swarm/network/stream/syncer_test.go
index f2be3bef9..b0e35b0db 100644
--- a/swarm/network/stream/syncer_test.go
+++ b/swarm/network/stream/syncer_test.go
@@ -62,6 +62,9 @@ func createMockStore(globalStore *mockdb.GlobalStore, id enode.ID, addr *network
params.Init(datadir)
params.BaseKey = addr.Over()
lstore, err = storage.NewLocalStore(params, mockStore)
+ if err != nil {
+ return nil, "", err
+ }
return lstore, datadir, nil
}
@@ -114,6 +117,8 @@ func testSyncBetweenNodes(t *testing.T, nodes, conns, chunkCount int, skipCheck
bucket.Store(bucketKeyDelivery, delivery)
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ Retrieval: RetrievalDisabled,
+ Syncing: SyncingAutoSubscribe,
SkipCheck: skipCheck,
})
diff --git a/swarm/pot/address.go b/swarm/pot/address.go
index 3974ebcaa..728dac14e 100644
--- a/swarm/pot/address.go
+++ b/swarm/pot/address.go
@@ -79,46 +79,6 @@ func (a Address) Bytes() []byte {
return a[:]
}
-/*
-Proximity(x, y) returns the proximity order of the MSB distance between x and y
-
-The distance metric MSB(x, y) of two equal length byte sequences x an y is the
-value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed.
-the binary cast is big endian: most significant bit first (=MSB).
-
-Proximity(x, y) is a discrete logarithmic scaling of the MSB distance.
-It is defined as the reverse rank of the integer part of the base 2
-logarithm of the distance.
-It is calculated by counting the number of common leading zeros in the (MSB)
-binary representation of the x^y.
-
-(0 farthest, 255 closest, 256 self)
-*/
-func proximity(one, other Address) (ret int, eq bool) {
- return posProximity(one, other, 0)
-}
-
-// posProximity(a, b, pos) returns proximity order of b wrt a (symmetric) pretending
-// the first pos bits match, checking only bits index >= pos
-func posProximity(one, other Address, pos int) (ret int, eq bool) {
- for i := pos / 8; i < len(one); i++ {
- if one[i] == other[i] {
- continue
- }
- oxo := one[i] ^ other[i]
- start := 0
- if i == pos/8 {
- start = pos % 8
- }
- for j := start; j < 8; j++ {
- if (oxo>>uint8(7-j))&0x01 != 0 {
- return i*8 + j, false
- }
- }
- }
- return len(one) * 8, true
-}
-
// ProxCmp compares the distances a->target and b->target.
// Returns -1 if a is closer to target, 1 if b is closer to target
// and 0 if they are equal.
diff --git a/swarm/pss/client/client_test.go b/swarm/pss/client/client_test.go
index 48edc6cce..8f2f0e805 100644
--- a/swarm/pss/client/client_test.go
+++ b/swarm/pss/client/client_test.go
@@ -252,7 +252,13 @@ func newServices() adapters.Services {
ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
keys, err := wapi.NewKeyPair(ctxlocal)
+ if err != nil {
+ return nil, err
+ }
privkey, err := w.GetPrivateKey(keys)
+ if err != nil {
+ return nil, err
+ }
psparams := pss.NewPssParams().WithPrivateKey(privkey)
pskad := kademlia(ctx.Config.ID)
ps, err := pss.NewPss(pskad, psparams)
@@ -288,10 +294,6 @@ type testStore struct {
values map[string][]byte
}
-func newTestStore() *testStore {
- return &testStore{values: make(map[string][]byte)}
-}
-
func (t *testStore) Load(key string) ([]byte, error) {
return nil, nil
}
diff --git a/swarm/pss/notify/notify_test.go b/swarm/pss/notify/notify_test.go
index 675b41ada..d4d383a6b 100644
--- a/swarm/pss/notify/notify_test.go
+++ b/swarm/pss/notify/notify_test.go
@@ -223,7 +223,13 @@ func newServices(allowRaw bool) adapters.Services {
ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
keys, err := wapi.NewKeyPair(ctxlocal)
+ if err != nil {
+ return nil, err
+ }
privkey, err := w.GetPrivateKey(keys)
+ if err != nil {
+ return nil, err
+ }
pssp := pss.NewPssParams().WithPrivateKey(privkey)
pssp.MsgTTL = time.Second * 30
pssp.AllowRaw = allowRaw
diff --git a/swarm/pss/protocol_test.go b/swarm/pss/protocol_test.go
index f4209fea5..4ef3e90a0 100644
--- a/swarm/pss/protocol_test.go
+++ b/swarm/pss/protocol_test.go
@@ -93,11 +93,17 @@ func testProtocol(t *testing.T) {
lctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
lsub, err := clients[0].Subscribe(lctx, "pss", lmsgC, "receive", topic)
+ if err != nil {
+ t.Fatal(err)
+ }
defer lsub.Unsubscribe()
rmsgC := make(chan APIMsg)
rctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
rsub, err := clients[1].Subscribe(rctx, "pss", rmsgC, "receive", topic)
+ if err != nil {
+ t.Fatal(err)
+ }
defer rsub.Unsubscribe()
// set reciprocal public keys
diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go
index 574714114..66a90be62 100644
--- a/swarm/pss/pss_test.go
+++ b/swarm/pss/pss_test.go
@@ -976,11 +976,6 @@ func TestNetwork10000(t *testing.T) {
}
func testNetwork(t *testing.T) {
- type msgnotifyC struct {
- id enode.ID
- msgIdx int
- }
-
paramstring := strings.Split(t.Name(), "/")
nodecount, _ := strconv.ParseInt(paramstring[1], 10, 0)
msgcount, _ := strconv.ParseInt(paramstring[2], 10, 0)
diff --git a/swarm/pss/types.go b/swarm/pss/types.go
index 1e33ecdca..56c2c51dc 100644
--- a/swarm/pss/types.go
+++ b/swarm/pss/types.go
@@ -169,10 +169,6 @@ type stateStore struct {
values map[string][]byte
}
-func newStateStore() *stateStore {
- return &stateStore{values: make(map[string][]byte)}
-}
-
func (store *stateStore) Load(key string) ([]byte, error) {
return nil, nil
}
diff --git a/swarm/sctx/sctx.go b/swarm/sctx/sctx.go
index bed2b1145..fb7d35b00 100644
--- a/swarm/sctx/sctx.go
+++ b/swarm/sctx/sctx.go
@@ -2,19 +2,17 @@ package sctx
import "context"
-type ContextKey int
-
-const (
- HTTPRequestIDKey ContextKey = iota
- requestHostKey
+type (
+ HTTPRequestIDKey struct{}
+ requestHostKey struct{}
)
func SetHost(ctx context.Context, domain string) context.Context {
- return context.WithValue(ctx, requestHostKey, domain)
+ return context.WithValue(ctx, requestHostKey{}, domain)
}
func GetHost(ctx context.Context) string {
- v, ok := ctx.Value(requestHostKey).(string)
+ v, ok := ctx.Value(requestHostKey{}).(string)
if ok {
return v
}
diff --git a/swarm/state/dbstore.go b/swarm/state/dbstore.go
index 5e5c172b2..b0aa92e27 100644
--- a/swarm/state/dbstore.go
+++ b/swarm/state/dbstore.go
@@ -69,7 +69,7 @@ func (s *DBStore) Get(key string, i interface{}) (err error) {
// Put stores an object that implements Binary for a specific key.
func (s *DBStore) Put(key string, i interface{}) (err error) {
- bytes := []byte{}
+ var bytes []byte
marshaler, ok := i.(encoding.BinaryMarshaler)
if !ok {
diff --git a/swarm/state/dbstore_test.go b/swarm/state/dbstore_test.go
index 6683e788f..f7098956d 100644
--- a/swarm/state/dbstore_test.go
+++ b/swarm/state/dbstore_test.go
@@ -112,6 +112,9 @@ func testPersistedStore(t *testing.T, store Store) {
as := []string{}
err = store.Get("key2", &as)
+ if err != nil {
+ t.Fatal(err)
+ }
if len(as) != 3 {
t.Fatalf("serialized array did not match expectation")
diff --git a/swarm/state/inmemorystore.go b/swarm/state/inmemorystore.go
index 1ca25404a..3ba48592b 100644
--- a/swarm/state/inmemorystore.go
+++ b/swarm/state/inmemorystore.go
@@ -59,7 +59,7 @@ func (s *InmemoryStore) Get(key string, i interface{}) (err error) {
func (s *InmemoryStore) Put(key string, i interface{}) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
- bytes := []byte{}
+ var bytes []byte
marshaler, ok := i.(encoding.BinaryMarshaler)
if !ok {
diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go
index 33133edd7..600be164a 100644
--- a/swarm/storage/common_test.go
+++ b/swarm/storage/common_test.go
@@ -88,17 +88,6 @@ func mputRandomChunks(store ChunkStore, n int, chunksize int64) ([]Chunk, error)
return mput(store, n, GenerateRandomChunk)
}
-func mputChunks(store ChunkStore, chunks ...Chunk) error {
- i := 0
- f := func(n int64) Chunk {
- chunk := chunks[i]
- i++
- return chunk
- }
- _, err := mput(store, len(chunks), f)
- return err
-}
-
func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error) {
// put to localstore and wait for stored channel
// does not check delivery error state
diff --git a/swarm/storage/feed/handler_test.go b/swarm/storage/feed/handler_test.go
index cf95bc1f5..fb2ef3a6b 100644
--- a/swarm/storage/feed/handler_test.go
+++ b/swarm/storage/feed/handler_test.go
@@ -27,7 +27,6 @@ import (
"time"
"github.com/ethereum/go-ethereum/crypto"
-
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/storage"
@@ -506,15 +505,3 @@ func newCharlieSigner() *GenericSigner {
privKey, _ := crypto.HexToECDSA("facadefacadefacadefacadefacadefacadefacadefacadefacadefacadefaca")
return NewGenericSigner(privKey)
}
-
-func getUpdateDirect(rh *Handler, addr storage.Address) ([]byte, error) {
- chunk, err := rh.chunkStore.Get(context.TODO(), addr)
- if err != nil {
- return nil, err
- }
- var r Request
- if err := r.fromChunk(addr, chunk.Data()); err != nil {
- return nil, err
- }
- return r.data, nil
-}
diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go
index 49508911f..9feb68741 100644
--- a/swarm/storage/ldbstore.go
+++ b/swarm/storage/ldbstore.go
@@ -39,7 +39,6 @@ import (
"github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/opt"
)
const (
@@ -72,13 +71,6 @@ var (
ErrDBClosed = errors.New("LDBStore closed")
)
-type gcItem struct {
- idx *dpaDBIndex
- value uint64
- idxKey []byte
- po uint8
-}
-
type LDBStoreParams struct {
*StoreParams
Path string
@@ -961,15 +953,3 @@ func (s *LDBStore) SyncIterator(since uint64, until uint64, po uint8, f func(Add
}
return it.Error()
}
-
-func databaseExists(path string) bool {
- o := &opt.Options{
- ErrorIfMissing: true,
- }
- tdb, err := leveldb.OpenFile(path, o)
- if err != nil {
- return false
- }
- defer tdb.Close()
- return true
-}
diff --git a/swarm/storage/types.go b/swarm/storage/types.go
index 8c70f4584..092843db0 100644
--- a/swarm/storage/types.go
+++ b/swarm/storage/types.go
@@ -80,6 +80,19 @@ func (a Address) bits(i, j uint) uint {
return res
}
+// Proximity(x, y) returns the proximity order of the MSB distance between x and y
+//
+// The distance metric MSB(x, y) of two equal length byte sequences x an y is the
+// value of the binary integer cast of the x^y, ie., x and y bitwise xor-ed.
+// the binary cast is big endian: most significant bit first (=MSB).
+//
+// Proximity(x, y) is a discrete logarithmic scaling of the MSB distance.
+// It is defined as the reverse rank of the integer part of the base 2
+// logarithm of the distance.
+// It is calculated by counting the number of common leading zeros in the (MSB)
+// binary representation of the x^y.
+//
+// (0 farthest, 255 closest, 256 self)
func Proximity(one, other []byte) (ret int) {
b := (MaxPO-1)/8 + 1
if b > len(one) {
@@ -231,11 +244,8 @@ func GenerateRandomChunk(dataSize int64) Chunk {
}
func GenerateRandomChunks(dataSize int64, count int) (chunks []Chunk) {
- if dataSize > ch.DefaultSize {
- dataSize = ch.DefaultSize
- }
for i := 0; i < count; i++ {
- ch := GenerateRandomChunk(ch.DefaultSize)
+ ch := GenerateRandomChunk(dataSize)
chunks = append(chunks, ch)
}
return chunks
diff --git a/swarm/swarm.go b/swarm/swarm.go
index 7214abbda..1fb5443fd 100644
--- a/swarm/swarm.go
+++ b/swarm/swarm.go
@@ -175,18 +175,24 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
if err := nodeID.UnmarshalText([]byte(config.NodeID)); err != nil {
return nil, err
}
+
+ syncing := stream.SyncingAutoSubscribe
+ if !config.SyncEnabled || config.LightNodeEnabled {
+ syncing = stream.SyncingDisabled
+ }
+
+ retrieval := stream.RetrievalEnabled
+ if config.LightNodeEnabled {
+ retrieval = stream.RetrievalClientOnly
+ }
+
registryOptions := &stream.RegistryOptions{
SkipCheck: config.DeliverySkipCheck,
- DoSync: config.SyncEnabled,
- DoRetrieve: true,
- DoServeRetrieve: true,
+ Syncing: syncing,
+ Retrieval: retrieval,
SyncUpdateDelay: config.SyncUpdateDelay,
MaxPeerServers: config.MaxStreamPeerServers,
}
- if config.LightNodeEnabled {
- registryOptions.DoSync = false
- registryOptions.DoRetrieve = false
- }
self.streamer = stream.NewRegistry(nodeID, delivery, self.netStore, stateStore, registryOptions)
// Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage