Skip to content

Commit

Permalink
Merge pull request #43 from charleskorn/respect-context-cancellation-…
Browse files Browse the repository at this point in the history
…in-filesystem

filesystem: abort filesystem bucket operations if the context has been cancelled
  • Loading branch information
yeya24 authored Feb 1, 2023
2 parents ae1c52a + 345d3b9 commit 11ffbc4
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#25](https://github.com/thanos-io/objstore/pull/25) S3: Support specifying S3 storage class.
- [#32](https://github.com/thanos-io/objstore/pull/32) Swift: Support authentication using application credentials.
- [#41](https://github.com/thanos-io/objstore/pull/41) S3: Support S3 session token.
- [#43](https://github.com/thanos-io/objstore/pull/43) filesystem: abort filesystem bucket operations if the context has been cancelled

### Changed
- [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`.
Expand Down
34 changes: 29 additions & 5 deletions providers/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func NewBucket(rootDir string) (*Bucket, error) {
// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
if ctx.Err() != nil {
return ctx.Err()
}

params := objstore.ApplyIterOptions(options...)
absDir := filepath.Join(b.rootDir, dir)
info, err := os.Stat(absDir)
Expand Down Expand Up @@ -119,7 +123,11 @@ func (r *rangeReaderCloser) Close() error {
}

// Attributes returns information about the specified object.
func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) {
func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
if ctx.Err() != nil {
return objstore.ObjectAttributes{}, ctx.Err()
}

file := filepath.Join(b.rootDir, name)
stat, err := os.Stat(file)
if err != nil {
Expand All @@ -133,7 +141,11 @@ func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttr
}

// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error) {
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}

if name == "" {
return nil, errors.New("object name is empty")
}
Expand Down Expand Up @@ -163,7 +175,11 @@ func (b *Bucket) GetRange(_ context.Context, name string, off, length int64) (io
}

// Exists checks if the given directory exists in memory.
func (b *Bucket) Exists(_ context.Context, name string) (bool, error) {
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
if ctx.Err() != nil {
return false, ctx.Err()
}

info, err := os.Stat(filepath.Join(b.rootDir, name))
if err != nil {
if os.IsNotExist(err) {
Expand All @@ -175,7 +191,11 @@ func (b *Bucket) Exists(_ context.Context, name string) (bool, error) {
}

// Upload writes the file specified in src to into the memory.
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) (err error) {
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err error) {
if ctx.Err() != nil {
return ctx.Err()
}

file := filepath.Join(b.rootDir, name)
if err := os.MkdirAll(filepath.Dir(file), os.ModePerm); err != nil {
return err
Expand Down Expand Up @@ -211,7 +231,11 @@ func isDirEmpty(name string) (ok bool, err error) {
}

// Delete removes all data prefixed with the dir.
func (b *Bucket) Delete(_ context.Context, name string) error {
func (b *Bucket) Delete(ctx context.Context, name string) error {
if ctx.Err() != nil {
return ctx.Err()
}

file := filepath.Join(b.rootDir, name)
for file != b.rootDir {
if err := os.RemoveAll(file); err != nil {
Expand Down
88 changes: 88 additions & 0 deletions providers/filesystem/filesystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package filesystem

import (
"bytes"
"context"
"strings"
"sync"
Expand Down Expand Up @@ -44,3 +45,90 @@ func TestDelete_EmptyDirDeletionRaceCondition(t *testing.T) {
group.Wait()
}
}

func TestIter_CancelledContext(t *testing.T) {
b, err := NewBucket(t.TempDir())
testutil.Ok(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

err = b.Iter(ctx, "", func(s string) error {
return nil
})

testutil.NotOk(t, err)
testutil.Equals(t, context.Canceled, err)
}

func TestGet_CancelledContext(t *testing.T) {
b, err := NewBucket(t.TempDir())
testutil.Ok(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

_, err = b.Get(ctx, "some-file")
testutil.NotOk(t, err)
testutil.Equals(t, context.Canceled, err)
}

func TestAttributes_CancelledContext(t *testing.T) {
b, err := NewBucket(t.TempDir())
testutil.Ok(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

_, err = b.Attributes(ctx, "some-file")
testutil.NotOk(t, err)
testutil.Equals(t, context.Canceled, err)
}

func TestGetRange_CancelledContext(t *testing.T) {
b, err := NewBucket(t.TempDir())
testutil.Ok(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

_, err = b.GetRange(ctx, "some-file", 0, 100)
testutil.NotOk(t, err)
testutil.Equals(t, context.Canceled, err)
}

func TestExists_CancelledContext(t *testing.T) {
b, err := NewBucket(t.TempDir())
testutil.Ok(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

_, err = b.Exists(ctx, "some-file")
testutil.NotOk(t, err)
testutil.Equals(t, context.Canceled, err)
}

func TestUpload_CancelledContext(t *testing.T) {
b, err := NewBucket(t.TempDir())
testutil.Ok(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

err = b.Upload(ctx, "some-file", bytes.NewReader([]byte("file content")))
testutil.NotOk(t, err)
testutil.Equals(t, context.Canceled, err)
}

func TestDelete_CancelledContext(t *testing.T) {
b, err := NewBucket(t.TempDir())
testutil.Ok(t, err)

ctx, cancel := context.WithCancel(context.Background())
cancel()

err = b.Delete(ctx, "some-file")
testutil.NotOk(t, err)
testutil.Equals(t, context.Canceled, err)
}

0 comments on commit 11ffbc4

Please sign in to comment.