diff --git a/esti/lakectl_test.go b/esti/lakectl_test.go index 4068e171430..7f8198457b5 100644 --- a/esti/lakectl_test.go +++ b/esti/lakectl_test.go @@ -867,6 +867,7 @@ func TestLakectlFsStat(t *testing.T) { func TestLakectlImport(t *testing.T) { // TODO(barak): generalize test to work all supported object stores + const IngestTestBucketPath = "s3://esti-system-testing-data/ingest-test-data/" skipOnSchemaMismatch(t, IngestTestBucketPath) repoName := generateUniqueRepositoryName() diff --git a/esti/store_test.go b/esti/store_test.go deleted file mode 100644 index 98c45708fd5..00000000000 --- a/esti/store_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package esti - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - "github.com/treeverse/lakefs/pkg/block" - "github.com/treeverse/lakefs/pkg/ingest/store" -) - -// Currently, this test accesses the following bucket, and so AWS should be configured to allow it -const IngestTestBucketPath = "s3://esti-system-testing-data/ingest-test-data/" - -func TestS3Walk(t *testing.T) { - // Specific S3 test, this test can only run on AWS setup, and therefore is skipped for other store types - skipOnSchemaMismatch(t, IngestTestBucketPath) - - // Test bucket was uploaded with 2100 as the test is written. If the test fails on this number, - // make sure there were no changes made to the bucket, or update this number accordingly - const expectedNumObjs = 2100 - numObjs := 0 - - walker, err := store.NewFactory(nil).GetWalker(context.Background(), store.WalkerOptions{ - S3EndpointURL: "", - StorageURI: IngestTestBucketPath, - }) - require.NoError(t, err) - err = walker.Walk(context.Background(), block.WalkOptions{}, func(e block.ObjectStoreEntry) error { - numObjs++ - return nil - }) - - require.NoError(t, err) - require.Equal(t, expectedNumObjs, numObjs, "Wrong number of objects detected by Walk function") -} diff --git a/pkg/api/serve_test.go b/pkg/api/serve_test.go index 91e01faebfb..5262e1c0819 100644 --- a/pkg/api/serve_test.go +++ b/pkg/api/serve_test.go @@ -27,7 +27,6 @@ import ( "github.com/treeverse/lakefs/pkg/catalog" "github.com/treeverse/lakefs/pkg/config" "github.com/treeverse/lakefs/pkg/graveler/settings" - "github.com/treeverse/lakefs/pkg/ingest/store" "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/kv/kvparams" "github.com/treeverse/lakefs/pkg/kv/kvtest" @@ -116,7 +115,6 @@ func setupHandler(t testing.TB) (http.Handler, *dependencies) { cfg, err := config.NewConfig("", cfg) testutil.MustDo(t, "config", err) kvStore := kvtest.GetStore(ctx, t) - factory := store.NewFactory(nil) actionsStore := actions.NewActionsKVStore(kvStore) idGen := &actions.DecreasingIDGenerator{} authService := auth.NewBasicAuthService(kvStore, crypt.NewSecretStore([]byte("some secret")), authparams.ServiceCache{ @@ -128,7 +126,6 @@ func setupHandler(t testing.TB) (http.Handler, *dependencies) { c, err := catalog.New(ctx, catalog.Config{ Config: cfg, KVStore: kvStore, - WalkerFactory: factory, SettingsManagerOption: settings.WithCache(cache.NoCache), PathProvider: upload.DefaultPathProvider, }) diff --git a/pkg/block/adapter.go b/pkg/block/adapter.go index 3d34dd77372..9704965b38b 100644 --- a/pkg/block/adapter.go +++ b/pkg/block/adapter.go @@ -185,9 +185,7 @@ type Adapter interface { Put(ctx context.Context, obj ObjectPointer, sizeBytes int64, reader io.Reader, opts PutOpts) (*PutResponse, error) Get(ctx context.Context, obj ObjectPointer) (io.ReadCloser, error) - // GetWalker is never called on the server side. - // TODO(itaiad200): Remove it from this interface. - GetWalker(uri *url.URL) (Walker, error) + GetWalker(storageID string, opts WalkerOptions) (Walker, error) // GetPreSignedURL returns a pre-signed URL for accessing obj with mode, and the // expiry time for this URL. The expiry time IsZero() if reporting @@ -221,3 +219,33 @@ type Adapter interface { RuntimeStats() map[string]string } + +type WalkerOptions struct { + StorageURI *url.URL + // SkipOutOfOrder skips non-lexically ordered entries (Azure only). + SkipOutOfOrder bool +} + +type WalkerWrapper struct { + walker Walker + uri *url.URL +} + +func NewWalkerWrapper(walker Walker, uri *url.URL) *WalkerWrapper { + return &WalkerWrapper{ + walker: walker, + uri: uri, + } +} + +func (ww *WalkerWrapper) Walk(ctx context.Context, opts WalkOptions, walkFn func(e ObjectStoreEntry) error) error { + return ww.walker.Walk(ctx, ww.uri, opts, walkFn) +} + +func (ww *WalkerWrapper) Marker() Mark { + return ww.walker.Marker() +} + +func (ww *WalkerWrapper) GetSkippedEntries() []ObjectStoreEntry { + return ww.walker.GetSkippedEntries() +} diff --git a/pkg/block/azure/adapter.go b/pkg/block/azure/adapter.go index 2f83a27edbb..94c9f3d0744 100644 --- a/pkg/block/azure/adapter.go +++ b/pkg/block/azure/adapter.go @@ -221,12 +221,12 @@ func (a *Adapter) Get(ctx context.Context, obj block.ObjectPointer) (io.ReadClos return a.Download(ctx, obj, 0, blockblob.CountToEnd) } -func (a *Adapter) GetWalker(uri *url.URL) (block.Walker, error) { - if err := block.ValidateStorageType(uri, block.StorageTypeAzure); err != nil { +func (a *Adapter) GetWalker(_ string, opts block.WalkerOptions) (block.Walker, error) { + if err := block.ValidateStorageType(opts.StorageURI, block.StorageTypeAzure); err != nil { return nil, err } - storageAccount, domain, err := ParseURL(uri) + storageAccount, domain, err := ParseURL(opts.StorageURI) if err != nil { return nil, err } @@ -239,7 +239,7 @@ func (a *Adapter) GetWalker(uri *url.URL) (block.Walker, error) { return nil, err } - return NewAzureDataLakeWalker(client, false) + return NewAzureDataLakeWalker(client, opts.SkipOutOfOrder) } func (a *Adapter) GetPreSignedURL(ctx context.Context, obj block.ObjectPointer, mode block.PreSignMode) (string, time.Time, error) { @@ -622,7 +622,7 @@ func (a *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo { return &info } -func (a *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { +func (a *Adapter) ResolveNamespace(_, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { return block.DefaultResolveNamespace(storageNamespace, key, identifierType) } diff --git a/pkg/block/blocktest/adapter.go b/pkg/block/blocktest/adapter.go index 30bb74f9d98..85e6fa854d0 100644 --- a/pkg/block/blocktest/adapter.go +++ b/pkg/block/blocktest/adapter.go @@ -14,7 +14,6 @@ import ( "github.com/stretchr/testify/require" "github.com/treeverse/lakefs/pkg/block" - "github.com/treeverse/lakefs/pkg/ingest/store" ) // AdapterTest Test suite of basic adapter functionality @@ -131,7 +130,7 @@ func testAdapterWalker(t *testing.T, adapter block.Adapter, storageNamespace str uri, err := url.Parse(qk.Format()) require.NoError(t, err) t.Run(tt.name, func(t *testing.T) { - reader, err := adapter.GetWalker(uri) + reader, err := adapter.GetWalker("", block.WalkerOptions{StorageURI: uri}) require.NoError(t, err) var results []string @@ -217,15 +216,15 @@ func dumpPathTree(t testing.TB, ctx context.Context, adapter block.Adapter, qk b t.Helper() tree := make([]string, 0) - uri, err := url.Parse(qk.Format()) + p := qk.Format() + uri, err := url.Parse(p) require.NoError(t, err, "URL Parse Error") - w, err := adapter.GetWalker(uri) + walker, err := adapter.GetWalker("", block.WalkerOptions{StorageURI: uri}) require.NoError(t, err, "GetWalker failed") - walker := store.NewWrapper(w, uri) - - err = walker.Walk(ctx, block.WalkOptions{}, func(e block.ObjectStoreEntry) error { + wwalker := block.NewWalkerWrapper(walker, uri) + err = wwalker.Walk(ctx, block.WalkOptions{}, func(e block.ObjectStoreEntry) error { _, p, _ := strings.Cut(e.Address, uri.String()) tree = append(tree, p) return nil diff --git a/pkg/block/gs/adapter.go b/pkg/block/gs/adapter.go index 796098fdc91..5fb6bde8a0a 100644 --- a/pkg/block/gs/adapter.go +++ b/pkg/block/gs/adapter.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "net/http" - "net/url" "sort" "strings" "time" @@ -194,8 +193,8 @@ func (a *Adapter) Get(ctx context.Context, obj block.ObjectPointer) (io.ReadClos return r, nil } -func (a *Adapter) GetWalker(uri *url.URL) (block.Walker, error) { - if err := block.ValidateStorageType(uri, block.StorageTypeGS); err != nil { +func (a *Adapter) GetWalker(_ string, opts block.WalkerOptions) (block.Walker, error) { + if err := block.ValidateStorageType(opts.StorageURI, block.StorageTypeGS); err != nil { return nil, err } return NewGCSWalker(a.client), nil @@ -682,7 +681,7 @@ func (a *Adapter) extractParamsFromObj(obj block.ObjectPointer) (string, string, return bucket, key, nil } -func (a *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { +func (a *Adapter) ResolveNamespace(_, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { qualifiedKey, err := block.DefaultResolveNamespace(storageNamespace, key, identifierType) if err != nil { return qualifiedKey, err diff --git a/pkg/block/local/adapter.go b/pkg/block/local/adapter.go index 74ee803ba48..b3d36034619 100644 --- a/pkg/block/local/adapter.go +++ b/pkg/block/local/adapter.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "net/http" - "net/url" "os" "path" "path/filepath" @@ -301,12 +300,12 @@ func (l *Adapter) Get(_ context.Context, obj block.ObjectPointer) (reader io.Rea return f, nil } -func (l *Adapter) GetWalker(uri *url.URL) (block.Walker, error) { - if err := block.ValidateStorageType(uri, block.StorageTypeLocal); err != nil { +func (l *Adapter) GetWalker(_ string, opts block.WalkerOptions) (block.Walker, error) { + if err := block.ValidateStorageType(opts.StorageURI, block.StorageTypeLocal); err != nil { return nil, err } - - err := VerifyAbsPath(uri.Path, l.path, l.allowedExternalPrefixes) + uriPath := strings.TrimSuffix(opts.StorageURI.Path, string(filepath.Separator)) + err := VerifyAbsPath(uriPath, l.path, l.allowedExternalPrefixes) if err != nil { return nil, err } @@ -553,7 +552,7 @@ func (l *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo { return &info } -func (l *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { +func (l *Adapter) ResolveNamespace(_, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { qk, err := block.DefaultResolveNamespace(storageNamespace, key, identifierType) if err != nil { return nil, err diff --git a/pkg/block/mem/adapter.go b/pkg/block/mem/adapter.go index d1a8bbb7118..406c8076f3b 100644 --- a/pkg/block/mem/adapter.go +++ b/pkg/block/mem/adapter.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "net/http" - "net/url" "sort" "strings" "sync" @@ -120,7 +119,7 @@ func verifyObjectPointer(obj block.ObjectPointer) error { return nil } -func (a *Adapter) GetWalker(_ *url.URL) (block.Walker, error) { +func (a *Adapter) GetWalker(_ string, _ block.WalkerOptions) (block.Walker, error) { return nil, fmt.Errorf("mem block adapter: %w", block.ErrOperationNotSupported) } diff --git a/pkg/block/metrics.go b/pkg/block/metrics.go index 9134c164b19..a6ac4e7f8b5 100644 --- a/pkg/block/metrics.go +++ b/pkg/block/metrics.go @@ -4,7 +4,6 @@ import ( "context" "io" "net/http" - "net/url" "time" "github.com/treeverse/lakefs/pkg/httputil" @@ -32,8 +31,8 @@ func (m *MetricsAdapter) Get(ctx context.Context, obj ObjectPointer) (io.ReadClo return m.adapter.Get(ctx, obj) } -func (m *MetricsAdapter) GetWalker(uri *url.URL) (Walker, error) { - return m.adapter.GetWalker(uri) +func (m *MetricsAdapter) GetWalker(storageID string, opts WalkerOptions) (Walker, error) { + return m.adapter.GetWalker(storageID, opts) } func (m *MetricsAdapter) GetPreSignedURL(ctx context.Context, obj ObjectPointer, mode PreSignMode) (string, time.Time, error) { diff --git a/pkg/block/s3/adapter.go b/pkg/block/s3/adapter.go index b5e220bcae4..ab4a44b2274 100644 --- a/pkg/block/s3/adapter.go +++ b/pkg/block/s3/adapter.go @@ -382,8 +382,8 @@ func (a *Adapter) Get(ctx context.Context, obj block.ObjectPointer) (io.ReadClos return objectOutput.Body, nil } -func (a *Adapter) GetWalker(uri *url.URL) (block.Walker, error) { - if err := block.ValidateStorageType(uri, block.StorageTypeS3); err != nil { +func (a *Adapter) GetWalker(_ string, opts block.WalkerOptions) (block.Walker, error) { + if err := block.ValidateStorageType(opts.StorageURI, block.StorageTypeS3); err != nil { return nil, err } return NewS3Walker(a.clients.GetDefault()), nil @@ -926,7 +926,7 @@ func resolveNamespace(obj block.ObjectPointer) (block.CommonQualifiedKey, error) return qualifiedKey, nil } -func (a *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { +func (a *Adapter) ResolveNamespace(_, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { return block.DefaultResolveNamespace(storageNamespace, key, identifierType) } diff --git a/pkg/block/transient/adapter.go b/pkg/block/transient/adapter.go index c3690c4be6b..9e76630d73c 100644 --- a/pkg/block/transient/adapter.go +++ b/pkg/block/transient/adapter.go @@ -7,7 +7,6 @@ import ( "encoding/hex" "io" "net/http" - "net/url" "time" "github.com/google/uuid" @@ -36,7 +35,7 @@ func (a *Adapter) Get(_ context.Context, _ block.ObjectPointer) (io.ReadCloser, return io.NopCloser(&io.LimitedReader{R: rand.Reader, N: DefaultReaderSize}), nil } -func (a *Adapter) GetWalker(_ *url.URL) (block.Walker, error) { +func (a *Adapter) GetWalker(_ string, _ block.WalkerOptions) (block.Walker, error) { return nil, block.ErrOperationNotSupported } @@ -159,7 +158,7 @@ func (a *Adapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInfo { return &info } -func (a *Adapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { +func (a *Adapter) ResolveNamespace(_, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { return block.DefaultResolveNamespace(storageNamespace, key, identifierType) } diff --git a/pkg/catalog/catalog.go b/pkg/catalog/catalog.go index bf87bb2b797..23795c5bd4f 100644 --- a/pkg/catalog/catalog.go +++ b/pkg/catalog/catalog.go @@ -36,7 +36,6 @@ import ( "github.com/treeverse/lakefs/pkg/graveler/sstable" "github.com/treeverse/lakefs/pkg/graveler/staging" "github.com/treeverse/lakefs/pkg/ident" - "github.com/treeverse/lakefs/pkg/ingest/store" "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/logging" "github.com/treeverse/lakefs/pkg/pyramid" @@ -216,7 +215,6 @@ type WriteRangeRequest struct { type Config struct { Config config.Config KVStore kv.Store - WalkerFactory WalkerFactory SettingsManagerOption settings.ManagerOption PathProvider *upload.PathPartitionProvider } @@ -224,7 +222,6 @@ type Config struct { type Catalog struct { BlockAdapter block.Adapter Store Store - walkerFactory WalkerFactory managers []io.Closer workPool *pond.WorkerPool PathProvider *upload.PathPartitionProvider @@ -312,12 +309,8 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { cancelFn() return nil, fmt.Errorf("build block adapter: %w", err) } - baseCfg := cfg.Config.GetBaseConfig() - // TODO (niro): This part will break using multiple blockstores. We should change the catalog logic to get the walker from the adapter - if cfg.WalkerFactory == nil { - cfg.WalkerFactory = store.NewFactory(&baseCfg.Blockstore) - } + baseCfg := cfg.Config.GetBaseConfig() tierFSParams, err := pyramidparams.NewCommittedTierFSParams(baseCfg, adapter) if err != nil { cancelFn() @@ -348,7 +341,6 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { sstableManager := sstable.NewPebbleSSTableRangeManager(pebbleSSTableCache, rangeFS, hashAlg) sstableMetaManager := sstable.NewPebbleSSTableRangeManager(pebbleSSTableCache, metaRangeFS, hashAlg) - committedParams := committed.Params{ MinRangeSizeBytes: baseCfg.Committed.Permanent.MinRangeSizeBytes, MaxRangeSizeBytes: baseCfg.Committed.Permanent.MaxRangeSizeBytes, @@ -418,7 +410,6 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { UGCPrepareInterval: baseCfg.UGC.PrepareInterval, PathProvider: cfg.PathProvider, BackgroundLimiter: limiter, - walkerFactory: cfg.WalkerFactory, workPool: workPool, KVStore: cfg.KVStore, managers: []io.Closer{sstableManager, sstableMetaManager, &ctxCloser{cancelFn}}, @@ -2328,19 +2319,23 @@ func (c *Catalog) importAsync(ctx context.Context, repository *graveler.Reposito wg, wgCtx := c.workPool.GroupContext(ctx) for _, source := range params.Paths { - src := source // Pinning wg.Submit(func() error { - // TODO (niro): Need to handle this at some point (use adapter GetWalker) - walker, err := c.walkerFactory.GetWalker(wgCtx, store.WalkerOptions{StorageURI: src.Path}) + uri, err := url.Parse(source.Path) + if err != nil { + return fmt.Errorf("could not parse storage URI %s: %w", uri, err) + } + + walker, err := c.BlockAdapter.GetWalker(repository.StorageID.String(), block.WalkerOptions{StorageURI: uri}) if err != nil { return fmt.Errorf("creating object-store walker on path %s: %w", source.Path, err) } - it, err := NewWalkEntryIterator(wgCtx, walker, src.Type, src.Destination, "", "") + it, err := NewWalkEntryIterator(wgCtx, block.NewWalkerWrapper(walker, uri), source.Type, source.Destination, "", "") if err != nil { - return fmt.Errorf("creating walk iterator on path %s: %w", src.Path, err) + return fmt.Errorf("creating walk iterator on path %s: %w", source.Path, err) } - logger.WithFields(logging.Fields{"source": src.Path, "itr": it}).Debug("Ingest source") + + logger.WithFields(logging.Fields{"source": source.Path, "itr": it}).Debug("Ingest source") defer it.Close() return importManager.Ingest(it) }) @@ -2504,13 +2499,17 @@ func (c *Catalog) WriteRange(ctx context.Context, repositoryID string, params Wr return nil, nil, err } - // TODO (niro): Need to handle this at some point (use adapter GetWalker) - walker, err := c.walkerFactory.GetWalker(ctx, store.WalkerOptions{StorageURI: params.SourceURI, SkipOutOfOrder: true}) + uri, err := url.Parse(params.SourceURI) + if err != nil { + return nil, nil, fmt.Errorf("could not parse storage URI %s: %w", uri, err) + } + + walker, err := c.BlockAdapter.GetWalker(repository.StorageID.String(), block.WalkerOptions{StorageURI: uri}) if err != nil { - return nil, nil, fmt.Errorf("creating object-store walker: %w", err) + return nil, nil, fmt.Errorf("creating object-store walker on path %s: %w", params.SourceURI, err) } - it, err := NewWalkEntryIterator(ctx, walker, ImportPathTypePrefix, params.Prepend, params.After, params.ContinuationToken) + it, err := NewWalkEntryIterator(ctx, block.NewWalkerWrapper(walker, uri), ImportPathTypePrefix, params.Prepend, params.After, params.ContinuationToken) if err != nil { return nil, nil, fmt.Errorf("creating walk iterator: %w", err) } diff --git a/pkg/catalog/testutils/testutils.go b/pkg/catalog/testutils/testutils.go index fcb796854f5..9e74087f797 100644 --- a/pkg/catalog/testutils/testutils.go +++ b/pkg/catalog/testutils/testutils.go @@ -13,7 +13,6 @@ import ( "github.com/treeverse/lakefs/pkg/block" "github.com/treeverse/lakefs/pkg/catalog" "github.com/treeverse/lakefs/pkg/graveler" - "github.com/treeverse/lakefs/pkg/ingest/store" "github.com/treeverse/lakefs/pkg/testutil" ) @@ -110,15 +109,6 @@ func (f *FakeEntryIterator) Err() error { func (f *FakeEntryIterator) Close() {} -type FakeFactory struct { - Walker *FakeWalker -} - -func (f FakeFactory) GetWalker(_ context.Context, op store.WalkerOptions) (*store.WalkerWrapper, error) { - u, _ := url.Parse(op.StorageURI) - return store.NewWrapper(f.Walker, u), nil -} - func NewFakeWalker(count, max int, uriPrefix, expectedAfter, expectedContinuationToken, expectedFromSourceURIWithPrefix string, err error) *FakeWalker { w := &FakeWalker{ Max: max, diff --git a/pkg/catalog/walk_entry_iterator.go b/pkg/catalog/walk_entry_iterator.go index 9f42208c668..2ddeb9bf0ed 100644 --- a/pkg/catalog/walk_entry_iterator.go +++ b/pkg/catalog/walk_entry_iterator.go @@ -6,14 +6,13 @@ import ( "strings" "github.com/treeverse/lakefs/pkg/block" - "github.com/treeverse/lakefs/pkg/ingest/store" "go.uber.org/atomic" "google.golang.org/protobuf/types/known/timestamppb" ) type walkEntryIterator struct { entries chan EntryWithMarker - walker *store.WalkerWrapper + walker *block.WalkerWrapper done chan bool closed *atomic.Bool @@ -37,12 +36,7 @@ type EntryWithMarker struct { // bufferSize - buffer size of the buffer between reading entries from the blockstore Walk and passing it on const bufferSize = 100 -// WalkerFactory provides an abstraction for creating Walker -type WalkerFactory interface { - GetWalker(ctx context.Context, opts store.WalkerOptions) (*store.WalkerWrapper, error) -} - -func NewWalkEntryIterator(ctx context.Context, walker *store.WalkerWrapper, sourceType ImportPathType, destination, after, continuationToken string) (*walkEntryIterator, error) { +func NewWalkEntryIterator(ctx context.Context, walker *block.WalkerWrapper, sourceType ImportPathType, destination, after, continuationToken string) (*walkEntryIterator, error) { prepend := destination if prepend != "" && !strings.HasSuffix(prepend, "/") { prepend += "/" diff --git a/pkg/catalog/walk_entry_iterator_test.go b/pkg/catalog/walk_entry_iterator_test.go index df2b263d844..648ecf66e1f 100644 --- a/pkg/catalog/walk_entry_iterator_test.go +++ b/pkg/catalog/walk_entry_iterator_test.go @@ -9,7 +9,6 @@ import ( "github.com/treeverse/lakefs/pkg/block" "github.com/treeverse/lakefs/pkg/catalog" "github.com/treeverse/lakefs/pkg/catalog/testutils" - "github.com/treeverse/lakefs/pkg/ingest/store" ) const ( @@ -41,7 +40,7 @@ func TestWalkEntryIterator(t *testing.T) { t.Run(tt.name, func(t *testing.T) { w := testutils.NewFakeWalker(iteratorTestCount, tt.max, uriPrefix, after, continuationToken, fromSourceURIWithPrefix, nil) parsedURL, _ := url.Parse(fromSourceURIWithPrefix) - sut, err := catalog.NewWalkEntryIterator(context.Background(), store.NewWrapper(w, parsedURL), catalog.ImportPathTypePrefix, prepend, after, continuationToken) + sut, err := catalog.NewWalkEntryIterator(context.Background(), block.NewWalkerWrapper(w, parsedURL), catalog.ImportPathTypePrefix, prepend, after, continuationToken) require.NoError(t, err, "creating walk entry iterator") require.NotNil(t, sut) diff --git a/pkg/ingest/store/factory.go b/pkg/ingest/store/factory.go deleted file mode 100644 index a236714c9ea..00000000000 --- a/pkg/ingest/store/factory.go +++ /dev/null @@ -1,198 +0,0 @@ -package store - -import ( - "context" - "errors" - "fmt" - "net/url" - - "cloud.google.com/go/storage" - "github.com/aws/aws-sdk-go-v2/aws" - awsconfig "github.com/aws/aws-sdk-go-v2/config" - awss3 "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/treeverse/lakefs/pkg/block" - "github.com/treeverse/lakefs/pkg/block/azure" - "github.com/treeverse/lakefs/pkg/block/factory" - "github.com/treeverse/lakefs/pkg/block/gs" - "github.com/treeverse/lakefs/pkg/block/local" - "github.com/treeverse/lakefs/pkg/block/params" - "github.com/treeverse/lakefs/pkg/block/s3" - "github.com/treeverse/lakefs/pkg/config" -) - -var ErrNotSupported = errors.New("no storage adapter found") - -type WalkerOptions struct { - S3EndpointURL string - StorageURI string - SkipOutOfOrder bool -} - -type WalkerWrapper struct { - walker block.Walker - uri *url.URL -} - -func NewWrapper(walker block.Walker, uri *url.URL) *WalkerWrapper { - return &WalkerWrapper{ - walker: walker, - uri: uri, - } -} - -func (ww *WalkerWrapper) Walk(ctx context.Context, opts block.WalkOptions, walkFn func(e block.ObjectStoreEntry) error) error { - return ww.walker.Walk(ctx, ww.uri, opts, walkFn) -} - -func (ww *WalkerWrapper) Marker() block.Mark { - return ww.walker.Marker() -} - -func (ww *WalkerWrapper) GetSkippedEntries() []block.ObjectStoreEntry { - return ww.walker.GetSkippedEntries() -} - -type WalkerFactory struct { - params config.AdapterConfig -} - -func NewFactory(params config.AdapterConfig) *WalkerFactory { - return &WalkerFactory{params: params} -} - -func (f *WalkerFactory) buildS3Walker(opts WalkerOptions) (*s3.Walker, error) { - var client *awss3.Client - if f.params != nil { - s3params, err := f.params.BlockstoreS3Params() - if err != nil { - return nil, err - } - client, err = factory.BuildS3Client(context.Background(), s3params) - if err != nil { - return nil, err - } - } else { - var err error - client, err = getS3Client(opts.S3EndpointURL) - if err != nil { - return nil, err - } - } - return s3.NewS3Walker(client), nil -} - -func (f *WalkerFactory) buildGCSWalker(ctx context.Context) (*gs.GCSWalker, error) { - var svc *storage.Client - if f.params != nil { - gsParams, err := f.params.BlockstoreGSParams() - if err != nil { - return nil, err - } - svc, err = factory.BuildGSClient(ctx, gsParams) - if err != nil { - return nil, err - } - } else { - var err error - svc, err = storage.NewClient(ctx) - if err != nil { - return nil, err - } - } - return gs.NewGCSWalker(svc), nil -} - -func (f *WalkerFactory) buildAzureWalker(importURL *url.URL, skipOutOfOrder bool) (block.Walker, error) { - storageAccount, err := azure.ExtractStorageAccount(importURL) - if err != nil { - return nil, err - } - - var azureParams params.Azure - if f.params != nil { - // server settings - azureParams, err = f.params.BlockstoreAzureParams() - if err != nil { - return nil, err - } - } - - // Use StorageAccessKey to initialize the storage account client only if it was provided for this given storage account - // Otherwise fall back to the default credentials - if azureParams.StorageAccount != storageAccount { - azureParams.StorageAccount = storageAccount - azureParams.StorageAccessKey = "" - } - client, err := azure.BuildAzureServiceClient(azureParams) - if err != nil { - return nil, err - } - - return azure.NewAzureDataLakeWalker(client, skipOutOfOrder) -} - -func (f *WalkerFactory) GetWalker(ctx context.Context, opts WalkerOptions) (*WalkerWrapper, error) { - uri, err := url.Parse(opts.StorageURI) - if err != nil { - return nil, fmt.Errorf("could not parse storage URI %s: %w", uri, err) - } - - var walker block.Walker - switch uri.Scheme { - case "s3": - walker, err = f.buildS3Walker(opts) - if err != nil { - return nil, fmt.Errorf("creating s3 walker: %w", err) - } - case "gs": - walker, err = f.buildGCSWalker(ctx) - if err != nil { - return nil, fmt.Errorf("creating gs walker: %w", err) - } - case "http", "https": - walker, err = f.buildAzureWalker(uri, opts.SkipOutOfOrder) - if err != nil { - return nil, fmt.Errorf("creating Azure walker: %w", err) - } - case "local": - walker, err = f.buildLocalWalker() - if err != nil { - return nil, fmt.Errorf("creating local walker: %w", err) - } - default: - return nil, fmt.Errorf("%w: for scheme: %s", ErrNotSupported, uri.Scheme) - } - return NewWrapper(walker, uri), nil -} - -func (f *WalkerFactory) buildLocalWalker() (*local.Walker, error) { - var ( - localParams params.Local - err error - ) - - if f.params != nil { - localParams, err = f.params.BlockstoreLocalParams() - if err != nil { - return nil, err - } - } - - return local.NewLocalWalker(localParams), nil -} - -func getS3Client(s3EndpointURL string) (*awss3.Client, error) { - cfg, err := awsconfig.LoadDefaultConfig(context.Background()) - if err != nil { - return nil, err - } - client := awss3.NewFromConfig(cfg, func(o *awss3.Options) { - if s3EndpointURL != "" { - o.BaseEndpoint = aws.String(s3EndpointURL) - o.Region = "us-east-1" - o.UsePathStyle = true - } - }) - // TODO(barak): do we require SharedConfigState: session.SharedConfigEnable, - return client, nil -} diff --git a/pkg/testutil/adapter.go b/pkg/testutil/adapter.go index bb6705c2511..826ce3a2794 100644 --- a/pkg/testutil/adapter.go +++ b/pkg/testutil/adapter.go @@ -5,7 +5,6 @@ import ( "errors" "io" "net/http" - "net/url" "time" "github.com/treeverse/lakefs/pkg/block" @@ -81,7 +80,7 @@ func (a *MockAdapter) Get(_ context.Context, _ block.ObjectPointer) (io.ReadClos return nil, nil } -func (a *MockAdapter) GetWalker(_ *url.URL) (block.Walker, error) { +func (a *MockAdapter) GetWalker(_ string, _ block.WalkerOptions) (block.Walker, error) { return nil, nil } @@ -150,7 +149,7 @@ func (a *MockAdapter) GetStorageNamespaceInfo(string) *block.StorageNamespaceInf return &info } -func (a *MockAdapter) ResolveNamespace(storageID, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { +func (a *MockAdapter) ResolveNamespace(_, storageNamespace, key string, identifierType block.IdentifierType) (block.QualifiedKey, error) { return block.DefaultResolveNamespace(storageNamespace, key, identifierType) }