Skip to content

Commit

Permalink
FMWK-559 Added read after parameter to cloud readers (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
filkeith authored Oct 1, 2024
1 parent 46159e0 commit 4affbb7
Show file tree
Hide file tree
Showing 13 changed files with 381 additions and 200 deletions.
2 changes: 2 additions & 0 deletions cmd/internal/app/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func mapBackupConfig(
c.NoIndexes = commonParams.NoIndexes
c.RecordsPerSecond = commonParams.RecordsPerSecond
c.FileLimit = backupParams.FileLimit
c.NoUDFs = commonParams.NoUDFs
// The original backup tools have a single parallelism configuration property.
// We may consider splitting the configuration in the future.
c.ParallelWrite = commonParams.Parallel
Expand Down Expand Up @@ -128,6 +129,7 @@ func mapRestoreConfig(
c.BinList = splitByComma(commonParams.BinList)
c.NoRecords = commonParams.NoRecords
c.NoIndexes = commonParams.NoIndexes
c.NoUDFs = commonParams.NoUDFs
c.RecordsPerSecond = commonParams.RecordsPerSecond
c.Parallel = commonParams.Parallel
c.WritePolicy = mapWritePolicy(restoreParams, commonParams)
Expand Down
82 changes: 82 additions & 0 deletions io/aws/s3/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2024 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package s3

type options struct {
// path contains path to file or directory.
path string
// isDir flag describes what we have in path, file or directory.
isDir bool
// isRemovingFiles flag describes should we remove everything from backup folder or not.
isRemovingFiles bool
// validator contains files validator that is applied to files if isDir = true.
validator validator
// withNestedDir describes if we should check for if an object is a directory for read/write operations.
// When we stream files or delete files in folder, we skip directories. This flag will avoid skipping.
// Default: false
withNestedDir bool
// startAfter is where you want Amazon S3 to start listing from. Amazon S3 starts
// listing after this specified key. StartAfter can be any key in the bucket.
startAfter string
}

type Opt func(*options)

// WithDir adds directory to reading/writing files from/to.
func WithDir(path string) Opt {
return func(r *options) {
r.path = path
r.isDir = true
}
}

// WithFile adds a file path to reading/writing from/to.
func WithFile(path string) Opt {
return func(r *options) {
r.path = path
r.isDir = false
}
}

// WithValidator adds validator to Reader, so files will be validated before reading.
// Is used only for Reader.
func WithValidator(v validator) Opt {
return func(r *options) {
r.validator = v
}
}

// WithNestedDir adds withNestedDir = true parameter. That means that we won't skip nested folders.
func WithNestedDir() Opt {
return func(r *options) {
r.withNestedDir = true
}
}

// WithRemoveFiles adds remove files flag, so all files will be removed from backup folder before backup.
// Is used only for Writer.
func WithRemoveFiles() Opt {
return func(r *options) {
r.isRemovingFiles = true
}
}

// WithStartAfter adds start after parameter to list request.
// Is used only for Reader.
func WithStartAfter(v string) Opt {
return func(r *options) {
r.startAfter = v
}
}
49 changes: 1 addition & 48 deletions io/aws/s3/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,54 +42,6 @@ type Reader struct {
prefix string
}

type options struct {
// path contains path to file or directory.
path string
// isDir flag describes what we have in path, file or directory.
isDir bool
// isRemovingFiles flag describes should we remove everything from backup folder or not.
isRemovingFiles bool
// validator contains files validator that is applied to files if isDir = true.
validator validator
// withNestedDir describes if we should check for if an object is a directory for read/write operations.
// When we stream files or delete files in folder, we skip directories. This flag will avoid skipping.
// Default: false
withNestedDir bool
}

type Opt func(*options)

// WithDir adds directory to reading/writing files from/to.
func WithDir(path string) Opt {
return func(r *options) {
r.path = path
r.isDir = true
}
}

// WithFile adds a file path to reading/writing from/to.
func WithFile(path string) Opt {
return func(r *options) {
r.path = path
r.isDir = false
}
}

// WithValidator adds validator to Reader, so files will be validated before reading.
// Is used only for Reader.
func WithValidator(v validator) Opt {
return func(r *options) {
r.validator = v
}
}

// WithNestedDir adds withNestedDir = true parameter. That means that we won't skip nested folders.
func WithNestedDir() Opt {
return func(r *options) {
r.withNestedDir = true
}
}

// NewReader returns new S3 storage reader.
// Must be called with WithDir(path string) or WithFile(path string) - mandatory.
// Can be called with WithValidator(v validator) - optional.
Expand Down Expand Up @@ -166,6 +118,7 @@ func (r *Reader) streamDirectory(
Bucket: &r.bucketName,
Prefix: &r.prefix,
ContinuationToken: continuationToken,
StartAfter: &r.startAfter,
})

if err != nil {
Expand Down
8 changes: 0 additions & 8 deletions io/aws/s3/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,6 @@ type Writer struct {
called atomic.Bool
}

// WithRemoveFiles adds remove files flag, so all files will be removed from backup folder before backup.
// Is used only for Writer.
func WithRemoveFiles() Opt {
return func(r *options) {
r.isRemovingFiles = true
}
}

// NewWriter creates a new writer for S3 storage directory/file writes.
// Must be called with WithDir(path string) or WithFile(path string) - mandatory.
// Can be called with WithRemoveFiles() - optional.
Expand Down
100 changes: 100 additions & 0 deletions io/azure/blob/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2024 Aerospike, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package blob

type options struct {
// path contains path to file or directory.
path string
// isDir flag describes what we have in path, file or directory.
isDir bool
// isRemovingFiles flag describes should we remove everything from backup folder or not.
isRemovingFiles bool
// validator contains files validator that is applied to files if isDir = true.
validator validator
// Concurrency defines the max number of concurrent uploads to be performed to upload the file.
// Each concurrent upload will create a buffer of size BlockSize.
uploadConcurrency int
// withNestedDir describes if we should check for if an object is a directory for read/write operations.
// When we stream files or delete files in folder, we skip directories. This flag will avoid skipping.
// Default: false
withNestedDir bool
// marker is a string value that identifies the portion of the list of containers to be
// returned with the next listing operation.
// The operation returns the NextMarker value within the response body if the listing
// operation did not return all containers remaining to be listed with the current page.
// The NextMarker value can be used
// as the value for the marker parameter in a subsequent call to request the next
// page of list items. The marker value is opaque to the client.
marker string
}

type Opt func(*options)

// WithDir adds directory to reading/writing files from/to.
func WithDir(path string) Opt {
return func(r *options) {
r.path = path
r.isDir = true
}
}

// WithFile adds a file path to reading/writing from/to.
func WithFile(path string) Opt {
return func(r *options) {
r.path = path
r.isDir = false
}
}

// WithValidator adds validator to Reader, so files will be validated before reading.
// Is used only for Reader.
func WithValidator(v validator) Opt {
return func(r *options) {
r.validator = v
}
}

// WithNestedDir adds withNestedDir = true parameter. That means that we won't skip nested folders.
func WithNestedDir() Opt {
return func(r *options) {
r.withNestedDir = true
}
}

// WithRemoveFiles adds remove files flag, so all files will be removed from backup folder before backup.
// Is used only for Writer.
func WithRemoveFiles() Opt {
return func(r *options) {
r.isRemovingFiles = true
}
}

// WithUploadConcurrency define max number of concurrent uploads to be performed to upload the file.
// Is used only for Writer.
func WithUploadConcurrency(v int) Opt {
return func(r *options) {
r.uploadConcurrency = v
}
}

// WithMarker adds marker parameter to list request.
// The Value of marker will be not included in a result.
// You will receive objects after marker.
// Is used only for Reader.
func WithMarker(v string) Opt {
return func(r *options) {
r.marker = v
}
}
52 changes: 1 addition & 51 deletions io/azure/blob/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,57 +41,6 @@ type Reader struct {
prefix string
}

type options struct {
// path contains path to file or directory.
path string
// isDir flag describes what we have in path, file or directory.
isDir bool
// isRemovingFiles flag describes should we remove everything from backup folder or not.
isRemovingFiles bool
// validator contains files validator that is applied to files if isDir = true.
validator validator
// Concurrency defines the max number of concurrent uploads to be performed to upload the file.
// Each concurrent upload will create a buffer of size BlockSize.
uploadConcurrency int
// withNestedDir describes if we should check for if an object is a directory for read/write operations.
// When we stream files or delete files in folder, we skip directories. This flag will avoid skipping.
// Default: false
withNestedDir bool
}

type Opt func(*options)

// WithDir adds directory to reading/writing files from/to.
func WithDir(path string) Opt {
return func(r *options) {
r.path = path
r.isDir = true
}
}

// WithFile adds a file path to reading/writing from/to.
func WithFile(path string) Opt {
return func(r *options) {
r.path = path
r.isDir = false
}
}

// WithValidator adds validator to Reader, so files will be validated before reading.
// Is used only for Reader.
func WithValidator(v validator) Opt {
return func(r *options) {
r.validator = v
}
}

// WithNestedDir adds withNestedDir = true parameter. That means that we won't skip nested folders.
func WithNestedDir() Opt {
return func(r *options) {
r.withNestedDir = true
}
}

// NewReader returns new Azure blob directory/file reader.
// Must be called with WithDir(path string) or WithFile(path string) - mandatory.
// Can be called with WithValidator(v validator) - optional.
Expand Down Expand Up @@ -153,6 +102,7 @@ func (r *Reader) streamDirectory(

pager := r.client.NewListBlobsFlatPager(r.containerName, &azblob.ListBlobsFlatOptions{
Prefix: &r.prefix,
Marker: &r.marker,
})

for pager.More() {
Expand Down
Loading

0 comments on commit 4affbb7

Please sign in to comment.