From f97c77ab55913b001d65d8f6f926227aa8193e13 Mon Sep 17 00:00:00 2001 From: Richard Liu Date: Thu, 13 Jun 2024 21:51:40 -0700 Subject: [PATCH] Change Flyte CR naming scheme to better support namespace_mapping - Typically Flyte is configured so that each project / domain has its own Kubernetes namespace. Certain environments may change this behavior by using the Flyteadmin namespace_mapping setting to put all executions in fewer (or a singular) Kubernetes namespace. This is problematic because it can lead to collisions in the naming of the CR that flyteadmin generates. - This patch fixes 2 important things to make this work properly inside of Flyte: * it adds a random element to the CR name in Flyte so that the CR is named by the execution + some unique value when created by flyteadmin Without this change, an execution Foo in project A will prevent an execution Foo in project B from launching, because the name of the CR thats generated in Kubernetes *assumes* that the namespace the CRs are put into is different for project A and project B When namespace_mapping is set to a singular value, that assumption is wrong * it makes sure that when flytepropeller cleans up the CR resource that it uses Kubernetes labels to find the correct CR -- so instead of assuming that it can use the execution name, it instead uses the project, domain and execution labels Signed-off-by: ddl-ebrown --- .../pkg/workflowengine/impl/k8s_executor.go | 28 +++++++++++++++++-- .../workflowengine/impl/k8s_executor_test.go | 27 +++++++++++------- .../pkg/compiler/transformers/k8s/workflow.go | 15 +++++++++- .../nodes/dynamic/dynamic_workflow_test.go | 2 +- 4 files changed, 57 insertions(+), 15 deletions(-) diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go index f90051fabe4..c72f0c95207 100644 --- a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go +++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go @@ -12,6 +12,7 @@ import ( execClusterInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/storage" ) @@ -79,6 +80,23 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut }, nil } +const ( + // Labels that are set on the FlyteWorkflow CRD + DomainLabel = "domain" + ExecutionIDLabel = "execution-id" + ProjectLabel = "project" +) + +func executionLabelSelector(executionID *core.WorkflowExecutionIdentifier) *v1.LabelSelector { + return &v1.LabelSelector{ + MatchLabels: map[string]string{ + DomainLabel: executionID.GetDomain(), + ExecutionIDLabel: executionID.GetName(), + ProjectLabel: executionID.GetProject(), + }, + } +} + func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortData) error { target, err := e.executionCluster.GetTarget(ctx, &executioncluster.ExecutionTargetSpec{ TargetID: data.Cluster, @@ -86,9 +104,13 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat if err != nil { return errors.NewFlyteAdminErrorf(codes.Internal, err.Error()) } - err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{ - PropagationPolicy: &deletePropagationBackground, - }) + err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).DeleteCollection( + ctx, + v1.DeleteOptions{PropagationPolicy: &deletePropagationBackground}, + v1.ListOptions{ + LabelSelector: v1.FormatLabelSelector(executionLabelSelector(data.ExecutionID)), + }, + ) // An IsNotFound error indicates the resource is already deleted. if err != nil && !k8_api_err.IsNotFound(err) { return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err) diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go index b384ebbcaf7..3b0829f2814 100644 --- a/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go +++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go @@ -30,11 +30,11 @@ import ( var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{} type createCallback func(*v1alpha1.FlyteWorkflow, v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) -type deleteCallback func(name string, options *v1.DeleteOptions) error +type deleteCollectionCallback func(*v1.DeleteOptions, *v1.ListOptions) error type FakeFlyteWorkflow struct { v1alpha12.FlyteWorkflowInterface - createCallback createCallback - deleteCallback deleteCallback + createCallback + deleteCollectionCallback } func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) { @@ -44,9 +44,9 @@ func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkfl return nil, nil } -func (b *FakeFlyteWorkflow) Delete(ctx context.Context, name string, options v1.DeleteOptions) error { - if b.deleteCallback != nil { - return b.deleteCallback(name, &options) +func (b *FakeFlyteWorkflow) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + if b.deleteCollectionCallback != nil { + return b.deleteCollectionCallback(&opts, &listOpts) } return nil } @@ -279,8 +279,15 @@ func TestExecute_MiscError(t *testing.T) { func TestAbort(t *testing.T) { fakeFlyteWorkflow := FakeFlyteWorkflow{} - fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error { - assert.Equal(t, execID.Name, name) + fakeFlyteWorkflow.deleteCollectionCallback = func(options *v1.DeleteOptions, listOpts *v1.ListOptions) error { + selector := v1.FormatLabelSelector(&v1.LabelSelector{ + MatchLabels: map[string]string{ + DomainLabel: execID.GetDomain(), + ExecutionIDLabel: execID.GetName(), + ProjectLabel: execID.GetProject(), + }, + }) + assert.Equal(t, selector, listOpts.LabelSelector) assert.Equal(t, options.PropagationPolicy, &deletePropagationBackground) return nil } @@ -301,7 +308,7 @@ func TestAbort(t *testing.T) { func TestAbort_Notfound(t *testing.T) { fakeFlyteWorkflow := FakeFlyteWorkflow{} - fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error { + fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error { return k8_api_err.NewNotFound(schema.GroupResource{ Group: "foo", Resource: "bar", @@ -324,7 +331,7 @@ func TestAbort_Notfound(t *testing.T) { func TestAbort_MiscError(t *testing.T) { fakeFlyteWorkflow := FakeFlyteWorkflow{} - fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error { + fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error { return errors.New("call failed") } fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface { diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go index 2421ddf9bbc..44e24464bcd 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go @@ -5,8 +5,10 @@ import ( "fmt" "hash/fnv" "strings" + "time" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/rand" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" @@ -159,6 +161,17 @@ func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifie } } +const ExecutionIDSuffixLength = 21 + +/* #nosec */ +func GetExecutionName(name string, seed int64) string { + rand.Seed(seed) + // K8s has a limitation of 63 chars + name = name[:minInt(63-ExecutionIDSuffixLength, len(name))] + execName := name + "-" + rand.String(ExecutionIDSuffixLength-1) + return execName +} + // BuildFlyteWorkflow builds v1alpha1.FlyteWorkflow resource. Returned error, if not nil, is of type errors.CompilerErrors. func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.LiteralMap, executionID *core.WorkflowExecutionIdentifier, namespace string) (*v1alpha1.FlyteWorkflow, error) { @@ -231,7 +244,7 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li errs.Collect(errors.NewWorkflowBuildError(err)) } - obj.ObjectMeta.Name = name + obj.ObjectMeta.Name = GetExecutionName(name, time.Now().UnixNano()) obj.ObjectMeta.GenerateName = generatedName obj.ObjectMeta.Labels[ExecutionIDLabel] = label obj.ObjectMeta.Labels[ProjectLabel] = project diff --git a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go index 389ea0439b6..e3df26502f5 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go @@ -496,7 +496,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t composedPBStore.OnWriteRawMatch( mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"), - int64(1501), + int64(1532), storage.Options{}, mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo"))