Skip to content

Commit

Permalink
GODRIVER-2520 Remove deadline setters from gridfs
Browse files Browse the repository at this point in the history
  • Loading branch information
prestonvasquez committed Oct 16, 2023
1 parent fcacd77 commit 0483971
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 253 deletions.
186 changes: 58 additions & 128 deletions mongo/gridfs/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"errors"
"fmt"
"io"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
Expand Down Expand Up @@ -51,9 +50,6 @@ type Bucket struct {
firstWriteDone bool
readBuf []byte
writeBuf []byte

readDeadline time.Time
writeDeadline time.Time
}

// Upload contains options to upload a file to a bucket.
Expand Down Expand Up @@ -100,30 +96,22 @@ func NewBucket(db *mongo.Database, opts ...*options.BucketOptions) (*Bucket, err
return b, nil
}

// SetWriteDeadline sets the write deadline for this bucket.
func (b *Bucket) SetWriteDeadline(t time.Time) error {
b.writeDeadline = t
return nil
}

// SetReadDeadline sets the read deadline for this bucket
func (b *Bucket) SetReadDeadline(t time.Time) error {
b.readDeadline = t
return nil
}

// OpenUploadStream creates a file ID new upload stream for a file given the filename.
func (b *Bucket) OpenUploadStream(filename string, opts ...*options.UploadOptions) (*UploadStream, error) {
return b.OpenUploadStreamWithID(primitive.NewObjectID(), filename, opts...)
func (b *Bucket) OpenUploadStream(
ctx context.Context,
filename string,
opts ...*options.UploadOptions,
) (*UploadStream, error) {
return b.OpenUploadStreamWithID(ctx, primitive.NewObjectID(), filename, opts...)
}

// OpenUploadStreamWithID creates a new upload stream for a file given the file ID and filename.
func (b *Bucket) OpenUploadStreamWithID(fileID interface{}, filename string, opts ...*options.UploadOptions) (*UploadStream, error) {
ctx, cancel := deadlineContext(b.writeDeadline)
if cancel != nil {
defer cancel()
}

func (b *Bucket) OpenUploadStreamWithID(
ctx context.Context,
fileID interface{},
filename string,
opts ...*options.UploadOptions,
) (*UploadStream, error) {
if err := b.checkFirstWrite(ctx); err != nil {
return nil, err
}
Expand All @@ -140,25 +128,30 @@ func (b *Bucket) OpenUploadStreamWithID(fileID interface{}, filename string, opt
//
// If this upload requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
// write operations operations on this bucket that also require a custom deadline.
func (b *Bucket) UploadFromStream(filename string, source io.Reader, opts ...*options.UploadOptions) (primitive.ObjectID, error) {
func (b *Bucket) UploadFromStream(
ctx context.Context,
filename string,
source io.Reader,
opts ...*options.UploadOptions,
) (primitive.ObjectID, error) {
fileID := primitive.NewObjectID()
err := b.UploadFromStreamWithID(fileID, filename, source, opts...)
err := b.UploadFromStreamWithID(ctx, fileID, filename, source, opts...)
return fileID, err
}

// UploadFromStreamWithID uploads a file given a source stream.
//
// If this upload requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
// write operations operations on this bucket that also require a custom deadline.
func (b *Bucket) UploadFromStreamWithID(fileID interface{}, filename string, source io.Reader, opts ...*options.UploadOptions) error {
us, err := b.OpenUploadStreamWithID(fileID, filename, opts...)
if err != nil {
return err
}

err = us.SetWriteDeadline(b.writeDeadline)
func (b *Bucket) UploadFromStreamWithID(
ctx context.Context,
fileID interface{},
filename string,
source io.Reader,
opts ...*options.UploadOptions,
) error {
us, err := b.OpenUploadStreamWithID(ctx, fileID, filename, opts...)
if err != nil {
_ = us.Close()
return err
}

Expand All @@ -185,8 +178,8 @@ func (b *Bucket) UploadFromStreamWithID(fileID interface{}, filename string, sou
}

// OpenDownloadStream creates a stream from which the contents of the file can be read.
func (b *Bucket) OpenDownloadStream(fileID interface{}) (*DownloadStream, error) {
return b.openDownloadStream(bson.D{
func (b *Bucket) OpenDownloadStream(ctx context.Context, fileID interface{}) (*DownloadStream, error) {
return b.openDownloadStream(ctx, bson.D{
{"_id", fileID},
})
}
Expand All @@ -196,17 +189,21 @@ func (b *Bucket) OpenDownloadStream(fileID interface{}) (*DownloadStream, error)
//
// If this download requires a custom read deadline to be set on the bucket, it cannot be done concurrently with other
// read operations operations on this bucket that also require a custom deadline.
func (b *Bucket) DownloadToStream(fileID interface{}, stream io.Writer) (int64, error) {
ds, err := b.OpenDownloadStream(fileID)
func (b *Bucket) DownloadToStream(ctx context.Context, fileID interface{}, stream io.Writer) (int64, error) {
ds, err := b.OpenDownloadStream(ctx, fileID)
if err != nil {
return 0, err
}

return b.downloadToStream(ds, stream)
return b.downloadToStream(ctx, ds, stream)
}

// OpenDownloadStreamByName opens a download stream for the file with the given filename.
func (b *Bucket) OpenDownloadStreamByName(filename string, opts ...*options.NameOptions) (*DownloadStream, error) {
func (b *Bucket) OpenDownloadStreamByName(
ctx context.Context,
filename string,
opts ...*options.NameOptions,
) (*DownloadStream, error) {
var numSkip int32 = -1
var sortOrder int32 = 1

Expand All @@ -222,41 +219,32 @@ func (b *Bucket) OpenDownloadStreamByName(filename string, opts ...*options.Name

findOpts := options.Find().SetSkip(int64(numSkip)).SetSort(bson.D{{"uploadDate", sortOrder}})

return b.openDownloadStream(bson.D{{"filename", filename}}, findOpts)
return b.openDownloadStream(ctx, bson.D{{"filename", filename}}, findOpts)
}

// DownloadToStreamByName downloads the file with the given name to the given io.Writer.
//
// If this download requires a custom read deadline to be set on the bucket, it cannot be done concurrently with other
// read operations operations on this bucket that also require a custom deadline.
func (b *Bucket) DownloadToStreamByName(filename string, stream io.Writer, opts ...*options.NameOptions) (int64, error) {
ds, err := b.OpenDownloadStreamByName(filename, opts...)
func (b *Bucket) DownloadToStreamByName(
ctx context.Context,
filename string,
stream io.Writer,
opts ...*options.NameOptions,
) (int64, error) {
ds, err := b.OpenDownloadStreamByName(ctx, filename, opts...)
if err != nil {
return 0, err
}

return b.downloadToStream(ds, stream)
return b.downloadToStream(ctx, ds, stream)
}

// Delete deletes all chunks and metadata associated with the file with the given file ID.
//
// If this operation requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
// write operations operations on this bucket that also require a custom deadline.
//
// Use SetWriteDeadline to set a deadline for the delete operation.
func (b *Bucket) Delete(fileID interface{}) error {
ctx, cancel := deadlineContext(b.writeDeadline)
if cancel != nil {
defer cancel()
}
return b.DeleteContext(ctx, fileID)
}

// DeleteContext deletes all chunks and metadata associated with the file with the given file ID and runs the underlying
// Delete deletes all chunks and metadata associated with the file with the given file ID and runs the underlying
// delete operations with the provided context.
//
// Use the context parameter to time-out or cancel the delete operation. The deadline set by SetWriteDeadline is ignored.
func (b *Bucket) DeleteContext(ctx context.Context, fileID interface{}) error {
func (b *Bucket) Delete(ctx context.Context, fileID interface{}) error {
// If no deadline is set on the passed-in context, Timeout is set on the Client, and context is
// not already a Timeout context, honor Timeout in new Timeout context for operation execution to
// be shared by both delete operations.
Expand All @@ -281,27 +269,12 @@ func (b *Bucket) DeleteContext(ctx context.Context, fileID interface{}) error {
return b.deleteChunks(ctx, fileID)
}

// Find returns the files collection documents that match the given filter.
//
// If this download requires a custom read deadline to be set on the bucket, it cannot be done concurrently with other
// read operations operations on this bucket that also require a custom deadline.
//
// Use SetReadDeadline to set a deadline for the find operation.
func (b *Bucket) Find(filter interface{}, opts ...*options.GridFSFindOptions) (*mongo.Cursor, error) {
ctx, cancel := deadlineContext(b.readDeadline)
if cancel != nil {
defer cancel()
}

return b.FindContext(ctx, filter, opts...)
}

// FindContext returns the files collection documents that match the given filter and runs the underlying
// Find returns the files collection documents that match the given filter and runs the underlying
// find query with the provided context.
//
// Use the context parameter to time-out or cancel the find operation. The deadline set by SetReadDeadline
// is ignored.
func (b *Bucket) FindContext(ctx context.Context, filter interface{}, opts ...*options.GridFSFindOptions) (*mongo.Cursor, error) {
func (b *Bucket) Find(ctx context.Context, filter interface{}, opts ...*options.GridFSFindOptions) (*mongo.Cursor, error) {
gfsOpts := options.MergeGridFSFindOptions(opts...)
find := options.Find()
if gfsOpts.AllowDiskUse != nil {
Expand Down Expand Up @@ -335,20 +308,7 @@ func (b *Bucket) FindContext(ctx context.Context, filter interface{}, opts ...*o
// write operations operations on this bucket that also require a custom deadline
//
// Use SetWriteDeadline to set a deadline for the rename operation.
func (b *Bucket) Rename(fileID interface{}, newFilename string) error {
ctx, cancel := deadlineContext(b.writeDeadline)
if cancel != nil {
defer cancel()
}

return b.RenameContext(ctx, fileID, newFilename)
}

// RenameContext renames the stored file with the specified file ID and runs the underlying update with the provided
// context.
//
// Use the context parameter to time-out or cancel the rename operation. The deadline set by SetWriteDeadline is ignored.
func (b *Bucket) RenameContext(ctx context.Context, fileID interface{}, newFilename string) error {
func (b *Bucket) Rename(ctx context.Context, fileID interface{}, newFilename string) error {
res, err := b.filesColl.UpdateOne(ctx,
bson.D{{"_id", fileID}},
bson.D{{"$set", bson.D{{"filename", newFilename}}}},
Expand All @@ -364,26 +324,11 @@ func (b *Bucket) RenameContext(ctx context.Context, fileID interface{}, newFilen
return nil
}

// Drop drops the files and chunks collections associated with this bucket.
//
// If this operation requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
// write operations operations on this bucket that also require a custom deadline
//
// Use SetWriteDeadline to set a deadline for the drop operation.
func (b *Bucket) Drop() error {
ctx, cancel := deadlineContext(b.writeDeadline)
if cancel != nil {
defer cancel()
}

return b.DropContext(ctx)
}

// DropContext drops the files and chunks collections associated with this bucket and runs the drop operations with
// the provided context.
//
// Use the context parameter to time-out or cancel the drop operation. The deadline set by SetWriteDeadline is ignored.
func (b *Bucket) DropContext(ctx context.Context) error {
func (b *Bucket) Drop(ctx context.Context) error {
// If no deadline is set on the passed-in context, Timeout is set on the Client, and context is
// not already a Timeout context, honor Timeout in new Timeout context for operation execution to
// be shared by both drop operations.
Expand Down Expand Up @@ -413,12 +358,11 @@ func (b *Bucket) GetChunksCollection() *mongo.Collection {
return b.chunksColl
}

func (b *Bucket) openDownloadStream(filter interface{}, opts ...*options.FindOptions) (*DownloadStream, error) {
ctx, cancel := deadlineContext(b.readDeadline)
if cancel != nil {
defer cancel()
}

func (b *Bucket) openDownloadStream(
ctx context.Context,
filter interface{},
opts ...*options.FindOptions,
) (*DownloadStream, error) {
cursor, err := b.findFile(ctx, filter, opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -450,21 +394,7 @@ func (b *Bucket) openDownloadStream(filter interface{}, opts ...*options.FindOpt
return newDownloadStream(chunksCursor, foundFile.ChunkSize, &foundFile), nil
}

func deadlineContext(deadline time.Time) (context.Context, context.CancelFunc) {
if deadline.Equal(time.Time{}) {
return context.Background(), nil
}

return context.WithDeadline(context.Background(), deadline)
}

func (b *Bucket) downloadToStream(ds *DownloadStream, stream io.Writer) (int64, error) {
err := ds.SetReadDeadline(b.readDeadline)
if err != nil {
_ = ds.Close()
return 0, err
}

func (b *Bucket) downloadToStream(ctx context.Context, ds *DownloadStream, stream io.Writer) (int64, error) {
copied, err := io.Copy(stream, ds)
if err != nil {
_ = ds.Close()
Expand Down
28 changes: 7 additions & 21 deletions mongo/gridfs/download_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type DownloadStream struct {
bufferStart int
bufferEnd int
expectedChunk int32 // index of next expected chunk
readDeadline time.Time
fileLen int64
ctx context.Context

// The pointer returned by GetFile. This should not be used in the actual DownloadStream code outside of the
// newDownloadStream constructor because the values can be mutated by the user after calling GetFile. Instead,
Expand Down Expand Up @@ -128,14 +128,10 @@ func (ds *DownloadStream) Close() error {
return nil
}

// SetReadDeadline sets the read deadline for this download stream.
func (ds *DownloadStream) SetReadDeadline(t time.Time) error {
if ds.closed {
return ErrStreamClosed
}

ds.readDeadline = t
return nil
// WithContext sets the context for the DownloadStream, allowing control over
// the execution and behavior of operations associated with the stream.
func (ds *DownloadStream) WithContext(ctx context.Context) {
ds.ctx = ctx
}

// Read reads the file from the server and writes it to a destination byte slice.
Expand All @@ -148,17 +144,12 @@ func (ds *DownloadStream) Read(p []byte) (int, error) {
return 0, io.EOF
}

ctx, cancel := deadlineContext(ds.readDeadline)
if cancel != nil {
defer cancel()
}

bytesCopied := 0
var err error
for bytesCopied < len(p) {
if ds.bufferStart >= ds.bufferEnd {
// Buffer is empty and can load in data from new chunk.
err = ds.fillBuffer(ctx)
err = ds.fillBuffer(ds.ctx)
if err != nil {
if err == errNoMoreChunks {
if bytesCopied == 0 {
Expand Down Expand Up @@ -190,18 +181,13 @@ func (ds *DownloadStream) Skip(skip int64) (int64, error) {
return 0, nil
}

ctx, cancel := deadlineContext(ds.readDeadline)
if cancel != nil {
defer cancel()
}

var skipped int64
var err error

for skipped < skip {
if ds.bufferStart >= ds.bufferEnd {
// Buffer is empty and can load in data from new chunk.
err = ds.fillBuffer(ctx)
err = ds.fillBuffer(ds.ctx)
if err != nil {
if err == errNoMoreChunks {
return skipped, nil
Expand Down
Loading

0 comments on commit 0483971

Please sign in to comment.