diff --git a/go.mod b/go.mod index d3d8dcfc0..3d15572f8 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 github.com/flyteorg/flyteidl v0.21.11 - github.com/flyteorg/flyteplugins v0.8.0 + github.com/flyteorg/flyteplugins v0.8.1 github.com/flyteorg/flytestdlib v0.4.4 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible @@ -33,5 +33,3 @@ require ( ) replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d - -replace github.com/flyteorg/flyteplugins => github.com/hamersaw/flyteplugins v0.6.2-0.20211129132447-34fdd4b8118e diff --git a/go.sum b/go.sum index eeb21f664..b2378eecd 100644 --- a/go.sum +++ b/go.sum @@ -238,8 +238,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/flyteorg/flyteidl v0.21.11 h1:oH9YPoR7scO9GFF/I8D0gCTOB+JP5HRK7b7cLUBRz90= github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteplugins v0.8.0 h1:Jiy7Ugm9olGmm5OFAbbxv/VfVmYib3JqGdeytyoiwnU= -github.com/flyteorg/flyteplugins v0.8.0/go.mod h1:kOiuXk1ddIEVSPoHcc4kBfVQcLuyf8jw3vWJT2Was90= +github.com/flyteorg/flyteplugins v0.8.1 h1:wZ8JRWOXPZ2+O5LI2kxwkTaoxER2ag+iYpm5S8KLmww= +github.com/flyteorg/flyteplugins v0.8.1/go.mod h1:tmU5lkRQjftCNd7T4gHykh5zZNNTdrxNmQRSBrFWCyg= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/flyteorg/flytestdlib v0.3.36/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q= github.com/flyteorg/flytestdlib v0.4.4 h1:oPADei4KEjxtUqkTwrIjXB1nuH+JEKjwmwF92DSO3NM= @@ -444,18 +444,6 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= -github.com/hamersaw/flyteidl v0.19.26-0.20211103115633-100abab11c51 h1:e9zvtfNKr+K84a7du4wJgC+MXYcEsA13yYGMsEGsjQs= -github.com/hamersaw/flyteidl v0.19.26-0.20211103115633-100abab11c51/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/hamersaw/flyteidl v0.19.26-0.20211118002926-987e338423ab h1:1FC+p4ilWvLJZDm6DJeKWPzu+L+DRN0Y27osXgD5dzE= -github.com/hamersaw/flyteidl v0.19.26-0.20211118002926-987e338423ab/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/hamersaw/flyteplugins v0.6.2-0.20211103120611-e74f13d04613 h1:NSj2O4QOX5Y4t/d7ZuXcxSG2H5sHplcVuFLOmlv4/4o= -github.com/hamersaw/flyteplugins v0.6.2-0.20211103120611-e74f13d04613/go.mod h1:ERtmZ9DEtbc9eM8v2X+bqyLJ7eER4sGOCpYPeEIizso= -github.com/hamersaw/flyteplugins v0.6.2-0.20211118004720-b9783595a7ad h1:bUWcGrrmuiy+p+9KsMHxm3hilFNjSa4pLF0W+z/R7Lk= -github.com/hamersaw/flyteplugins v0.6.2-0.20211118004720-b9783595a7ad/go.mod h1:UBxUP9suj5xYnue88DyQj4Oneh2tZbQ/MAGWBs4wsg0= -github.com/hamersaw/flyteplugins v0.6.2-0.20211124155027-231b2dd33415 h1:VXE321w65mYF6OFYNjIal47LuB4yNsOTb0biq7geQ8Y= -github.com/hamersaw/flyteplugins v0.6.2-0.20211124155027-231b2dd33415/go.mod h1:tmU5lkRQjftCNd7T4gHykh5zZNNTdrxNmQRSBrFWCyg= -github.com/hamersaw/flyteplugins v0.6.2-0.20211129132447-34fdd4b8118e h1:HVeMzLTi+eVMNrfPTwXgHV8j+DNojgt/GCph2hdTPUA= -github.com/hamersaw/flyteplugins v0.6.2-0.20211129132447-34fdd4b8118e/go.mod h1:tmU5lkRQjftCNd7T4gHykh5zZNNTdrxNmQRSBrFWCyg= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go index 9097d4d15..d91936553 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go @@ -264,8 +264,9 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp return catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, EventCatalogMetadata(datasetID, tag, nil)), nil } -// Attempts to get a reservation for the cachable task. If you have previously acquired a reservation -// it will be extended. If another entity holds the reservation that is returned. +// GetOrExtendReservation attempts to get a reservation for the cachable task. If you have +// previously acquired a reservation it will be extended. If another entity holds the reservation +// that is returned. func (m *CatalogClient) GetOrExtendReservation(ctx context.Context, key catalog.Key, ownerID string, heartbeatInterval time.Duration) (*datacatalog.Reservation, error) { datasetID, err := GenerateDatasetIDForTask(ctx, key) if err != nil { @@ -303,6 +304,9 @@ func (m *CatalogClient) GetOrExtendReservation(ctx context.Context, key catalog. return response.Reservation, nil } +// ReleaseReservation attempts to release a reservation for a cachable task. If the reservation +// does not exist (e.x. it never existed or has been acquired by another owner) then this call +// still succeeds. func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key, ownerID string) error { datasetID, err := GenerateDatasetIDForTask(ctx, key) if err != nil { diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index be778060b..4b0cb9f85 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -94,6 +94,8 @@ func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) { } } +// PopulateReservationInfo sets the ReservationStatus of a requested plugin transition based on the +// provided ReservationEntry. func (p *pluginRequestedTransition) PopulateReservationInfo(entry catalog.ReservationEntry) { if p.execInfo.TaskNodeInfo == nil { p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{ diff --git a/pkg/controller/nodes/task/pre_post_execution.go b/pkg/controller/nodes/task/pre_post_execution.go index 7461b7074..1a6903b76 100644 --- a/pkg/controller/nodes/task/pre_post_execution.go +++ b/pkg/controller/nodes/task/pre_post_execution.go @@ -72,6 +72,9 @@ func (t *Handler) CheckCatalogCache(ctx context.Context, tr pluginCore.TaskReade return catalog.NewCatalogEntry(nil, cacheDisabled), nil } +// GetOrExtendCatalogReservation attempts to acquire an artifact reservation if the task is +// cachable and cache serializable. If the reservation already exists for this owner, the +// reservation is extended. func (t *Handler) GetOrExtendCatalogReservation(ctx context.Context, ownerID string, heartbeatInterval time.Duration, tr pluginCore.TaskReader, inputReader io.InputReader) (catalog.ReservationEntry, error) { tk, err := tr.Read(ctx) if err != nil { @@ -230,6 +233,9 @@ func (t *Handler) ValidateOutputAndCacheAdd(ctx context.Context, nodeID v1alpha1 return s, nil, nil } +// ReleaseCatalogReservation attempts to release an artifact reservation if the task is cachable +// and cache serializable. If the reservation does not exist for this owner (e.x. it never existed +// or has been acquired by another owner) this call is still successful. func (t *Handler) ReleaseCatalogReservation(ctx context.Context, ownerID string, tr pluginCore.TaskReader, inputReader io.InputReader) (catalog.ReservationEntry, error) { tk, err := tr.Read(ctx) if err != nil {