Skip to content

Commit

Permalink
[FEAT] Increment metric
Browse files Browse the repository at this point in the history
Signed-off-by: rita.canavarro <[email protected]>
  • Loading branch information
rita.canavarro committed Jul 25, 2023
1 parent b7698b6 commit da9b315
Show file tree
Hide file tree
Showing 19 changed files with 136 additions and 109 deletions.
9 changes: 5 additions & 4 deletions inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,19 +175,20 @@ func (b *InMemBucket) Attributes(_ context.Context, name string) (ObjectAttribut
}

// Upload writes the file specified in src to into the memory.
func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader) error {
func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader) (int64, error) {
b.mtx.Lock()
defer b.mtx.Unlock()
body, err := io.ReadAll(r)
if err != nil {
return err
return 0, err
}
b.objects[name] = body
size := int64(len(body))
b.attrs[name] = ObjectAttributes{
Size: int64(len(body)),
Size: size,
LastModified: time.Now(),
}
return nil
return size, nil
}

// Delete removes all data prefixed with the dir.
Expand Down
34 changes: 18 additions & 16 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Bucket interface {

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
Upload(ctx context.Context, name string, r io.Reader) error
Upload(ctx context.Context, name string, r io.Reader) (int64, error)

// Delete removes the object with the given name.
// If object does not exist in the moment of deletion, Delete should throw error.
Expand Down Expand Up @@ -285,7 +285,7 @@ func UploadFile(ctx context.Context, logger log.Logger, bkt Bucket, src, dst str
}
defer logerrcapture.Do(logger, r.Close, "close file %s", src)

if err := bkt.Upload(ctx, dst, r); err != nil {
if _, err := bkt.Upload(ctx, dst, r); err != nil {
return errors.Wrapf(err, "upload file %s as %s", src, dst)
}
level.Debug(logger).Log("msg", "uploaded file", "from", src, "dst", dst, "bucket", bkt.Name())
Expand Down Expand Up @@ -436,7 +436,7 @@ func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBu
Help: "Second timestamp of the last successful upload to the bucket.",
}, []string{"bucket"}),

opsUploadedBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
opsWrittenBytes: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "objstore_bucket_operation_written_bytes_total",
Help: "Total number of bytes uploaded from TSDB block, per operation.",
ConstLabels: prometheus.Labels{"bucket": name},
Expand All @@ -455,7 +455,7 @@ func WrapWithMetrics(b Bucket, reg prometheus.Registerer, name string) *metricBu
bkt.opsFailures.WithLabelValues(op)
bkt.opsDuration.WithLabelValues(op)
bkt.opsFetchedBytes.WithLabelValues(op)
bkt.opsUploadedBytes.WithLabelValues(op)
bkt.opsWrittenBytes.WithLabelValues(op)
}
bkt.lastSuccessfulUploadTime.WithLabelValues(b.Name())
return bkt
Expand All @@ -468,8 +468,8 @@ type metricBucket struct {
opsFailures *prometheus.CounterVec
isOpFailureExpected IsOpFailureExpectedFunc

opsFetchedBytes *prometheus.CounterVec
opsUploadedBytes *prometheus.CounterVec
opsFetchedBytes *prometheus.CounterVec
opsWrittenBytes *prometheus.CounterVec

opsDuration *prometheus.HistogramVec
lastSuccessfulUploadTime *prometheus.GaugeVec
Expand All @@ -481,7 +481,7 @@ func (b *metricBucket) WithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket {
ops: b.ops,
opsFailures: b.opsFailures,
opsFetchedBytes: b.opsFetchedBytes,
opsUploadedBytes: b.opsUploadedBytes,
opsWrittenBytes: b.opsWrittenBytes,
isOpFailureExpected: fn,
opsDuration: b.opsDuration,
lastSuccessfulUploadTime: b.lastSuccessfulUploadTime,
Expand Down Expand Up @@ -539,7 +539,7 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err
b.opsFailures,
b.isOpFailureExpected,
b.opsFetchedBytes,
b.opsUploadedBytes,
b.opsWrittenBytes,
), nil
}

Expand All @@ -561,7 +561,7 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in
b.opsFailures,
b.isOpFailureExpected,
b.opsFetchedBytes,
b.opsUploadedBytes,
b.opsWrittenBytes,
), nil
}

Expand All @@ -581,20 +581,22 @@ func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) {
return ok, nil
}

func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) (int64, error) {
const op = OpUpload
b.ops.WithLabelValues(op).Inc()

start := time.Now()
if err := b.bkt.Upload(ctx, name, r); err != nil {
writtenBytes, err := b.bkt.Upload(ctx, name, r)
if err != nil {
if !b.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
b.opsFailures.WithLabelValues(op).Inc()
}
return err
return 0, err
}
b.opsWrittenBytes.WithLabelValues(op).Add(float64(writtenBytes))
b.lastSuccessfulUploadTime.WithLabelValues(b.bkt.Name()).SetToCurrentTime()
b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
return nil
return writtenBytes, nil
}

func (b *metricBucket) Delete(ctx context.Context, name string) error {
Expand Down Expand Up @@ -642,10 +644,10 @@ type timingReadCloser struct {
failed *prometheus.CounterVec
isFailureExpected IsOpFailureExpectedFunc
fetchedBytes *prometheus.CounterVec
uploadedBytes *prometheus.CounterVec
writtenBytes *prometheus.CounterVec
}

func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, uploadedBytes *prometheus.CounterVec) *timingReadCloser {
func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, writtenBytes *prometheus.CounterVec) *timingReadCloser {
// Initialize the metrics with 0.
dur.WithLabelValues(op)
failed.WithLabelValues(op)
Expand All @@ -660,7 +662,7 @@ func newTimingReadCloser(rc io.ReadCloser, op string, dur *prometheus.HistogramV
failed: failed,
isFailureExpected: isFailureExpected,
fetchedBytes: fetchedBytes,
uploadedBytes: uploadedBytes,
writtenBytes: writtenBytes,
}
}

Expand Down
22 changes: 14 additions & 8 deletions objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,12 @@ func TestDownloadUploadDirConcurrency(t *testing.T) {
m := WrapWithMetrics(NewInMemBucket(), r, "")
tempDir := t.TempDir()

testutil.Ok(t, m.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1"))))
testutil.Ok(t, m.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2"))))
testutil.Ok(t, m.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3"))))
_, err := m.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1")))
testutil.Ok(t, err)
_, err = m.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2")))
testutil.Ok(t, err)
_, err = m.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3")))
testutil.Ok(t, err)

testutil.Ok(t, promtest.GatherAndCompare(r, strings.NewReader(`
# HELP objstore_bucket_operations_total Total number of all attempted operations against a bucket.
Expand Down Expand Up @@ -143,7 +146,7 @@ func TestTimingTracingReader(t *testing.T) {
tr := NopCloserWithSize(r)
tr = newTimingReadCloser(tr, "", m.opsDuration, m.opsFailures, func(err error) bool {
return false
}, m.opsFetchedBytes, m.opsUploadedBytes)
}, m.opsFetchedBytes, m.opsWrittenBytes)

size, err := TryToGetSize(tr)

Expand All @@ -170,13 +173,16 @@ func TestDownloadDir_CleanUp(t *testing.T) {
}
tempDir := t.TempDir()

testutil.Ok(t, b.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1"))))
testutil.Ok(t, b.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2"))))
testutil.Ok(t, b.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3"))))
_, err := b.Upload(context.Background(), "dir/obj1", bytes.NewReader([]byte("1")))
testutil.Ok(t, err)
_, err = b.Upload(context.Background(), "dir/obj2", bytes.NewReader([]byte("2")))
testutil.Ok(t, err)
_, err = b.Upload(context.Background(), "dir/obj3", bytes.NewReader([]byte("3")))
testutil.Ok(t, err)

// We exapect the third Get to fail
testutil.NotOk(t, DownloadDir(context.Background(), log.NewNopLogger(), b, "dir/", "dir/", tempDir))
_, err := os.Stat(tempDir)
_, err = os.Stat(tempDir)
testutil.Assert(t, os.IsNotExist(err))
}

Expand Down
2 changes: 1 addition & 1 deletion prefixed_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (p PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttr

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
func (p *PrefixedBucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (p *PrefixedBucket) Upload(ctx context.Context, name string, r io.Reader) (int64, error) {
return p.bkt.Upload(ctx, conditionalPrefix(p.prefix, name), r)
}

Expand Down
9 changes: 6 additions & 3 deletions prefixed_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func TestPrefixedBucket_Acceptance(t *testing.T) {
}

func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) {
testutil.Ok(t, bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/file1.jpg", strings.NewReader("test-data1")))
_, err := bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/file1.jpg", strings.NewReader("test-data1"))
testutil.Ok(t, err)

pBkt := NewPrefixedBucket(bkt, prefix)
rc1, err := pBkt.Get(context.Background(), "file1.jpg")
Expand All @@ -41,7 +42,8 @@ func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) {
testutil.Ok(t, err)
testutil.Equals(t, "test-data1", string(content))

testutil.Ok(t, pBkt.Upload(context.Background(), "file2.jpg", strings.NewReader("test-data2")))
_, err = pBkt.Upload(context.Background(), "file2.jpg", strings.NewReader("test-data2"))
testutil.Ok(t, err)
rc2, err := bkt.Get(context.Background(), strings.Trim(prefix, "/")+"/file2.jpg")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rc2.Close()) }()
Expand Down Expand Up @@ -69,7 +71,8 @@ func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) {
testutil.Ok(t, err)
testutil.Assert(t, attrs.Size == 10, "expected size to be equal to 10")

testutil.Ok(t, bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/dir/file1.jpg", strings.NewReader("test-data1")))
_, err = bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/dir/file1.jpg", strings.NewReader("test-data1"))
testutil.Ok(t, err)
seen := []string{}
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error {
seen = append(seen, fn)
Expand Down
20 changes: 10 additions & 10 deletions providers/bos/bos.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,29 +110,29 @@ func (b *Bucket) Delete(_ context.Context, name string) error {
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) (int64, error) {
size, err := objstore.TryToGetSize(r)
if err != nil {
return errors.Wrapf(err, "getting size of %s", name)
return 0, errors.Wrapf(err, "getting size of %s", name)
}

partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize
if partNums == 0 {
body, err := bce.NewBodyFromSizedReader(r, lastSlice)
if err != nil {
return errors.Wrapf(err, "failed to create SizedReader for %s", name)
return 0, errors.Wrapf(err, "failed to create SizedReader for %s", name)
}

if _, err := b.client.PutObject(b.name, name, body, nil); err != nil {
return errors.Wrapf(err, "failed to upload %s", name)
return 0, errors.Wrapf(err, "failed to upload %s", name)
}

return nil
return 0, nil
}

result, err := b.client.BasicInitiateMultipartUpload(b.name, name)
if err != nil {
return errors.Wrapf(err, "failed to initiate MultipartUpload for %s", name)
return 0, errors.Wrapf(err, "failed to initiate MultipartUpload for %s", name)
}

uploadEveryPart := func(partSize int64, part int, uploadId string) (string, error) {
Expand All @@ -156,23 +156,23 @@ func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {
for part := 1; part <= partNums; part++ {
etag, err := uploadEveryPart(partSize, part, result.UploadId)
if err != nil {
return errors.Wrapf(err, "failed to upload part %d for %s", part, name)
return 0, errors.Wrapf(err, "failed to upload part %d for %s", part, name)
}
parts = append(parts, api.UploadInfoType{PartNumber: part, ETag: etag})
}

if lastSlice != 0 {
etag, err := uploadEveryPart(lastSlice, partNums+1, result.UploadId)
if err != nil {
return errors.Wrapf(err, "failed to upload the last part for %s", name)
return 0, errors.Wrapf(err, "failed to upload the last part for %s", name)
}
parts = append(parts, api.UploadInfoType{PartNumber: partNums + 1, ETag: etag})
}

if _, err := b.client.CompleteMultipartUploadFromStruct(b.name, name, result.UploadId, &api.CompleteMultipartUploadArgs{Parts: parts}); err != nil {
return errors.Wrapf(err, "failed to set %s upload completed", name)
return 0, errors.Wrapf(err, "failed to set %s upload completed", name)
}
return nil
return size, nil
}

// Iter calls f for each entry in the given directory (not recursive). The argument to f is the full
Expand Down
18 changes: 9 additions & 9 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,24 +197,24 @@ func (r fixedLengthReader) Size() int64 {
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (int64, error) {
size, err := objstore.TryToGetSize(r)
if err != nil {
return errors.Wrapf(err, "getting size of %s", name)
return 0, errors.Wrapf(err, "getting size of %s", name)
}
// partSize 128MB.
const partSize = 1024 * 1024 * 128
partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize
if partNums == 0 {
if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil {
return errors.Wrapf(err, "Put object: %s", name)
return 0, errors.Wrapf(err, "Put object: %s", name)
}
return nil
return 0, nil
}
// 1. init.
result, _, err := b.client.Object.InitiateMultipartUpload(ctx, name, nil)
if err != nil {
return errors.Wrapf(err, "InitiateMultipartUpload %s", name)
return 0, errors.Wrapf(err, "InitiateMultipartUpload %s", name)
}
uploadEveryPart := func(partSize int64, part int, uploadID string) (string, error) {
r := newFixedLengthReader(r, partSize)
Expand All @@ -235,7 +235,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
for part := 1; part <= partNums; part++ {
etag, err := uploadEveryPart(partSize, part, result.UploadID)
if err != nil {
return errors.Wrapf(err, "uploadPart %d, %s", part, name)
return 0, errors.Wrapf(err, "uploadPart %d, %s", part, name)
}
optcom.Parts = append(optcom.Parts, cos.Object{
PartNumber: part, ETag: etag},
Expand All @@ -246,17 +246,17 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
part := partNums + 1
etag, err := uploadEveryPart(lastSlice, part, result.UploadID)
if err != nil {
return errors.Wrapf(err, "uploadPart %d, %s", part, name)
return 0, errors.Wrapf(err, "uploadPart %d, %s", part, name)
}
optcom.Parts = append(optcom.Parts, cos.Object{
PartNumber: part, ETag: etag},
)
}
// 4. complete.
if _, _, err := b.client.Object.CompleteMultipartUpload(ctx, name, result.UploadID, optcom); err != nil {
return errors.Wrapf(err, "CompleteMultipartUpload %s", name)
return 0, errors.Wrapf(err, "CompleteMultipartUpload %s", name)
}
return nil
return size, nil
}

// Delete removes the object with the given name.
Expand Down
15 changes: 8 additions & 7 deletions providers/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,26 +191,27 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
}

// Upload writes the file specified in src to into the memory.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err error) {
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (writtenBytes int64, err error) {
if ctx.Err() != nil {
return ctx.Err()
return 0, ctx.Err()
}

file := filepath.Join(b.rootDir, name)
if err := os.MkdirAll(filepath.Dir(file), os.ModePerm); err != nil {
return err
return 0, err
}

f, err := os.Create(file)
if err != nil {
return err
return 0, err
}
defer errcapture.Do(&err, f.Close, "close")

if _, err := io.Copy(f, r); err != nil {
return errors.Wrapf(err, "copy to %s", file)
written, err := io.Copy(f, r)
if err != nil {
return 0, errors.Wrapf(err, "copy to %s", file)
}
return nil
return written, nil
}

func isDirEmpty(name string) (ok bool, err error) {
Expand Down
Loading

0 comments on commit da9b315

Please sign in to comment.