Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copilot][flytedirectory] multipart blob download #5715

Merged
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
43a0b18
add download multipart blob
wayner0628 Sep 1, 2024
6f78352
recursively process subparts
wayner0628 Sep 1, 2024
69514d6
implement GetItems function
wayner0628 Sep 4, 2024
f13022b
add unit testing
wayner0628 Sep 4, 2024
8fae9f9
Parallelly handle blob items
wayner0628 Sep 4, 2024
e82c5de
fix lint error
wayner0628 Sep 4, 2024
d7c4686
implement GetItems function
wayner0628 Sep 4, 2024
ceaa72d
add mutex avoid racing
wayner0628 Sep 4, 2024
1f0b195
avoid infinite call
wayner0628 Sep 4, 2024
19b0ae8
protect critical variables
wayner0628 Sep 5, 2024
b948aee
avoid infinite call
wayner0628 Sep 5, 2024
c88813f
lint
wayner0628 Sep 5, 2024
df9b8ed
add more unit tests
wayner0628 Sep 5, 2024
8150baa
add more unit tests
wayner0628 Sep 5, 2024
672b711
fix mock
wayner0628 Sep 5, 2024
da3de3c
Merge remote-tracking branch 'origin/master' into feature/download-mu…
wayner0628 Sep 15, 2024
96c4177
Accept incoming changes
wayner0628 Sep 15, 2024
ad12330
Accept incoming changes
wayner0628 Sep 15, 2024
65611c0
multipart blob download based on new api
wayner0628 Sep 15, 2024
38a030b
cache store stop listing at end cursor
wayner0628 Sep 15, 2024
abf7f6a
lint
wayner0628 Sep 15, 2024
2703848
remove old api mock
wayner0628 Sep 15, 2024
99847bd
remove old api mock
wayner0628 Sep 15, 2024
e008444
remove old api mock
wayner0628 Sep 15, 2024
acc16c8
update mem_store List to return global path
wayner0628 Oct 22, 2024
7ca6af1
change mkdir perm
wayner0628 Nov 7, 2024
ac2940a
add comments and handle more errors
wayner0628 Nov 7, 2024
27cdeee
Merge branch 'master' into feature/download-multipart-blob
wayner0628 Nov 8, 2024
bf65836
lint
wayner0628 Nov 8, 2024
db481d0
address race condition and aggregate errors
wayner0628 Nov 8, 2024
03e8221
fix tests
Future-Outlier Nov 8, 2024
dbbd8c3
err msg enhancement
Future-Outlier Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 101 additions & 20 deletions flytecopilot/data/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"sync"

"github.com/ghodss/yaml"
"github.com/golang/protobuf/jsonpb"
Expand All @@ -31,40 +33,120 @@
mode core.IOStrategy_DownloadMode
}

// TODO add support for multipart blobs
func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toFilePath string) (interface{}, error) {
ref := storage.DataReference(blob.Uri)
scheme, _, _, err := ref.Split()
func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath string) (interface{}, error) {
blobRef := storage.DataReference(blob.Uri)
scheme, _, _, err := blobRef.Split()
if err != nil {
return nil, errors.Wrapf(err, "Blob uri incorrectly formatted")
}
var reader io.ReadCloser
if scheme == "http" || scheme == "https" {
reader, err = DownloadFileFromHTTP(ctx, ref)
} else {
if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART {
logger.Warnf(ctx, "Currently only single part blobs are supported, we will force multipart to be 'path/00000'")
ref, err = d.store.ConstructReference(ctx, ref, "000000")
if err != nil {

if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART {
maxItems := 100
cursor := storage.NewCursorAtStart()
var items []storage.DataReference
var absPaths []string
for {
items, cursor, err = d.store.List(ctx, blobRef, maxItems, cursor)
if err != nil || len(items) == 0 {
logger.Errorf(ctx, "failed to collect items from multipart blob [%s]", blobRef)
return nil, err
}
for _, item := range items {
absPaths = append(absPaths, item.String())
}
if storage.IsCursorEnd(cursor) {
break
}
}
reader, err = DownloadFileFromStorage(ctx, ref, d.store)

success := 0
var mu sync.Mutex
var wg sync.WaitGroup
for _, absPath := range absPaths {
absPath := absPath

wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if err := recover(); err != nil {
logger.Errorf(ctx, "recover receives error: %s", err)
}

Check warning on line 74 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L73-L74

Added lines #L73 - L74 were not covered by tests
}()

ref := storage.DataReference(absPath)
reader, err := DownloadFileFromStorage(ctx, ref, d.store)
if err != nil {
logger.Errorf(ctx, "Failed to download from ref [%s]", ref)
return
}

Check warning on line 82 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L80-L82

Added lines #L80 - L82 were not covered by tests
defer func() {
err := reader.Close()
if err != nil {
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err)
}

Check warning on line 87 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L86-L87

Added lines #L86 - L87 were not covered by tests
}()

_, _, k, err := ref.Split()
if err != nil {
logger.Errorf(ctx, "Failed to parse ref [%s]", ref)
return
}

Check warning on line 94 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L92-L94

Added lines #L92 - L94 were not covered by tests
newPath := filepath.Join(toPath, k)
dir := filepath.Dir(newPath)

mu.Lock()
// 0755: the directory can be read by anyone but can only be written by the owner
os.MkdirAll(dir, 0755)
mu.Unlock()
writer, err := os.Create(newPath)
if err != nil {
logger.Errorf(ctx, "failed to open file at path %s", newPath)
return
}

Check warning on line 106 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L104-L106

Added lines #L104 - L106 were not covered by tests
defer func() {
err := writer.Close()
if err != nil {
logger.Errorf(ctx, "failed to close File write stream. Error: %s", err)
}

Check warning on line 111 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L110-L111

Added lines #L110 - L111 were not covered by tests
}()

_, err = io.Copy(writer, reader)
if err != nil {
logger.Errorf(ctx, "failed to write remote data to local filesystem")
return
}

Check warning on line 118 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L116-L118

Added lines #L116 - L118 were not covered by tests
mu.Lock()
success += 1
mu.Unlock()
}()
}
wg.Wait()
logger.Infof(ctx, "Successfully copied [%d] remote files from [%s] to local [%s]", success, blobRef, toPath)
return toPath, nil
}

// reader should be declared here (avoid being shared across all goroutines)
var reader io.ReadCloser
if scheme == "http" || scheme == "https" {
reader, err = DownloadFileFromHTTP(ctx, blobRef)
} else {
reader, err = DownloadFileFromStorage(ctx, blobRef, d.store)
}
if err != nil {
logger.Errorf(ctx, "Failed to download from ref [%s]", ref)
logger.Errorf(ctx, "Failed to download from ref [%s]", blobRef)

Check warning on line 137 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L137

Added line #L137 was not covered by tests
return nil, err
}
defer func() {
err := reader.Close()
if err != nil {
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err)
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", blobRef, err)

Check warning on line 143 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L143

Added line #L143 was not covered by tests
}
}()

writer, err := os.Create(toFilePath)
writer, err := os.Create(toPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to open file at path %s", toFilePath)
return nil, errors.Wrapf(err, "failed to open file at path %s", toPath)

Check warning on line 149 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L149

Added line #L149 was not covered by tests
}
defer func() {
err := writer.Close()
Expand All @@ -76,12 +158,11 @@
if err != nil {
return nil, errors.Wrapf(err, "failed to write remote data to local filesystem")
}
logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, ref, toFilePath)
return toFilePath, nil
logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, blobRef, toPath)
return toPath, nil
}

func (d Downloader) handleSchema(ctx context.Context, schema *core.Schema, toFilePath string) (interface{}, error) {
// TODO Handle schema type
return d.handleBlob(ctx, &core.Blob{Uri: schema.Uri, Metadata: &core.BlobMetadata{Type: &core.BlobType{Dimensionality: core.BlobType_MULTIPART}}}, toFilePath)
}

Expand Down
151 changes: 151 additions & 0 deletions flytecopilot/data/download_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package data

import (
"bytes"
"context"
"os"
"path/filepath"
"testing"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"

"github.com/stretchr/testify/assert"
)

func TestHandleBlobMultipart(t *testing.T) {
t.Run("Successful Query", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
ref := storage.DataReference("s3://container/folder/file1")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))
ref = storage.DataReference("s3://container/folder/file2")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/folder",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_MULTIPART,
},
},
}

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
for _, file := range []string{"file1", "file2"} {
if _, err := os.Stat(filepath.Join(toPath, "folder", file)); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", file)
}
}
})

t.Run("No Items", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/folder",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_MULTIPART,
},
},
}

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.Error(t, err)
assert.Nil(t, result)
})
}

func TestHandleBlobSinglePart(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
ref := storage.DataReference("s3://container/file")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/file",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_SINGLE,
},
},
}

toPath := "./input"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete file: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
if _, err := os.Stat(toPath); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", toPath)
}
}

func TestHandleBlobHTTP(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
d := Downloader{store: s}

blob := &core.Blob{
Uri: "https://raw.githubusercontent.com/flyteorg/flyte/master/README.md",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_SINGLE,
},
},
}

toPath := "./input"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete file: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
if _, err := os.Stat(toPath); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", toPath)
}
}
16 changes: 15 additions & 1 deletion flytestdlib/storage/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"io"
"io/ioutil"
"os"
"strings"
)

type rawFile = []byte
Expand Down Expand Up @@ -55,7 +56,20 @@
}

func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) {
return nil, NewCursorAtEnd(), fmt.Errorf("Not implemented yet")
var items []DataReference
prefix := strings.TrimSuffix(string(reference), "/") + "/"

for ref := range s.cache {
if strings.HasPrefix(ref.String(), prefix) {
items = append(items, ref)
}

Check warning on line 65 in flytestdlib/storage/mem_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/mem_store.go#L59-L65

Added lines #L59 - L65 were not covered by tests
}

if len(items) == 0 {
return nil, NewCursorAtEnd(), os.ErrNotExist
}

Check warning on line 70 in flytestdlib/storage/mem_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/mem_store.go#L68-L70

Added lines #L68 - L70 were not covered by tests

return items, NewCursorAtEnd(), nil

Check warning on line 72 in flytestdlib/storage/mem_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/mem_store.go#L72

Added line #L72 was not covered by tests
}

func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
Expand Down
6 changes: 5 additions & 1 deletion flytestdlib/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@
}
}

func IsCursorEnd(cursor Cursor) bool {
return cursor.cursorState == AtEndCursorState

Check warning on line 79 in flytestdlib/storage/storage.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/storage.go#L78-L79

Added lines #L78 - L79 were not covered by tests
}

// DataStore is a simplified interface for accessing and storing data in one of the Cloud stores.
// Today we rely on Stow for multi-cloud support, but this interface abstracts that part
type DataStore struct {
Expand Down Expand Up @@ -113,7 +117,7 @@
// Head gets metadata about the reference. This should generally be a light weight operation.
Head(ctx context.Context, reference DataReference) (Metadata, error)

// List gets a list of items given a prefix, using a paginated API
// List gets a list of items (relative path to the reference input) given a prefix, using a paginated API
List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)

// ReadRaw retrieves a byte array from the Blob store or an error
Expand Down
Loading