Skip to content

Commit

Permalink
Change Flyte CR naming scheme to better support namespace_mapping
Browse files Browse the repository at this point in the history
 - Typically Flyte is configured so that each project / domain has its
   own Kubernetes namespace.

   Certain environments may change this behavior by using the Flyteadmin
   namespace_mapping setting to put all executions in fewer (or a singular)
   Kubernetes namespace. This is problematic because it can lead to
   collisions in the naming of the CR that flyteadmin generates.

 - This patch fixes 2 important things to make this work properly inside
   of Flyte:

   * it adds a random element to the CR name in Flyte so that the CR is
     named by the execution + some unique value when created by
     flyteadmin

     Without this change, an execution Foo in project A will prevent an
     execution Foo in project B from launching, because the name of the
     CR thats generated in Kubernetes *assumes* that the namespace the
     CRs are put into is different for project A and project B

     When namespace_mapping is set to a singular value, that assumption
     is wrong

   * it makes sure that when flytepropeller cleans up the CR resource
     that it uses Kubernetes labels to find the correct CR -- so instead
     of assuming that it can use the execution name, it instead uses the
     project, domain and execution labels

Signed-off-by: ddl-ebrown <[email protected]>
  • Loading branch information
ddl-rliu authored and ddl-ebrown committed Jun 14, 2024
1 parent bba8c11 commit 532888b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
28 changes: 25 additions & 3 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
execClusterInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/storage"
)
Expand Down Expand Up @@ -79,16 +80,37 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
}, nil
}

const (
// Labels that are set on the FlyteWorkflow CRD
DomainLabel = "domain"
ExecutionIDLabel = "execution-id"
ProjectLabel = "project"
)

func executionLabelSelector(executionID *core.WorkflowExecutionIdentifier) *v1.LabelSelector {
return &v1.LabelSelector{
MatchLabels: map[string]string{
DomainLabel: executionID.GetDomain(),
ExecutionIDLabel: executionID.GetName(),
ProjectLabel: executionID.GetProject(),
},
}
}

func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortData) error {
target, err := e.executionCluster.GetTarget(ctx, &executioncluster.ExecutionTargetSpec{
TargetID: data.Cluster,
})
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, err.Error())
}
err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{
PropagationPolicy: &deletePropagationBackground,
})
err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).DeleteCollection(
ctx,
v1.DeleteOptions{PropagationPolicy: &deletePropagationBackground},
v1.ListOptions{
LabelSelector: v1.FormatLabelSelector(executionLabelSelector(data.ExecutionID)),
},
)
// An IsNotFound error indicates the resource is already deleted.
if err != nil && !k8_api_err.IsNotFound(err) {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)
Expand Down
15 changes: 14 additions & 1 deletion flytepropeller/pkg/compiler/transformers/k8s/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"fmt"
"hash/fnv"
"strings"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
Expand Down Expand Up @@ -159,6 +161,17 @@ func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifie
}
}

const ExecutionIDSuffixLength = 21

/* #nosec */
func GetExecutionName(name string, seed int64) string {
rand.Seed(seed)
// K8s has a limitation of 63 chars
name = name[:minInt(63-ExecutionIDSuffixLength, len(name))]
execName := name + "-" + rand.String(ExecutionIDSuffixLength-1)
return execName
}

// BuildFlyteWorkflow builds v1alpha1.FlyteWorkflow resource. Returned error, if not nil, is of type errors.CompilerErrors.
func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.LiteralMap,
executionID *core.WorkflowExecutionIdentifier, namespace string) (*v1alpha1.FlyteWorkflow, error) {
Expand Down Expand Up @@ -231,7 +244,7 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
errs.Collect(errors.NewWorkflowBuildError(err))
}

obj.ObjectMeta.Name = name
obj.ObjectMeta.Name = GetExecutionName(name, time.Now().UnixNano())
obj.ObjectMeta.GenerateName = generatedName
obj.ObjectMeta.Labels[ExecutionIDLabel] = label
obj.ObjectMeta.Labels[ProjectLabel] = project
Expand Down

0 comments on commit 532888b

Please sign in to comment.