Skip to content

Commit

Permalink
s3: make client name configurable in s3 dependent implementations;
Browse files Browse the repository at this point in the history
  • Loading branch information
Bogdan Finn authored and bogdanfinn committed May 15, 2023
1 parent 667a0af commit 424d0e8
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 70 deletions.
2 changes: 1 addition & 1 deletion docs/fixtures/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func main() {
or for example when using the `BlobFixtureWriterFactory`:

```yaml
blobstore:
blob:
blobconfig:
bucket: s3-fixtures-bucket
```
Expand Down
31 changes: 16 additions & 15 deletions pkg/blob/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,26 @@ import (

const (
metricName = "BlobBatchRunner"
operationRead = "Read"
operationWrite = "Write"
operationCopy = "Copy"
operationDelete = "Delete"
operationRead = "Read"
operationWrite = "Write"
)

type BatchRunnerSettings struct {
ReaderRunnerCount int `cfg:"reader_runner_count" default:"10"`
WriterRunnerCount int `cfg:"writer_runner_count" default:"10"`
CopyRunnerCount int `cfg:"copy_runner_count" default:"10"`
DeleteRunnerCount int `cfg:"delete_runner_count" default:"10"`
ClientName string `cfg:"client_name" default:"default"`
CopyRunnerCount int `cfg:"copy_runner_count" default:"10"`
DeleteRunnerCount int `cfg:"delete_runner_count" default:"10"`
ReaderRunnerCount int `cfg:"reader_runner_count" default:"10"`
WriterRunnerCount int `cfg:"writer_runner_count" default:"10"`
}

var br = struct {
sync.Mutex
instance *batchRunner
instance BatchRunner
}{}

func ProvideBatchRunner() kernel.ModuleFactory {
func ProvideBatchRunner(name string) kernel.ModuleFactory {
br.Lock()
defer br.Unlock()

Expand All @@ -47,7 +48,7 @@ func ProvideBatchRunner() kernel.ModuleFactory {
}

var err error
br.instance, err = NewBatchRunner(ctx, config, logger)
br.instance, err = NewBatchRunner(ctx, config, logger, name)

return br.instance, err
}
Expand All @@ -63,22 +64,22 @@ type batchRunner struct {
kernel.ServiceStage

logger log.Logger
metric metric.Writer
client gosoS3.Client
channels *BatchRunnerChannels
client gosoS3.Client
metric metric.Writer
settings *BatchRunnerSettings
}

func NewBatchRunner(ctx context.Context, config cfg.Config, logger log.Logger) (*batchRunner, error) {
func NewBatchRunner(ctx context.Context, config cfg.Config, logger log.Logger, name string) (BatchRunner, error) {
settings := &BatchRunnerSettings{}
config.UnmarshalKey("blob", settings)
config.UnmarshalKey(fmt.Sprintf("blob.%s", name), settings)

defaultMetrics := getDefaultRunnerMetrics()
metricWriter := metric.NewWriter(defaultMetrics...)

s3Client, err := gosoS3.ProvideClient(ctx, config, logger, "default")
s3Client, err := gosoS3.ProvideClient(ctx, config, logger, settings.ClientName)
if err != nil {
return nil, fmt.Errorf("can not create s3 client default: %w", err)
return nil, fmt.Errorf("can not create s3 client with name %s: %w", settings.ClientName, err)
}

runner := &batchRunner{
Expand Down
10 changes: 7 additions & 3 deletions pkg/blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ type Service struct {
client gosoS3.Client
}

func NewService(ctx context.Context, config cfg.Config, logger log.Logger) (*Service, error) {
s3Client, err := gosoS3.ProvideClient(ctx, config, logger, "default")
func NewService(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*Service, error) {
if name == "" {
name = "default"
}

s3Client, err := gosoS3.ProvideClient(ctx, config, logger, name)
if err != nil {
return nil, fmt.Errorf("can not create s3 client default: %w", err)
return nil, fmt.Errorf("can not create s3 client with name %s: %w", name, err)
}

return &Service{
Expand Down
59 changes: 34 additions & 25 deletions pkg/blob/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,37 @@ const (
)

type Object struct {
Key *string
Body Stream
ACL types.ObjectCannedACL
Exists bool
Error error
ACL types.ObjectCannedACL
Body Stream

bucket *string

ContentEncoding *string
ContentType *string

bucket *string
Error error

Exists bool
Key *string
prefix *string
wg *sync.WaitGroup
}

type CopyObject struct {
Key *string
SourceKey *string
SourceBucket *string
ACL types.ObjectCannedACL
Error error
ACL types.ObjectCannedACL

bucket *string

ContentEncoding *string
ContentType *string

bucket *string
prefix *string
wg *sync.WaitGroup
Error error

Key *string
prefix *string
SourceBucket *string
SourceKey *string
wg *sync.WaitGroup
}

type (
Expand All @@ -61,8 +65,9 @@ type (

type Settings struct {
cfg.AppId
Bucket string `cfg:"bucket"`
Prefix string `cfg:"prefix"`
Bucket string `cfg:"bucket"`
ClientName string `cfg:"client_name" default:"default"`
Prefix string `cfg:"prefix"`
}

//go:generate mockery --name Store
Expand All @@ -83,7 +88,8 @@ type Store interface {
var _ Store = &s3Store{}

type s3Store struct {
logger log.Logger
logger log.Logger

channels *BatchRunnerChannels
client gosoS3.Client

Expand Down Expand Up @@ -114,23 +120,23 @@ func CreateKey() string {
return namingStrategy()
}

func NewStore(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*s3Store, error) {
func NewStore(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Store, error) {
channels := ProvideBatchRunnerChannels(config)

s3Client, err := gosoS3.ProvideClient(ctx, config, logger, "default")
if err != nil {
return nil, fmt.Errorf("can not create s3 client default: %w", err)
}

var settings Settings
key := fmt.Sprintf("blobstore.%s", name)
key := fmt.Sprintf("blob.%s", name)
config.UnmarshalKey(key, &settings)
settings.AppId.PadFromConfig(config)

if settings.Bucket == "" {
settings.Bucket = fmt.Sprintf("%s-%s-%s", settings.Project, settings.Environment, settings.Family)
}

s3Client, err := gosoS3.ProvideClient(ctx, config, logger, settings.ClientName)
if err != nil {
return nil, fmt.Errorf("can not create s3 client with name %s: %w", settings.ClientName, err)
}

store := NewStoreWithInterfaces(logger, channels, s3Client, settings)

autoCreate := dx.ShouldAutoCreate(config)
Expand All @@ -143,7 +149,7 @@ func NewStore(ctx context.Context, config cfg.Config, logger log.Logger, name st
return store, nil
}

func NewStoreWithInterfaces(logger log.Logger, channels *BatchRunnerChannels, client gosoS3.Client, settings Settings) *s3Store {
func NewStoreWithInterfaces(logger log.Logger, channels *BatchRunnerChannels, client gosoS3.Client, settings Settings) Store {
return &s3Store{
logger: logger,
channels: channels,
Expand All @@ -164,6 +170,7 @@ func (s *s3Store) CreateBucket(ctx context.Context) error {

if isBucketAlreadyExistsError(err) {
s.logger.Info("s3 bucket %s did already exist", *s.bucket)

return nil
}

Expand All @@ -172,6 +179,7 @@ func (s *s3Store) CreateBucket(ctx context.Context) error {
}

s.logger.Info("created s3 bucket %s", *s.bucket)

return nil
}

Expand Down Expand Up @@ -322,6 +330,7 @@ func getFullKey(prefixPtr, keyPtr *string) string {

fullKey := fmt.Sprintf("%s/%s", prefix, key)
fullKey = strings.TrimLeft(fullKey, "/")

return fullKey
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/fixtures/purger_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func newBlobPurger(ctx context.Context, config cfg.Config, logger log.Logger, se
return nil, fmt.Errorf("can not create blob store: %w", err)
}

br, err := blob.NewBatchRunner(ctx, config, logger)
br, err := blob.NewBatchRunner(ctx, config, logger, settings.ConfigName)
if err != nil {
return nil, fmt.Errorf("can not create blob batch runner: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fixtures/writer_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func BlobFixtureWriterFactory(settings *BlobFixturesSettings) FixtureWriterFacto
return nil, fmt.Errorf("can not create blob store: %w", err)
}

br, err := blob.NewBatchRunner(ctx, config, logger)
br, err := blob.NewBatchRunner(ctx, config, logger, settings.ConfigName)
if err != nil {
return nil, fmt.Errorf("can not create blob batch runner: %w", err)
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/parquet/file_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (

//go:generate mockery --name FileRecorder
type FileRecorder interface {
DeleteRecordedFiles(ctx context.Context) error
Files() []File
RecordFile(bucket string, key string)
RenameRecordedFiles(ctx context.Context, newPrefix string) error
DeleteRecordedFiles(ctx context.Context) error
}

type File struct {
Expand All @@ -27,9 +27,9 @@ type File struct {

type s3FileRecorder struct {
logger log.Logger
s3Client gosoS3.Client
lck sync.Mutex
files []File
lck sync.Mutex
s3Client gosoS3.Client
}

type nopRecorder struct{}
Expand All @@ -53,10 +53,14 @@ func NewNopRecorder() FileRecorder {
return nopRecorder{}
}

func NewS3FileRecorder(ctx context.Context, config cfg.Config, logger log.Logger) (FileRecorder, error) {
s3Client, err := gosoS3.ProvideClient(ctx, config, logger, "default")
func NewS3FileRecorder(ctx context.Context, config cfg.Config, logger log.Logger, name string) (FileRecorder, error) {
if name == "" {
name = "default"
}

s3Client, err := gosoS3.ProvideClient(ctx, config, logger, name)
if err != nil {
return nil, fmt.Errorf("can not create s3 client default: %w", err)
return nil, fmt.Errorf("can not create s3 client with name %s: %w", name, err)
}

return NewS3FileRecorderWithInterfaces(logger, s3Client), nil
Expand Down
1 change: 1 addition & 0 deletions pkg/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func RegisterS3PrefixNamingStrategy(name string, strategy S3PrefixNamingStrategy
}

type ReaderSettings struct {
ClientName string `cfg:"client_name" default:"default"`
ModelId mdl.ModelId
NamingStrategy string
Recorder FileRecorder
Expand Down
15 changes: 8 additions & 7 deletions pkg/parquet/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
)

type Progress struct {
FileCount int
Current int
FileCount int
}

type (
Expand All @@ -42,18 +42,18 @@ type Reader interface {
}

type s3Reader struct {
logger log.Logger
s3Client gosoS3.Client
logger log.Logger

modelId mdl.ModelId
prefixNamingStrategy S3PrefixNamingStrategy
recorder FileRecorder
s3Client gosoS3.Client
}

func NewReader(ctx context.Context, config cfg.Config, logger log.Logger, settings *ReaderSettings) (*s3Reader, error) {
s3Client, err := gosoS3.ProvideClient(ctx, config, logger, "default")
func NewReader(ctx context.Context, config cfg.Config, logger log.Logger, settings *ReaderSettings) (Reader, error) {
s3Client, err := gosoS3.ProvideClient(ctx, config, logger, settings.ClientName)
if err != nil {
return nil, fmt.Errorf("can not create s3 client default: %w", err)
return nil, fmt.Errorf("can not create s3 client with name %s: %w", settings.ClientName, err)
}

prefixNaming, exists := s3PrefixNamingStrategies[settings.NamingStrategy]
Expand All @@ -76,7 +76,7 @@ func NewReaderWithInterfaces(
modelId mdl.ModelId,
prefixNaming S3PrefixNamingStrategy,
recorder FileRecorder,
) *s3Reader {
) Reader {
return &s3Reader{
logger: logger,
s3Client: s3Client,
Expand Down Expand Up @@ -166,6 +166,7 @@ func (r *s3Reader) ReadDateAsync(ctx context.Context, datetime time.Time, target

if !ok {
stop = true

return nil
}

Expand Down
Loading

0 comments on commit 424d0e8

Please sign in to comment.