From 114a45e4061dd178d31de395052cdd7b0677469a Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Sun, 7 Jan 2024 20:13:15 -0800 Subject: [PATCH] only support cache eviction for single task Signed-off-by: Paul Dittamo --- flyteadmin/pkg/manager/impl/cache_manager.go | 197 +-- .../pkg/manager/impl/cache_manager_test.go | 1138 +---------------- flyteadmin/pkg/rpc/cacheservice/base.go | 1 - 3 files changed, 79 insertions(+), 1257 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/cache_manager.go b/flyteadmin/pkg/manager/impl/cache_manager.go index a8334fb389..45d5705e68 100644 --- a/flyteadmin/pkg/manager/impl/cache_manager.go +++ b/flyteadmin/pkg/manager/impl/cache_manager.go @@ -2,7 +2,6 @@ package impl import ( "context" - "strings" "time" "github.com/golang/protobuf/proto" @@ -20,7 +19,6 @@ import ( "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/catalog" - "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flyte/flytestdlib/catalog/datacatalog" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/promutils" @@ -30,11 +28,7 @@ var ( _ interfaces.CacheInterface = &CacheManager{} ) -var sortByCreatedAtAsc = &admin.Sort{Key: shared.CreatedAt, Direction: admin.Sort_ASCENDING} - const ( - nodeExecutionLimit = 10000 - taskExecutionLimit = 10000 reservationHeartbeat = 10 * time.Second ) @@ -59,39 +53,23 @@ func (m *CacheManager) EvictTaskExecutionCache(ctx context.Context, req service. } ctx = getTaskExecutionContext(ctx, req.TaskExecutionId) + var evictionErrors []*core.CacheEvictionError - // Sanity check to ensure referenced task execution exists although only encapsulated node execution is strictly required - _, err := util.GetTaskExecutionModel(ctx, m.db, req.TaskExecutionId) + taskExecutionModel, err := util.GetTaskExecutionModel(ctx, m.db, req.TaskExecutionId) if err != nil { logger.Debugf(ctx, "Failed to get task execution model for task execution ID %+v: %v", req.TaskExecutionId, err) return nil, err } - logger.Debugf(ctx, "Starting to evict cache for execution %+v of task %+v", req.TaskExecutionId, req.TaskExecutionId.TaskId) - nodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, req.TaskExecutionId.NodeExecutionId) if err != nil { logger.Debugf(ctx, "Failed to get node execution model for node execution ID %+v: %v", req.TaskExecutionId.NodeExecutionId, err) return nil, err } - evictionErrors := m.evictNodeExecutionCache(ctx, *nodeExecutionModel, nil) - - logger.Debugf(ctx, "Finished evicting cache for execution %+v of task %+v", req.TaskExecutionId, req.TaskExecutionId.TaskId) - return &service.EvictCacheResponse{ - Errors: &core.CacheEvictionErrorList{Errors: evictionErrors}, - }, nil -} - -func (m *CacheManager) evictNodeExecutionCache(ctx context.Context, nodeExecutionModel models.NodeExecution, - evictionErrors []*core.CacheEvictionError) []*core.CacheEvictionError { - if strings.HasSuffix(nodeExecutionModel.NodeID, v1alpha1.StartNodeID) || strings.HasSuffix(nodeExecutionModel.NodeID, v1alpha1.EndNodeID) { - return evictionErrors - } - - nodeExecution, err := transformers.FromNodeExecutionModel(nodeExecutionModel, transformers.DefaultExecutionTransformerOptions) + nodeExecution, err := transformers.FromNodeExecutionModel(*nodeExecutionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { - logger.Warnf(ctx, "Failed to transform node execution model %+v: %v", + logger.Debugf(ctx, "Failed to transform node execution model %+v: %v", nodeExecutionModel.NodeExecutionKey, err) m.metrics.CacheEvictionFailures.Inc() evictionErrors = append(evictionErrors, &core.CacheEvictionError{ @@ -106,83 +84,35 @@ func (m *CacheManager) evictNodeExecutionCache(ctx context.Context, nodeExecutio Code: core.CacheEvictionError_INTERNAL, Message: "Internal error", }) - return evictionErrors - } - - logger.Debugf(ctx, "Starting to evict cache for node execution %+v", nodeExecution.Id) - t := m.metrics.CacheEvictionTime.Start() - defer t.Stop() - - if nodeExecution.Metadata.IsParentNode { - var childNodeExecutions []models.NodeExecution - if nodeExecution.Metadata.IsDynamic { - var err error - childNodeExecutions, err = m.listAllNodeExecutionsForWorkflow(ctx, nodeExecution.Id.ExecutionId, nodeExecution.Id.NodeId) - if err != nil { - logger.Warnf(ctx, "Failed to list child node executions for dynamic node execution %+v: %v", - nodeExecution.Id, err) - m.metrics.CacheEvictionFailures.Inc() - evictionErrors = append(evictionErrors, &core.CacheEvictionError{ - NodeExecutionId: nodeExecution.Id, - Source: &core.CacheEvictionError_WorkflowExecutionId{ - WorkflowExecutionId: nodeExecution.Id.ExecutionId, - }, - Code: core.CacheEvictionError_INTERNAL, - Message: "Failed to list child node executions", - }) - return evictionErrors - } - } else { - childNodeExecutions = nodeExecutionModel.ChildNodeExecutions - } - for _, childNodeExecution := range childNodeExecutions { - evictionErrors = m.evictNodeExecutionCache(ctx, childNodeExecution, evictionErrors) - } + return &service.EvictCacheResponse{ + Errors: &core.CacheEvictionErrorList{Errors: evictionErrors}, + }, nil } - taskExecutions, err := m.listAllTaskExecutions(ctx, nodeExecution.Id) + taskExecution, err := transformers.FromTaskExecutionModel(*taskExecutionModel, transformers.DefaultExecutionTransformerOptions) if err != nil { - logger.Warnf(ctx, "Failed to list task executions for node execution %+v: %v", - nodeExecution.Id, err) + logger.Debugf(ctx, "Failed to transform task execution model %+v: %v", + taskExecutionModel.TaskExecutionKey, err) m.metrics.CacheEvictionFailures.Inc() evictionErrors = append(evictionErrors, &core.CacheEvictionError{ NodeExecutionId: nodeExecution.Id, - Source: &core.CacheEvictionError_WorkflowExecutionId{ - WorkflowExecutionId: nodeExecution.Id.ExecutionId, + Source: &core.CacheEvictionError_TaskExecutionId{ + TaskExecutionId: taskExecution.Id, }, Code: core.CacheEvictionError_INTERNAL, - Message: "Failed to list task executions", + Message: "Internal error", }) - return evictionErrors + return &service.EvictCacheResponse{ + Errors: &core.CacheEvictionErrorList{Errors: evictionErrors}, + }, nil } - switch md := nodeExecution.GetClosure().GetTargetMetadata().(type) { - case *admin.NodeExecutionClosure_TaskNodeMetadata: - for _, taskExecutionModel := range taskExecutions { - taskExecution, err := transformers.FromTaskExecutionModel(taskExecutionModel, transformers.DefaultExecutionTransformerOptions) - if err != nil { - logger.Warnf(ctx, "Failed to transform task execution model %+v: %v", - taskExecutionModel.TaskExecutionKey, err) - m.metrics.CacheEvictionFailures.Inc() - evictionErrors = append(evictionErrors, &core.CacheEvictionError{ - NodeExecutionId: nodeExecution.Id, - Source: &core.CacheEvictionError_TaskExecutionId{ - TaskExecutionId: taskExecution.Id, - }, - Code: core.CacheEvictionError_INTERNAL, - Message: "Internal error", - }) - return evictionErrors - } + logger.Debugf(ctx, "Starting to evict cache for execution %+v of task %+v", req.TaskExecutionId, req.TaskExecutionId.TaskId) - evictionErrors = m.evictTaskNodeExecutionCache(ctx, nodeExecutionModel, nodeExecution, taskExecution, - md.TaskNodeMetadata, evictionErrors) - } - case *admin.NodeExecutionClosure_WorkflowNodeMetadata: - evictionErrors = m.evictWorkflowNodeExecutionCache(ctx, nodeExecution, md.WorkflowNodeMetadata, evictionErrors) - default: - logger.Errorf(ctx, "Invalid target metadata type %T for node execution closure %+v for node execution %+v", - md, md, nodeExecution.Id) + metadata, ok := nodeExecution.GetClosure().GetTargetMetadata().(*admin.NodeExecutionClosure_TaskNodeMetadata) + if !ok { + logger.Debugf(ctx, "Node execution %+v did not contain task node metadata, skipping cache eviction", + nodeExecution.Id) m.metrics.CacheEvictionFailures.Inc() evictionErrors = append(evictionErrors, &core.CacheEvictionError{ NodeExecutionId: nodeExecution.Id, @@ -192,9 +122,16 @@ func (m *CacheManager) evictNodeExecutionCache(ctx context.Context, nodeExecutio Code: core.CacheEvictionError_INTERNAL, Message: "Internal error", }) + return &service.EvictCacheResponse{ + Errors: &core.CacheEvictionErrorList{Errors: evictionErrors}, + }, nil } - return evictionErrors + evictionErrors = m.evictTaskNodeExecutionCache(ctx, *nodeExecutionModel, nodeExecution, taskExecution, metadata.TaskNodeMetadata, nil) + logger.Debugf(ctx, "Finished evicting cache for execution %+v of task %+v", req.TaskExecutionId, req.TaskExecutionId.TaskId) + return &service.EvictCacheResponse{ + Errors: &core.CacheEvictionErrorList{Errors: evictionErrors}, + }, nil } func (m *CacheManager) evictTaskNodeExecutionCache(ctx context.Context, nodeExecutionModel models.NodeExecution, @@ -333,82 +270,6 @@ func (m *CacheManager) evictTaskNodeExecutionCache(ctx context.Context, nodeExec return evictionErrors } -func (m *CacheManager) evictWorkflowNodeExecutionCache(ctx context.Context, nodeExecution *admin.NodeExecution, - workflowNodeMetadata *admin.WorkflowNodeMetadata, evictionErrors []*core.CacheEvictionError) []*core.CacheEvictionError { - if workflowNodeMetadata == nil { - logger.Debugf(ctx, "Node execution %+v did not contain cached data, skipping cache eviction", - nodeExecution.Id) - return evictionErrors - } - - childNodeExecutions, err := m.listAllNodeExecutionsForWorkflow(ctx, workflowNodeMetadata.GetExecutionId(), "") - if err != nil { - logger.Debugf(ctx, "Failed to list child executions for node execution %+v of workflow %+v: %v", - nodeExecution.Id, workflowNodeMetadata.GetExecutionId(), err) - m.metrics.CacheEvictionFailures.Inc() - evictionErrors = append(evictionErrors, &core.CacheEvictionError{ - NodeExecutionId: nodeExecution.Id, - Source: &core.CacheEvictionError_WorkflowExecutionId{ - WorkflowExecutionId: workflowNodeMetadata.GetExecutionId(), - }, - Code: core.CacheEvictionError_INTERNAL, - Message: "Failed to evict child executions for workflow", - }) - return evictionErrors - } - for _, childNodeExecution := range childNodeExecutions { - evictionErrors = m.evictNodeExecutionCache(ctx, childNodeExecution, evictionErrors) - } - - logger.Debugf(ctx, "Successfully evicted cache for workflow node execution %+v", nodeExecution.Id) - m.metrics.CacheEvictionSuccess.Inc() - - return evictionErrors -} - -func (m *CacheManager) listAllNodeExecutionsForWorkflow(ctx context.Context, - workflowExecutionID *core.WorkflowExecutionIdentifier, uniqueParentID string) ([]models.NodeExecution, error) { - var nodeExecutions []models.NodeExecution - var token string - for { - executions, newToken, err := util.ListNodeExecutionsForWorkflow(ctx, m.db, workflowExecutionID, - uniqueParentID, "", nodeExecutionLimit, token, sortByCreatedAtAsc) - if err != nil { - return nil, err - } - - nodeExecutions = append(nodeExecutions, executions...) - if len(newToken) == 0 { - // empty token is returned once no more node executions are available - break - } - token = newToken - } - - return nodeExecutions, nil -} - -func (m *CacheManager) listAllTaskExecutions(ctx context.Context, nodeExecutionID *core.NodeExecutionIdentifier) ([]models.TaskExecution, error) { - var taskExecutions []models.TaskExecution - var token string - for { - executions, newToken, err := util.ListTaskExecutions(ctx, m.db, nodeExecutionID, "", - taskExecutionLimit, token, sortByCreatedAtAsc) - if err != nil { - return nil, err - } - - taskExecutions = append(taskExecutions, executions...) - if len(newToken) == 0 { - // empty token is returned once no more task executions are available - break - } - token = newToken - } - - return taskExecutions, nil -} - func NewCacheManager(db repoInterfaces.Repository, config runtimeInterfaces.Configuration, catalogClient catalog.Client, scope promutils.Scope) interfaces.CacheInterface { metrics := cacheMetrics{ diff --git a/flyteadmin/pkg/manager/impl/cache_manager_test.go b/flyteadmin/pkg/manager/impl/cache_manager_test.go index f580b43f27..dad2d2da93 100644 --- a/flyteadmin/pkg/manager/impl/cache_manager_test.go +++ b/flyteadmin/pkg/manager/impl/cache_manager_test.go @@ -52,17 +52,8 @@ func ptr[T any](val T) *T { return &val } -func setupCacheEvictionMockRepositories(t *testing.T, repository interfaces.Repository, executionModel *models.Execution, - nodeExecutionModels []models.NodeExecution, taskExecutionModels map[string][]models.TaskExecution) map[string]int { - if executionModel != nil { - repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback( - func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { - assert.Equal(t, executionIdentifier.Domain, input.Domain) - assert.Equal(t, executionIdentifier.Name, input.Name) - assert.Equal(t, executionIdentifier.Project, input.Project) - return *executionModel, nil - }) - } +func setupCacheEvictionMockRepositories(t *testing.T, repository interfaces.Repository, nodeExecutionModels []models.NodeExecution, + taskExecutionModels map[string][]models.TaskExecution) map[string]int { repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetCallback( func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) { @@ -265,8 +256,7 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, } - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) + updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nodeExecutionModels, taskExecutionModels) deletedArtifactIDs := setupCacheEvictionCatalogClient(t, catalogClient, artifactTags, taskExecutionModels) cacheManager := NewCacheManager(repository, mockConfig, catalogClient, promutils.NewTestScope()) @@ -386,8 +376,7 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, } - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) + updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nodeExecutionModels, taskExecutionModels) cacheManager := NewCacheManager(repository, mockConfig, catalogClient, promutils.NewTestScope()) request := service.EvictTaskExecutionCacheRequest{ @@ -583,8 +572,7 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, } - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) + updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nodeExecutionModels, taskExecutionModels) cacheManager := NewCacheManager(repository, mockConfig, catalogClient, promutils.NewTestScope()) request := service.EvictTaskExecutionCacheRequest{ @@ -611,7 +599,7 @@ func TestEvictTaskExecutionCache(t *testing.T) { assert.Empty(t, updatedNodeExecutions) }) - t.Run("subtask with partially cached results", func(t *testing.T) { + t.Run("idempotency", func(t *testing.T) { repository := repositoryMocks.NewMockRepository() catalogClient := &mocks.Client{} mockConfig := getMockExecutionsConfigProvider() @@ -621,14 +609,6 @@ func TestEvictTaskExecutionCache(t *testing.T) { ArtifactId: "0285ddb9-ddfb-4835-bc22-80e1bdf7f560", Name: "flyte_cached-G3ACyxqY0U3sEf99tLMta5vuLCOk7j9O7MStxubzxYM", }, - { - ArtifactId: "4074be3e-7cee-4a7b-8c45-56577fa32f24", - Name: "flyte_cached-G3ACyxqY0U3sEf99tLMta5vuLCOk7j9O7MStxubzxYN", - }, - { - ArtifactId: "a8bd60d5-b2bb-4b06-a2ac-240d183a4ca8", - Name: "flyte_cached-G3ACyxqY0U3sEf99tLMta5vuLCOk7j9O7MStxubzxYO", - }, } nodeExecutionModels := []models.NodeExecution{ @@ -646,7 +626,7 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, Phase: core.NodeExecution_SUCCEEDED.String(), NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: true, + IsParentNode: false, IsDynamic: false, SpecNodeId: "n0", }), @@ -667,176 +647,6 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, }, }), - ChildNodeExecutions: []models.NodeExecution{ - { - BaseModel: models.BaseModel{ - ID: 2, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-start-node", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "start-node", - }), - }, - { - BaseModel: models.BaseModel{ - ID: 3, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n0", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "n0", - }), - Closure: serializeNodeExecutionClosure(t, &admin.NodeExecutionClosure{ - TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ - TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED, - CatalogKey: &core.CatalogMetadata{ - DatasetId: &core.Identifier{ - ResourceType: core.ResourceType_DATASET, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache_sub", - Version: "version", - }, - ArtifactTag: artifactTags[1], - SourceExecution: &core.CatalogMetadata_SourceTaskExecution{ - SourceTaskExecution: &core.TaskExecutionIdentifier{ - TaskId: &core.Identifier{ - ResourceType: core.ResourceType_TASK, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionId: &core.NodeExecutionIdentifier{ - NodeId: "dn0", - ExecutionId: &executionIdentifier, - }, - RetryAttempt: 0, - }, - }, - }, - }, - }, - }), - }, - { - BaseModel: models.BaseModel{ - ID: 4, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n1", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "n1", - }), - Closure: serializeNodeExecutionClosure(t, &admin.NodeExecutionClosure{ - TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ - TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: core.CatalogCacheStatus_CACHE_DISABLED, - }, - }, - }), - }, - { - BaseModel: models.BaseModel{ - ID: 5, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n2", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "n2", - }), - Closure: serializeNodeExecutionClosure(t, &admin.NodeExecutionClosure{ - TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ - TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED, - CatalogKey: &core.CatalogMetadata{ - DatasetId: &core.Identifier{ - ResourceType: core.ResourceType_DATASET, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache_sub", - Version: "version", - }, - ArtifactTag: artifactTags[2], - SourceExecution: &core.CatalogMetadata_SourceTaskExecution{ - SourceTaskExecution: &core.TaskExecutionIdentifier{ - TaskId: &core.Identifier{ - ResourceType: core.ResourceType_TASK, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionId: &core.NodeExecutionIdentifier{ - NodeId: "dn2", - ExecutionId: &executionIdentifier, - }, - RetryAttempt: 0, - }, - }, - }, - }, - }, - }), - }, - { - BaseModel: models.BaseModel{ - ID: 6, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-end-node", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "end-node", - }), - }, - }, }, } @@ -846,96 +656,6 @@ func TestEvictTaskExecutionCache(t *testing.T) { BaseModel: models.BaseModel{ ID: 1, }, - TaskExecutionKey: models.TaskExecutionKey{ - TaskKey: models.TaskKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache_single_task", - Version: "version", - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0", - }, - RetryAttempt: ptr[uint32](0), - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - Closure: serializeTaskExecutionClosure(t, &admin.TaskExecutionClosure{ - Metadata: &event.TaskExecutionMetadata{ - GeneratedName: "name-n0-0-n0-0", - }, - }), - }, - }, - "n0-0-n0": { - { - BaseModel: models.BaseModel{ - ID: 2, - }, - TaskExecutionKey: models.TaskExecutionKey{ - TaskKey: models.TaskKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n0", - }, - RetryAttempt: ptr[uint32](0), - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - Closure: serializeTaskExecutionClosure(t, &admin.TaskExecutionClosure{ - Metadata: &event.TaskExecutionMetadata{ - GeneratedName: "name-n0-0-n0-0", - }, - }), - }, - }, - "n0-0-n1": { - { - BaseModel: models.BaseModel{ - ID: 2, - }, - TaskExecutionKey: models.TaskExecutionKey{ - TaskKey: models.TaskKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n1", - }, - RetryAttempt: ptr[uint32](0), - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - Closure: serializeTaskExecutionClosure(t, &admin.TaskExecutionClosure{ - Metadata: &event.TaskExecutionMetadata{ - GeneratedName: "name-n0-0-n1-0", - }, - }), - }, - }, - "n0-0-n2": { - { - BaseModel: models.BaseModel{ - ID: 2, - }, TaskExecutionKey: models.TaskExecutionKey{ TaskKey: models.TaskKey{ Project: executionIdentifier.Project, @@ -949,22 +669,21 @@ func TestEvictTaskExecutionCache(t *testing.T) { Domain: executionIdentifier.Domain, Name: executionIdentifier.Name, }, - NodeID: "n0-0-n2", + NodeID: "n0", }, RetryAttempt: ptr[uint32](0), }, Phase: core.NodeExecution_SUCCEEDED.String(), Closure: serializeTaskExecutionClosure(t, &admin.TaskExecutionClosure{ Metadata: &event.TaskExecutionMetadata{ - GeneratedName: "name-n0-0-n2-0", + GeneratedName: "name-n0-0", }, }), }, }, } - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) + updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nodeExecutionModels, taskExecutionModels) deletedArtifactIDs := setupCacheEvictionCatalogClient(t, catalogClient, artifactTags, taskExecutionModels) cacheManager := NewCacheManager(repository, mockConfig, catalogClient, promutils.NewTestScope()) @@ -989,10 +708,10 @@ func TestEvictTaskExecutionCache(t *testing.T) { require.NotNil(t, resp) assert.Empty(t, resp.GetErrors().GetErrors()) - assert.Contains(t, updatedNodeExecutions, "n0") - assert.Contains(t, updatedNodeExecutions, "n0-0-n0") - assert.Contains(t, updatedNodeExecutions, "n0-0-n2") - assert.Len(t, updatedNodeExecutions, 3) + for nodeID := range taskExecutionModels { + assert.Contains(t, updatedNodeExecutions, nodeID) + } + assert.Len(t, updatedNodeExecutions, len(taskExecutionModels)) for _, artifactTag := range artifactTags { assert.Equal(t, 1, deletedArtifactIDs[artifactTag.GetArtifactId()]) @@ -1000,7 +719,7 @@ func TestEvictTaskExecutionCache(t *testing.T) { assert.Len(t, deletedArtifactIDs, len(artifactTags)) }) - t.Run("idempotency", func(t *testing.T) { + t.Run("reserved artifact", func(t *testing.T) { repository := repositoryMocks.NewMockRepository() catalogClient := &mocks.Client{} mockConfig := getMockExecutionsConfigProvider() @@ -1084,9 +803,10 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, } - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) - deletedArtifactIDs := setupCacheEvictionCatalogClient(t, catalogClient, artifactTags, taskExecutionModels) + updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nodeExecutionModels, taskExecutionModels) + + catalogClient.On("GetOrExtendReservationByArtifactTag", mock.Anything, mock.Anything, mock.Anything, + mock.Anything, mock.Anything).Return(&datacatalog.Reservation{OwnerId: "otherOwnerID"}, nil) cacheManager := NewCacheManager(repository, mockConfig, catalogClient, promutils.NewTestScope()) request := service.EvictTaskExecutionCacheRequest{ @@ -1108,20 +828,18 @@ func TestEvictTaskExecutionCache(t *testing.T) { resp, err := cacheManager.EvictTaskExecutionCache(context.Background(), request) require.NoError(t, err) require.NotNil(t, resp) - assert.Empty(t, resp.GetErrors().GetErrors()) - - for nodeID := range taskExecutionModels { - assert.Contains(t, updatedNodeExecutions, nodeID) - } - assert.Len(t, updatedNodeExecutions, len(taskExecutionModels)) + assert.Len(t, resp.GetErrors().GetErrors(), len(artifactTags)) + eErr := resp.GetErrors().GetErrors()[0] + assert.Equal(t, core.CacheEvictionError_RESERVATION_NOT_ACQUIRED, eErr.Code) + assert.Equal(t, "n0", eErr.NodeExecutionId.NodeId) + assert.True(t, proto.Equal(&executionIdentifier, eErr.NodeExecutionId.ExecutionId)) + require.NotNil(t, eErr.Source) + assert.IsType(t, &core.CacheEvictionError_TaskExecutionId{}, eErr.Source) - for _, artifactTag := range artifactTags { - assert.Equal(t, 1, deletedArtifactIDs[artifactTag.GetArtifactId()]) - } - assert.Len(t, deletedArtifactIDs, len(artifactTags)) + assert.Empty(t, updatedNodeExecutions) }) - t.Run("reserved artifact", func(t *testing.T) { + t.Run("unknown artifact", func(t *testing.T) { repository := repositoryMocks.NewMockRepository() catalogClient := &mocks.Client{} mockConfig := getMockExecutionsConfigProvider() @@ -1205,145 +923,23 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, } - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) + updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nodeExecutionModels, taskExecutionModels) - catalogClient.On("GetOrExtendReservationByArtifactTag", mock.Anything, mock.Anything, mock.Anything, - mock.Anything, mock.Anything).Return(&datacatalog.Reservation{OwnerId: "otherOwnerID"}, nil) - - cacheManager := NewCacheManager(repository, mockConfig, catalogClient, promutils.NewTestScope()) - request := service.EvictTaskExecutionCacheRequest{ - TaskExecutionId: &core.TaskExecutionIdentifier{ - TaskId: &core.Identifier{ - ResourceType: core.ResourceType_TASK, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - Version: version, - }, - NodeExecutionId: &core.NodeExecutionIdentifier{ - ExecutionId: &executionIdentifier, - NodeId: "n0", - }, - RetryAttempt: uint32(0), - }, - } - resp, err := cacheManager.EvictTaskExecutionCache(context.Background(), request) - require.NoError(t, err) - require.NotNil(t, resp) - assert.Len(t, resp.GetErrors().GetErrors(), len(artifactTags)) - eErr := resp.GetErrors().GetErrors()[0] - assert.Equal(t, core.CacheEvictionError_RESERVATION_NOT_ACQUIRED, eErr.Code) - assert.Equal(t, "n0", eErr.NodeExecutionId.NodeId) - assert.True(t, proto.Equal(&executionIdentifier, eErr.NodeExecutionId.ExecutionId)) - require.NotNil(t, eErr.Source) - assert.IsType(t, &core.CacheEvictionError_TaskExecutionId{}, eErr.Source) - - assert.Empty(t, updatedNodeExecutions) - }) - - t.Run("unknown artifact", func(t *testing.T) { - repository := repositoryMocks.NewMockRepository() - catalogClient := &mocks.Client{} - mockConfig := getMockExecutionsConfigProvider() - - artifactTags := []*core.CatalogArtifactTag{ - { - ArtifactId: "0285ddb9-ddfb-4835-bc22-80e1bdf7f560", - Name: "flyte_cached-G3ACyxqY0U3sEf99tLMta5vuLCOk7j9O7MStxubzxYM", - }, - } - - nodeExecutionModels := []models.NodeExecution{ - { - BaseModel: models.BaseModel{ - ID: 1, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "n0", - }), - Closure: serializeNodeExecutionClosure(t, &admin.NodeExecutionClosure{ - TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ - TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED, - CatalogKey: &core.CatalogMetadata{ - DatasetId: &core.Identifier{ - ResourceType: core.ResourceType_DATASET, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache_single_task", - Version: "version", - }, - ArtifactTag: artifactTags[0], - }, - }, - }, - }), - }, - } - - taskExecutionModels := map[string][]models.TaskExecution{ - "n0": { - { - BaseModel: models.BaseModel{ - ID: 1, - }, - TaskExecutionKey: models.TaskExecutionKey{ - TaskKey: models.TaskKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0", - }, - RetryAttempt: ptr[uint32](0), - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - Closure: serializeTaskExecutionClosure(t, &admin.TaskExecutionClosure{ - Metadata: &event.TaskExecutionMetadata{ - GeneratedName: "name-n0-0", - }, - }), - }, - }, - } - - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) - - for nodeID := range taskExecutionModels { - for _, taskExecution := range taskExecutionModels[nodeID] { - require.NotNil(t, taskExecution.RetryAttempt) - ownerID := fmt.Sprintf("%s-%s-%d", executionIdentifier.Name, nodeID, *taskExecution.RetryAttempt) - for _, artifactTag := range artifactTags { - catalogClient.On("GetOrExtendReservationByArtifactTag", mock.Anything, mock.Anything, - artifactTag.GetName(), ownerID, mock.Anything).Return(&datacatalog.Reservation{OwnerId: ownerID}, nil) - catalogClient.On("ReleaseReservationByArtifactTag", mock.Anything, mock.Anything, - artifactTag.GetName(), ownerID).Return(nil) - } - } - } - - catalogClient.On("DeleteByArtifactID", mock.Anything, mock.Anything, mock.Anything). - Return(status.Error(codes.NotFound, "not found")) + for nodeID := range taskExecutionModels { + for _, taskExecution := range taskExecutionModels[nodeID] { + require.NotNil(t, taskExecution.RetryAttempt) + ownerID := fmt.Sprintf("%s-%s-%d", executionIdentifier.Name, nodeID, *taskExecution.RetryAttempt) + for _, artifactTag := range artifactTags { + catalogClient.On("GetOrExtendReservationByArtifactTag", mock.Anything, mock.Anything, + artifactTag.GetName(), ownerID, mock.Anything).Return(&datacatalog.Reservation{OwnerId: ownerID}, nil) + catalogClient.On("ReleaseReservationByArtifactTag", mock.Anything, mock.Anything, + artifactTag.GetName(), ownerID).Return(nil) + } + } + } + + catalogClient.On("DeleteByArtifactID", mock.Anything, mock.Anything, mock.Anything). + Return(status.Error(codes.NotFound, "not found")) cacheManager := NewCacheManager(repository, mockConfig, catalogClient, promutils.NewTestScope()) request := service.EvictTaskExecutionCacheRequest{ @@ -1458,8 +1054,7 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, } - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) + updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nodeExecutionModels, taskExecutionModels) catalogClient.On("GetOrExtendReservationByArtifactTag", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, status.Error(codes.Internal, "error")) @@ -1579,8 +1174,7 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, } - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) + updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nodeExecutionModels, taskExecutionModels) for nodeID := range taskExecutionModels { for _, taskExecution := range taskExecutionModels[nodeID] { @@ -1714,8 +1308,7 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, } - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) + updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nodeExecutionModels, taskExecutionModels) for nodeID := range taskExecutionModels { for _, taskExecution := range taskExecutionModels[nodeID] { @@ -1861,8 +1454,7 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, } - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) + updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nodeExecutionModels, taskExecutionModels) repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetCallback( func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) { @@ -1980,8 +1572,7 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, } - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) + updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nodeExecutionModels, taskExecutionModels) repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetUpdateSelectedCallback( func(ctx context.Context, nodeExecution *models.NodeExecution, selectedFields []string) error { @@ -2024,129 +1615,6 @@ func TestEvictTaskExecutionCache(t *testing.T) { }) t.Run("TaskExecutionRepo", func(t *testing.T) { - t.Run("List", func(t *testing.T) { - repository := repositoryMocks.NewMockRepository() - catalogClient := &mocks.Client{} - mockConfig := getMockExecutionsConfigProvider() - - artifactTags := []*core.CatalogArtifactTag{ - { - ArtifactId: "0285ddb9-ddfb-4835-bc22-80e1bdf7f560", - Name: "flyte_cached-G3ACyxqY0U3sEf99tLMta5vuLCOk7j9O7MStxubzxYM", - }, - } - - nodeExecutionModels := []models.NodeExecution{ - { - BaseModel: models.BaseModel{ - ID: 1, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "n0", - }), - Closure: serializeNodeExecutionClosure(t, &admin.NodeExecutionClosure{ - TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ - TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED, - CatalogKey: &core.CatalogMetadata{ - DatasetId: &core.Identifier{ - ResourceType: core.ResourceType_DATASET, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache_single_task", - Version: "version", - }, - ArtifactTag: artifactTags[0], - }, - }, - }, - }), - }, - } - - taskExecutionModels := map[string][]models.TaskExecution{ - "n0": { - { - BaseModel: models.BaseModel{ - ID: 1, - }, - TaskExecutionKey: models.TaskExecutionKey{ - TaskKey: models.TaskKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0", - }, - RetryAttempt: ptr[uint32](0), - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - Closure: serializeTaskExecutionClosure(t, &admin.TaskExecutionClosure{ - Metadata: &event.TaskExecutionMetadata{ - GeneratedName: "name-n0-0", - }, - }), - }, - }, - } - - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) - - repository.TaskExecutionRepo().(*repositoryMocks.MockTaskExecutionRepo).SetListCallback( - func(ctx context.Context, input interfaces.ListResourceInput) (interfaces.TaskExecutionCollectionOutput, error) { - return interfaces.TaskExecutionCollectionOutput{}, flyteAdminErrors.NewFlyteAdminError(codes.Internal, "error") - }) - - cacheManager := NewCacheManager(repository, mockConfig, catalogClient, promutils.NewTestScope()) - request := service.EvictTaskExecutionCacheRequest{ - TaskExecutionId: &core.TaskExecutionIdentifier{ - TaskId: &core.Identifier{ - ResourceType: core.ResourceType_TASK, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - Version: version, - }, - NodeExecutionId: &core.NodeExecutionIdentifier{ - ExecutionId: &executionIdentifier, - NodeId: "n0", - }, - RetryAttempt: uint32(0), - }, - } - resp, err := cacheManager.EvictTaskExecutionCache(context.Background(), request) - require.NoError(t, err) - require.NotNil(t, resp) - assert.Len(t, resp.GetErrors().GetErrors(), len(artifactTags)) - eErr := resp.GetErrors().GetErrors()[0] - assert.Equal(t, core.CacheEvictionError_INTERNAL, eErr.Code) - assert.Equal(t, "n0", eErr.NodeExecutionId.NodeId) - assert.True(t, proto.Equal(&executionIdentifier, eErr.NodeExecutionId.ExecutionId)) - require.NotNil(t, eErr.Source) - assert.IsType(t, &core.CacheEvictionError_WorkflowExecutionId{}, eErr.Source) - - assert.Empty(t, updatedNodeExecutions) - }) - t.Run("Get", func(t *testing.T) { repository := repositoryMocks.NewMockRepository() catalogClient := &mocks.Client{} @@ -2231,8 +1699,7 @@ func TestEvictTaskExecutionCache(t *testing.T) { }, } - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) + updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nodeExecutionModels, taskExecutionModels) repository.TaskExecutionRepo().(*repositoryMocks.MockTaskExecutionRepo).SetGetCallback( func(ctx context.Context, input interfaces.GetTaskExecutionInput) (models.TaskExecution, error) { @@ -2267,509 +1734,4 @@ func TestEvictTaskExecutionCache(t *testing.T) { }) }) }) - - t.Run("subtask with identical artifact tags", func(t *testing.T) { - repository := repositoryMocks.NewMockRepository() - catalogClient := &mocks.Client{} - mockConfig := getMockExecutionsConfigProvider() - - artifactTags := []*core.CatalogArtifactTag{ - { - ArtifactId: "0285ddb9-ddfb-4835-bc22-80e1bdf7f560", - Name: "flyte_cached-G3ACyxqY0U3sEf99tLMta5vuLCOk7j9O7MStxubzxYM", - }, - { - ArtifactId: "4074be3e-7cee-4a7b-8c45-56577fa32f24", - Name: "flyte_cached-G3ACyxqY0U3sEf99tLMta5vuLCOk7j9O7MStxubzxYN", - }, - { - ArtifactId: "a8bd60d5-b2bb-4b06-a2ac-240d183a4ca8", - Name: "flyte_cached-G3ACyxqY0U3sEf99tLMta5vuLCOk7j9O7MStxubzxYN", - }, - { - ArtifactId: "8a47c342-ff71-481e-9c7b-0e6ecb57e742", - Name: "flyte_cached-G3ACyxqY0U3sEf99tLMta5vuLCOk7j9O7MStxubzxYN", - }, - { - ArtifactId: "dafdef15-0aba-4f7c-a4aa-deba89568277", - Name: "flyte_cached-G3ACyxqY0U3sEf99tLMta5vuLCOk7j9O7MStxubzxYN", - }, - } - - nodeExecutionModels := []models.NodeExecution{ - { - BaseModel: models.BaseModel{ - ID: 1, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: true, - IsDynamic: false, - SpecNodeId: "n0", - }), - Closure: serializeNodeExecutionClosure(t, &admin.NodeExecutionClosure{ - TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ - TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED, - CatalogKey: &core.CatalogMetadata{ - DatasetId: &core.Identifier{ - ResourceType: core.ResourceType_DATASET, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache_single_task", - Version: "version", - }, - ArtifactTag: artifactTags[0], - }, - }, - }, - }), - ChildNodeExecutions: []models.NodeExecution{ - { - BaseModel: models.BaseModel{ - ID: 2, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-start-node", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "start-node", - }), - }, - { - BaseModel: models.BaseModel{ - ID: 3, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n0", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "n0", - }), - Closure: serializeNodeExecutionClosure(t, &admin.NodeExecutionClosure{ - TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ - TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED, - CatalogKey: &core.CatalogMetadata{ - DatasetId: &core.Identifier{ - ResourceType: core.ResourceType_DATASET, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache_sub", - Version: "version", - }, - ArtifactTag: artifactTags[1], - SourceExecution: &core.CatalogMetadata_SourceTaskExecution{ - SourceTaskExecution: &core.TaskExecutionIdentifier{ - TaskId: &core.Identifier{ - ResourceType: core.ResourceType_TASK, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionId: &core.NodeExecutionIdentifier{ - NodeId: "dn0", - ExecutionId: &executionIdentifier, - }, - RetryAttempt: 0, - }, - }, - }, - }, - }, - }), - }, - { - BaseModel: models.BaseModel{ - ID: 4, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n1", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "n1", - }), - Closure: serializeNodeExecutionClosure(t, &admin.NodeExecutionClosure{ - TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ - TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED, - CatalogKey: &core.CatalogMetadata{ - DatasetId: &core.Identifier{ - ResourceType: core.ResourceType_DATASET, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache_sub", - Version: "version", - }, - ArtifactTag: artifactTags[2], - SourceExecution: &core.CatalogMetadata_SourceTaskExecution{ - SourceTaskExecution: &core.TaskExecutionIdentifier{ - TaskId: &core.Identifier{ - ResourceType: core.ResourceType_TASK, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionId: &core.NodeExecutionIdentifier{ - NodeId: "dn0", - ExecutionId: &executionIdentifier, - }, - RetryAttempt: 0, - }, - }, - }, - }, - }, - }), - }, - { - BaseModel: models.BaseModel{ - ID: 5, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n2", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "n2", - }), - Closure: serializeNodeExecutionClosure(t, &admin.NodeExecutionClosure{ - TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ - TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED, - CatalogKey: &core.CatalogMetadata{ - DatasetId: &core.Identifier{ - ResourceType: core.ResourceType_DATASET, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache_sub", - Version: "version", - }, - ArtifactTag: artifactTags[3], - SourceExecution: &core.CatalogMetadata_SourceTaskExecution{ - SourceTaskExecution: &core.TaskExecutionIdentifier{ - TaskId: &core.Identifier{ - ResourceType: core.ResourceType_TASK, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionId: &core.NodeExecutionIdentifier{ - NodeId: "dn2", - ExecutionId: &executionIdentifier, - }, - RetryAttempt: 0, - }, - }, - }, - }, - }, - }), - }, - { - BaseModel: models.BaseModel{ - ID: 6, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n3", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "n3", - }), - Closure: serializeNodeExecutionClosure(t, &admin.NodeExecutionClosure{ - TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ - TaskNodeMetadata: &admin.TaskNodeMetadata{ - CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED, - CatalogKey: &core.CatalogMetadata{ - DatasetId: &core.Identifier{ - ResourceType: core.ResourceType_DATASET, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache_sub", - Version: "version", - }, - ArtifactTag: artifactTags[4], - SourceExecution: &core.CatalogMetadata_SourceTaskExecution{ - SourceTaskExecution: &core.TaskExecutionIdentifier{ - TaskId: &core.Identifier{ - ResourceType: core.ResourceType_TASK, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionId: &core.NodeExecutionIdentifier{ - NodeId: "dn3", - ExecutionId: &executionIdentifier, - }, - RetryAttempt: 0, - }, - }, - }, - }, - }, - }), - }, - { - BaseModel: models.BaseModel{ - ID: 7, - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-end-node", - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - NodeExecutionMetadata: serializeNodeExecutionMetadata(t, &admin.NodeExecutionMetaData{ - IsParentNode: false, - IsDynamic: false, - SpecNodeId: "end-node", - }), - }, - }, - }, - } - - taskExecutionModels := map[string][]models.TaskExecution{ - "n0": { - { - BaseModel: models.BaseModel{ - ID: 1, - }, - TaskExecutionKey: models.TaskExecutionKey{ - TaskKey: models.TaskKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache_single_task", - Version: "version", - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0", - }, - RetryAttempt: ptr[uint32](0), - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - Closure: serializeTaskExecutionClosure(t, &admin.TaskExecutionClosure{ - Metadata: &event.TaskExecutionMetadata{ - GeneratedName: "name-n0-0-n0-0", - }, - }), - }, - }, - "n0-0-n0": { - { - BaseModel: models.BaseModel{ - ID: 2, - }, - TaskExecutionKey: models.TaskExecutionKey{ - TaskKey: models.TaskKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n0", - }, - RetryAttempt: ptr[uint32](0), - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - Closure: serializeTaskExecutionClosure(t, &admin.TaskExecutionClosure{ - Metadata: &event.TaskExecutionMetadata{ - GeneratedName: "name-n0-0-n0-0", - }, - }), - }, - }, - "n0-0-n1": { - { - BaseModel: models.BaseModel{ - ID: 2, - }, - TaskExecutionKey: models.TaskExecutionKey{ - TaskKey: models.TaskKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n1", - }, - RetryAttempt: ptr[uint32](0), - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - Closure: serializeTaskExecutionClosure(t, &admin.TaskExecutionClosure{ - Metadata: &event.TaskExecutionMetadata{ - GeneratedName: "name-n0-0-n1-0", - }, - }), - }, - }, - "n0-0-n2": { - { - BaseModel: models.BaseModel{ - ID: 3, - }, - TaskExecutionKey: models.TaskExecutionKey{ - TaskKey: models.TaskKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n2", - }, - RetryAttempt: ptr[uint32](0), - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - Closure: serializeTaskExecutionClosure(t, &admin.TaskExecutionClosure{ - Metadata: &event.TaskExecutionMetadata{ - GeneratedName: "name-n0-0-n2-0", - }, - }), - }, - }, - "n0-0-n3": { - { - BaseModel: models.BaseModel{ - ID: 4, - }, - TaskExecutionKey: models.TaskExecutionKey{ - TaskKey: models.TaskKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: "flyte_task-test.evict.execution_cache", - Version: "version", - }, - NodeExecutionKey: models.NodeExecutionKey{ - ExecutionKey: models.ExecutionKey{ - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - }, - NodeID: "n0-0-n3", - }, - RetryAttempt: ptr[uint32](0), - }, - Phase: core.NodeExecution_SUCCEEDED.String(), - Closure: serializeTaskExecutionClosure(t, &admin.TaskExecutionClosure{ - Metadata: &event.TaskExecutionMetadata{ - GeneratedName: "name-n0-0-n3-0", - }, - }), - }, - }, - } - - updatedNodeExecutions := setupCacheEvictionMockRepositories(t, repository, nil, nodeExecutionModels, - taskExecutionModels) - deletedArtifactIDs := setupCacheEvictionCatalogClient(t, catalogClient, artifactTags, taskExecutionModels) - - cacheManager := NewCacheManager(repository, mockConfig, catalogClient, promutils.NewTestScope()) - request := service.EvictTaskExecutionCacheRequest{ - TaskExecutionId: &core.TaskExecutionIdentifier{ - TaskId: &core.Identifier{ - ResourceType: core.ResourceType_TASK, - Project: executionIdentifier.Project, - Domain: executionIdentifier.Domain, - Name: executionIdentifier.Name, - Version: version, - }, - NodeExecutionId: &core.NodeExecutionIdentifier{ - ExecutionId: &executionIdentifier, - NodeId: "n0", - }, - RetryAttempt: uint32(0), - }, - } - resp, err := cacheManager.EvictTaskExecutionCache(context.Background(), request) - require.NoError(t, err) - require.NotNil(t, resp) - assert.Empty(t, resp.GetErrors().GetErrors()) - - for nodeID := range taskExecutionModels { - assert.Contains(t, updatedNodeExecutions, nodeID) - } - assert.Len(t, updatedNodeExecutions, len(taskExecutionModels)) - - for _, artifactTag := range artifactTags { - assert.Equal(t, 1, deletedArtifactIDs[artifactTag.GetArtifactId()]) - } - assert.Len(t, deletedArtifactIDs, len(artifactTags)) - }) } diff --git a/flyteadmin/pkg/rpc/cacheservice/base.go b/flyteadmin/pkg/rpc/cacheservice/base.go index f94cfef0b5..18c3b88801 100644 --- a/flyteadmin/pkg/rpc/cacheservice/base.go +++ b/flyteadmin/pkg/rpc/cacheservice/base.go @@ -3,7 +3,6 @@ package cacheservice import ( "context" "fmt" - "runtime/debug" "github.com/golang/protobuf/proto"