Skip to content

Commit

Permalink
Cache Serialize API (flyteorg#326)
Browse files Browse the repository at this point in the history
* added cache reservation logic to task execution

Signed-off-by: Daniel Rammer <[email protected]>

* setting reservation heartbeat to workflow reeval duration

Signed-off-by: Daniel Rammer <[email protected]>

* releasing reservation on completion

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint errors

Signed-off-by: Daniel Rammer <[email protected]>

* updated task metadata field to DiscoverySerializable in accordance with name change

Signed-off-by: Daniel Rammer <[email protected]>

* not allowing cache serialization without specifying cachable as well

Signed-off-by: Daniel Rammer <[email protected]>

* added datacatalog unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* added handler unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl and flyteplugins versions - change before merging

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl and flyteplugin version to reflect latest changes

Signed-off-by: Daniel Rammer <[email protected]>

* remove flyteidl replace in go.mod and updating to latest version

Signed-off-by: Daniel Rammer <[email protected]>

* moving release reservation to node finalize function

Signed-off-by: Daniel Rammer <[email protected]>

* updated go.mod and added missing docs on public types

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Dec 1, 2021
1 parent dc5d314 commit 709bb00
Show file tree
Hide file tree
Showing 9 changed files with 744 additions and 115 deletions.
4 changes: 2 additions & 2 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.21.4
github.com/flyteorg/flyteplugins v0.8.0
github.com/flyteorg/flyteidl v0.21.11
github.com/flyteorg/flyteplugins v0.8.1
github.com/flyteorg/flytestdlib v0.4.4
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
8 changes: 4 additions & 4 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.21.4 h1:gtJK5rX2ydLAo2xLRHHznOSLuLHrRRdXDbpEAlxluhk=
github.com/flyteorg/flyteidl v0.21.4/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.8.0 h1:Jiy7Ugm9olGmm5OFAbbxv/VfVmYib3JqGdeytyoiwnU=
github.com/flyteorg/flyteplugins v0.8.0/go.mod h1:kOiuXk1ddIEVSPoHcc4kBfVQcLuyf8jw3vWJT2Was90=
github.com/flyteorg/flyteidl v0.21.11 h1:oH9YPoR7scO9GFF/I8D0gCTOB+JP5HRK7b7cLUBRz90=
github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.8.1 h1:wZ8JRWOXPZ2+O5LI2kxwkTaoxER2ag+iYpm5S8KLmww=
github.com/flyteorg/flyteplugins v0.8.1/go.mod h1:tmU5lkRQjftCNd7T4gHykh5zZNNTdrxNmQRSBrFWCyg=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.3.36/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q=
github.com/flyteorg/flytestdlib v0.4.4 h1:oPADei4KEjxtUqkTwrIjXB1nuH+JEKjwmwF92DSO3NM=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,85 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp
return catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, EventCatalogMetadata(datasetID, tag, nil)), nil
}

// GetOrExtendReservation attempts to get a reservation for the cachable task. If you have
// previously acquired a reservation it will be extended. If another entity holds the reservation
// that is returned.
func (m *CatalogClient) GetOrExtendReservation(ctx context.Context, key catalog.Key, ownerID string, heartbeatInterval time.Duration) (*datacatalog.Reservation, error) {
datasetID, err := GenerateDatasetIDForTask(ctx, key)
if err != nil {
return nil, err
}

inputs := &core.LiteralMap{}
if key.TypedInterface.Inputs != nil {
retInputs, err := key.InputReader.Get(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to read inputs when trying to query catalog")
}
inputs = retInputs
}

tag, err := GenerateArtifactTagName(ctx, inputs)
if err != nil {
return nil, err
}

reservationQuery := &datacatalog.GetOrExtendReservationRequest{
ReservationId: &datacatalog.ReservationID{
DatasetId: datasetID,
TagName: tag,
},
OwnerId: ownerID,
HeartbeatInterval: ptypes.DurationProto(heartbeatInterval),
}

response, err := m.client.GetOrExtendReservation(ctx, reservationQuery)
if err != nil {
return nil, err
}

return response.Reservation, nil
}

// ReleaseReservation attempts to release a reservation for a cachable task. If the reservation
// does not exist (e.x. it never existed or has been acquired by another owner) then this call
// still succeeds.
func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key, ownerID string) error {
datasetID, err := GenerateDatasetIDForTask(ctx, key)
if err != nil {
return err
}

inputs := &core.LiteralMap{}
if key.TypedInterface.Inputs != nil {
retInputs, err := key.InputReader.Get(ctx)
if err != nil {
return errors.Wrap(err, "failed to read inputs when trying to query catalog")
}
inputs = retInputs
}

tag, err := GenerateArtifactTagName(ctx, inputs)
if err != nil {
return err
}

reservationQuery := &datacatalog.ReleaseReservationRequest{
ReservationId: &datacatalog.ReservationID{
DatasetId: datasetID,
TagName: tag,
},
OwnerId: ownerID,
}

_, err = m.client.ReleaseReservation(ctx, reservationQuery)
if err != nil {
return err
}

return nil
}

// Create a new Datacatalog client for task execution caching
func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration) (*CatalogClient, error) {
var opts []grpc.DialOption
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,3 +539,132 @@ func TestCatalog_Put(t *testing.T) {
})

}

var tagName = "flyte_cached-BE6CZsMk6N3ExR_4X9EuwBgj2Jh2UwasXK3a_pM9xlY"
var reservationID = datacatalog.ReservationID{
DatasetId: datasetID,
TagName: tagName,
}
var prevOwner = "prevOwner"
var currentOwner = "currentOwner"

func TestCatalog_GetOrExtendReservation(t *testing.T) {
ctx := context.Background()

heartbeatInterval := time.Second * 5
prevReservation := datacatalog.Reservation{
ReservationId: &reservationID,
OwnerId: prevOwner,
}

currentReservation := datacatalog.Reservation{
ReservationId: &reservationID,
OwnerId: currentOwner,
}

t.Run("CreateOrUpdateReservation", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)

mockClient := &mocks.DataCatalogClient{}
catalogClient := &CatalogClient{
client: mockClient,
}

mockClient.On("GetOrExtendReservation",
ctx,
mock.MatchedBy(func(o *datacatalog.GetOrExtendReservationRequest) bool {
assert.EqualValues(t, datasetID.String(), o.ReservationId.DatasetId.String())
assert.EqualValues(t, tagName, o.ReservationId.TagName)
return true
}),
).Return(&datacatalog.GetOrExtendReservationResponse{Reservation: &currentReservation}, nil, "")

newKey := sampleKey
newKey.InputReader = ir
reservation, err := catalogClient.GetOrExtendReservation(ctx, newKey, currentOwner, heartbeatInterval)

assert.NoError(t, err)
assert.Equal(t, reservation.OwnerId, currentOwner)
})

t.Run("ExistingReservation", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)

mockClient := &mocks.DataCatalogClient{}
catalogClient := &CatalogClient{
client: mockClient,
}

mockClient.On("GetOrExtendReservation",
ctx,
mock.MatchedBy(func(o *datacatalog.GetOrExtendReservationRequest) bool {
assert.EqualValues(t, datasetID.String(), o.ReservationId.DatasetId.String())
assert.EqualValues(t, tagName, o.ReservationId.TagName)
return true
}),
).Return(&datacatalog.GetOrExtendReservationResponse{Reservation: &prevReservation}, nil, "")

newKey := sampleKey
newKey.InputReader = ir
reservation, err := catalogClient.GetOrExtendReservation(ctx, newKey, currentOwner, heartbeatInterval)

assert.NoError(t, err)
assert.Equal(t, reservation.OwnerId, prevOwner)
})
}

func TestCatalog_ReleaseReservation(t *testing.T) {
ctx := context.Background()

t.Run("ReleaseReservation", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)

mockClient := &mocks.DataCatalogClient{}
catalogClient := &CatalogClient{
client: mockClient,
}

mockClient.On("ReleaseReservation",
ctx,
mock.MatchedBy(func(o *datacatalog.ReleaseReservationRequest) bool {
assert.EqualValues(t, datasetID.String(), o.ReservationId.DatasetId.String())
assert.EqualValues(t, tagName, o.ReservationId.TagName)
return true
}),
).Return(&datacatalog.ReleaseReservationResponse{}, nil, "")

newKey := sampleKey
newKey.InputReader = ir
err := catalogClient.ReleaseReservation(ctx, newKey, currentOwner)

assert.NoError(t, err)
})

t.Run("ReleaseReservationFailure", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)

mockClient := &mocks.DataCatalogClient{}
catalogClient := &CatalogClient{
client: mockClient,
}

mockClient.On("ReleaseReservation",
ctx,
mock.MatchedBy(func(o *datacatalog.ReleaseReservationRequest) bool {
assert.EqualValues(t, datasetID.String(), o.ReservationId.DatasetId.String())
assert.EqualValues(t, tagName, o.ReservationId.TagName)
return true
}),
).Return(nil, status.Error(codes.NotFound, "reservation not found"))

newKey := sampleKey
newKey.InputReader = ir
err := catalogClient.ReleaseReservation(ctx, newKey, currentOwner)

assertGrpcErr(t, err, codes.NotFound)
})
}
10 changes: 10 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/catalog/noop_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package catalog

import (
"context"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io"
)
Expand All @@ -20,3 +22,11 @@ func (n NOOPCatalog) Get(_ context.Context, _ catalog.Key) (catalog.Entry, error
func (n NOOPCatalog) Put(_ context.Context, _ catalog.Key, _ io.OutputReader, _ catalog.Metadata) (catalog.Status, error) {
return disabledStatus, nil
}

func (n NOOPCatalog) GetOrExtendReservation(_ context.Context, _ catalog.Key, _ string, _ time.Duration) (*datacatalog.Reservation, error) {
return nil, nil
}

func (n NOOPCatalog) ReleaseReservation(_ context.Context, _ catalog.Key, _ string) error {
return nil
}
Loading

0 comments on commit 709bb00

Please sign in to comment.