diff options
Diffstat (limited to 'swarm/api/http/server.go')
-rw-r--r-- | swarm/api/http/server.go | 60 |
1 files changed, 55 insertions, 5 deletions
diff --git a/swarm/api/http/server.go b/swarm/api/http/server.go index 3c6735a73..a336bd82f 100644 --- a/swarm/api/http/server.go +++ b/swarm/api/http/server.go @@ -26,6 +26,7 @@ import ( "fmt" "io" "io/ioutil" + "math" "mime" "mime/multipart" "net/http" @@ -38,7 +39,9 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/swarm/api" + "github.com/ethereum/go-ethereum/swarm/chunk" "github.com/ethereum/go-ethereum/swarm/log" + "github.com/ethereum/go-ethereum/swarm/sctx" "github.com/ethereum/go-ethereum/swarm/storage" "github.com/ethereum/go-ethereum/swarm/storage/feed" "github.com/rs/cors" @@ -60,6 +63,8 @@ var ( getListFail = metrics.NewRegisteredCounter("api.http.get.list.fail", nil) ) +const SwarmTagHeaderName = "x-swarm-tag" + type methodHandler map[string]http.Handler func (m methodHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { @@ -94,6 +99,12 @@ func NewServer(api *api.API, corsString string) *Server { InstrumentOpenTracing, } + tagAdapter := Adapter(func(h http.Handler) http.Handler { + return InitUploadTag(h, api.Tags) + }) + + defaultPostMiddlewares := append(defaultMiddlewares, tagAdapter) + mux := http.NewServeMux() mux.Handle("/bzz:/", methodHandler{ "GET": Adapt( @@ -102,7 +113,7 @@ func NewServer(api *api.API, corsString string) *Server { ), "POST": Adapt( http.HandlerFunc(server.HandlePostFiles), - defaultMiddlewares..., + defaultPostMiddlewares..., ), "DELETE": Adapt( http.HandlerFunc(server.HandleDelete), @@ -116,7 +127,7 @@ func NewServer(api *api.API, corsString string) *Server { ), "POST": Adapt( http.HandlerFunc(server.HandlePostRaw), - defaultMiddlewares..., + defaultPostMiddlewares..., ), }) mux.Handle("/bzz-immutable:/", methodHandler{ @@ -230,6 +241,12 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) { ruid := GetRUID(r.Context()) log.Debug("handle.post.raw", "ruid", ruid) + tagUid := sctx.GetTag(r.Context()) + tag, err := s.api.Tags.Get(tagUid) + if err != nil { + log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err) + } + postRawCount.Inc(1) toEncrypt := false @@ -256,13 +273,16 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) { return } - addr, _, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt) + addr, wait, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt) if err != nil { postRawFail.Inc(1) respondError(w, r, err.Error(), http.StatusInternalServerError) return } + wait(r.Context()) + tag.DoneSplit(addr) + log.Debug("stored content", "ruid", ruid, "key", addr) w.Header().Set("Content-Type", "text/plain") @@ -311,7 +331,6 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { } log.Debug("new manifest", "ruid", ruid, "key", addr) } - newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error { switch contentType { case "application/x-tar": @@ -334,6 +353,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { return } + tagUid := sctx.GetTag(r.Context()) + tag, err := s.api.Tags.Get(tagUid) + if err != nil { + log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err) + } + + log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.Total()) + tag.DoneSplit(newAddr) + log.Debug("stored content", "ruid", ruid, "key", newAddr) w.Header().Set("Content-Type", "text/plain") @@ -342,7 +370,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) { } func (s *Server) handleTarUpload(r *http.Request, mw *api.ManifestWriter) (storage.Address, error) { - log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context())) + log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context()), "tag", sctx.GetTag(r.Context())) defaultPath := r.URL.Query().Get("defaultpath") @@ -837,6 +865,28 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) { http.ServeContent(w, r, fileName, time.Now(), newBufferedReadSeeker(reader, getFileBufferSize)) } +// calculateNumberOfChunks calculates the number of chunks in an arbitrary content length +func calculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 { + if contentLength < 4096 { + return 1 + } + branchingFactor := 128 + if isEncrypted { + branchingFactor = 64 + } + + dataChunks := math.Ceil(float64(contentLength) / float64(4096)) + totalChunks := dataChunks + intermediate := dataChunks / float64(branchingFactor) + + for intermediate > 1 { + totalChunks += math.Ceil(intermediate) + intermediate = intermediate / float64(branchingFactor) + } + + return int64(totalChunks) + 1 +} + // The size of buffer used for bufio.Reader on LazyChunkReader passed to // http.ServeContent in HandleGetFile. // Warning: This value influences the number of chunk requests and chunker join goroutines |