diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go index f90051fabe4..c72f0c95207 100644 --- a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go +++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go @@ -12,6 +12,7 @@ import ( execClusterInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/storage" ) @@ -79,6 +80,23 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut }, nil } +const ( + // Labels that are set on the FlyteWorkflow CRD + DomainLabel = "domain" + ExecutionIDLabel = "execution-id" + ProjectLabel = "project" +) + +func executionLabelSelector(executionID *core.WorkflowExecutionIdentifier) *v1.LabelSelector { + return &v1.LabelSelector{ + MatchLabels: map[string]string{ + DomainLabel: executionID.GetDomain(), + ExecutionIDLabel: executionID.GetName(), + ProjectLabel: executionID.GetProject(), + }, + } +} + func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortData) error { target, err := e.executionCluster.GetTarget(ctx, &executioncluster.ExecutionTargetSpec{ TargetID: data.Cluster, @@ -86,9 +104,13 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat if err != nil { return errors.NewFlyteAdminErrorf(codes.Internal, err.Error()) } - err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{ - PropagationPolicy: &deletePropagationBackground, - }) + err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).DeleteCollection( + ctx, + v1.DeleteOptions{PropagationPolicy: &deletePropagationBackground}, + v1.ListOptions{ + LabelSelector: v1.FormatLabelSelector(executionLabelSelector(data.ExecutionID)), + }, + ) // An IsNotFound error indicates the resource is already deleted. if err != nil && !k8_api_err.IsNotFound(err) { return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err) diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go index 2421ddf9bbc..44e24464bcd 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go @@ -5,8 +5,10 @@ import ( "fmt" "hash/fnv" "strings" + "time" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/rand" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" @@ -159,6 +161,17 @@ func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifie } } +const ExecutionIDSuffixLength = 21 + +/* #nosec */ +func GetExecutionName(name string, seed int64) string { + rand.Seed(seed) + // K8s has a limitation of 63 chars + name = name[:minInt(63-ExecutionIDSuffixLength, len(name))] + execName := name + "-" + rand.String(ExecutionIDSuffixLength-1) + return execName +} + // BuildFlyteWorkflow builds v1alpha1.FlyteWorkflow resource. Returned error, if not nil, is of type errors.CompilerErrors. func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.LiteralMap, executionID *core.WorkflowExecutionIdentifier, namespace string) (*v1alpha1.FlyteWorkflow, error) { @@ -231,7 +244,7 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li errs.Collect(errors.NewWorkflowBuildError(err)) } - obj.ObjectMeta.Name = name + obj.ObjectMeta.Name = GetExecutionName(name, time.Now().UnixNano()) obj.ObjectMeta.GenerateName = generatedName obj.ObjectMeta.Labels[ExecutionIDLabel] = label obj.ObjectMeta.Labels[ProjectLabel] = project