Skip to content

Commit

Permalink
Use deterministic hash of execution id, name, project, as the FlyteWo…
Browse files Browse the repository at this point in the history
…rkflow CR name. Add workflow-cr-name-hash-length to flytepropeller config.

Signed-off-by: ddl-rliu <[email protected]>
  • Loading branch information
ddl-rliu committed Jul 23, 2024
1 parent b524953 commit 856e416
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 20 deletions.
8 changes: 0 additions & 8 deletions flytepropeller/pkg/compiler/test/compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,6 @@ func TestDynamic(t *testing.T) {
Name: "name",
},
"namespace")
// make sure real CR has randomized suffix
assert.Regexp(t, regexp.MustCompile("name-[bcdfghjklmnpqrstvwxz2456789]{20}"), flyteWf.Name)
// then reset for the sake of serialization comparison
flyteWf.Name = "name"
if assert.NoError(t, err) {
raw, err := json.Marshal(flyteWf)
if assert.NoError(t, err) {
Expand Down Expand Up @@ -463,10 +459,6 @@ func runCompileTest(t *testing.T, dirName string) {
if !assert.NoError(t, err) {
t.FailNow()
}
// make sure real CR has randomized suffix
assert.Regexp(t, regexp.MustCompile("name-[bcdfghjklmnpqrstvwxz2456789]{20}"), flyteWf.Name)
// then reset for the sake of serialization comparison
flyteWf.Name = "name"

file := filepath.Join(filepath.Dir(filepath.Dir(p)), "k8s", strings.TrimRight(filepath.Base(p), filepath.Ext(p))+"_crd.json")
if !storeOrDiff(t, json.Marshal, flyteWf, file) {
Expand Down
39 changes: 28 additions & 11 deletions flytepropeller/pkg/compiler/transformers/k8s/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
package k8s

import (
"context"
"fmt"
"hash/fnv"
"strings"
"time"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
Expand All @@ -14,7 +14,9 @@ import (
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/utils"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

const (
Expand Down Expand Up @@ -161,15 +163,18 @@ func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifie
}
}

const ExecutionIDSuffixLength = 21

/* #nosec */
func GetExecutionName(name string, seed int64) string {
rand.Seed(seed)
// K8s has a limitation of 63 chars
name = name[:minInt(63-ExecutionIDSuffixLength, len(name))]
execName := name + "-" + rand.String(ExecutionIDSuffixLength-1)
return execName
func hashIdentifier(identifier core.Identifier) uint64 {
h := fnv.New64()
_, err := h.Write([]byte(fmt.Sprintf("%s:%s:%s",
identifier.Project, identifier.Domain, identifier.Name)))
if err != nil {
// This shouldn't occur.
logger.Errorf(context.Background(),
"failed to hash execution identifier: %+v with err: %v", identifier, err)
return 0
}
logger.Debugf(context.Background(), "Returning hash for [%+v]: %d", identifier, h.Sum64())
return h.Sum64()
}

// BuildFlyteWorkflow builds v1alpha1.FlyteWorkflow resource. Returned error, if not nil, is of type errors.CompilerErrors.
Expand Down Expand Up @@ -244,7 +249,19 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
errs.Collect(errors.NewWorkflowBuildError(err))
}

obj.ObjectMeta.Name = GetExecutionName(name, time.Now().UnixNano())
hashedIdentifier := hashIdentifier(core.Identifier{
Project: project,
Domain: domain,
Name: name,
})
rand.Seed(int64(hashedIdentifier))

if workflowCRNameHashLength := config.GetConfig().WorkflowCRNameHashLength; workflowCRNameHashLength > 0 {
obj.ObjectMeta.Name = rand.String(workflowCRNameHashLength)
} else {
obj.ObjectMeta.Name = name
}

obj.ObjectMeta.GenerateName = generatedName
obj.ObjectMeta.Labels[ExecutionIDLabel] = label
obj.ObjectMeta.Labels[ProjectLabel] = project
Expand Down
42 changes: 42 additions & 0 deletions flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytestdlib/utils"
)

Expand Down Expand Up @@ -251,6 +252,47 @@ func TestBuildFlyteWorkflow_withUnionInputs(t *testing.T) {
assert.Equal(t, "hello", wf.Inputs.Literals["y"].GetScalar().GetUnion().GetValue().GetScalar().GetPrimitive().GetStringValue())
}

func TestBuildFlyteWorkflow_setWorkflowCRNameHashLength(t *testing.T) {
for name, tt := range map[string]struct {
hashLength int
expected string
}{
"default does not use hash as workflow CR name": {
hashLength: 0,
expected: "",
},
"use hash as workflow CR name": {
hashLength: 63,
expected: "x6m7gswrdlbj7nmv9zq8nq7ck78qtt8dxv9469q9llblmh9fb2pgggnxfs72n84",
},
} {
t.Run(name, func(t *testing.T) {
flyteConfig := config.GetConfig()
flyteConfig.WorkflowCRNameHashLength = tt.hashLength

w := createSampleMockWorkflow()

errors.SetConfig(errors.Config{IncludeSource: true})
wf, err := BuildFlyteWorkflow(
&core.CompiledWorkflowClosure{
Primary: w.GetCoreWorkflow(),
Tasks: []*core.CompiledTask{
{
Template: &core.TaskTemplate{
Id: &core.Identifier{Name: "ref_1"},
},
},
},
},
nil, nil, "")
assert.Equal(t, tt.expected, wf.ObjectMeta.Name)
assert.NoError(t, err)
assert.NotNil(t, wf)
errors.SetConfig(errors.Config{})
})
}
}

func TestGenerateName(t *testing.T) {
t.Run("Invalid params", func(t *testing.T) {
_, _, _, _, _, err := generateName(nil, nil)
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ var (
EventVersion: 0,
DefaultParallelismBehavior: ParallelismBehaviorUnlimited,
},
WorkflowCRNameHashLength: 0,
}
)

Expand Down Expand Up @@ -161,6 +162,7 @@ type Config struct {
CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"`
NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"`
ArrayNode ArrayNodeConfig `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"`
WorkflowCRNameHashLength int `json:"workflow-cr-name-hash-length" pflag:",If 0, the execution ID will be used as the workflow CR name. Otherwise, a hash of the execution ID, project, domain will be used as the CR name, and WorkflowCRNameHashLength sets the length of this hash. Recommended: 0, or 32-63."`
}

// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
composedPBStore.OnWriteRawMatch(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"),
int64(1532),
int64(1501),
storage.Options{},
mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo"))

Expand Down

0 comments on commit 856e416

Please sign in to comment.