Skip to content

Commit

Permalink
datacatalog
Browse files Browse the repository at this point in the history
Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan committed Dec 27, 2023
1 parent 799576d commit f5b8f46
Show file tree
Hide file tree
Showing 39 changed files with 933 additions and 648 deletions.
20 changes: 8 additions & 12 deletions datacatalog/pkg/manager/impl/artifact_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,10 @@ func (m *artifactManager) CreateArtifact(ctx context.Context, request *datacatal
}

ctx = contextutils.WithProjectDomain(ctx, artifact.Dataset.Project, artifact.Dataset.Domain)
datasetKey := transformers.FromDatasetID(artifact.Dataset)

// The dataset must exist for the artifact, let's verify that first
dataset, err := m.repo.DatasetRepo().Get(ctx, datasetKey)
dataset, err := m.repo.DatasetRepo().Get(ctx, request.GetArtifact().GetDataset())
if err != nil {
logger.Warnf(ctx, "Failed to get dataset for artifact creation %v, err: %v", datasetKey, err)
logger.Warnf(ctx, "Failed to get dataset for artifact creation %v, err: %v", request.GetArtifact().GetDataset(), err)
m.systemMetrics.createFailureCounter.Inc(ctx)
return nil, err
}
Expand Down Expand Up @@ -111,7 +109,7 @@ func (m *artifactManager) CreateArtifact(ctx context.Context, request *datacatal
return nil, err
}

err = m.repo.ArtifactRepo().Create(ctx, artifactModel)
err = m.repo.ArtifactRepo().Create(ctx, request.GetArtifact().GetDataset(), artifactModel)
if err != nil {
if errors.IsAlreadyExistsError(err) {
logger.Warnf(ctx, "Artifact already exists key: %+v, err %v", artifact.Id, err)
Expand Down Expand Up @@ -182,9 +180,8 @@ func (m *artifactManager) findArtifact(ctx context.Context, datasetID *datacatal
key := queryHandle.GetArtifactId()
if len(key) > 0 {
logger.Debugf(ctx, "Get artifact by id %v", key)
artifactKey := transformers.ToArtifactKey(datasetID, key)
var err error
artifactModel, err = m.repo.ArtifactRepo().Get(ctx, artifactKey)
artifactModel, err = m.repo.ArtifactRepo().Get(ctx, datasetID, key)

if err != nil {
if errors.IsDoesNotExistError(err) {
Expand Down Expand Up @@ -249,10 +246,9 @@ func (m *artifactManager) ListArtifacts(ctx context.Context, request *datacatalo
}

// Verify the dataset exists before listing artifacts
datasetKey := transformers.FromDatasetID(request.Dataset)
dataset, err := m.repo.DatasetRepo().Get(ctx, datasetKey)
_, err = m.repo.DatasetRepo().Get(ctx, request.GetDataset())
if err != nil {
logger.Warnf(ctx, "Failed to get dataset for listing artifacts %v, err: %v", datasetKey, err)
logger.Warnf(ctx, "Failed to get dataset for listing artifacts %v, err: %v", request.GetDataset(), err)

Check warning on line 251 in datacatalog/pkg/manager/impl/artifact_manager.go

View check run for this annotation

Codecov / codecov/patch

datacatalog/pkg/manager/impl/artifact_manager.go#L251

Added line #L251 was not covered by tests
m.systemMetrics.listFailureCounter.Inc(ctx)
return nil, err
}
Expand All @@ -273,7 +269,7 @@ func (m *artifactManager) ListArtifacts(ctx context.Context, request *datacatalo
}

// Perform the list with the dataset and listInput filters
artifactModels, err := m.repo.ArtifactRepo().List(ctx, dataset.DatasetKey, listInput)
artifactModels, err := m.repo.ArtifactRepo().List(ctx, request.GetDataset(), listInput)
if err != nil {
logger.Errorf(ctx, "Unable to list Artifacts err: %v", err)
m.systemMetrics.listFailureCounter.Inc(ctx)
Expand Down Expand Up @@ -381,7 +377,7 @@ func (m *artifactManager) UpdateArtifact(ctx context.Context, request *datacatal
artifactModel.ArtifactData = artifactDataModels
logger.Debugf(ctx, "Updating ArtifactModel with %+v", artifactModel)

err = m.repo.ArtifactRepo().Update(ctx, artifactModel)
err = m.repo.ArtifactRepo().Update(ctx, request.GetDataset(), artifactModel)
if err != nil {
if errors.IsDoesNotExistError(err) {
logger.Warnf(ctx, "Artifact does not exist key: %+v, err %v", artifact.Id, err)
Expand Down
102 changes: 47 additions & 55 deletions datacatalog/pkg/manager/impl/artifact_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@ import (
stdErrors "errors"
"fmt"
"os"
"reflect"
"testing"
"time"
"reflect"

"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/flyteorg/flyte/datacatalog/pkg/repositories/transformers"

"github.com/flyteorg/flyte/datacatalog/pkg/common"
"github.com/flyteorg/flyte/datacatalog/pkg/errors"
repoErrors "github.com/flyteorg/flyte/datacatalog/pkg/repositories/errors"
"github.com/flyteorg/flyte/datacatalog/pkg/repositories/mocks"
"github.com/flyteorg/flyte/datacatalog/pkg/repositories/models"
"github.com/flyteorg/flyte/datacatalog/pkg/repositories/transformers"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/datacatalog"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
Expand All @@ -33,6 +34,14 @@ func init() {
labeled.SetMetricKeys(contextutils.AppNameKey)
}

var testDatasetID = &datacatalog.DatasetID{
Project: "test-project",
Domain: "test-domain",
Name: "test-name",
Version: "test-version",
UUID: "test-uuid",
}

func createInmemoryDataStore(t testing.TB, scope mockScope.Scope) *storage.DataStore {
cfg := storage.Config{
Type: storage.TypeMemory,
Expand Down Expand Up @@ -64,18 +73,11 @@ func getTestTimestamp() time.Time {
}

func getTestArtifact() *datacatalog.Artifact {
datasetID := &datacatalog.DatasetID{
Project: "test-project",
Domain: "test-domain",
Name: "test-name",
Version: "test-version",
UUID: "test-uuid",
}
createdAt, _ := ptypes.TimestampProto(getTestTimestamp())

return &datacatalog.Artifact{
Id: "test-id",
Dataset: datasetID,
Dataset: testDatasetID,
Metadata: &datacatalog.Metadata{
KeyMap: map[string]string{"key1": "value1"},
},
Expand All @@ -90,7 +92,7 @@ func getTestArtifact() *datacatalog.Artifact {
{Key: "key2", Value: "value2"},
},
Tags: []*datacatalog.Tag{
{Name: "test-tag", Dataset: datasetID, ArtifactId: "test-id"},
{Name: "test-tag", Dataset: testDatasetID, ArtifactId: "test-id"},
},
CreatedAt: createdAt,
}
Expand Down Expand Up @@ -197,18 +199,18 @@ func TestCreateArtifact(t *testing.T) {

ctx := context.Background()
dcRepo := newMockDataCatalogRepo()
expectedArtifact := getTestArtifact()
dcRepo.MockDatasetRepo.On("Get", mock.Anything,
mock.MatchedBy(func(dataset models.DatasetKey) bool {
return dataset.Project == expectedDataset.Id.Project &&
dataset.Domain == expectedDataset.Id.Domain &&
dataset.Name == expectedDataset.Id.Name &&
dataset.Version == expectedDataset.Id.Version
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
})).Return(mockDatasetModel, nil)

dcRepo.MockArtifactRepo.On("Create",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
}),
mock.MatchedBy(func(artifact models.Artifact) bool {
expectedArtifact := getTestArtifact()
return artifact.ArtifactID == expectedArtifact.Id &&
artifact.SerializedMetadata != nil &&
len(artifact.ArtifactData) == len(expectedArtifact.Data) &&
Expand Down Expand Up @@ -289,10 +291,13 @@ func TestCreateArtifact(t *testing.T) {

dcRepo.MockDatasetRepo.On("Get", mock.Anything, mock.Anything).Return(mockDatasetModel, nil)

expectedArtifact := getTestArtifact()
dcRepo.MockArtifactRepo.On("Create",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
}),
mock.MatchedBy(func(artifact models.Artifact) bool {
expectedArtifact := getTestArtifact()
return artifact.ArtifactID == expectedArtifact.Id &&
artifact.SerializedMetadata != nil &&
len(artifact.ArtifactData) == len(expectedArtifact.Data) &&
Expand Down Expand Up @@ -346,7 +351,7 @@ func TestCreateArtifact(t *testing.T) {
dcRepo.MockDatasetRepo.On("Get", mock.Anything, mock.Anything).Return(mockDatasetModel, nil)
artifact := getTestArtifact()
artifact.Partitions = []*datacatalog.Partition{}
dcRepo.MockArtifactRepo.On("Create", mock.Anything, mock.Anything).Return(nil)
dcRepo.MockArtifactRepo.On("Create", mock.Anything, mock.Anything, mock.Anything).Return(nil)

request := &datacatalog.CreateArtifactRequest{Artifact: artifact}
artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
Expand Down Expand Up @@ -390,13 +395,9 @@ func TestGetArtifact(t *testing.T) {

t.Run("Get by Id", func(t *testing.T) {
dcRepo.MockArtifactRepo.On("Get", mock.Anything,
mock.MatchedBy(func(artifactKey models.ArtifactKey) bool {
return artifactKey.ArtifactID == expectedArtifact.Id &&
artifactKey.DatasetProject == expectedArtifact.Dataset.Project &&
artifactKey.DatasetDomain == expectedArtifact.Dataset.Domain &&
artifactKey.DatasetVersion == expectedArtifact.Dataset.Version &&
artifactKey.DatasetName == expectedArtifact.Dataset.Name
})).Return(mockArtifactModel, nil)
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
}), expectedArtifact.Id).Return(mockArtifactModel, nil)

artifactManager := NewArtifactManager(dcRepo, datastore, testStoragePrefix, mockScope.NewTestScope())
artifactResponse, err := artifactManager.GetArtifact(ctx, &datacatalog.GetArtifactRequest{
Expand Down Expand Up @@ -541,11 +542,8 @@ func TestListArtifact(t *testing.T) {
}

dcRepo.MockDatasetRepo.On("Get", mock.Anything,
mock.MatchedBy(func(dataset models.DatasetKey) bool {
return dataset.Project == expectedDataset.Id.Project &&
dataset.Domain == expectedDataset.Id.Domain &&
dataset.Name == expectedDataset.Id.Name &&
dataset.Version == expectedDataset.Id.Version
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
})).Return(mockDatasetModel, nil)

mockArtifacts := []models.Artifact{
Expand All @@ -554,11 +552,8 @@ func TestListArtifact(t *testing.T) {
}

dcRepo.MockArtifactRepo.On("List", mock.Anything,
mock.MatchedBy(func(dataset models.DatasetKey) bool {
return dataset.Project == expectedDataset.Id.Project &&
dataset.Domain == expectedDataset.Id.Domain &&
dataset.Name == expectedDataset.Id.Name &&
dataset.Version == expectedDataset.Id.Version
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
}),
mock.MatchedBy(func(listInput models.ListModelsInput) bool {
return len(listInput.ModelFilters) == 3 &&
Expand All @@ -582,23 +577,17 @@ func TestListArtifact(t *testing.T) {
filter := &datacatalog.FilterExpression{Filters: nil}

dcRepo.MockDatasetRepo.On("Get", mock.Anything,
mock.MatchedBy(func(dataset models.DatasetKey) bool {
return dataset.Project == expectedDataset.Id.Project &&
dataset.Domain == expectedDataset.Id.Domain &&
dataset.Name == expectedDataset.Id.Name &&
dataset.Version == expectedDataset.Id.Version
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
})).Return(mockDatasetModel, nil)

mockArtifacts := []models.Artifact{
mockArtifactModel,
mockArtifactModel,
}
dcRepo.MockArtifactRepo.On("List", mock.Anything,
mock.MatchedBy(func(dataset models.DatasetKey) bool {
return dataset.Project == expectedDataset.Id.Project &&
dataset.Domain == expectedDataset.Id.Domain &&
dataset.Name == expectedDataset.Id.Name &&
dataset.Version == expectedDataset.Id.Version
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
}),
mock.MatchedBy(func(listInput models.ListModelsInput) bool {
return len(listInput.ModelFilters) == 0
Expand Down Expand Up @@ -632,13 +621,9 @@ func TestUpdateArtifact(t *testing.T) {
dcRepo := newMockDataCatalogRepo()
dcRepo.MockArtifactRepo.On("Get",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(artifactKey models.ArtifactKey) bool {
return artifactKey.ArtifactID == expectedArtifact.Id &&
artifactKey.DatasetProject == expectedArtifact.Dataset.Project &&
artifactKey.DatasetDomain == expectedArtifact.Dataset.Domain &&
artifactKey.DatasetName == expectedArtifact.Dataset.Name &&
artifactKey.DatasetVersion == expectedArtifact.Dataset.Version
})).Return(mockArtifactModel, nil)
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
}), expectedArtifact.Id).Return(mockArtifactModel, nil)

metaData := &datacatalog.Metadata{
KeyMap: map[string]string{"key2": "value2"},
Expand All @@ -648,6 +633,9 @@ func TestUpdateArtifact(t *testing.T) {

dcRepo.MockArtifactRepo.On("Update",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
}),
mock.MatchedBy(func(artifact models.Artifact) bool {
return artifact.ArtifactID == expectedArtifact.Id &&
artifact.ArtifactKey.DatasetProject == expectedArtifact.Dataset.Project &&
Expand All @@ -657,7 +645,6 @@ func TestUpdateArtifact(t *testing.T) {
reflect.DeepEqual(artifact.SerializedMetadata, serializedMetadata)
})).Return(nil)


request := &datacatalog.UpdateArtifactRequest{
Dataset: expectedDataset.Id,
QueryHandle: &datacatalog.UpdateArtifactRequest_ArtifactId{
Expand Down Expand Up @@ -723,6 +710,9 @@ func TestUpdateArtifact(t *testing.T) {
dcRepo := newMockDataCatalogRepo()
dcRepo.MockArtifactRepo.On("Update",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
}),
mock.MatchedBy(func(artifact models.Artifact) bool {
return artifact.ArtifactID == expectedArtifact.Id &&
artifact.ArtifactKey.DatasetProject == expectedArtifact.Dataset.Project &&
Expand Down Expand Up @@ -808,7 +798,9 @@ func TestUpdateArtifact(t *testing.T) {
datastore := createInmemoryDataStore(t, mockScope.NewTestScope())

dcRepo := newMockDataCatalogRepo()
dcRepo.MockArtifactRepo.On("Get", mock.Anything, mock.Anything).Return(models.Artifact{}, repoErrors.GetMissingEntityError("Artifact", &datacatalog.Artifact{
dcRepo.MockArtifactRepo.On("Get", mock.Anything, mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
}), mock.Anything).Return(models.Artifact{}, repoErrors.GetMissingEntityError("Artifact", &datacatalog.Artifact{
Dataset: expectedDataset.Id,
Id: expectedArtifact.Id,
}))
Expand Down
8 changes: 3 additions & 5 deletions datacatalog/pkg/manager/impl/dataset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (dm *datasetManager) CreateDataset(ctx context.Context, request *datacatalo
return nil, err
}

err = dm.repo.DatasetRepo().Create(ctx, *datasetModel)
err = dm.repo.DatasetRepo().Create(ctx, request.GetDataset().GetId(), *datasetModel)
if err != nil {
if errors.IsAlreadyExistsError(err) {
logger.Warnf(ctx, "Dataset already exists key: %+v, err %v", request.Dataset, err)
Expand Down Expand Up @@ -106,13 +106,11 @@ func (dm *datasetManager) GetDataset(ctx context.Context, request *datacatalog.G
dm.systemMetrics.validationErrorCounter.Inc(ctx)
return nil, err
}

datasetKey := transformers.FromDatasetID(request.Dataset)
datasetModel, err := dm.repo.DatasetRepo().Get(ctx, datasetKey)
datasetModel, err := dm.repo.DatasetRepo().Get(ctx, request.GetDataset())

if err != nil {
if errors.IsDoesNotExistError(err) {
logger.Warnf(ctx, "Dataset does not exist key: %+v, err %v", datasetKey, err)
logger.Warnf(ctx, "Dataset does not exist key: %+v, err %v", request.GetDataset(), err)
dm.systemMetrics.doesNotExistCounter.Inc(ctx)
} else {
logger.Errorf(ctx, "Unable to get dataset request %+v err: %v", request, err)
Expand Down
24 changes: 11 additions & 13 deletions datacatalog/pkg/manager/impl/dataset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func TestCreateDataset(t *testing.T) {
datasetManager := NewDatasetManager(dcRepo, nil, mockScope.NewTestScope())
dcRepo.MockDatasetRepo.On("Create",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
}),
mock.MatchedBy(func(dataset models.Dataset) bool {

return dataset.Name == expectedDataset.Id.Name &&
Expand All @@ -77,6 +80,9 @@ func TestCreateDataset(t *testing.T) {
datasetManager := NewDatasetManager(dcRepo, nil, mockScope.NewTestScope())
dcRepo.MockDatasetRepo.On("Create",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
}),
mock.MatchedBy(func(dataset models.Dataset) bool {

return dataset.Name == expectedDataset.Id.Name &&
Expand Down Expand Up @@ -117,7 +123,7 @@ func TestCreateDataset(t *testing.T) {
datasetManager := NewDatasetManager(dcRepo, nil, mockScope.NewTestScope())

dcRepo.MockDatasetRepo.On("Create",
mock.Anything,
mock.Anything, mock.Anything,
mock.Anything).Return(status.Error(codes.AlreadyExists, "test already exists"))
request := &datacatalog.CreateDatasetRequest{
Dataset: getTestDataset(),
Expand Down Expand Up @@ -160,12 +166,8 @@ func TestGetDataset(t *testing.T) {

dcRepo.MockDatasetRepo.On("Get",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(datasetKey models.DatasetKey) bool {

return datasetKey.Name == expectedDataset.Id.Name &&
datasetKey.Project == expectedDataset.Id.Project &&
datasetKey.Domain == expectedDataset.Id.Domain &&
datasetKey.Version == expectedDataset.Id.Version
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
})).Return(*datasetModelResponse, nil)
request := &datacatalog.GetDatasetRequest{Dataset: getTestDataset().Id}
datasetResponse, err := datasetManager.GetDataset(context.Background(), request)
Expand All @@ -181,12 +183,8 @@ func TestGetDataset(t *testing.T) {

dcRepo.MockDatasetRepo.On("Get",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(datasetKey models.DatasetKey) bool {

return datasetKey.Name == expectedDataset.Id.Name &&
datasetKey.Project == expectedDataset.Id.Project &&
datasetKey.Domain == expectedDataset.Id.Domain &&
datasetKey.Version == expectedDataset.Id.Version
mock.MatchedBy(func(datasetID *datacatalog.DatasetID) bool {
return proto.Equal(testDatasetID, datasetID)
})).Return(models.Dataset{}, errors.NewDataCatalogError(codes.NotFound, "dataset does not exist"))
request := &datacatalog.GetDatasetRequest{Dataset: getTestDataset().Id}
_, err := datasetManager.GetDataset(context.Background(), request)
Expand Down
Loading

0 comments on commit f5b8f46

Please sign in to comment.