diff --git a/flytepropeller/pkg/compiler/test/compiler_test.go b/flytepropeller/pkg/compiler/test/compiler_test.go index 019f8db7049..355fc4a15b7 100644 --- a/flytepropeller/pkg/compiler/test/compiler_test.go +++ b/flytepropeller/pkg/compiler/test/compiler_test.go @@ -165,10 +165,6 @@ func TestDynamic(t *testing.T) { Name: "name", }, "namespace") - // make sure real CR has randomized suffix - assert.Regexp(t, regexp.MustCompile("name-[bcdfghjklmnpqrstvwxz2456789]{20}"), flyteWf.Name) - // then reset for the sake of serialization comparison - flyteWf.Name = "name" if assert.NoError(t, err) { raw, err := json.Marshal(flyteWf) if assert.NoError(t, err) { @@ -463,10 +459,6 @@ func runCompileTest(t *testing.T, dirName string) { if !assert.NoError(t, err) { t.FailNow() } - // make sure real CR has randomized suffix - assert.Regexp(t, regexp.MustCompile("name-[bcdfghjklmnpqrstvwxz2456789]{20}"), flyteWf.Name) - // then reset for the sake of serialization comparison - flyteWf.Name = "name" file := filepath.Join(filepath.Dir(filepath.Dir(p)), "k8s", strings.TrimRight(filepath.Base(p), filepath.Ext(p))+"_crd.json") if !storeOrDiff(t, json.Marshal, flyteWf, file) { diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go index 44e24464bcd..f23b815ca45 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go @@ -2,10 +2,10 @@ package k8s import ( + "context" "fmt" "hash/fnv" "strings" - "time" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" @@ -14,7 +14,9 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common" "github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" "github.com/flyteorg/flyte/flytepropeller/pkg/utils" + "github.com/flyteorg/flyte/flytestdlib/logger" ) const ( @@ -161,15 +163,18 @@ func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifie } } -const ExecutionIDSuffixLength = 21 - -/* #nosec */ -func GetExecutionName(name string, seed int64) string { - rand.Seed(seed) - // K8s has a limitation of 63 chars - name = name[:minInt(63-ExecutionIDSuffixLength, len(name))] - execName := name + "-" + rand.String(ExecutionIDSuffixLength-1) - return execName +func hashIdentifier(identifier core.Identifier) uint64 { + h := fnv.New64() + _, err := h.Write([]byte(fmt.Sprintf("%s:%s:%s", + identifier.Project, identifier.Domain, identifier.Name))) + if err != nil { + // This shouldn't occur. + logger.Errorf(context.Background(), + "failed to hash execution identifier: %+v with err: %v", identifier, err) + return 0 + } + logger.Debugf(context.Background(), "Returning hash for [%+v]: %d", identifier, h.Sum64()) + return h.Sum64() } // BuildFlyteWorkflow builds v1alpha1.FlyteWorkflow resource. Returned error, if not nil, is of type errors.CompilerErrors. @@ -244,7 +249,19 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li errs.Collect(errors.NewWorkflowBuildError(err)) } - obj.ObjectMeta.Name = GetExecutionName(name, time.Now().UnixNano()) + hashedIdentifier := hashIdentifier(core.Identifier{ + Project: project, + Domain: domain, + Name: name, + }) + rand.Seed(int64(hashedIdentifier)) + + if workflowCRNameHashLength := config.GetConfig().WorkflowCRNameHashLength; workflowCRNameHashLength > 0 { + obj.ObjectMeta.Name = rand.String(workflowCRNameHashLength) + } else { + obj.ObjectMeta.Name = name + } + obj.ObjectMeta.GenerateName = generatedName obj.ObjectMeta.Labels[ExecutionIDLabel] = label obj.ObjectMeta.Labels[ProjectLabel] = project diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go index dbb51e25eb7..9d8c6a746a4 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go @@ -11,6 +11,7 @@ import ( "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common" "github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" "github.com/flyteorg/flyte/flytestdlib/utils" ) @@ -251,6 +252,47 @@ func TestBuildFlyteWorkflow_withUnionInputs(t *testing.T) { assert.Equal(t, "hello", wf.Inputs.Literals["y"].GetScalar().GetUnion().GetValue().GetScalar().GetPrimitive().GetStringValue()) } +func TestBuildFlyteWorkflow_setWorkflowCRNameHashLength(t *testing.T) { + for name, tt := range map[string]struct { + hashLength int + expected string + }{ + "default does not use hash as workflow CR name": { + hashLength: 0, + expected: "", + }, + "use hash as workflow CR name": { + hashLength: 63, + expected: "x6m7gswrdlbj7nmv9zq8nq7ck78qtt8dxv9469q9llblmh9fb2pgggnxfs72n84", + }, + } { + t.Run(name, func(t *testing.T) { + flyteConfig := config.GetConfig() + flyteConfig.WorkflowCRNameHashLength = tt.hashLength + + w := createSampleMockWorkflow() + + errors.SetConfig(errors.Config{IncludeSource: true}) + wf, err := BuildFlyteWorkflow( + &core.CompiledWorkflowClosure{ + Primary: w.GetCoreWorkflow(), + Tasks: []*core.CompiledTask{ + { + Template: &core.TaskTemplate{ + Id: &core.Identifier{Name: "ref_1"}, + }, + }, + }, + }, + nil, nil, "") + assert.Equal(t, tt.expected, wf.ObjectMeta.Name) + assert.NoError(t, err) + assert.NotNil(t, wf) + errors.SetConfig(errors.Config{}) + }) + } +} + func TestGenerateName(t *testing.T) { t.Run("Invalid params", func(t *testing.T) { _, _, _, _, _, err := generateName(nil, nil) diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 419386eddda..9ff996bb8cd 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -120,6 +120,7 @@ var ( EventVersion: 0, DefaultParallelismBehavior: ParallelismBehaviorUnlimited, }, + WorkflowCRNameHashLength: 0, } ) @@ -161,6 +162,7 @@ type Config struct { CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"` NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"` ArrayNode ArrayNodeConfig `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"` + WorkflowCRNameHashLength int `json:"workflow-cr-name-hash-length" pflag:",If 0, the execution ID will be used as the workflow CR name. Otherwise, a hash of the execution ID, project, domain will be used as the CR name, and WorkflowCRNameHashLength sets the length of this hash. Recommended: 0, or 32-63."` } // KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client. diff --git a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go index e3df26502f5..389ea0439b6 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go @@ -496,7 +496,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t composedPBStore.OnWriteRawMatch( mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"), - int64(1532), + int64(1501), storage.Options{}, mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo"))