diff --git a/docs/fixtures/readme.md b/docs/fixtures/readme.md index 4b768cb46..a19b6fc07 100644 --- a/docs/fixtures/readme.md +++ b/docs/fixtures/readme.md @@ -49,7 +49,7 @@ func main() { or for example when using the `BlobFixtureWriterFactory`: ```yaml -blobstore: +blob: blobconfig: bucket: s3-fixtures-bucket ``` diff --git a/pkg/blob/runner.go b/pkg/blob/runner.go index 1a4462ab8..1b710fe6c 100644 --- a/pkg/blob/runner.go +++ b/pkg/blob/runner.go @@ -19,25 +19,26 @@ import ( const ( metricName = "BlobBatchRunner" - operationRead = "Read" - operationWrite = "Write" operationCopy = "Copy" operationDelete = "Delete" + operationRead = "Read" + operationWrite = "Write" ) type BatchRunnerSettings struct { - ReaderRunnerCount int `cfg:"reader_runner_count" default:"10"` - WriterRunnerCount int `cfg:"writer_runner_count" default:"10"` - CopyRunnerCount int `cfg:"copy_runner_count" default:"10"` - DeleteRunnerCount int `cfg:"delete_runner_count" default:"10"` + ClientName string `cfg:"client_name" default:"default"` + CopyRunnerCount int `cfg:"copy_runner_count" default:"10"` + DeleteRunnerCount int `cfg:"delete_runner_count" default:"10"` + ReaderRunnerCount int `cfg:"reader_runner_count" default:"10"` + WriterRunnerCount int `cfg:"writer_runner_count" default:"10"` } var br = struct { sync.Mutex - instance *batchRunner + instance BatchRunner }{} -func ProvideBatchRunner() kernel.ModuleFactory { +func ProvideBatchRunner(name string) kernel.ModuleFactory { br.Lock() defer br.Unlock() @@ -47,7 +48,7 @@ func ProvideBatchRunner() kernel.ModuleFactory { } var err error - br.instance, err = NewBatchRunner(ctx, config, logger) + br.instance, err = NewBatchRunner(ctx, config, logger, name) return br.instance, err } @@ -63,22 +64,22 @@ type batchRunner struct { kernel.ServiceStage logger log.Logger - metric metric.Writer - client gosoS3.Client channels *BatchRunnerChannels + client gosoS3.Client + metric metric.Writer settings *BatchRunnerSettings } -func NewBatchRunner(ctx context.Context, config cfg.Config, logger log.Logger) (*batchRunner, error) { +func NewBatchRunner(ctx context.Context, config cfg.Config, logger log.Logger, name string) (BatchRunner, error) { settings := &BatchRunnerSettings{} - config.UnmarshalKey("blob", settings) + config.UnmarshalKey(fmt.Sprintf("blob.%s", name), settings) defaultMetrics := getDefaultRunnerMetrics() metricWriter := metric.NewWriter(defaultMetrics...) - s3Client, err := gosoS3.ProvideClient(ctx, config, logger, "default") + s3Client, err := gosoS3.ProvideClient(ctx, config, logger, settings.ClientName) if err != nil { - return nil, fmt.Errorf("can not create s3 client default: %w", err) + return nil, fmt.Errorf("can not create s3 client with name %s: %w", settings.ClientName, err) } runner := &batchRunner{ diff --git a/pkg/blob/service.go b/pkg/blob/service.go index ef56b6987..a5d64a079 100644 --- a/pkg/blob/service.go +++ b/pkg/blob/service.go @@ -19,10 +19,14 @@ type Service struct { client gosoS3.Client } -func NewService(ctx context.Context, config cfg.Config, logger log.Logger) (*Service, error) { - s3Client, err := gosoS3.ProvideClient(ctx, config, logger, "default") +func NewService(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*Service, error) { + if name == "" { + name = "default" + } + + s3Client, err := gosoS3.ProvideClient(ctx, config, logger, name) if err != nil { - return nil, fmt.Errorf("can not create s3 client default: %w", err) + return nil, fmt.Errorf("can not create s3 client with name %s: %w", name, err) } return &Service{ diff --git a/pkg/blob/store.go b/pkg/blob/store.go index 2d9f8e826..87d1e5609 100644 --- a/pkg/blob/store.go +++ b/pkg/blob/store.go @@ -25,33 +25,37 @@ const ( ) type Object struct { - Key *string - Body Stream - ACL types.ObjectCannedACL - Exists bool - Error error + ACL types.ObjectCannedACL + Body Stream + + bucket *string ContentEncoding *string ContentType *string - bucket *string + Error error + + Exists bool + Key *string prefix *string wg *sync.WaitGroup } type CopyObject struct { - Key *string - SourceKey *string - SourceBucket *string - ACL types.ObjectCannedACL - Error error + ACL types.ObjectCannedACL + + bucket *string ContentEncoding *string ContentType *string - bucket *string - prefix *string - wg *sync.WaitGroup + Error error + + Key *string + prefix *string + SourceBucket *string + SourceKey *string + wg *sync.WaitGroup } type ( @@ -61,8 +65,9 @@ type ( type Settings struct { cfg.AppId - Bucket string `cfg:"bucket"` - Prefix string `cfg:"prefix"` + Bucket string `cfg:"bucket"` + ClientName string `cfg:"client_name" default:"default"` + Prefix string `cfg:"prefix"` } //go:generate mockery --name Store @@ -83,7 +88,8 @@ type Store interface { var _ Store = &s3Store{} type s3Store struct { - logger log.Logger + logger log.Logger + channels *BatchRunnerChannels client gosoS3.Client @@ -114,16 +120,11 @@ func CreateKey() string { return namingStrategy() } -func NewStore(ctx context.Context, config cfg.Config, logger log.Logger, name string) (*s3Store, error) { +func NewStore(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Store, error) { channels := ProvideBatchRunnerChannels(config) - s3Client, err := gosoS3.ProvideClient(ctx, config, logger, "default") - if err != nil { - return nil, fmt.Errorf("can not create s3 client default: %w", err) - } - var settings Settings - key := fmt.Sprintf("blobstore.%s", name) + key := fmt.Sprintf("blob.%s", name) config.UnmarshalKey(key, &settings) settings.AppId.PadFromConfig(config) @@ -131,6 +132,11 @@ func NewStore(ctx context.Context, config cfg.Config, logger log.Logger, name st settings.Bucket = fmt.Sprintf("%s-%s-%s", settings.Project, settings.Environment, settings.Family) } + s3Client, err := gosoS3.ProvideClient(ctx, config, logger, settings.ClientName) + if err != nil { + return nil, fmt.Errorf("can not create s3 client with name %s: %w", settings.ClientName, err) + } + store := NewStoreWithInterfaces(logger, channels, s3Client, settings) autoCreate := dx.ShouldAutoCreate(config) @@ -143,7 +149,7 @@ func NewStore(ctx context.Context, config cfg.Config, logger log.Logger, name st return store, nil } -func NewStoreWithInterfaces(logger log.Logger, channels *BatchRunnerChannels, client gosoS3.Client, settings Settings) *s3Store { +func NewStoreWithInterfaces(logger log.Logger, channels *BatchRunnerChannels, client gosoS3.Client, settings Settings) Store { return &s3Store{ logger: logger, channels: channels, @@ -164,6 +170,7 @@ func (s *s3Store) CreateBucket(ctx context.Context) error { if isBucketAlreadyExistsError(err) { s.logger.Info("s3 bucket %s did already exist", *s.bucket) + return nil } @@ -172,6 +179,7 @@ func (s *s3Store) CreateBucket(ctx context.Context) error { } s.logger.Info("created s3 bucket %s", *s.bucket) + return nil } @@ -322,6 +330,7 @@ func getFullKey(prefixPtr, keyPtr *string) string { fullKey := fmt.Sprintf("%s/%s", prefix, key) fullKey = strings.TrimLeft(fullKey, "/") + return fullKey } diff --git a/pkg/fixtures/purger_blob.go b/pkg/fixtures/purger_blob.go index 5a044c150..8b1f3b405 100644 --- a/pkg/fixtures/purger_blob.go +++ b/pkg/fixtures/purger_blob.go @@ -22,7 +22,7 @@ func newBlobPurger(ctx context.Context, config cfg.Config, logger log.Logger, se return nil, fmt.Errorf("can not create blob store: %w", err) } - br, err := blob.NewBatchRunner(ctx, config, logger) + br, err := blob.NewBatchRunner(ctx, config, logger, settings.ConfigName) if err != nil { return nil, fmt.Errorf("can not create blob batch runner: %w", err) } diff --git a/pkg/fixtures/writer_blob.go b/pkg/fixtures/writer_blob.go index a949bd87a..0861abb91 100644 --- a/pkg/fixtures/writer_blob.go +++ b/pkg/fixtures/writer_blob.go @@ -41,7 +41,7 @@ func BlobFixtureWriterFactory(settings *BlobFixturesSettings) FixtureWriterFacto return nil, fmt.Errorf("can not create blob store: %w", err) } - br, err := blob.NewBatchRunner(ctx, config, logger) + br, err := blob.NewBatchRunner(ctx, config, logger, settings.ConfigName) if err != nil { return nil, fmt.Errorf("can not create blob batch runner: %w", err) } diff --git a/pkg/parquet/file_recorder.go b/pkg/parquet/file_recorder.go index 20935fcfa..afe37036d 100644 --- a/pkg/parquet/file_recorder.go +++ b/pkg/parquet/file_recorder.go @@ -14,10 +14,10 @@ import ( //go:generate mockery --name FileRecorder type FileRecorder interface { + DeleteRecordedFiles(ctx context.Context) error Files() []File RecordFile(bucket string, key string) RenameRecordedFiles(ctx context.Context, newPrefix string) error - DeleteRecordedFiles(ctx context.Context) error } type File struct { @@ -27,9 +27,9 @@ type File struct { type s3FileRecorder struct { logger log.Logger - s3Client gosoS3.Client - lck sync.Mutex files []File + lck sync.Mutex + s3Client gosoS3.Client } type nopRecorder struct{} @@ -53,10 +53,14 @@ func NewNopRecorder() FileRecorder { return nopRecorder{} } -func NewS3FileRecorder(ctx context.Context, config cfg.Config, logger log.Logger) (FileRecorder, error) { - s3Client, err := gosoS3.ProvideClient(ctx, config, logger, "default") +func NewS3FileRecorder(ctx context.Context, config cfg.Config, logger log.Logger, name string) (FileRecorder, error) { + if name == "" { + name = "default" + } + + s3Client, err := gosoS3.ProvideClient(ctx, config, logger, name) if err != nil { - return nil, fmt.Errorf("can not create s3 client default: %w", err) + return nil, fmt.Errorf("can not create s3 client with name %s: %w", name, err) } return NewS3FileRecorderWithInterfaces(logger, s3Client), nil diff --git a/pkg/parquet/parquet.go b/pkg/parquet/parquet.go index 5cd422f1a..e24310af8 100644 --- a/pkg/parquet/parquet.go +++ b/pkg/parquet/parquet.go @@ -29,6 +29,7 @@ func RegisterS3PrefixNamingStrategy(name string, strategy S3PrefixNamingStrategy } type ReaderSettings struct { + ClientName string `cfg:"client_name" default:"default"` ModelId mdl.ModelId NamingStrategy string Recorder FileRecorder diff --git a/pkg/parquet/reader.go b/pkg/parquet/reader.go index 8cc8d900e..6ad39523a 100644 --- a/pkg/parquet/reader.go +++ b/pkg/parquet/reader.go @@ -24,8 +24,8 @@ import ( ) type Progress struct { - FileCount int Current int + FileCount int } type ( @@ -42,18 +42,18 @@ type Reader interface { } type s3Reader struct { - logger log.Logger - s3Client gosoS3.Client + logger log.Logger modelId mdl.ModelId prefixNamingStrategy S3PrefixNamingStrategy recorder FileRecorder + s3Client gosoS3.Client } -func NewReader(ctx context.Context, config cfg.Config, logger log.Logger, settings *ReaderSettings) (*s3Reader, error) { - s3Client, err := gosoS3.ProvideClient(ctx, config, logger, "default") +func NewReader(ctx context.Context, config cfg.Config, logger log.Logger, settings *ReaderSettings) (Reader, error) { + s3Client, err := gosoS3.ProvideClient(ctx, config, logger, settings.ClientName) if err != nil { - return nil, fmt.Errorf("can not create s3 client default: %w", err) + return nil, fmt.Errorf("can not create s3 client with name %s: %w", settings.ClientName, err) } prefixNaming, exists := s3PrefixNamingStrategies[settings.NamingStrategy] @@ -76,7 +76,7 @@ func NewReaderWithInterfaces( modelId mdl.ModelId, prefixNaming S3PrefixNamingStrategy, recorder FileRecorder, -) *s3Reader { +) Reader { return &s3Reader{ logger: logger, s3Client: s3Client, @@ -166,6 +166,7 @@ func (r *s3Reader) ReadDateAsync(ctx context.Context, datetime time.Time, target if !ok { stop = true + return nil } diff --git a/pkg/parquet/writer.go b/pkg/parquet/writer.go index 1fd8670e5..54c29d629 100644 --- a/pkg/parquet/writer.go +++ b/pkg/parquet/writer.go @@ -19,10 +19,11 @@ import ( ) type WriterSettings struct { + ClientName string `cfg:"client_name" default:"default"` ModelId mdl.ModelId NamingStrategy string - Tags map[string]string Recorder FileRecorder + Tags map[string]string } //go:generate mockery --name Writer @@ -32,21 +33,22 @@ type Writer interface { } type s3Writer struct { - logger log.Logger - s3Client gosoS3.Client + logger log.Logger modelId mdl.ModelId prefixNamingStrategy S3PrefixNamingStrategy - tags map[string]string recorder FileRecorder + s3Client gosoS3.Client + + tags map[string]string } -func NewWriter(ctx context.Context, config cfg.Config, logger log.Logger, settings *WriterSettings) (*s3Writer, error) { +func NewWriter(ctx context.Context, config cfg.Config, logger log.Logger, settings *WriterSettings) (Writer, error) { settings.ModelId.PadFromConfig(config) - s3Client, err := gosoS3.ProvideClient(ctx, config, logger, "default") + s3Client, err := gosoS3.ProvideClient(ctx, config, logger, settings.ClientName) if err != nil { - return nil, fmt.Errorf("can not create s3 client default: %w", err) + return nil, fmt.Errorf("can not create s3 client with name %s: %w", settings.ClientName, err) } prefixNaming, exists := s3PrefixNamingStrategies[settings.NamingStrategy] @@ -70,7 +72,7 @@ func NewWriterWithInterfaces( prefixNaming S3PrefixNamingStrategy, tags map[string]string, recorder FileRecorder, -) *s3Writer { +) Writer { combinedTags := map[string]string{ "Project": modelId.Project, "Environment": modelId.Environment, diff --git a/test/fixtures/s3/config.test.yml b/test/fixtures/s3/config.test.yml index 75b252ffd..88a245762 100644 --- a/test/fixtures/s3/config.test.yml +++ b/test/fixtures/s3/config.test.yml @@ -12,7 +12,7 @@ cloud: default: usePathStyle: true -blobstore: +blob: test: bucket: s3-fixtures-test-bucket diff --git a/test/fixtures/s3/s3_test.go b/test/fixtures/s3/s3_test.go index 9682cf98c..1027bed69 100644 --- a/test/fixtures/s3/s3_test.go +++ b/test/fixtures/s3/s3_test.go @@ -37,7 +37,7 @@ func (s *S3TestSuite) TestS3() { s.NoError(err) s3Client := s.Env().S3("default").Client() - bucketName := s.Env().Config().GetString("blobstore.test.bucket") + bucketName := s.Env().Config().GetString("blob.test.bucket") input := &s3.GetObjectInput{ Bucket: aws.String(bucketName), @@ -81,7 +81,7 @@ func (s *S3TestSuite) TestS3WithPurge() { s.NoError(err) s3Client := s.Env().S3("default").Client() - bucketName := s.Env().Config().GetString("blobstore.test.bucket") + bucketName := s.Env().Config().GetString("blob.test.bucket") input := &s3.GetObjectInput{ Bucket: aws.String(bucketName),