Skip to content

Commit

Permalink
Change Flyte CR naming scheme to better support namespace_mapping
Browse files Browse the repository at this point in the history
 - Typically Flyte is configured so that each project / domain has its
   own Kubernetes namespace.

   Certain environments may change this behavior by using the Flyteadmin
   namespace_mapping setting to put all executions in fewer (or a singular)
   Kubernetes namespace. This is problematic because it can lead to
   collisions in the naming of the CR that flyteadmin generates.

 - This patch fixes 2 important things to make this work properly inside
   of Flyte:

   * it adds a random element to the CR name in Flyte so that the CR is
     named by the execution + some unique value when created by
     flyteadmin

     Without this change, an execution Foo in project A will prevent an
     execution Foo in project B from launching, because the name of the
     CR thats generated in Kubernetes *assumes* that the namespace the
     CRs are put into is different for project A and project B

     When namespace_mapping is set to a singular value, that assumption
     is wrong

   * it makes sure that when flytepropeller cleans up the CR resource
     that it uses Kubernetes labels to find the correct CR -- so instead
     of assuming that it can use the execution name, it instead uses the
     project, domain and execution labels

Signed-off-by: ddl-ebrown <[email protected]>
  • Loading branch information
ddl-rliu authored and ddl-ebrown committed Jun 14, 2024
1 parent bba8c11 commit d994304
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 15 deletions.
28 changes: 25 additions & 3 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
execClusterInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/storage"
)
Expand Down Expand Up @@ -79,16 +80,37 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
}, nil
}

const (
// Labels that are set on the FlyteWorkflow CRD
DomainLabel = "domain"
ExecutionIDLabel = "execution-id"
ProjectLabel = "project"
)

func executionLabelSelector(executionID *core.WorkflowExecutionIdentifier) *v1.LabelSelector {
return &v1.LabelSelector{
MatchLabels: map[string]string{
DomainLabel: executionID.GetDomain(),
ExecutionIDLabel: executionID.GetName(),
ProjectLabel: executionID.GetProject(),
},
}
}

func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortData) error {
target, err := e.executionCluster.GetTarget(ctx, &executioncluster.ExecutionTargetSpec{
TargetID: data.Cluster,
})
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, err.Error())
}
err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{
PropagationPolicy: &deletePropagationBackground,
})
err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).DeleteCollection(
ctx,
v1.DeleteOptions{PropagationPolicy: &deletePropagationBackground},

Check warning on line 109 in flyteadmin/pkg/workflowengine/impl/k8s_executor.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/workflowengine/impl/k8s_executor.go#L109

Added line #L109 was not covered by tests
v1.ListOptions{
LabelSelector: v1.FormatLabelSelector(executionLabelSelector(data.ExecutionID)),
},
)
// An IsNotFound error indicates the resource is already deleted.
if err != nil && !k8_api_err.IsNotFound(err) {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)
Expand Down
27 changes: 17 additions & 10 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import (
var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{}

type createCallback func(*v1alpha1.FlyteWorkflow, v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error)
type deleteCallback func(name string, options *v1.DeleteOptions) error
type deleteCollectionCallback func(*v1.DeleteOptions, *v1.ListOptions) error
type FakeFlyteWorkflow struct {
v1alpha12.FlyteWorkflowInterface
createCallback createCallback
deleteCallback deleteCallback
createCallback
deleteCollectionCallback
}

func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) {
Expand All @@ -44,9 +44,9 @@ func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkfl
return nil, nil
}

func (b *FakeFlyteWorkflow) Delete(ctx context.Context, name string, options v1.DeleteOptions) error {
if b.deleteCallback != nil {
return b.deleteCallback(name, &options)
func (b *FakeFlyteWorkflow) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error {
if b.deleteCollectionCallback != nil {
return b.deleteCollectionCallback(&opts, &listOpts)
}
return nil
}
Expand Down Expand Up @@ -279,8 +279,15 @@ func TestExecute_MiscError(t *testing.T) {

func TestAbort(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
assert.Equal(t, execID.Name, name)
fakeFlyteWorkflow.deleteCollectionCallback = func(options *v1.DeleteOptions, listOpts *v1.ListOptions) error {
selector := v1.FormatLabelSelector(&v1.LabelSelector{
MatchLabels: map[string]string{
DomainLabel: execID.GetDomain(),
ExecutionIDLabel: execID.GetName(),
ProjectLabel: execID.GetProject(),
},
})
assert.Equal(t, selector, listOpts.LabelSelector)
assert.Equal(t, options.PropagationPolicy, &deletePropagationBackground)
return nil
}
Expand All @@ -301,7 +308,7 @@ func TestAbort(t *testing.T) {

func TestAbort_Notfound(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error {
return k8_api_err.NewNotFound(schema.GroupResource{
Group: "foo",
Resource: "bar",
Expand All @@ -324,7 +331,7 @@ func TestAbort_Notfound(t *testing.T) {

func TestAbort_MiscError(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error {
return errors.New("call failed")
}
fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
Expand Down
8 changes: 8 additions & 0 deletions flytepropeller/pkg/compiler/test/compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func TestDynamic(t *testing.T) {
Name: "name",
},
"namespace")
// make sure real CR has randomized suffix
assert.Regexp(t, regexp.MustCompile("name-[bcdfghjklmnpqrstvwxz2456789]{20}"), flyteWf.Name)
// then reset for the sake of serialization comparison
flyteWf.Name = "name"
if assert.NoError(t, err) {
raw, err := json.Marshal(flyteWf)
if assert.NoError(t, err) {
Expand Down Expand Up @@ -460,6 +464,10 @@ func runCompileTest(t *testing.T, dirName string) {
if !assert.NoError(t, err) {
t.FailNow()
}
// make sure real CR has randomized suffix
assert.Regexp(t, regexp.MustCompile("name-[bcdfghjklmnpqrstvwxz2456789]{20}"), flyteWf.Name)
// then reset for the sake of serialization comparison
flyteWf.Name = "name"

file := filepath.Join(filepath.Dir(filepath.Dir(p)), "k8s", strings.TrimRight(filepath.Base(p), filepath.Ext(p))+"_crd.json")
if !storeOrDiff(t, json.Marshal, flyteWf, file) {
Expand Down
15 changes: 14 additions & 1 deletion flytepropeller/pkg/compiler/transformers/k8s/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"fmt"
"hash/fnv"
"strings"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
Expand Down Expand Up @@ -159,6 +161,17 @@ func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifie
}
}

const ExecutionIDSuffixLength = 21

/* #nosec */
func GetExecutionName(name string, seed int64) string {
rand.Seed(seed)
// K8s has a limitation of 63 chars
name = name[:minInt(63-ExecutionIDSuffixLength, len(name))]
execName := name + "-" + rand.String(ExecutionIDSuffixLength-1)
return execName
}

// BuildFlyteWorkflow builds v1alpha1.FlyteWorkflow resource. Returned error, if not nil, is of type errors.CompilerErrors.
func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.LiteralMap,
executionID *core.WorkflowExecutionIdentifier, namespace string) (*v1alpha1.FlyteWorkflow, error) {
Expand Down Expand Up @@ -231,7 +244,7 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
errs.Collect(errors.NewWorkflowBuildError(err))
}

obj.ObjectMeta.Name = name
obj.ObjectMeta.Name = GetExecutionName(name, time.Now().UnixNano())
obj.ObjectMeta.GenerateName = generatedName
obj.ObjectMeta.Labels[ExecutionIDLabel] = label
obj.ObjectMeta.Labels[ProjectLabel] = project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
composedPBStore.OnWriteRawMatch(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"),
int64(1501),
int64(1532),
storage.Options{},
mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo"))

Expand Down

0 comments on commit d994304

Please sign in to comment.