From 532888ba9ad54d6b9ea5aa9dd8b70e7b14c3cd64 Mon Sep 17 00:00:00 2001 From: Richard Liu Date: Thu, 13 Jun 2024 21:51:40 -0700 Subject: [PATCH] Change Flyte CR naming scheme to better support namespace_mapping - Typically Flyte is configured so that each project / domain has its own Kubernetes namespace. Certain environments may change this behavior by using the Flyteadmin namespace_mapping setting to put all executions in fewer (or a singular) Kubernetes namespace. This is problematic because it can lead to collisions in the naming of the CR that flyteadmin generates. - This patch fixes 2 important things to make this work properly inside of Flyte: * it adds a random element to the CR name in Flyte so that the CR is named by the execution + some unique value when created by flyteadmin Without this change, an execution Foo in project A will prevent an execution Foo in project B from launching, because the name of the CR thats generated in Kubernetes *assumes* that the namespace the CRs are put into is different for project A and project B When namespace_mapping is set to a singular value, that assumption is wrong * it makes sure that when flytepropeller cleans up the CR resource that it uses Kubernetes labels to find the correct CR -- so instead of assuming that it can use the execution name, it instead uses the project, domain and execution labels Signed-off-by: ddl-ebrown --- .../pkg/workflowengine/impl/k8s_executor.go | 28 +++++++++++++++++-- .../pkg/compiler/transformers/k8s/workflow.go | 15 +++++++++- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go index f90051fabe4..c72f0c95207 100644 --- a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go +++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go @@ -12,6 +12,7 @@ import ( execClusterInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/storage" ) @@ -79,6 +80,23 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut }, nil } +const ( + // Labels that are set on the FlyteWorkflow CRD + DomainLabel = "domain" + ExecutionIDLabel = "execution-id" + ProjectLabel = "project" +) + +func executionLabelSelector(executionID *core.WorkflowExecutionIdentifier) *v1.LabelSelector { + return &v1.LabelSelector{ + MatchLabels: map[string]string{ + DomainLabel: executionID.GetDomain(), + ExecutionIDLabel: executionID.GetName(), + ProjectLabel: executionID.GetProject(), + }, + } +} + func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortData) error { target, err := e.executionCluster.GetTarget(ctx, &executioncluster.ExecutionTargetSpec{ TargetID: data.Cluster, @@ -86,9 +104,13 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat if err != nil { return errors.NewFlyteAdminErrorf(codes.Internal, err.Error()) } - err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{ - PropagationPolicy: &deletePropagationBackground, - }) + err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).DeleteCollection( + ctx, + v1.DeleteOptions{PropagationPolicy: &deletePropagationBackground}, + v1.ListOptions{ + LabelSelector: v1.FormatLabelSelector(executionLabelSelector(data.ExecutionID)), + }, + ) // An IsNotFound error indicates the resource is already deleted. if err != nil && !k8_api_err.IsNotFound(err) { return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err) diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go index 2421ddf9bbc..44e24464bcd 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go @@ -5,8 +5,10 @@ import ( "fmt" "hash/fnv" "strings" + "time" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/rand" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" @@ -159,6 +161,17 @@ func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifie } } +const ExecutionIDSuffixLength = 21 + +/* #nosec */ +func GetExecutionName(name string, seed int64) string { + rand.Seed(seed) + // K8s has a limitation of 63 chars + name = name[:minInt(63-ExecutionIDSuffixLength, len(name))] + execName := name + "-" + rand.String(ExecutionIDSuffixLength-1) + return execName +} + // BuildFlyteWorkflow builds v1alpha1.FlyteWorkflow resource. Returned error, if not nil, is of type errors.CompilerErrors. func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.LiteralMap, executionID *core.WorkflowExecutionIdentifier, namespace string) (*v1alpha1.FlyteWorkflow, error) { @@ -231,7 +244,7 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li errs.Collect(errors.NewWorkflowBuildError(err)) } - obj.ObjectMeta.Name = name + obj.ObjectMeta.Name = GetExecutionName(name, time.Now().UnixNano()) obj.ObjectMeta.GenerateName = generatedName obj.ObjectMeta.Labels[ExecutionIDLabel] = label obj.ObjectMeta.Labels[ProjectLabel] = project