Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copilot][flytedirectory] multipart blob download #5715

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
43a0b18
add download multipart blob
wayner0628 Sep 1, 2024
6f78352
recursively process subparts
wayner0628 Sep 1, 2024
69514d6
implement GetItems function
wayner0628 Sep 4, 2024
f13022b
add unit testing
wayner0628 Sep 4, 2024
8fae9f9
Parallelly handle blob items
wayner0628 Sep 4, 2024
e82c5de
fix lint error
wayner0628 Sep 4, 2024
d7c4686
implement GetItems function
wayner0628 Sep 4, 2024
ceaa72d
add mutex avoid racing
wayner0628 Sep 4, 2024
1f0b195
avoid infinite call
wayner0628 Sep 4, 2024
19b0ae8
protect critical variables
wayner0628 Sep 5, 2024
b948aee
avoid infinite call
wayner0628 Sep 5, 2024
c88813f
lint
wayner0628 Sep 5, 2024
df9b8ed
add more unit tests
wayner0628 Sep 5, 2024
8150baa
add more unit tests
wayner0628 Sep 5, 2024
672b711
fix mock
wayner0628 Sep 5, 2024
da3de3c
Merge remote-tracking branch 'origin/master' into feature/download-mu…
wayner0628 Sep 15, 2024
96c4177
Accept incoming changes
wayner0628 Sep 15, 2024
ad12330
Accept incoming changes
wayner0628 Sep 15, 2024
65611c0
multipart blob download based on new api
wayner0628 Sep 15, 2024
38a030b
cache store stop listing at end cursor
wayner0628 Sep 15, 2024
abf7f6a
lint
wayner0628 Sep 15, 2024
2703848
remove old api mock
wayner0628 Sep 15, 2024
99847bd
remove old api mock
wayner0628 Sep 15, 2024
e008444
remove old api mock
wayner0628 Sep 15, 2024
acc16c8
update mem_store List to return global path
wayner0628 Oct 22, 2024
7ca6af1
change mkdir perm
wayner0628 Nov 7, 2024
ac2940a
add comments and handle more errors
wayner0628 Nov 7, 2024
27cdeee
Merge branch 'master' into feature/download-multipart-blob
wayner0628 Nov 8, 2024
bf65836
lint
wayner0628 Nov 8, 2024
db481d0
address race condition and aggregate errors
wayner0628 Nov 8, 2024
03e8221
fix tests
Future-Outlier Nov 8, 2024
dbbd8c3
err msg enhancement
Future-Outlier Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 101 additions & 20 deletions flytecopilot/data/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"sync"

"github.com/ghodss/yaml"
"github.com/golang/protobuf/jsonpb"
Expand All @@ -31,40 +33,120 @@ type Downloader struct {
mode core.IOStrategy_DownloadMode
}

// TODO add support for multipart blobs
func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toFilePath string) (interface{}, error) {
ref := storage.DataReference(blob.Uri)
scheme, _, _, err := ref.Split()
func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath string) (interface{}, error) {
blobRef := storage.DataReference(blob.Uri)
wayner0628 marked this conversation as resolved.
Show resolved Hide resolved
scheme, _, _, err := blobRef.Split()
if err != nil {
return nil, errors.Wrapf(err, "Blob uri incorrectly formatted")
}
var reader io.ReadCloser
if scheme == "http" || scheme == "https" {
reader, err = DownloadFileFromHTTP(ctx, ref)
} else {
if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART {
logger.Warnf(ctx, "Currently only single part blobs are supported, we will force multipart to be 'path/00000'")
ref, err = d.store.ConstructReference(ctx, ref, "000000")
if err != nil {

if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART {
maxItems := 100
wayner0628 marked this conversation as resolved.
Show resolved Hide resolved
cursor := storage.NewCursorAtStart()
var items []storage.DataReference
var absPaths []string
for {
items, cursor, err = d.store.List(ctx, blobRef, maxItems, cursor)
if err != nil || len(items) == 0 {
logger.Errorf(ctx, "failed to collect items from multipart blob [%s]", blobRef)
return nil, err
}
for _, item := range items {
absPaths = append(absPaths, item.String())
}
if storage.IsCursorEnd(cursor) {
break
}
}
reader, err = DownloadFileFromStorage(ctx, ref, d.store)

success := 0
wayner0628 marked this conversation as resolved.
Show resolved Hide resolved
var mu sync.Mutex
var wg sync.WaitGroup
for _, absPath := range absPaths {
absPath := absPath

wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if err := recover(); err != nil {
logger.Errorf(ctx, "recover receives error: %s", err)
}
}()

ref := storage.DataReference(absPath)
reader, err := DownloadFileFromStorage(ctx, ref, d.store)
if err != nil {
logger.Errorf(ctx, "Failed to download from ref [%s]", ref)
return
}
defer func() {
err := reader.Close()
if err != nil {
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err)
}
}()

_, _, k, err := ref.Split()
if err != nil {
logger.Errorf(ctx, "Failed to parse ref [%s]", ref)
return
}
newPath := filepath.Join(toPath, k)
dir := filepath.Dir(newPath)

mu.Lock()
// 0777: the directory can be read and written by anyone
os.MkdirAll(dir, 0777)
mu.Unlock()
writer, err := os.Create(newPath)
if err != nil {
wayner0628 marked this conversation as resolved.
Show resolved Hide resolved
logger.Errorf(ctx, "failed to open file at path %s", newPath)
return
}
defer func() {
err := writer.Close()
if err != nil {
logger.Errorf(ctx, "failed to close File write stream. Error: %s", err)
}
}()

_, err = io.Copy(writer, reader)
if err != nil {
logger.Errorf(ctx, "failed to write remote data to local filesystem")
return
}
mu.Lock()
success += 1
mu.Unlock()
}()
}
wg.Wait()
logger.Infof(ctx, "Successfully copied [%d] remote files from [%s] to local [%s]", success, blobRef, toPath)
return toPath, nil
}

// reader should be declared here (avoid being shared across all goroutines)
var reader io.ReadCloser
if scheme == "http" || scheme == "https" {
reader, err = DownloadFileFromHTTP(ctx, blobRef)
} else {
reader, err = DownloadFileFromStorage(ctx, blobRef, d.store)
}
if err != nil {
logger.Errorf(ctx, "Failed to download from ref [%s]", ref)
logger.Errorf(ctx, "Failed to download from ref [%s]", blobRef)
return nil, err
}
defer func() {
err := reader.Close()
if err != nil {
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err)
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", blobRef, err)
}
}()

writer, err := os.Create(toFilePath)
writer, err := os.Create(toPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to open file at path %s", toFilePath)
return nil, errors.Wrapf(err, "failed to open file at path %s", toPath)
}
defer func() {
err := writer.Close()
Expand All @@ -76,12 +158,11 @@ func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toFilePath
if err != nil {
return nil, errors.Wrapf(err, "failed to write remote data to local filesystem")
}
logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, ref, toFilePath)
return toFilePath, nil
logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, blobRef, toPath)
return toPath, nil
}

func (d Downloader) handleSchema(ctx context.Context, schema *core.Schema, toFilePath string) (interface{}, error) {
// TODO Handle schema type
return d.handleBlob(ctx, &core.Blob{Uri: schema.Uri, Metadata: &core.BlobMetadata{Type: &core.BlobType{Dimensionality: core.BlobType_MULTIPART}}}, toFilePath)
}

Expand Down
151 changes: 151 additions & 0 deletions flytecopilot/data/download_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package data

import (
"bytes"
"context"
"os"
"path/filepath"
"testing"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"

"github.com/stretchr/testify/assert"
)

func TestHandleBlobMultipart(t *testing.T) {
t.Run("Successful Query", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
ref := storage.DataReference("s3://container/folder/file1")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))
ref = storage.DataReference("s3://container/folder/file2")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/folder",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_MULTIPART,
},
},
}

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
for _, file := range []string{"file1", "file2"} {
if _, err := os.Stat(filepath.Join(toPath, "folder", file)); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", file)
}
}
})

t.Run("No Items", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/folder",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_MULTIPART,
},
},
}

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.Error(t, err)
assert.Nil(t, result)
})
}

func TestHandleBlobSinglePart(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
ref := storage.DataReference("s3://container/file")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/file",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_SINGLE,
},
},
}

toPath := "./input"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete file: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
if _, err := os.Stat(toPath); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", toPath)
}
}

func TestHandleBlobHTTP(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
d := Downloader{store: s}

blob := &core.Blob{
Uri: "https://raw.githubusercontent.com/flyteorg/flyte/master/README.md",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_SINGLE,
},
},
}

toPath := "./input"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete file: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
if _, err := os.Stat(toPath); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", toPath)
}
}
16 changes: 15 additions & 1 deletion flytestdlib/storage/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"os"
"strings"
)

type rawFile = []byte
Expand Down Expand Up @@ -55,7 +56,20 @@ func (s *InMemoryStore) Head(ctx context.Context, reference DataReference) (Meta
}

func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) {
return nil, NewCursorAtEnd(), fmt.Errorf("Not implemented yet")
var items []DataReference
prefix := strings.TrimSuffix(string(reference), "/") + "/"

for ref := range s.cache {
if strings.HasPrefix(ref.String(), prefix) {
items = append(items, ref)
}
}

if len(items) == 0 {
return nil, NewCursorAtEnd(), os.ErrNotExist
}

return items, NewCursorAtEnd(), nil
}

func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
Expand Down
6 changes: 5 additions & 1 deletion flytestdlib/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func NewCursorFromCustomPosition(customPosition string) Cursor {
}
}

func IsCursorEnd(cursor Cursor) bool {
return cursor.cursorState == AtEndCursorState
}

// DataStore is a simplified interface for accessing and storing data in one of the Cloud stores.
// Today we rely on Stow for multi-cloud support, but this interface abstracts that part
type DataStore struct {
Expand Down Expand Up @@ -113,7 +117,7 @@ type RawStore interface {
// Head gets metadata about the reference. This should generally be a light weight operation.
Head(ctx context.Context, reference DataReference) (Metadata, error)

// List gets a list of items given a prefix, using a paginated API
// List gets a list of items (relative path to the reference input) given a prefix, using a paginated API
List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)

// ReadRaw retrieves a byte array from the Blob store or an error
Expand Down