From af9e4758d4acab699564c074ef004e4d7b957a96 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Thu, 21 Sep 2023 21:27:14 -0700 Subject: [PATCH 1/2] FlyteAdmin will always add `base_exec_id` unless it is already added Reasons: 1. Make it possible to retrieve all executions launched by the same base execution id (even recursively) 2. users could group executions using their own base exec id 3. flytectl get executions or remote list executions can use this label as a filter to retrieve high level progress of all subworkflows Signed-off-by: Ketan Umare --- .../pkg/manager/impl/execution_manager.go | 21 +++++++++++++++++++ .../manager/impl/execution_manager_test.go | 10 +++++---- .../pkg/manager/impl/shared/constants.go | 7 +++++-- 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index bd57d4fb14..eef2c7a477 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -553,6 +553,10 @@ func (m *ExecutionManager) launchSingleTaskExecution( if err != nil { return nil, nil, err } + labels, err = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels) + if err != nil { + return nil, nil, err + } var annotations map[string]string if executionConfig.Annotations != nil { @@ -810,6 +814,10 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( if err != nil { return nil, nil, err } + labels, err = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels) + if err != nil { + return nil, nil, err + } annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries()) if err != nil { return nil, nil, err @@ -1687,6 +1695,19 @@ func (m *ExecutionManager) addProjectLabels(ctx context.Context, projectName str return initialLabels, nil } +// Adds base execution label to execution labels. Base execution label is ignored if a corresponding label is set on the execution already. +// An execution label will exist if Flytepropeller launches a child workflow execution, as it will copy the parent execution's labels. +// This label can later be used to retrieve all executions that were launched from a given execution, no matter how deep in the recursion tree. +func (m *ExecutionManager) addBaseExecutionLabel(_ context.Context, execID string, initialLabels map[string]string) (map[string]string, error) { + if initialLabels == nil { + initialLabels = make(map[string]string) + } + if _, ok := initialLabels[shared.BaseExecutionIDLabelKey]; !ok { + initialLabels[shared.BaseExecutionIDLabelKey] = execID + } + return initialLabels, nil +} + func addStateFilter(filters []common.InlineFilter) ([]common.InlineFilter, error) { var stateFilterExists bool for _, inlineFilter := range filters { diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index 38bac0df10..1a9d5b18e1 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -886,8 +886,9 @@ func TestCreateExecutionDynamicLabelsAndAnnotations(t *testing.T) { mockExecutor := workflowengineMocks.WorkflowExecutor{} mockExecutor.OnExecuteMatch(mock.Anything, mock.MatchedBy(func(executionData workflowengineInterfaces.ExecutionData) bool { assert.EqualValues(t, map[string]string{ - "dynamiclabel1": "dynamic1", - "dynamiclabel2": "dynamic2", + "dynamiclabel1": "dynamic1", + "dynamiclabel2": "dynamic2", + shared.BaseExecutionIDLabelKey: "name", }, executionData.ExecutionParameters.Labels) assert.EqualValues(t, map[string]string{ "dynamicannotation3": "dynamic3", @@ -3834,8 +3835,9 @@ func TestCreateExecution_LegacyClient(t *testing.T) { mockExecutor := workflowengineMocks.WorkflowExecutor{} mockExecutor.OnExecuteMatch(mock.Anything, mock.MatchedBy(func(execData workflowengineInterfaces.ExecutionData) bool { assert.EqualValues(t, map[string]string{ - "label1": "1", - "label2": "2", + "label1": "1", + "label2": "2", + shared.BaseExecutionIDLabelKey: "name", }, execData.ExecutionParameters.Labels) assert.EqualValues(t, map[string]string{ "annotation3": "3", diff --git a/flyteadmin/pkg/manager/impl/shared/constants.go b/flyteadmin/pkg/manager/impl/shared/constants.go index 469bd3e3ba..e9bc1b79e7 100644 --- a/flyteadmin/pkg/manager/impl/shared/constants.go +++ b/flyteadmin/pkg/manager/impl/shared/constants.go @@ -33,6 +33,9 @@ const ( Attributes = "attributes" MatchingAttributes = "matching_attributes" // Parent of a node execution in the node executions table - ParentID = "parent_id" - WorkflowClosure = "workflow_closure" + ParentID = "parent_id" + WorkflowClosure = "workflow_closure" + BaseExecutionIDLabelKey = "base_exec_id" + // BaseExecutionIDLabelKey is the label key for the base execution ID and is globally known. The UI, CLI and potentially + // other components use this label key to identify the base execution ID, so DO NOT CHANGE THIS VALUE. ) From 682181324385f8fa434dfe7f6bc8c552ecd822be Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Thu, 21 Sep 2023 21:41:58 -0700 Subject: [PATCH 2/2] lint fixed Signed-off-by: Ketan Umare --- flyteadmin/pkg/manager/impl/execution_manager.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index eef2c7a477..af8ff8d29e 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -553,10 +553,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( if err != nil { return nil, nil, err } - labels, err = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels) - if err != nil { - return nil, nil, err - } + labels = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels) var annotations map[string]string if executionConfig.Annotations != nil { @@ -814,10 +811,8 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( if err != nil { return nil, nil, err } - labels, err = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels) - if err != nil { - return nil, nil, err - } + labels = m.addBaseExecutionLabel(ctx, workflowExecutionID.Name, labels) + annotations, err := resolveStringMap(executionConfig.GetAnnotations(), launchPlan.Spec.Annotations, "annotations", m.config.RegistrationValidationConfiguration().GetMaxAnnotationEntries()) if err != nil { return nil, nil, err @@ -1698,14 +1693,14 @@ func (m *ExecutionManager) addProjectLabels(ctx context.Context, projectName str // Adds base execution label to execution labels. Base execution label is ignored if a corresponding label is set on the execution already. // An execution label will exist if Flytepropeller launches a child workflow execution, as it will copy the parent execution's labels. // This label can later be used to retrieve all executions that were launched from a given execution, no matter how deep in the recursion tree. -func (m *ExecutionManager) addBaseExecutionLabel(_ context.Context, execID string, initialLabels map[string]string) (map[string]string, error) { +func (m *ExecutionManager) addBaseExecutionLabel(_ context.Context, execID string, initialLabels map[string]string) map[string]string { if initialLabels == nil { initialLabels = make(map[string]string) } if _, ok := initialLabels[shared.BaseExecutionIDLabelKey]; !ok { initialLabels[shared.BaseExecutionIDLabelKey] = execID } - return initialLabels, nil + return initialLabels } func addStateFilter(filters []common.InlineFilter) ([]common.InlineFilter, error) {