Skip to content

Commit

Permalink
fix linters and test in s3 (gofr-dev#1274)
Browse files Browse the repository at this point in the history
  • Loading branch information
coolwednesday authored Dec 6, 2024
1 parent 4e90b54 commit 90de6c2
Show file tree
Hide file tree
Showing 10 changed files with 957 additions and 844 deletions.
126 changes: 25 additions & 101 deletions pkg/gofr/datasource/file/s3/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
)

type s3file struct {
conn *s3.Client
type S3File struct {
conn s3Client
name string
offset int64
logger Logger
Expand All @@ -29,94 +29,12 @@ type s3file struct {
lastModified time.Time
}

// Name returns the base name of the file.
//
// For a file, this method returns the name of the file without any directory components.
// For directories, it returns the name of the directory.
func (f *s3file) Name() string {
bucketName := getBucketName(f.name)

f.sendOperationStats(&FileLog{
Operation: "GET NAME",
Location: getLocation(bucketName),
}, time.Now())

return path.Base(f.name)
}

// Mode is not supported for the current implementation of S3 buckets.
// This method is included to adhere to the FileSystem interface in GoFr.
//
// Note: The Mode method does not provide meaningful information for S3 objects
// and should be considered a placeholder in this context.
func (f *s3file) Mode() os.FileMode {
bucketName := getBucketName(f.name)

f.sendOperationStats(&FileLog{
Operation: "FILE MODE",
Location: getLocation(bucketName),
Message: aws.String("Not supported for S3"),
}, time.Now())

return 0
}

// Size returns the size of the retrieved object.
//
// For files, it returns the size of the file in bytes.
// For directories, it returns the sum of sizes of all files contained within the directory.
//
// Note:
// - This method should be called on a FileInfo instance obtained from a Stat or ReadDir operation.
func (f *s3file) Size() int64 {
bucketName := getBucketName(f.name)

f.sendOperationStats(&FileLog{
Operation: "FILE/DIR SIZE",
Location: getLocation(bucketName),
}, time.Now())

return f.size
}

// ModTime returns the last modification time of the file or directory.
//
// For files, it returns the timestamp of the last modification to the file's contents.
// For directories, it returns the timestamp of the most recent change to the directory's contents, including updates
// to files within the directory.
func (f *s3file) ModTime() time.Time {
bucketName := getBucketName(f.name)

f.sendOperationStats(&FileLog{
Operation: "LAST MODIFIED",
Location: getLocation(bucketName),
}, time.Now())

return f.lastModified
}

// IsDir checks if the FileInfo describes a directory.
//
// This method returns true if the FileInfo object represents a directory; otherwise, it returns false.
// It is specifically used to determine the type of the file system object represented by the FileInfo.
//
// Note:
// - This method should be called on a FileInfo instance obtained from a Stat or ReadDir operation.
// - The FileInfo interface is used to describe file system objects, and IsDir is one of its methods
// to query whether the object is a directory.
func (f *s3file) IsDir() bool {
bucketName := getBucketName(f.name)

f.sendOperationStats(&FileLog{
Operation: "IS DIR",
Location: getLocation(bucketName),
}, time.Now())

return strings.HasSuffix(f.name, "/")
}
var (
ErrNilResponse = errors.New("response retrieved is nil ")
)

// Close closes the response body returned in Open/Create methods if the response body is not nil.
func (f *s3file) Close() error {
func (f *S3File) Close() error {
bucketName := getBucketName(f.name)

defer f.sendOperationStats(&FileLog{
Expand All @@ -136,7 +54,7 @@ func (f *s3file) Close() error {
// it returns an io.EOF error to indicate that the end of the file has been reached before the slice was completely filled.
//
// Additionally, this method updates the file offset to reflect the next position that will be used for subsequent read or write operations.
func (f *s3file) Read(p []byte) (n int, err error) {
func (f *S3File) Read(p []byte) (n int, err error) {
var fileName, msg string

bucketName := getBucketName(f.name)
Expand Down Expand Up @@ -169,7 +87,7 @@ func (f *s3file) Read(p []byte) (n int, err error) {
f.body = res.Body
if f.body == nil {
msg = fmt.Sprintf("File %q is nil", fileName)
return 0, errors.New("s3 body is nil")
return 0, fmt.Errorf("%w: S3 file is empty", ErrNilResponse)
}

buffer := make([]byte, len(p)+int(f.offset))
Expand Down Expand Up @@ -197,7 +115,7 @@ func (f *s3file) Read(p []byte) (n int, err error) {
//
// This method reads up to len(p) bytes from the file, starting at the given offset. It does not alter the current file offset
// used for other read or write operations. The number of bytes read is returned, along with any error encountered.
func (f *s3file) ReadAt(p []byte, offset int64) (n int, err error) {
func (f *S3File) ReadAt(p []byte, offset int64) (n int, err error) {
bucketName := getBucketName(f.name)

var fileName, msg string
Expand All @@ -217,7 +135,9 @@ func (f *s3file) ReadAt(p []byte, offset int64) (n int, err error) {
fileName = f.name[index+1:]
}

res, err := f.conn.GetObject(context.TODO(), &s3.GetObjectInput{
var res *s3.GetObjectOutput

res, err = f.conn.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(fileName),
})
Expand All @@ -235,7 +155,7 @@ func (f *s3file) ReadAt(p []byte, offset int64) (n int, err error) {

if int64(len(p))+offset+1 > f.size {
msg = fmt.Sprintf("Offset %v out of range", f.offset)
return 0, errors.New("reading out of range, fetching from the offset. Use Seek to reset offset")
return 0, fmt.Errorf("%w: reading out of range, fetching from the offset. Use Seek to reset offset", ErrOutOfRange)
}

buffer := make([]byte, len(p)+int(offset)+1)
Expand All @@ -260,7 +180,7 @@ func (f *s3file) ReadAt(p []byte, offset int64) (n int, err error) {
//
// This method writes up to len(p) bytes from the provided byte slice to the file, starting at the current offset.
// It updates the file offset after the write operation to reflect the new position for subsequent read or write operations.
func (f *s3file) Write(p []byte) (n int, err error) {
func (f *S3File) Write(p []byte) (n int, err error) {
bucketName := getBucketName(f.name)

var fileName, msg string
Expand All @@ -282,9 +202,11 @@ func (f *s3file) Write(p []byte) (n int, err error) {

buffer := p

var res *s3.GetObjectOutput

// if f.offset is not 0, we need to fetch the contents of the file till the offset and then write into the file
if f.offset != 0 {
res, err := f.conn.GetObject(context.TODO(), &s3.GetObjectInput{
res, err = f.conn.GetObject(context.TODO(), &s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(fileName),
})
Expand All @@ -310,7 +232,8 @@ func (f *s3file) Write(p []byte) (n int, err error) {
contentAfterBufferBytes = buffer[f.offset+int64(len(p)):]
}

buffer = append(contentBeforeOffset, p...)
contentBeforeOffset = append(contentBeforeOffset, p...)
buffer = contentBeforeOffset
buffer = append(buffer, contentAfterBufferBytes...)
}

Expand Down Expand Up @@ -344,7 +267,7 @@ func (f *s3file) Write(p []byte) (n int, err error) {
// This method writes up to len(p) bytes from the provided byte slice to the file, starting at the given offset.
// It does not modify the file's current offset used for other read or write operations.
// The number of bytes written and any error encountered during the operation are returned.
func (f *s3file) WriteAt(p []byte, offset int64) (n int, err error) {
func (f *S3File) WriteAt(p []byte, offset int64) (n int, err error) {
bucketName := getBucketName(f.name)

var fileName, msg string
Expand Down Expand Up @@ -380,7 +303,8 @@ func (f *s3file) WriteAt(p []byte, offset int64) (n int, err error) {
contentAfterBufferBytes = buffer[offset+int64(len(p)):]
}

buffer = append(contentBeforeOffset, p...)
contentBeforeOffset = append(contentBeforeOffset, p...)
buffer = contentBeforeOffset
buffer = append(buffer, contentAfterBufferBytes...)

_, err = f.conn.PutObject(context.TODO(), &s3.PutObjectInput{
Expand Down Expand Up @@ -409,7 +333,7 @@ func (f *s3file) WriteAt(p []byte, offset int64) (n int, err error) {
//
// This method performs validation on the arguments provided to the Seek operation. If the arguments are valid, it sets
// the file offset to the specified position. If there are any validation errors, it returns an appropriate error.
func (f *s3file) check(whence int, offset, length int64, msg *string) (int64, error) {
func (f *S3File) check(whence int, offset, length int64, msg *string) (int64, error) {
switch whence {
case io.SeekStart:
case io.SeekEnd:
Expand Down Expand Up @@ -443,7 +367,7 @@ func (f *s3file) check(whence int, offset, length int64, msg *string) (int64, er
// - `io.SeekStart` (0): Offset is relative to the start of the file.
// - `io.SeekCurrent` (1): Offset is relative to the current position in the file.
// - `io.SeekEnd` (2): Offset is relative to the end of the file.
func (f *s3file) Seek(offset int64, whence int) (int64, error) {
func (f *S3File) Seek(offset int64, whence int) (int64, error) {
var msg string

status := statusErr
Expand All @@ -467,7 +391,7 @@ func (f *s3file) Seek(offset int64, whence int) (int64, error) {
}

// sendOperationStats logs the FileLog of any file operations performed in S3.
func (f *s3file) sendOperationStats(fl *FileLog, startTime time.Time) {
func (f *S3File) sendOperationStats(fl *FileLog, startTime time.Time) {
duration := time.Since(startTime).Microseconds()

fl.Duration = duration
Expand Down
95 changes: 92 additions & 3 deletions pkg/gofr/datasource/file/s3/file_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"encoding/json"
"errors"
"io"
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/aws/aws-sdk-go-v2/aws"

file "gofr.dev/pkg/gofr/datasource/file"
)

Expand Down Expand Up @@ -38,7 +41,7 @@ type jsonReader struct {
}

// ReadAll reads either JSON or text files based on file extension and returns a corresponding RowReader.
func (f *s3file) ReadAll() (file.RowReader, error) {
func (f *S3File) ReadAll() (file.RowReader, error) {
bucketName := strings.Split(f.name, string(filepath.Separator))[0]

var fileName string
Expand All @@ -60,7 +63,7 @@ func (f *s3file) ReadAll() (file.RowReader, error) {
}

// createJSONReader creates a JSON reader for JSON files.
func (f *s3file) createJSONReader(location string) (file.RowReader, error) {
func (f *S3File) createJSONReader(location string) (file.RowReader, error) {
status := statusErr

defer f.sendOperationStats(&FileLog{Operation: "JSON READER", Location: location, Status: &status}, time.Now())
Expand Down Expand Up @@ -97,7 +100,7 @@ func (f *s3file) createJSONReader(location string) (file.RowReader, error) {
}

// createTextCSVReader creates a text reader for reading text files.
func (f *s3file) createTextCSVReader(location string) (file.RowReader, error) {
func (f *S3File) createTextCSVReader(location string) (file.RowReader, error) {
status := statusErr

defer f.sendOperationStats(&FileLog{Operation: "TEXT/CSV READER", Location: location, Status: &status}, time.Now())
Expand Down Expand Up @@ -141,3 +144,89 @@ func (f *textReader) Scan(i interface{}) error {

return errStringNotPointer
}

// Name returns the base name of the file.
//
// For a file, this method returns the name of the file without any directory components.
// For directories, it returns the name of the directory.
func (f *S3File) Name() string {
bucketName := getBucketName(f.name)

f.sendOperationStats(&FileLog{
Operation: "GET NAME",
Location: getLocation(bucketName),
}, time.Now())

return path.Base(f.name)
}

// Mode is not supported for the current implementation of S3 buckets.
// This method is included to adhere to the FileSystem interface in GoFr.
//
// Note: The Mode method does not provide meaningful information for S3 objects
// and should be considered a placeholder in this context.
func (f *S3File) Mode() os.FileMode {
bucketName := getBucketName(f.name)

f.sendOperationStats(&FileLog{
Operation: "FILE MODE",
Location: getLocation(bucketName),
Message: aws.String("Not supported for S3"),
}, time.Now())

return 0
}

// Size returns the size of the retrieved object.
//
// For files, it returns the size of the file in bytes.
// For directories, it returns the sum of sizes of all files contained within the directory.
//
// Note:
// - This method should be called on a FileInfo instance obtained from a Stat or ReadDir operation.
func (f *S3File) Size() int64 {
bucketName := getBucketName(f.name)

f.sendOperationStats(&FileLog{
Operation: "FILE/DIR SIZE",
Location: getLocation(bucketName),
}, time.Now())

return f.size
}

// ModTime returns the last modification time of the file or directory.
//
// For files, it returns the timestamp of the last modification to the file's contents.
// For directories, it returns the timestamp of the most recent change to the directory's contents, including updates
// to files within the directory.
func (f *S3File) ModTime() time.Time {
bucketName := getBucketName(f.name)

f.sendOperationStats(&FileLog{
Operation: "LAST MODIFIED",
Location: getLocation(bucketName),
}, time.Now())

return f.lastModified
}

// IsDir checks if the FileInfo describes a directory.
//
// This method returns true if the FileInfo object represents a directory; otherwise, it returns false.
// It is specifically used to determine the type of the file system object represented by the FileInfo.
//
// Note:
// - This method should be called on a FileInfo instance obtained from a Stat or ReadDir operation.
// - The [FileInfo] interface is used to describe file system objects, and IsDir is one of its methods
// to query whether the object is a directory.
func (f *S3File) IsDir() bool {
bucketName := getBucketName(f.name)

f.sendOperationStats(&FileLog{
Operation: "IS DIR",
Location: getLocation(bucketName),
}, time.Now())

return strings.HasSuffix(f.name, "/")
}
Loading

0 comments on commit 90de6c2

Please sign in to comment.