From 7a82f3a1b41ef675fabb4c234e310e4c5b196036 Mon Sep 17 00:00:00 2001 From: Yue Shang Date: Mon, 11 Mar 2024 18:00:59 -0700 Subject: [PATCH] fix overwrite cache bug in UI Signed-off-by: Yue Shang --- .../pkg/manager/impl/execution_manager.go | 18 ++++++ .../manager/impl/execution_manager_test.go | 63 +++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 16a36f895f8..97a41c52f3d 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -1453,6 +1453,24 @@ func (m *ExecutionManager) GetExecution( return nil, transformerErr } + // replace OverwriteCache data in execution spec with data from launch plan spec, this will help to correct overwrite cache label in UI + executionSpec := execution.Spec + launchPlanModel, err := util.GetLaunchPlanModel(ctx, m.db, *executionSpec.LaunchPlan) + if err != nil { + logger.Debugf(ctx, "Failed to get launch plan model for execution %+v with err %v", execution, err) + return nil, err + } + launchPlan, err := transformers.FromLaunchPlanModel(launchPlanModel) + if err != nil { + logger.Debugf(ctx, "Failed to transform launch plan model %+v with err %v", launchPlanModel, err) + return nil, err + } + logger.Infof(ctx, "Before update: launchPlan %v, launchPlan.Spec %v, launchPlan.Spec.GetOverwriteCache() %v", launchPlan, launchPlan.Spec, launchPlan.Spec.GetOverwriteCache()) + if !executionSpec.GetOverwriteCache() && launchPlan.Spec.GetOverwriteCache() { + executionSpec.OverwriteCache = launchPlan.Spec.GetOverwriteCache() + } + logger.Infof(ctx, "After update executionSpec %v, executionSpec.OverwriteCache %v", executionSpec, executionSpec.OverwriteCache) + return execution, nil } diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index d6a05cc214e..7a0a9a19803 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -2836,6 +2836,69 @@ func TestGetExecution(t *testing.T) { assert.True(t, proto.Equal(&closure, execution.Closure)) } +func TestGetExecutionOverwriteCache(t *testing.T) { + repository := repositoryMocks.NewMockRepository() + startedAt := time.Date(2018, 8, 30, 0, 0, 0, 0, time.UTC) + executionGetFunc := func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { + assert.Equal(t, "project", input.Project) + assert.Equal(t, "domain", input.Domain) + assert.Equal(t, "name", input.Name) + return models.Execution{ + BaseModel: models.BaseModel{ + CreatedAt: testutils.MockCreatedAtValue, + }, + ExecutionKey: models.ExecutionKey{ + Project: "project", + Domain: "domain", + Name: "name", + }, + Spec: getExpectedSpecBytes(), + Phase: phase, + Closure: closureBytes, + LaunchPlanID: uint(1), + WorkflowID: uint(2), + StartedAt: &startedAt, + // TODO: Input uri + }, nil + } + + lpSpec := testutils.GetSampleLpSpecForTest() + lpSpec.OverwriteCache = true + lpSpecBytes, _ := proto.Marshal(&lpSpec) + lpClosure := admin.LaunchPlanClosure{ + ExpectedInputs: lpSpec.DefaultInputs, + } + lpClosureBytes, _ := proto.Marshal(&lpClosure) + lpGetFunc := func(input interfaces.Identifier) (models.LaunchPlan, error) { + lpModel := models.LaunchPlan{ + LaunchPlanKey: models.LaunchPlanKey{ + Project: input.Project, + Domain: input.Domain, + Name: input.Name, + Version: input.Version, + }, + BaseModel: models.BaseModel{ + ID: uint(100), + }, + Spec: lpSpecBytes, + Closure: lpClosureBytes, + } + return lpModel, nil + } + + repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) + repository.LaunchPlanRepo().(*repositoryMocks.MockLaunchPlanRepo).SetGetCallback(lpGetFunc) + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor) + execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + execution, err := execManager.GetExecution(context.Background(), admin.WorkflowExecutionGetRequest{ + Id: &executionIdentifier, + }) + assert.NoError(t, err) + assert.True(t, proto.Equal(&executionIdentifier, execution.Id)) + assert.Equal(t, execution.Spec.OverwriteCache, true) +} + func TestGetExecution_DatabaseError(t *testing.T) { repository := repositoryMocks.NewMockRepository() expectedErr := errors.New("expected error")