diff options
Diffstat (limited to 'vendor/cloud.google.com/go/storage/writer.go')
-rw-r--r-- | vendor/cloud.google.com/go/storage/writer.go | 261 |
1 files changed, 261 insertions, 0 deletions
diff --git a/vendor/cloud.google.com/go/storage/writer.go b/vendor/cloud.google.com/go/storage/writer.go new file mode 100644 index 000000000..3a58c404e --- /dev/null +++ b/vendor/cloud.google.com/go/storage/writer.go @@ -0,0 +1,261 @@ +// Copyright 2014 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "io" + "sync" + "unicode/utf8" + + "google.golang.org/api/googleapi" + raw "google.golang.org/api/storage/v1" +) + +// A Writer writes a Cloud Storage object. +type Writer struct { + // ObjectAttrs are optional attributes to set on the object. Any attributes + // must be initialized before the first Write call. Nil or zero-valued + // attributes are ignored. + ObjectAttrs + + // SendCRC specifies whether to transmit a CRC32C field. It should be set + // to true in addition to setting the Writer's CRC32C field, because zero + // is a valid CRC and normally a zero would not be transmitted. + // If a CRC32C is sent, and the data written does not match the checksum, + // the write will be rejected. + SendCRC32C bool + + // ChunkSize controls the maximum number of bytes of the object that the + // Writer will attempt to send to the server in a single request. Objects + // smaller than the size will be sent in a single request, while larger + // objects will be split over multiple requests. The size will be rounded up + // to the nearest multiple of 256K. If zero, chunking will be disabled and + // the object will be uploaded in a single request. + // + // ChunkSize will default to a reasonable value. If you perform many concurrent + // writes of small objects, you may wish set ChunkSize to a value that matches + // your objects' sizes to avoid consuming large amounts of memory. + // + // ChunkSize must be set before the first Write call. + ChunkSize int + + // ProgressFunc can be used to monitor the progress of a large write. + // operation. If ProgressFunc is not nil and writing requires multiple + // calls to the underlying service (see + // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload), + // then ProgressFunc will be invoked after each call with the number of bytes of + // content copied so far. + // + // ProgressFunc should return quickly without blocking. + ProgressFunc func(int64) + + ctx context.Context + o *ObjectHandle + + opened bool + pw *io.PipeWriter + + donec chan struct{} // closed after err and obj are set. + obj *ObjectAttrs + + mu sync.Mutex + err error +} + +func (w *Writer) open() error { + attrs := w.ObjectAttrs + // Check the developer didn't change the object Name (this is unfortunate, but + // we don't want to store an object under the wrong name). + if attrs.Name != w.o.object { + return fmt.Errorf("storage: Writer.Name %q does not match object name %q", attrs.Name, w.o.object) + } + if !utf8.ValidString(attrs.Name) { + return fmt.Errorf("storage: object name %q is not valid UTF-8", attrs.Name) + } + if attrs.KMSKeyName != "" && w.o.encryptionKey != nil { + return errors.New("storage: cannot use KMSKeyName with a customer-supplied encryption key") + } + pr, pw := io.Pipe() + w.pw = pw + w.opened = true + + go w.monitorCancel() + + if w.ChunkSize < 0 { + return errors.New("storage: Writer.ChunkSize must be non-negative") + } + mediaOpts := []googleapi.MediaOption{ + googleapi.ChunkSize(w.ChunkSize), + } + if c := attrs.ContentType; c != "" { + mediaOpts = append(mediaOpts, googleapi.ContentType(c)) + } + + go func() { + defer close(w.donec) + + rawObj := attrs.toRawObject(w.o.bucket) + if w.SendCRC32C { + rawObj.Crc32c = encodeUint32(attrs.CRC32C) + } + if w.MD5 != nil { + rawObj.Md5Hash = base64.StdEncoding.EncodeToString(w.MD5) + } + call := w.o.c.raw.Objects.Insert(w.o.bucket, rawObj). + Media(pr, mediaOpts...). + Projection("full"). + Context(w.ctx) + if w.ProgressFunc != nil { + call.ProgressUpdater(func(n, _ int64) { w.ProgressFunc(n) }) + } + if attrs.KMSKeyName != "" { + call.KmsKeyName(attrs.KMSKeyName) + } + if attrs.PredefinedACL != "" { + call.PredefinedAcl(attrs.PredefinedACL) + } + if err := setEncryptionHeaders(call.Header(), w.o.encryptionKey, false); err != nil { + w.mu.Lock() + w.err = err + w.mu.Unlock() + pr.CloseWithError(err) + return + } + var resp *raw.Object + err := applyConds("NewWriter", w.o.gen, w.o.conds, call) + if err == nil { + if w.o.userProject != "" { + call.UserProject(w.o.userProject) + } + setClientHeader(call.Header()) + // If the chunk size is zero, then no chunking is done on the Reader, + // which means we cannot retry: the first call will read the data, and if + // it fails, there is no way to re-read. + if w.ChunkSize == 0 { + resp, err = call.Do() + } else { + // We will only retry here if the initial POST, which obtains a URI for + // the resumable upload, fails with a retryable error. The upload itself + // has its own retry logic. + err = runWithRetry(w.ctx, func() error { + var err2 error + resp, err2 = call.Do() + return err2 + }) + } + } + if err != nil { + w.mu.Lock() + w.err = err + w.mu.Unlock() + pr.CloseWithError(err) + return + } + w.obj = newObject(resp) + }() + return nil +} + +// Write appends to w. It implements the io.Writer interface. +// +// Since writes happen asynchronously, Write may return a nil +// error even though the write failed (or will fail). Always +// use the error returned from Writer.Close to determine if +// the upload was successful. +func (w *Writer) Write(p []byte) (n int, err error) { + w.mu.Lock() + werr := w.err + w.mu.Unlock() + if werr != nil { + return 0, werr + } + if !w.opened { + if err := w.open(); err != nil { + return 0, err + } + } + n, err = w.pw.Write(p) + if err != nil { + w.mu.Lock() + werr := w.err + w.mu.Unlock() + // Preserve existing functionality that when context is canceled, Write will return + // context.Canceled instead of "io: read/write on closed pipe". This hides the + // pipe implementation detail from users and makes Write seem as though it's an RPC. + if werr == context.Canceled || werr == context.DeadlineExceeded { + return n, werr + } + } + return n, err +} + +// Close completes the write operation and flushes any buffered data. +// If Close doesn't return an error, metadata about the written object +// can be retrieved by calling Attrs. +func (w *Writer) Close() error { + if !w.opened { + if err := w.open(); err != nil { + return err + } + } + + // Closing either the read or write causes the entire pipe to close. + if err := w.pw.Close(); err != nil { + return err + } + + <-w.donec + w.mu.Lock() + defer w.mu.Unlock() + return w.err +} + +// monitorCancel is intended to be used as a background goroutine. It monitors the +// the context, and when it observes that the context has been canceled, it manually +// closes things that do not take a context. +func (w *Writer) monitorCancel() { + select { + case <-w.ctx.Done(): + w.mu.Lock() + werr := w.ctx.Err() + w.err = werr + w.mu.Unlock() + + // Closing either the read or write causes the entire pipe to close. + w.CloseWithError(werr) + case <-w.donec: + } +} + +// CloseWithError aborts the write operation with the provided error. +// CloseWithError always returns nil. +// +// Deprecated: cancel the context passed to NewWriter instead. +func (w *Writer) CloseWithError(err error) error { + if !w.opened { + return nil + } + return w.pw.CloseWithError(err) +} + +// Attrs returns metadata about a successfully-written object. +// It's only valid to call it after Close returns nil. +func (w *Writer) Attrs() *ObjectAttrs { + return w.obj +} |