Skip to content

Commit

Permalink
FMWK-556-restore-folder-validation
Browse files Browse the repository at this point in the history
- added validation of restore folder
  • Loading branch information
filkeith committed Oct 1, 2024
1 parent 4affbb7 commit 0d07702
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 0 deletions.
42 changes: 42 additions & 0 deletions io/aws/s3/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,14 @@ func (r *Reader) StreamFiles(
) {
// If it is a folder, open and return.
if r.isDir {
err := r.checkRestoreDirectory(ctx)
if err != nil {
errorsCh <- err
return
}

r.streamDirectory(ctx, readersCh, errorsCh)

return
}

Expand Down Expand Up @@ -184,3 +191,38 @@ func (r *Reader) streamFile(
func (r *Reader) GetType() string {
return s3type
}

// checkRestoreDirectory checks that the restore directory contains any file.
func (r *Reader) checkRestoreDirectory(ctx context.Context) error {
var continuationToken *string

for {
listResponse, err := r.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: &r.bucketName,
Prefix: &r.prefix,
ContinuationToken: continuationToken,
StartAfter: &r.startAfter,
})

if err != nil {
return fmt.Errorf("failed to list objects: %w", err)
}

for _, p := range listResponse.Contents {
if p.Key == nil || isDirectory(r.prefix, *p.Key) && !r.withNestedDir {
continue
}
// If we found anything, then folder is not empty.
if p.Key != nil {
return nil
}
}

continuationToken = listResponse.NextContinuationToken
if continuationToken == nil {
break
}
}

return fmt.Errorf("%s is empty", r.prefix)
}
35 changes: 35 additions & 0 deletions io/azure/blob/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,14 @@ func (r *Reader) StreamFiles(
) {
// If it is a folder, open and return.
if r.isDir {
err := r.checkRestoreDirectory(ctx)
if err != nil {
errorsCh <- err
return
}

r.streamDirectory(ctx, readersCh, errorsCh)

return
}

Expand Down Expand Up @@ -158,6 +165,34 @@ func (r *Reader) GetType() string {
return azureBlobType
}

// checkRestoreDirectory checks that the restore directory contains any file.
func (r *Reader) checkRestoreDirectory(ctx context.Context) error {
pager := r.client.NewListBlobsFlatPager(r.containerName, &azblob.ListBlobsFlatOptions{
Prefix: &r.prefix,
Marker: &r.marker,
})

for pager.More() {
page, err := pager.NextPage(ctx)
if err != nil {
return fmt.Errorf("failed to get next page: %w", err)
}

for _, blob := range page.Segment.BlobItems {
// Skip files in folders.
if isDirectory(r.prefix, *blob.Name) && !r.withNestedDir {
continue
}
// If we found anything, then folder is not empty.
if blob.Name != nil {
return nil
}
}
}

return fmt.Errorf("%s is empty", r.prefix)
}

func isDirectory(prefix, fileName string) bool {
// If file name ends with / it is 100% dir.
if strings.HasSuffix(fileName, "/") {
Expand Down
41 changes: 41 additions & 0 deletions io/gcp/storage/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,14 @@ func (r *Reader) StreamFiles(
) {
// If it is a folder, open and return.
if r.isDir {
err := r.checkRestoreDirectory(ctx)
if err != nil {
errorsCh <- err
return
}

r.streamDirectory(ctx, readersCh, errorsCh)

return
}

Expand Down Expand Up @@ -172,6 +179,40 @@ func (r *Reader) GetType() string {
return gcpStorageType
}

// checkRestoreDirectory checks that the restore directory contains any file.
func (r *Reader) checkRestoreDirectory(ctx context.Context) error {
it := r.bucketHandle.Objects(ctx, &storage.Query{
Prefix: r.prefix,
StartOffset: r.startOffset,
})

for {
// Iterate over bucket until we're done.
objAttrs, err := it.Next()
if err != nil {
if !errors.Is(err, iterator.Done) {
return fmt.Errorf("failed to read object attr from bucket %s: %w",
r.bucketName, err)
}
// If the previous call to Next returned an error other than iterator.Done, all
// subsequent calls will return the same error. To continue iteration, a new
// `ObjectIterator` must be created.
break
}

// Skip files in folders.
if isDirectory(r.prefix, objAttrs.Name) && !r.withNestedDir {
continue
}
// If we found anything, then folder is not empty.
if objAttrs.Name != "" {
return nil
}
}

return fmt.Errorf("%s is empty", r.prefix)
}

func isDirectory(prefix, fileName string) bool {
// If file name ends with / it is 100% dir.
if strings.HasSuffix(fileName, "/") {
Expand Down

0 comments on commit 0d07702

Please sign in to comment.