diff --git a/inmem.go b/inmem.go index aee4aec6..52f325d6 100644 --- a/inmem.go +++ b/inmem.go @@ -175,19 +175,20 @@ func (b *InMemBucket) Attributes(_ context.Context, name string) (ObjectAttribut } // Upload writes the file specified in src to into the memory. -func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader) error { +func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader) (int64, error) { b.mtx.Lock() defer b.mtx.Unlock() body, err := io.ReadAll(r) if err != nil { - return err + return 0, err } b.objects[name] = body + size := int64(len(body)) b.attrs[name] = ObjectAttributes{ - Size: int64(len(body)), + Size: size, LastModified: time.Now(), } - return nil + return size, nil } // Delete removes all data prefixed with the dir. diff --git a/objstore.go b/objstore.go index 164c8ae0..e049da8e 100644 --- a/objstore.go +++ b/objstore.go @@ -42,7 +42,7 @@ type Bucket interface { // Upload the contents of the reader as an object into the bucket. // Upload should be idempotent. - Upload(ctx context.Context, name string, r io.Reader) error + Upload(ctx context.Context, name string, r io.Reader) (int64, error) // Delete removes the object with the given name. // If object does not exist in the moment of deletion, Delete should throw error. @@ -285,7 +285,7 @@ func UploadFile(ctx context.Context, logger log.Logger, bkt Bucket, src, dst str } defer logerrcapture.Do(logger, r.Close, "close file %s", src) - if err := bkt.Upload(ctx, dst, r); err != nil { + if _, err := bkt.Upload(ctx, dst, r); err != nil { return errors.Wrapf(err, "upload file %s as %s", src, dst) } level.Debug(logger).Log("msg", "uploaded file", "from", src, "dst", dst, "bucket", bkt.Name()) @@ -436,7 +436,7 @@ func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBu Help: "Second timestamp of the last successful upload to the bucket.", }, []string{"bucket"}), - opsUploadedBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + opsWrittenBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "objstore_bucket_operation_written_bytes_total", Help: "Total number of bytes uploaded from TSDB block, per operation.", ConstLabels: prometheus.Labels{"bucket": name}, @@ -455,7 +455,7 @@ func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBu bkt.opsFailures.WithLabelValues(op) bkt.opsDuration.WithLabelValues(op) bkt.opsFetchedBytes.WithLabelValues(op) - bkt.opsUploadedBytes.WithLabelValues(op) + bkt.opsWrittenBytes.WithLabelValues(op) } bkt.lastSuccessfulUploadTime.WithLabelValues(b.Name()) return bkt @@ -468,8 +468,8 @@ type metricBucket struct { opsFailures *prometheus.CounterVec isOpFailureExpected IsOpFailureExpectedFunc - opsFetchedBytes *prometheus.CounterVec - opsUploadedBytes *prometheus.CounterVec + opsFetchedBytes *prometheus.CounterVec + opsWrittenBytes *prometheus.CounterVec opsDuration *prometheus.HistogramVec lastSuccessfulUploadTime *prometheus.GaugeVec @@ -481,7 +481,7 @@ func (b *metricBucket) WithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket { ops: b.ops, opsFailures: b.opsFailures, opsFetchedBytes: b.opsFetchedBytes, - opsUploadedBytes: b.opsUploadedBytes, + opsWrittenBytes: b.opsWrittenBytes, isOpFailureExpected: fn, opsDuration: b.opsDuration, lastSuccessfulUploadTime: b.lastSuccessfulUploadTime, @@ -539,7 +539,7 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err b.opsFailures, b.isOpFailureExpected, b.opsFetchedBytes, - b.opsUploadedBytes, + b.opsWrittenBytes, ), nil } @@ -561,7 +561,7 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in b.opsFailures, b.isOpFailureExpected, b.opsFetchedBytes, - b.opsUploadedBytes, + b.opsWrittenBytes, ), nil } @@ -581,20 +581,22 @@ func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) { return ok, nil } -func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) (int64, error) { const op = OpUpload b.ops.WithLabelValues(op).Inc() start := time.Now() - if err := b.bkt.Upload(ctx, name, r); err != nil { + writtenBytes, err := b.bkt.Upload(ctx, name, r) + if err != nil { if !b.isOpFailureExpected(err) && ctx.Err() != context.Canceled { b.opsFailures.WithLabelValues(op).Inc() } - return err + return 0, err } + b.opsWrittenBytes.WithLabelValues(op).Add(float64(writtenBytes)) b.lastSuccessfulUploadTime.WithLabelValues(b.bkt.Name()).SetToCurrentTime() b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) - return nil + return writtenBytes, nil } func (b *metricBucket) Delete(ctx context.Context, name string) error { @@ -642,10 +644,10 @@ type timingReadCloser struct { failed *prometheus.CounterVec isFailureExpected IsOpFailureExpectedFunc fetchedBytes *prometheus.CounterVec - uploadedBytes *prometheus.CounterVec + writtenBytes *prometheus.CounterVec } -func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, uploadedBytes *prometheus.CounterVec) *timingReadCloser { +func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, writtenBytes *prometheus.CounterVec) *timingReadCloser { // Initialize the metrics with 0. dur.WithLabelValues(op) failed.WithLabelValues(op) @@ -660,7 +662,7 @@ func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramV failed: failed, isFailureExpected: isFailureExpected, fetchedBytes: fetchedBytes, - uploadedBytes: uploadedBytes, + writtenBytes: writtenBytes, } } diff --git a/objstore_test.go b/objstore_test.go index 87e76810..86b28585 100644 --- a/objstore_test.go +++ b/objstore_test.go @@ -77,9 +77,12 @@ func TestDownloadUploadDirConcurrency(t *testing.T) { m := WrapWithMetrics(NewInMemBucket(), r, "") tempDir := t.TempDir() - testutil.Ok(t, m.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1")))) - testutil.Ok(t, m.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2")))) - testutil.Ok(t, m.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3")))) + _, err := m.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1"))) + testutil.Ok(t, err) + _, err = m.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2"))) + testutil.Ok(t, err) + _, err = m.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3"))) + testutil.Ok(t, err) testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(` # HELP objstore_bucket_operations_total Total number of all attempted operations against a bucket. @@ -143,7 +146,7 @@ func TestTimingTracingReader(t *testing.T) { tr := NopCloserWithSize(r) tr = newTimingReadCloser(tr, "", m.opsDuration, m.opsFailures, func(err error) bool { return false - }, m.opsFetchedBytes, m.opsUploadedBytes) + }, m.opsFetchedBytes, m.opsWrittenBytes) size, err := TryToGetSize(tr) @@ -170,13 +173,16 @@ func TestDownloadDir_CleanUp(t *testing.T) { } tempDir := t.TempDir() - testutil.Ok(t, b.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1")))) - testutil.Ok(t, b.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2")))) - testutil.Ok(t, b.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3")))) + _, err := b.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1"))) + testutil.Ok(t, err) + _, err = b.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2"))) + testutil.Ok(t, err) + _, err = b.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3"))) + testutil.Ok(t, err) // We exapect the third Get to fail testutil.NotOk(t, DownloadDir(context.Background(), log.NewNopLogger(), b, "dir/", "dir/", tempDir)) - _, err := os.Stat(tempDir) + _, err = os.Stat(tempDir) testutil.Assert(t, os.IsNotExist(err)) } diff --git a/prefixed_bucket.go b/prefixed_bucket.go index 41448011..a646cd22 100644 --- a/prefixed_bucket.go +++ b/prefixed_bucket.go @@ -86,7 +86,7 @@ func (p PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttr // Upload the contents of the reader as an object into the bucket. // Upload should be idempotent. -func (p *PrefixedBucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (p *PrefixedBucket) Upload(ctx context.Context, name string, r io.Reader) (int64, error) { return p.bkt.Upload(ctx, conditionalPrefix(p.prefix, name), r) } diff --git a/prefixed_bucket_test.go b/prefixed_bucket_test.go index f93c8580..e7f2d9bc 100644 --- a/prefixed_bucket_test.go +++ b/prefixed_bucket_test.go @@ -29,7 +29,8 @@ func TestPrefixedBucket_Acceptance(t *testing.T) { } func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) { - testutil.Ok(t, bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/file1.jpg", strings.NewReader("test-data1"))) + _, err := bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/file1.jpg", strings.NewReader("test-data1")) + testutil.Ok(t, err) pBkt := NewPrefixedBucket(bkt, prefix) rc1, err := pBkt.Get(context.Background(), "file1.jpg") @@ -41,7 +42,8 @@ func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) { testutil.Ok(t, err) testutil.Equals(t, "test-data1", string(content)) - testutil.Ok(t, pBkt.Upload(context.Background(), "file2.jpg", strings.NewReader("test-data2"))) + _, err = pBkt.Upload(context.Background(), "file2.jpg", strings.NewReader("test-data2")) + testutil.Ok(t, err) rc2, err := bkt.Get(context.Background(), strings.Trim(prefix, "/")+"/file2.jpg") testutil.Ok(t, err) defer func() { testutil.Ok(t, rc2.Close()) }() @@ -69,7 +71,8 @@ func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) { testutil.Ok(t, err) testutil.Assert(t, attrs.Size == 10, "expected size to be equal to 10") - testutil.Ok(t, bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/dir/file1.jpg", strings.NewReader("test-data1"))) + _, err = bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/dir/file1.jpg", strings.NewReader("test-data1")) + testutil.Ok(t, err) seen := []string{} testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error { seen = append(seen, fn) diff --git a/providers/bos/bos.go b/providers/bos/bos.go index 2a9b5cae..58af1209 100644 --- a/providers/bos/bos.go +++ b/providers/bos/bos.go @@ -110,29 +110,29 @@ func (b *Bucket) Delete(_ context.Context, name string) error { } // Upload the contents of the reader as an object into the bucket. -func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { +func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) (int64, error) { size, err := objstore.TryToGetSize(r) if err != nil { - return errors.Wrapf(err, "getting size of %s", name) + return 0, errors.Wrapf(err, "getting size of %s", name) } partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize if partNums == 0 { body, err := bce.NewBodyFromSizedReader(r, lastSlice) if err != nil { - return errors.Wrapf(err, "failed to create SizedReader for %s", name) + return 0, errors.Wrapf(err, "failed to create SizedReader for %s", name) } if _, err := b.client.PutObject(b.name, name, body, nil); err != nil { - return errors.Wrapf(err, "failed to upload %s", name) + return 0, errors.Wrapf(err, "failed to upload %s", name) } - return nil + return 0, nil } result, err := b.client.BasicInitiateMultipartUpload(b.name, name) if err != nil { - return errors.Wrapf(err, "failed to initiate MultipartUpload for %s", name) + return 0, errors.Wrapf(err, "failed to initiate MultipartUpload for %s", name) } uploadEveryPart := func(partSize int64, part int, uploadId string) (string, error) { @@ -156,7 +156,7 @@ func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { for part := 1; part <= partNums; part++ { etag, err := uploadEveryPart(partSize, part, result.UploadId) if err != nil { - return errors.Wrapf(err, "failed to upload part %d for %s", part, name) + return 0, errors.Wrapf(err, "failed to upload part %d for %s", part, name) } parts = append(parts, api.UploadInfoType{PartNumber: part, ETag: etag}) } @@ -164,15 +164,15 @@ func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { if lastSlice != 0 { etag, err := uploadEveryPart(lastSlice, partNums+1, result.UploadId) if err != nil { - return errors.Wrapf(err, "failed to upload the last part for %s", name) + return 0, errors.Wrapf(err, "failed to upload the last part for %s", name) } parts = append(parts, api.UploadInfoType{PartNumber: partNums + 1, ETag: etag}) } if _, err := b.client.CompleteMultipartUploadFromStruct(b.name, name, result.UploadId, &api.CompleteMultipartUploadArgs{Parts: parts}); err != nil { - return errors.Wrapf(err, "failed to set %s upload completed", name) + return 0, errors.Wrapf(err, "failed to set %s upload completed", name) } - return nil + return size, nil } // Iter calls f for each entry in the given directory (not recursive). The argument to f is the full diff --git a/providers/cos/cos.go b/providers/cos/cos.go index 36ecead5..b52ae181 100644 --- a/providers/cos/cos.go +++ b/providers/cos/cos.go @@ -197,24 +197,24 @@ func (r fixedLengthReader) Size() int64 { } // Upload the contents of the reader as an object into the bucket. -func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (int64, error) { size, err := objstore.TryToGetSize(r) if err != nil { - return errors.Wrapf(err, "getting size of %s", name) + return 0, errors.Wrapf(err, "getting size of %s", name) } // partSize 128MB. const partSize = 1024 * 1024 * 128 partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize if partNums == 0 { if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil { - return errors.Wrapf(err, "Put object: %s", name) + return 0, errors.Wrapf(err, "Put object: %s", name) } - return nil + return 0, nil } // 1. init. result, _, err := b.client.Object.InitiateMultipartUpload(ctx, name, nil) if err != nil { - return errors.Wrapf(err, "InitiateMultipartUpload %s", name) + return 0, errors.Wrapf(err, "InitiateMultipartUpload %s", name) } uploadEveryPart := func(partSize int64, part int, uploadID string) (string, error) { r := newFixedLengthReader(r, partSize) @@ -235,7 +235,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { for part := 1; part <= partNums; part++ { etag, err := uploadEveryPart(partSize, part, result.UploadID) if err != nil { - return errors.Wrapf(err, "uploadPart %d, %s", part, name) + return 0, errors.Wrapf(err, "uploadPart %d, %s", part, name) } optcom.Parts = append(optcom.Parts, cos.Object{ PartNumber: part, ETag: etag}, @@ -246,7 +246,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { part := partNums + 1 etag, err := uploadEveryPart(lastSlice, part, result.UploadID) if err != nil { - return errors.Wrapf(err, "uploadPart %d, %s", part, name) + return 0, errors.Wrapf(err, "uploadPart %d, %s", part, name) } optcom.Parts = append(optcom.Parts, cos.Object{ PartNumber: part, ETag: etag}, @@ -254,9 +254,9 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { } // 4. complete. if _, _, err := b.client.Object.CompleteMultipartUpload(ctx, name, result.UploadID, optcom); err != nil { - return errors.Wrapf(err, "CompleteMultipartUpload %s", name) + return 0, errors.Wrapf(err, "CompleteMultipartUpload %s", name) } - return nil + return size, nil } // Delete removes the object with the given name. diff --git a/providers/filesystem/filesystem.go b/providers/filesystem/filesystem.go index 8ccd33b1..72fc2d80 100644 --- a/providers/filesystem/filesystem.go +++ b/providers/filesystem/filesystem.go @@ -191,26 +191,27 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { } // Upload writes the file specified in src to into the memory. -func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (writtenBytes int64, err error) { if ctx.Err() != nil { - return ctx.Err() + return 0, ctx.Err() } file := filepath.Join(b.rootDir, name) if err := os.MkdirAll(filepath.Dir(file), os.ModePerm); err != nil { - return err + return 0, err } f, err := os.Create(file) if err != nil { - return err + return 0, err } defer errcapture.Do(&err, f.Close, "close") - if _, err := io.Copy(f, r); err != nil { - return errors.Wrapf(err, "copy to %s", file) + written, err := io.Copy(f, r) + if err != nil { + return 0, errors.Wrapf(err, "copy to %s", file) } - return nil + return written, nil } func isDirEmpty(name string) (ok bool, err error) { diff --git a/providers/filesystem/filesystem_test.go b/providers/filesystem/filesystem_test.go index c3621fe0..d8a0a91d 100644 --- a/providers/filesystem/filesystem_test.go +++ b/providers/filesystem/filesystem_test.go @@ -23,8 +23,10 @@ func TestDelete_EmptyDirDeletionRaceCondition(t *testing.T) { testutil.Ok(t, err) // Upload 2 objects in a subfolder. - testutil.Ok(t, b.Upload(ctx, "subfolder/first", strings.NewReader("first"))) - testutil.Ok(t, b.Upload(ctx, "subfolder/second", strings.NewReader("second"))) + _, err = b.Upload(ctx, "subfolder/first", strings.NewReader("first")) + testutil.Ok(t, err) + _, err = b.Upload(ctx, "subfolder/second", strings.NewReader("second")) + testutil.Ok(t, err) // Prepare goroutines to concurrently delete the 2 objects (each one deletes a different object) start := make(chan struct{}) @@ -116,7 +118,7 @@ func TestUpload_CancelledContext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - err = b.Upload(ctx, "some-file", bytes.NewReader([]byte("file content"))) + _, err = b.Upload(ctx, "some-file", bytes.NewReader([]byte("file content"))) testutil.NotOk(t, err) testutil.Equals(t, context.Canceled, err) } diff --git a/providers/gcs/gcs.go b/providers/gcs/gcs.go index 8b107c83..078296f0 100644 --- a/providers/gcs/gcs.go +++ b/providers/gcs/gcs.go @@ -169,13 +169,14 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { } // Upload writes the file specified in src to remote GCS location specified as target. -func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (int64, error) { w := b.bkt.Object(name).NewWriter(ctx) - if _, err := io.Copy(w, r); err != nil { - return err + written, err := io.Copy(w, r) + if err != nil { + return 0, err } - return w.Close() + return written, w.Close() } // Delete removes the object with the given name. diff --git a/providers/obs/obs.go b/providers/obs/obs.go index 1bc58aef..bb9c73e4 100644 --- a/providers/obs/obs.go +++ b/providers/obs/obs.go @@ -128,26 +128,26 @@ func (b *Bucket) Delete(ctx context.Context, name string) error { } // Upload the contents of the reader as an object into the bucket. -func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (int64, error) { size, err := objstore.TryToGetSize(r) if err != nil { - return errors.Wrapf(err, "failed to get size apriori to upload %s", name) + return 0, errors.Wrapf(err, "failed to get size apriori to upload %s", name) } if size <= 0 { - return errors.New("object size must be provided") + return 0, errors.New("object size must be provided") } if size <= MinMultipartUploadSize { err = b.putObjectSingle(name, r) if err != nil { - return err + return 0, err } } else { var initOutput *obs.InitiateMultipartUploadOutput initOutput, err = b.initiateMultipartUpload(name) if err != nil { - return err + return 0, err } uploadId := initOutput.UploadId @@ -165,7 +165,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { }() parts, err := b.multipartUpload(size, name, uploadId, r) if err != nil { - return err + return 0, err } _, err = b.client.CompleteMultipartUpload(&obs.CompleteMultipartUploadInput{ @@ -175,10 +175,10 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { Parts: parts, }) if err != nil { - return errors.Wrap(err, "failed to complete multipart upload") + return 0, errors.Wrap(err, "failed to complete multipart upload") } } - return nil + return size, nil } func (b *Bucket) putObjectSingle(key string, body io.Reader) error { diff --git a/providers/oci/oci.go b/providers/oci/oci.go index 2be3210a..24187e99 100644 --- a/providers/oci/oci.go +++ b/providers/oci/oci.go @@ -166,7 +166,7 @@ func (b *Bucket) GetRange(ctx context.Context, name string, offset, length int64 // Upload the contents of the reader as an object into the bucket. // Upload should be idempotent. -func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (written int64, err error) { req := transfer.UploadStreamRequest{ UploadRequest: transfer.UploadRequest{ NamespaceName: common.String(b.namespace), @@ -183,9 +183,9 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err erro } uploadManager := transfer.NewUploadManager() - _, err = uploadManager.UploadStream(ctx, req) + response, err := uploadManager.UploadStream(ctx, req) - return err + return response.PutObjectResponse.RawResponse.ContentLength, err } // Exists checks if the given object exists in the bucket. diff --git a/providers/oss/oss.go b/providers/oss/oss.go index 08fcf1e3..82c73771 100644 --- a/providers/oss/oss.go +++ b/providers/oss/oss.go @@ -68,11 +68,11 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { } // Upload the contents of the reader as an object into the bucket. -func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { +func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) (int64, error) { // TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this. size, err := objstore.TryToGetSize(r) if err != nil { - return errors.Wrapf(err, "failed to get size apriori to upload %s", name) + return 0, errors.Wrapf(err, "failed to get size apriori to upload %s", name) } chunksnum, lastslice := int(math.Floor(float64(size)/PartSize)), size%PartSize @@ -81,13 +81,13 @@ func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { switch chunksnum { case 0: if err := b.bucket.PutObject(name, ncloser); err != nil { - return errors.Wrap(err, "failed to upload oss object") + return 0, errors.Wrap(err, "failed to upload oss object") } default: { init, err := b.bucket.InitiateMultipartUpload(name) if err != nil { - return errors.Wrap(err, "failed to initiate multi-part upload") + return 0, errors.Wrap(err, "failed to initiate multi-part upload") } chunk := 0 uploadEveryPart := func(everypartsize int64, cnk int) (alioss.UploadPart, error) { @@ -105,23 +105,23 @@ func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { for ; chunk < chunksnum; chunk++ { part, err := uploadEveryPart(PartSize, chunk+1) if err != nil { - return errors.Wrap(err, "failed to upload every part") + return 0, errors.Wrap(err, "failed to upload every part") } parts = append(parts, part) } if lastslice != 0 { part, err := uploadEveryPart(lastslice, chunksnum+1) if err != nil { - return errors.Wrap(err, "failed to upload the last chunk") + return 0, errors.Wrap(err, "failed to upload the last chunk") } parts = append(parts, part) } if _, err := b.bucket.CompleteMultipartUpload(init, parts); err != nil { - return errors.Wrap(err, "failed to set multi-part upload completive") + return 0, errors.Wrap(err, "failed to set multi-part upload completive") } } } - return nil + return size, nil } // Delete removes the object with the given name. diff --git a/providers/s3/s3.go b/providers/s3/s3.go index 337bd0d8..5498bb22 100644 --- a/providers/s3/s3.go +++ b/providers/s3/s3.go @@ -478,10 +478,10 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { } // Upload the contents of the reader as an object into the bucket. -func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (int64, error) { sse, err := b.getServerSideEncryption(ctx) if err != nil { - return err + return 0, err } // TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this. @@ -512,10 +512,10 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { NumThreads: 4, }, ); err != nil { - return errors.Wrap(err, "upload s3 object") + return 0, errors.Wrap(err, "upload s3 object") } - return nil + return size, nil } // Attributes returns information about the specified object. diff --git a/providers/s3/s3_e2e_test.go b/providers/s3/s3_e2e_test.go index 4b75a014..2bec75d9 100644 --- a/providers/s3/s3_e2e_test.go +++ b/providers/s3/s3_e2e_test.go @@ -50,6 +50,7 @@ func BenchmarkUpload(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - testutil.Ok(b, bkt.Upload(ctx, "test", strings.NewReader(str))) + _, err = bkt.Upload(ctx, "test", strings.NewReader(str)) + testutil.Ok(b, err) } } diff --git a/providers/swift/swift.go b/providers/swift/swift.go index c24d03fd..c8ca0433 100644 --- a/providers/swift/swift.go +++ b/providers/swift/swift.go @@ -296,7 +296,7 @@ func (b *Container) IsCustomerManagedKeyError(_ error) bool { } // Upload writes the contents of the reader as an object into the container. -func (c *Container) Upload(_ context.Context, name string, r io.Reader) (err error) { +func (c *Container) Upload(_ context.Context, name string, r io.Reader) (written int64, err error) { size, err := objstore.TryToGetSize(r) if err != nil { level.Warn(c.logger).Log("msg", "could not guess file size, using large object to avoid issues if the file is larger than limit", "name", name, "err", err) @@ -314,23 +314,24 @@ func (c *Container) Upload(_ context.Context, name string, r io.Reader) (err err } if c.useDynamicLargeObjects { if file, err = c.connection.DynamicLargeObjectCreateFile(&opts); err != nil { - return errors.Wrap(err, "create DLO file") + return 0, errors.Wrap(err, "create DLO file") } } else { if file, err = c.connection.StaticLargeObjectCreateFile(&opts); err != nil { - return errors.Wrap(err, "create SLO file") + return 0, errors.Wrap(err, "create SLO file") } } } else { if file, err = c.connection.ObjectCreate(c.name, name, true, "", "", swift.Headers{}); err != nil { - return errors.Wrap(err, "create file") + return 0, errors.Wrap(err, "create file") } } defer errcapture.Do(&err, file.Close, "upload object close") - if _, err := io.Copy(file, r); err != nil { - return errors.Wrap(err, "uploading object") + written, err = io.Copy(file, r) + if err != nil { + return 0, errors.Wrap(err, "uploading object") } - return nil + return written, nil } // Delete removes the object with the given name. diff --git a/testing.go b/testing.go index 4e41b278..f784ef3b 100644 --- a/testing.go +++ b/testing.go @@ -100,7 +100,8 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Assert(t, bkt.IsObjNotFoundErr(err), "expected not found error but got %s", err) // Upload first object. - testutil.Ok(t, bkt.Upload(ctx, "id1/obj_1.some", strings.NewReader("@test-data@"))) + _, err = bkt.Upload(ctx, "id1/obj_1.some", strings.NewReader("@test-data@")) + testutil.Ok(t, err) // Double check we can immediately read it. rc1, err := bkt.Get(ctx, "id1/obj_1.some") @@ -150,14 +151,21 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Assert(t, ok, "expected exits") // Upload other objects. - testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@"))) + _, err = bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@")) + testutil.Ok(t, err) // Upload should be idempotent. - testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@"))) - testutil.Ok(t, bkt.Upload(ctx, "id1/obj_3.some", strings.NewReader("@test-data3@"))) - testutil.Ok(t, bkt.Upload(ctx, "id1/sub/subobj_1.some", strings.NewReader("@test-data4@"))) - testutil.Ok(t, bkt.Upload(ctx, "id1/sub/subobj_2.some", strings.NewReader("@test-data5@"))) - testutil.Ok(t, bkt.Upload(ctx, "id2/obj_4.some", strings.NewReader("@test-data6@"))) - testutil.Ok(t, bkt.Upload(ctx, "obj_5.some", strings.NewReader("@test-data7@"))) + _, err = bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@")) + testutil.Ok(t, err) + _, err = bkt.Upload(ctx, "id1/obj_3.some", strings.NewReader("@test-data3@")) + testutil.Ok(t, err) + _, err = bkt.Upload(ctx, "id1/sub/subobj_1.some", strings.NewReader("@test-data4@")) + testutil.Ok(t, err) + _, err = bkt.Upload(ctx, "id1/sub/subobj_2.some", strings.NewReader("@test-data5@")) + testutil.Ok(t, err) + _, err = bkt.Upload(ctx, "id2/obj_4.some", strings.NewReader("@test-data6@")) + testutil.Ok(t, err) + _, err = bkt.Upload(ctx, "obj_5.some", strings.NewReader("@test-data7@")) + testutil.Ok(t, err) // Can we iter over items from top dir? var seen []string @@ -247,7 +255,8 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { sort.Strings(seen) testutil.Equals(t, expected, seen) - testutil.Ok(t, bkt.Upload(ctx, "obj_6.som", bytes.NewReader(make([]byte, 1024*1024*200)))) + _, err = bkt.Upload(ctx, "obj_6.som", bytes.NewReader(make([]byte, 1024*1024*200))) + testutil.Ok(t, err) testutil.Ok(t, bkt.Delete(ctx, "obj_6.som")) } @@ -285,7 +294,7 @@ func (d *delayingBucket) Exists(ctx context.Context, name string) (bool, error) return d.bkt.Exists(ctx, name) } -func (d *delayingBucket) Upload(ctx context.Context, name string, r io.Reader) error { +func (d *delayingBucket) Upload(ctx context.Context, name string, r io.Reader) (int64, error) { time.Sleep(d.delay) return d.bkt.Upload(ctx, name, r) } diff --git a/tracing/opentelemetry/opentelemetry.go b/tracing/opentelemetry/opentelemetry.go index 0e7a279a..ca6588d5 100644 --- a/tracing/opentelemetry/opentelemetry.go +++ b/tracing/opentelemetry/opentelemetry.go @@ -90,7 +90,7 @@ func (t TracingBucket) Attributes(ctx context.Context, name string) (_ objstore. return t.bkt.Attributes(ctx, name) } -func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { +func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (writtenBytes int64, err error) { ctx, span := t.tracer.Start(ctx, "bucket_upload") defer span.End() span.SetAttributes(attribute.String("name", name)) diff --git a/tracing/opentracing/opentracing.go b/tracing/opentracing/opentracing.go index 8174afb1..0bfa4165 100644 --- a/tracing/opentracing/opentracing.go +++ b/tracing/opentracing/opentracing.go @@ -96,10 +96,10 @@ func (t TracingBucket) Attributes(ctx context.Context, name string) (attrs objst return } -func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { +func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (writtenBytes int64, err error) { doWithSpan(ctx, "bucket_upload", func(spanCtx context.Context, span opentracing.Span) { span.LogKV("name", name) - err = t.bkt.Upload(spanCtx, name, r) + writtenBytes, err = t.bkt.Upload(spanCtx, name, r) }) return }