Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor s3client ReadFile interface to return a channel. #52

Merged
merged 7 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: "1.20"
go-version-file: go.mod

- name: Unit tests
run: |
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ jobs:
name: PR - GO lint
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version-file: go.mod
- run: |
go generate ./...

Expand Down
193 changes: 67 additions & 126 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package s3client
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net/http"
"path/filepath"
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
Expand All @@ -18,7 +17,6 @@ import (
)

type (

// Logger interface to represent a fluent-bit logging mechanism.
Logger interface {
Error(format string, a ...any)
Expand All @@ -30,20 +28,14 @@ type (
// Client is the interface for interacting with an S3 bucket.
Client interface {
ListFiles(ctx context.Context, bucket, pattern string) ([]string, error)
ReadFiles(ctx context.Context, bucket string, files []string, concurrency int, ch chan<- Message) error
ReadFile(ctx context.Context, bucket string, file string, initialBufferSize int, maxBufferSize int) (<-chan string, <-chan error)
}
// DefaultClient is a concrete implementation of the Client interface that uses the AWS SDK for Go to interact with S3.
DefaultClient struct {
Client
Svc ifaces.Client
Logger Logger
}

// Message is the msg format as specified by the plugin library (https://github.com/calyptia/plugin/blob/785e54918feb3efb78f9ddeadf135dc4f75fa5b0/plugin.go#L77C1-L81C2)
Message struct {
Time time.Time
Record any
}
)

// New returns a new DefaultClient configured with the given options and using the provided logger.
Expand Down Expand Up @@ -92,7 +84,7 @@ func (c *DefaultClient) ListFiles(ctx context.Context, bucket, pattern string) (
params.Prefix = &prefix
}

c.Logger.Info("listing files on bucket: %q with prefix: %q that follows pattern: %q", bucket, prefix, pattern)
c.Logger.Debug("listing files on bucket: %q with prefix: %q that follows pattern: %q", bucket, prefix, pattern)
p := s3.NewListObjectsV2Paginator(c.Svc, params)

for p.HasMorePages() {
Expand All @@ -108,7 +100,7 @@ func (c *DefaultClient) ListFiles(ctx context.Context, bucket, pattern string) (
}
}
}
c.Logger.Info("found: %d file(s) on bucket: %q that follows pattern: %q", len(files), bucket, pattern)
c.Logger.Debug("found: %d file(s) on bucket: %q that follows pattern: %q", len(files), bucket, pattern)
return files, nil
}

Expand All @@ -126,137 +118,86 @@ func (c *DefaultClient) ListFiles(ctx context.Context, bucket, pattern string) (
return files, nil
}

// ReadFiles reads a list of files from the specified bucket and sends their contents
// through the provided channel.
//
// The function takes in a context object to cancel long-running operations, the name
// of the bucket, a slice of strings representing the names of the files to read, and
// a channel of plugin.Message to send the contents of the files.
//
// The function returns an error if there is a problem reading the files or sending
// the messages.
func (c *DefaultClient) ReadFiles(ctx context.Context, bucket string, files []string, concurrency int, ch chan<- Message) error {
// Create a done channel to signal when all the files have been processed
done := make(chan bool)
// Create an error channel to handle any errors that occur while processing the files
// ReadFile reads the specified file from the given S3 bucket and sends its contents
// line by line through a channel. It uses an adaptive buffering mechanism to handle
// large lines of text up to a specified maximum size.
func (c *DefaultClient) ReadFile(ctx context.Context, bucket string, file string, initialBufferSize int, maxBufferSize int) (<-chan string, <-chan error) {
// Channels to return the file contents and any potential errors.
out := make(chan string)
errChan := make(chan error)

// isCritical is a helper function to determine if an error is critical and should cause processing to stop
isCritical := func(err error) bool {
// TODO: add logic to determine if an error is critical
return false
}

var wg sync.WaitGroup

if concurrency == 0 {
concurrency = 1
}
// TODO: make this configurable.
poolSize := concurrency

// Create a semaphore to limit the number of files being processed concurrently
semaphore := make(chan struct{}, poolSize)

// Iterate over the files and process them concurrently
for _, file := range files {
// Acquire a slot in the semaphore
semaphore <- struct{}{}
// Add a waiting group for the file being processed
wg.Add(1)
go func() {
// Always close the output channel when done.
defer close(out)

go func(filename string) {
// Release the slot in the semaphore and decrement the waiting group when the function exits
defer func() {
wg.Done()
<-semaphore
}()
// Log start of file processing.
c.Logger.Info("Started processing file: %s from bucket: %s", file, bucket)

// Log that the filename processing has started
c.Logger.Info("started processing filename: %s from bucket: %s", filename, bucket)
// Get the filename from the S3 bucket
resp, err := c.Svc.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &filename,
})
// Get the specified file from the S3 bucket.
resp, err := c.Svc.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &file,
})
if err != nil {
// On error, send to error channel and exit.
errChan <- err
return
}
// Ensure the file's body stream is closed when done.
// Close the filename body when the function exits
defer func(Body io.ReadCloser) {
err = Body.Close()
if err != nil {
// Send the error to the error channel
errChan <- err
return
}
// Close the filename body when the function exits
defer func(Body io.ReadCloser) {
err = Body.Close()
if err != nil {
// Send the error to the error channel
errChan <- err
return
}
}(resp.Body)
}(resp.Body)

c.Logger.Info("getting a filename reader for filename: %q and content-type %q", filename, *resp.ContentType)

// Get a reader for the filename based on its content type
reader, err := GetFileReader(filename)(resp.Body)
// Get a reader for the file based on its format/type.
reader, err := GetFileReader(file)(resp.Body)
if err != nil {
// On error, send to error channel and exit.
errChan <- err
return
}
// Ensure the reader is closed when done.
defer func(reader io.ReadCloser) {
err := reader.Close()
if err != nil {
// Send the error to the error channel
errChan <- err
return
}
defer func() {
err := reader.Close()
if err != nil {
c.Logger.Error("error closing reader fp: %w", err)
}
}()
}(reader)

// Use a buffered reader to read the filename line by line
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
// Send each line of the filename as a message on the channel
ch <- Message{
Time: time.Now(),
Record: map[string]string{
"_raw": scanner.Text(),
"bucket": bucket,
"file": filename,
},
}
}
// If there is an error with the scanner, send the error to the errChan
// channel and return from the function.
if err := scanner.Err(); err != nil {
errChan <- err
return
}
c.Logger.Info("completed processing of filename: %s on bucket: %s", filename, bucket)
}(file)
}
// Create a scanner to read the file contents.
scanner := bufio.NewScanner(reader)

// Start a goroutine that waits for all the file processing goroutines to finish.
// When they are all done, send a message on the done channel.
go func() {
wg.Wait()
done <- true
}()
// Initialize a buffer for the scanner, setting its initial and maximum sizes.
buf := make([]byte, 0, initialBufferSize)
scanner.Buffer(buf, maxBufferSize)

// Read the file line by line.
for scanner.Scan() {
out <- scanner.Text()
}

// Loop indefinitely until the context is done or the done channel is closed.
for {
select {
case <-ctx.Done():
// If the context is done, return the context's error.
return ctx.Err()
case <-done:
// If the done channel is closed, close the done channel and return nil.
close(done)
return nil
case err := <-errChan:
// If there is an error on the errChan channel, log it and check if it is critical.
// If it is critical, return the error.
c.Logger.Error("error while processing file from s3 bucket:%w", err)
if isCritical(err) {
return err
// Check for any scanning errors.
if err := scanner.Err(); err != nil {
// If the error is due to a line being too long, log a specific message.
if errors.Is(err, bufio.ErrTooLong) {
c.Logger.Error("Encountered a line that was too long to read in file: %s from bucket: %s, exceeds > %d", file, bucket, maxBufferSize)
}

// Send the error to the error channel and exit.
errChan <- err
return
}
}

// Log completion of file processing.
c.Logger.Info("Completed processing of file: %s on bucket: %s", file, bucket)
}()

// Return channels to the caller.
return out, errChan
}
Loading