From 4c041c1adeaad13dab72684e6bea02eb28cd3fbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nick=20M=C3=BCller?= Date: Tue, 8 Nov 2022 14:01:24 +0100 Subject: [PATCH 1/2] Added function for deleting artifacts to catalog client interface Extended reservation retrieval to allow querying via artifact tag in catalog client interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nick Müller --- .../tasks/pluginmachinery/catalog/client.go | 8 ++ .../pluginmachinery/catalog/mocks/client.go | 105 ++++++++++++++++++ 2 files changed, 113 insertions(+) diff --git a/flyteplugins/go/tasks/pluginmachinery/catalog/client.go b/flyteplugins/go/tasks/pluginmachinery/catalog/client.go index 7531a59d93..30b99bd3a0 100644 --- a/flyteplugins/go/tasks/pluginmachinery/catalog/client.go +++ b/flyteplugins/go/tasks/pluginmachinery/catalog/client.go @@ -131,6 +131,9 @@ type Client interface { // GetOrExtendReservation tries to retrieve a (valid) reservation for the given key, creating a new one using the // specified owner ID if none was found or updating an existing one if it has expired. GetOrExtendReservation(ctx context.Context, key Key, ownerID string, heartbeatInterval time.Duration) (*datacatalog.Reservation, error) + // GetOrExtendReservationByArtifactTag tries to retrieve a (valid) reservation for the given dataset ID and artifact + // tag, creating a new one using the specified owner ID if none was found or updating an existing one if it has expired. + GetOrExtendReservationByArtifactTag(ctx context.Context, datasetID *datacatalog.DatasetID, artifactTag string, ownerID string, heartbeatInterval time.Duration) (*datacatalog.Reservation, error) // Put stores the given data using the specified key, creating artifact entries as required. // To update an existing artifact, use Update instead. Put(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) (Status, error) @@ -139,6 +142,11 @@ type Client interface { Update(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) (Status, error) // ReleaseReservation releases an acquired reservation for the given key and owner ID. ReleaseReservation(ctx context.Context, key Key, ownerID string) error + // Delete removes the artifact associated with the given key and deletes its underlying data from blob storage. + Delete(ctx context.Context, key Key) error + // DeleteByArtifactTag removes the artifact associated with the given dataset ID and artifact tag and deletes its + // underlying data from blob storage. + DeleteByArtifactTag(ctx context.Context, datasetID *datacatalog.DatasetID, artifactTag string) error } func IsNotFound(err error) bool { diff --git a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go index fe9d75f1cc..172b721771 100644 --- a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go +++ b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go @@ -21,6 +21,70 @@ type Client struct { mock.Mock } +type Client_Delete struct { + *mock.Call +} + +func (_m Client_Delete) Return(_a0 error) *Client_Delete { + return &Client_Delete{Call: _m.Call.Return(_a0)} +} + +func (_m *Client) OnDelete(ctx context.Context, key catalog.Key) *Client_Delete { + c_call := _m.On("Delete", ctx, key) + return &Client_Delete{Call: c_call} +} + +func (_m *Client) OnDeleteMatch(matchers ...interface{}) *Client_Delete { + c_call := _m.On("Delete", matchers...) + return &Client_Delete{Call: c_call} +} + +// Delete provides a mock function with given fields: ctx, key +func (_m *Client) Delete(ctx context.Context, key catalog.Key) error { + ret := _m.Called(ctx, key) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, catalog.Key) error); ok { + r0 = rf(ctx, key) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type Client_DeleteByArtifactTag struct { + *mock.Call +} + +func (_m Client_DeleteByArtifactTag) Return(_a0 error) *Client_DeleteByArtifactTag { + return &Client_DeleteByArtifactTag{Call: _m.Call.Return(_a0)} +} + +func (_m *Client) OnDeleteByArtifactTag(ctx context.Context, datasetID *datacatalog.DatasetID, artifactTag string) *Client_DeleteByArtifactTag { + c_call := _m.On("DeleteByArtifactTag", ctx, datasetID, artifactTag) + return &Client_DeleteByArtifactTag{Call: c_call} +} + +func (_m *Client) OnDeleteByArtifactTagMatch(matchers ...interface{}) *Client_DeleteByArtifactTag { + c_call := _m.On("DeleteByArtifactTag", matchers...) + return &Client_DeleteByArtifactTag{Call: c_call} +} + +// DeleteByArtifactTag provides a mock function with given fields: ctx, datasetID, artifactTag +func (_m *Client) DeleteByArtifactTag(ctx context.Context, datasetID *datacatalog.DatasetID, artifactTag string) error { + ret := _m.Called(ctx, datasetID, artifactTag) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.DatasetID, string) error); ok { + r0 = rf(ctx, datasetID, artifactTag) + } else { + r0 = ret.Error(0) + } + + return r0 +} + type Client_Get struct { *mock.Call } @@ -101,6 +165,47 @@ func (_m *Client) GetOrExtendReservation(ctx context.Context, key catalog.Key, o return r0, r1 } +type Client_GetOrExtendReservationByArtifactTag struct { + *mock.Call +} + +func (_m Client_GetOrExtendReservationByArtifactTag) Return(_a0 *datacatalog.Reservation, _a1 error) *Client_GetOrExtendReservationByArtifactTag { + return &Client_GetOrExtendReservationByArtifactTag{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *Client) OnGetOrExtendReservationByArtifactTag(ctx context.Context, datasetID *datacatalog.DatasetID, artifactTag string, ownerID string, heartbeatInterval time.Duration) *Client_GetOrExtendReservationByArtifactTag { + c_call := _m.On("GetOrExtendReservationByArtifactTag", ctx, datasetID, artifactTag, ownerID, heartbeatInterval) + return &Client_GetOrExtendReservationByArtifactTag{Call: c_call} +} + +func (_m *Client) OnGetOrExtendReservationByArtifactTagMatch(matchers ...interface{}) *Client_GetOrExtendReservationByArtifactTag { + c_call := _m.On("GetOrExtendReservationByArtifactTag", matchers...) + return &Client_GetOrExtendReservationByArtifactTag{Call: c_call} +} + +// GetOrExtendReservationByArtifactTag provides a mock function with given fields: ctx, datasetID, artifactTag, ownerID, heartbeatInterval +func (_m *Client) GetOrExtendReservationByArtifactTag(ctx context.Context, datasetID *datacatalog.DatasetID, artifactTag string, ownerID string, heartbeatInterval time.Duration) (*datacatalog.Reservation, error) { + ret := _m.Called(ctx, datasetID, artifactTag, ownerID, heartbeatInterval) + + var r0 *datacatalog.Reservation + if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.DatasetID, string, string, time.Duration) *datacatalog.Reservation); ok { + r0 = rf(ctx, datasetID, artifactTag, ownerID, heartbeatInterval) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*datacatalog.Reservation) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *datacatalog.DatasetID, string, string, time.Duration) error); ok { + r1 = rf(ctx, datasetID, artifactTag, ownerID, heartbeatInterval) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + type Client_Put struct { *mock.Call } From 56c7b31d5f8c015b73207d3847eaaec2d4befd63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nick=20M=C3=BCller?= Date: Thu, 15 Dec 2022 16:10:25 +0100 Subject: [PATCH 2/2] Added method to release catalog reservation by artifact tag Added method to delete catalog artifact by ID MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Nick Müller --- .../tasks/pluginmachinery/catalog/client.go | 6 ++ .../pluginmachinery/catalog/mocks/client.go | 64 +++++++++++++++++++ 2 files changed, 70 insertions(+) diff --git a/flyteplugins/go/tasks/pluginmachinery/catalog/client.go b/flyteplugins/go/tasks/pluginmachinery/catalog/client.go index 30b99bd3a0..8630d004db 100644 --- a/flyteplugins/go/tasks/pluginmachinery/catalog/client.go +++ b/flyteplugins/go/tasks/pluginmachinery/catalog/client.go @@ -142,11 +142,17 @@ type Client interface { Update(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) (Status, error) // ReleaseReservation releases an acquired reservation for the given key and owner ID. ReleaseReservation(ctx context.Context, key Key, ownerID string) error + // ReleaseReservationByArtifactTag releases an acquired reservation for the given dataset ID, artifact tag and + // owner ID. + ReleaseReservationByArtifactTag(ctx context.Context, datasetID *datacatalog.DatasetID, artifactTag string, ownerID string) error // Delete removes the artifact associated with the given key and deletes its underlying data from blob storage. Delete(ctx context.Context, key Key) error // DeleteByArtifactTag removes the artifact associated with the given dataset ID and artifact tag and deletes its // underlying data from blob storage. DeleteByArtifactTag(ctx context.Context, datasetID *datacatalog.DatasetID, artifactTag string) error + // DeleteByArtifactID removes the artifact associated with the given dataset and artifact ID and deletes its + // underlying data from blob storage. + DeleteByArtifactID(ctx context.Context, datasetID *datacatalog.DatasetID, artifactID string) error } func IsNotFound(err error) bool { diff --git a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go index 172b721771..4fed501d5a 100644 --- a/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go +++ b/flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go @@ -53,6 +53,38 @@ func (_m *Client) Delete(ctx context.Context, key catalog.Key) error { return r0 } +type Client_DeleteByArtifactID struct { + *mock.Call +} + +func (_m Client_DeleteByArtifactID) Return(_a0 error) *Client_DeleteByArtifactID { + return &Client_DeleteByArtifactID{Call: _m.Call.Return(_a0)} +} + +func (_m *Client) OnDeleteByArtifactID(ctx context.Context, datasetID *datacatalog.DatasetID, artifactID string) *Client_DeleteByArtifactID { + c_call := _m.On("DeleteByArtifactID", ctx, datasetID, artifactID) + return &Client_DeleteByArtifactID{Call: c_call} +} + +func (_m *Client) OnDeleteByArtifactIDMatch(matchers ...interface{}) *Client_DeleteByArtifactID { + c_call := _m.On("DeleteByArtifactID", matchers...) + return &Client_DeleteByArtifactID{Call: c_call} +} + +// DeleteByArtifactID provides a mock function with given fields: ctx, datasetID, artifactID +func (_m *Client) DeleteByArtifactID(ctx context.Context, datasetID *datacatalog.DatasetID, artifactID string) error { + ret := _m.Called(ctx, datasetID, artifactID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.DatasetID, string) error); ok { + r0 = rf(ctx, datasetID, artifactID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + type Client_DeleteByArtifactTag struct { *mock.Call } @@ -277,6 +309,38 @@ func (_m *Client) ReleaseReservation(ctx context.Context, key catalog.Key, owner return r0 } +type Client_ReleaseReservationByArtifactTag struct { + *mock.Call +} + +func (_m Client_ReleaseReservationByArtifactTag) Return(_a0 error) *Client_ReleaseReservationByArtifactTag { + return &Client_ReleaseReservationByArtifactTag{Call: _m.Call.Return(_a0)} +} + +func (_m *Client) OnReleaseReservationByArtifactTag(ctx context.Context, datasetID *datacatalog.DatasetID, artifactTag string, ownerID string) *Client_ReleaseReservationByArtifactTag { + c_call := _m.On("ReleaseReservationByArtifactTag", ctx, datasetID, artifactTag, ownerID) + return &Client_ReleaseReservationByArtifactTag{Call: c_call} +} + +func (_m *Client) OnReleaseReservationByArtifactTagMatch(matchers ...interface{}) *Client_ReleaseReservationByArtifactTag { + c_call := _m.On("ReleaseReservationByArtifactTag", matchers...) + return &Client_ReleaseReservationByArtifactTag{Call: c_call} +} + +// ReleaseReservationByArtifactTag provides a mock function with given fields: ctx, datasetID, artifactTag, ownerID +func (_m *Client) ReleaseReservationByArtifactTag(ctx context.Context, datasetID *datacatalog.DatasetID, artifactTag string, ownerID string) error { + ret := _m.Called(ctx, datasetID, artifactTag, ownerID) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *datacatalog.DatasetID, string, string) error); ok { + r0 = rf(ctx, datasetID, artifactTag, ownerID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + type Client_Update struct { *mock.Call }