Skip to content

Commit

Permalink
Stop admin launcher copying shard key from parent workflow (flyteorg#…
Browse files Browse the repository at this point in the history
…5174)

* Remove shard key in admin-launcher

Signed-off-by: Thomas Newton <[email protected]>

* Don't mutate existing state

Signed-off-by: Thomas Newton <[email protected]>

* Don't mutate state

Signed-off-by: Thomas Newton <[email protected]>

* Add a test

Signed-off-by: Thomas Newton <[email protected]>

---------

Signed-off-by: Thomas Newton <[email protected]>
  • Loading branch information
Tom-Newton authored and Jeinhaus committed Apr 8, 2024
1 parent dd2a4a4 commit 02fc380
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service"
evtErr "github.com/flyteorg/flyte/flytepropeller/events/errors"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/transformers/k8s"
"github.com/flyteorg/flyte/flytestdlib/cache"
stdErr "github.com/flyteorg/flyte/flytestdlib/errors"
"github.com/flyteorg/flyte/flytestdlib/logger"
Expand Down Expand Up @@ -114,6 +115,15 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo
})
}

// Make a copy of the labels with shard-key removed. This ensures that the shard-key is re-computed for each
// instead of being copied from the parent.
labels := make(map[string]string)
for key, value := range launchCtx.Labels {
if key != k8s.ShardKeyLabel {
labels[key] = value
}
}

req := &admin.ExecutionCreateRequest{
Project: executionID.Project,
Domain: executionID.Domain,
Expand All @@ -127,7 +137,7 @@ func (a *adminLaunchPlanExecutor) Launch(ctx context.Context, launchCtx LaunchCo
Principal: launchCtx.Principal,
ParentNodeExecution: launchCtx.ParentNodeExecution,
},
Labels: &admin.Labels{Values: launchCtx.Labels},
Labels: &admin.Labels{Values: labels},
Annotations: &admin.Annotations{Values: launchCtx.Annotations},
SecurityContext: &launchCtx.SecurityContext,
MaxParallelism: int32(launchCtx.MaxParallelism),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package launchplan

import (
"context"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -162,10 +163,14 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
ctx,
mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool {
return o.Project == "p" && o.Domain == "d" && o.Name == "n" && o.Spec.Inputs == nil &&
o.Spec.Metadata.Mode == admin.ExecutionMetadata_CHILD_WORKFLOW
o.Spec.Metadata.Mode == admin.ExecutionMetadata_CHILD_WORKFLOW &&
reflect.DeepEqual(o.Spec.Labels.Values, map[string]string{"foo": "bar"}) // Ensure shard-key was removed.
}),
).Return(nil, nil)
assert.NoError(t, err)

var labels = map[string]string{"foo": "bar", "shard-key": "1"}

err = exec.Launch(ctx,
LaunchContext{
ParentNodeExecution: &core.NodeExecutionIdentifier{
Expand All @@ -176,12 +181,15 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
Name: "w",
},
},
Labels: labels,
},
id,
&core.Identifier{},
nil,
)
assert.NoError(t, err)
// Ensure we haven't mutated the state of the parent workflow.
assert.True(t, reflect.DeepEqual(labels, map[string]string{"foo": "bar", "shard-key": "1"}))
})

t.Run("happy recover", func(t *testing.T) {
Expand Down

0 comments on commit 02fc380

Please sign in to comment.