Skip to content

Commit

Permalink
Allow setting a ExecutionClusterLabel when triggering a Launchplan/Wo…
Browse files Browse the repository at this point in the history
…rkflow/Task (flyteorg#4998)

Add ExecutionClusterLabel to the ExecutionSpec so users can override the label at kick off time.

Closes flyteorg#5081

Signed-off-by: Rafael Raposo <[email protected]>
  • Loading branch information
RRap0so authored and yubofredwang committed Mar 26, 2024
1 parent 9afb961 commit e7548e0
Show file tree
Hide file tree
Showing 21 changed files with 4,623 additions and 4,477 deletions.
14 changes: 8 additions & 6 deletions flyteadmin/pkg/executioncluster/execution_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@ import (
restclient "k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
flyteclient "github.com/flyteorg/flyte/flytepropeller/pkg/client/clientset/versioned"
"github.com/flyteorg/flyte/flytestdlib/random"
)

// Spec to determine the execution target
type ExecutionTargetSpec struct {
TargetID string
ExecutionID string
Project string
Domain string
Workflow string
LaunchPlan string
TargetID string
ExecutionID string
Project string
Domain string
Workflow string
LaunchPlan string
ExecutionClusterLabel *admin.ExecutionClusterLabel
}

// Client object of the target execution cluster
Expand Down
3 changes: 3 additions & 0 deletions flyteadmin/pkg/executioncluster/impl/in_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func (i InCluster) GetTarget(ctx context.Context, spec *executioncluster.Executi
if spec != nil && !(spec.TargetID == "" || spec.TargetID == defaultInClusterTargetID) {
return nil, errors.New(fmt.Sprintf("remote target %s is not supported", spec.TargetID))
}
if spec != nil && spec.ExecutionClusterLabel != nil && spec.ExecutionClusterLabel.Value != "" {
return nil, errors.New(fmt.Sprintf("execution cluster label %s is not supported", spec.ExecutionClusterLabel.Value))
}
return &i.target, nil
}

Expand Down
11 changes: 11 additions & 0 deletions flyteadmin/pkg/executioncluster/impl/in_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
)

func TestInClusterGetTarget(t *testing.T) {
Expand Down Expand Up @@ -52,6 +53,16 @@ func TestInClusterGetRemoteTarget(t *testing.T) {
assert.EqualError(t, err, "remote target t1 is not supported")
}

func TestInClusterGetTargetWithExecutionClusterLabel(t *testing.T) {
cluster := InCluster{
target: executioncluster.ExecutionTarget{},
}
_, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{ExecutionClusterLabel: &admin.ExecutionClusterLabel{
Value: "l1",
}})
assert.EqualError(t, err, "execution cluster label l1 is not supported")
}

func TestInClusterGetAllValidTargets(t *testing.T) {
target := &executioncluster.ExecutionTarget{
Enabled: true,
Expand Down
34 changes: 22 additions & 12 deletions flyteadmin/pkg/executioncluster/impl/random_cluster_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,31 @@ func (s RandomClusterSelector) GetTarget(ctx context.Context, spec *executionclu
}
return nil, fmt.Errorf("invalid cluster target %s", spec.TargetID)
}
resource, err := s.resourceManager.GetResource(ctx, managerInterfaces.ResourceRequest{
Project: spec.Project,
Domain: spec.Domain,
Workflow: spec.Workflow,
LaunchPlan: spec.LaunchPlan,
ResourceType: admin.MatchableResource_EXECUTION_CLUSTER_LABEL,
})
if err != nil && !errors.IsDoesNotExistError(err) {
return nil, err
}

var weightedRandomList random.WeightedRandomList
if resource != nil && resource.Attributes.GetExecutionClusterLabel() != nil {
label := resource.Attributes.GetExecutionClusterLabel().Value

var label string

if spec.ExecutionClusterLabel != nil && spec.ExecutionClusterLabel.Value != "" {
label = spec.ExecutionClusterLabel.Value
logger.Debugf(ctx, "Using execution cluster label %s", label)
} else {
resource, err := s.resourceManager.GetResource(ctx, managerInterfaces.ResourceRequest{
Project: spec.Project,
Domain: spec.Domain,
Workflow: spec.Workflow,
LaunchPlan: spec.LaunchPlan,
ResourceType: admin.MatchableResource_EXECUTION_CLUSTER_LABEL,
})
if err != nil && !errors.IsDoesNotExistError(err) {
return nil, err
}
if resource != nil && resource.Attributes.GetExecutionClusterLabel() != nil {
label = resource.Attributes.GetExecutionClusterLabel().Value
}
}

if label != "" {
if _, ok := s.labelWeightedRandomMap[label]; ok {
weightedRandomList = s.labelWeightedRandomMap[label]
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,3 +307,35 @@ func TestRandomClusterSelectorGetTargetWithFallbackToDefault3(t *testing.T) {
assert.Equal(t, testCluster1, target.ID)
assert.True(t, target.Enabled)
}

func TestRandomClusterSelectorGetTargetWithExecutionClusterLabelClusterAssignmentOne(t *testing.T) {
cluster := getRandomClusterSelectorWithDefaultLabelForTest(t, clusterConfig2WithDefaultLabel)
target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{
Project: testProject,
Domain: "different",
Workflow: testWorkflow,
ExecutionID: "e3",
ExecutionClusterLabel: &admin.ExecutionClusterLabel{
Value: "one",
},
})
assert.Nil(t, err)
assert.Equal(t, "testcluster1", target.ID)
assert.True(t, target.Enabled)
}

func TestRandomClusterSelectorGetTargetWithExecutionClusterLabelClusterAssignmentThree(t *testing.T) {
cluster := getRandomClusterSelectorWithDefaultLabelForTest(t, clusterConfig2WithDefaultLabel)
target, err := cluster.GetTarget(context.Background(), &executioncluster.ExecutionTargetSpec{
Project: testProject,
Domain: "different",
Workflow: testWorkflow,
ExecutionID: "e3",
ExecutionClusterLabel: &admin.ExecutionClusterLabel{
Value: "three",
},
})
assert.Nil(t, err)
assert.Equal(t, "testcluster3", target.ID)
assert.True(t, target.Enabled)
}
51 changes: 31 additions & 20 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,17 +547,22 @@ func (m *ExecutionManager) launchSingleTaskExecution(
return nil, nil, err
}

var executionClusterLabel *admin.ExecutionClusterLabel
if requestSpec.ExecutionClusterLabel != nil {
executionClusterLabel = requestSpec.ExecutionClusterLabel
}
executionParameters := workflowengineInterfaces.ExecutionParameters{
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
ClusterAssignment: clusterAssignment,
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
ClusterAssignment: clusterAssignment,
ExecutionClusterLabel: executionClusterLabel,
}

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, workflowExecutionID.Name, "")
Expand Down Expand Up @@ -947,17 +952,23 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
return nil, nil, err
}

var executionClusterLabel *admin.ExecutionClusterLabel
if requestSpec.ExecutionClusterLabel != nil {
executionClusterLabel = requestSpec.ExecutionClusterLabel
}

executionParameters := workflowengineInterfaces.ExecutionParameters{
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
ClusterAssignment: clusterAssignment,
Inputs: executionInputs,
AcceptedAt: requestedAt,
Labels: labels,
Annotations: annotations,
ExecutionConfig: executionConfig,
TaskResources: &platformTaskResources,
EventVersion: m.config.ApplicationConfiguration().GetTopLevelConfig().EventVersion,
RoleNameKey: m.config.ApplicationConfiguration().GetTopLevelConfig().RoleNameKey,
RawOutputDataConfig: rawOutputDataConfig,
ClusterAssignment: clusterAssignment,
ExecutionClusterLabel: executionClusterLabel,
}

overrides, err := m.addPluginOverrides(ctx, &workflowExecutionID, launchPlan.GetSpec().WorkflowId.Name, launchPlan.Id.Name)
Expand Down
6 changes: 4 additions & 2 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ import (
)

const (
principal = "principal"
rawOutput = "raw_output"
principal = "principal"
rawOutput = "raw_output"
executionClusterLabel = "execution_cluster_label"
)

var spec = testutils.GetExecutionRequest().Spec
Expand Down Expand Up @@ -383,6 +384,7 @@ func TestCreateExecution(t *testing.T) {
}
request.Spec.RawOutputDataConfig = &admin.RawOutputDataConfig{OutputLocationPrefix: rawOutput}
request.Spec.ClusterAssignment = &clusterAssignment
request.Spec.ExecutionClusterLabel = &admin.ExecutionClusterLabel{Value: executionClusterLabel}

identity, err := auth.NewIdentityContext("", principal, "", time.Now(), sets.NewString(), nil, nil)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/tls"
"fmt"
"google.golang.org/protobuf/encoding/protojson"
"net"
"net/http"
"strings"
Expand All @@ -24,6 +23,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/encoding/protojson"
"k8s.io/apimachinery/pkg/util/rand"

"github.com/flyteorg/flyte/flyteadmin/auth"
Expand Down
11 changes: 6 additions & 5 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
}

executionTargetSpec := executioncluster.ExecutionTargetSpec{
Project: data.ExecutionID.Project,
Domain: data.ExecutionID.Domain,
Workflow: data.ReferenceWorkflowName,
LaunchPlan: data.ReferenceWorkflowName,
ExecutionID: data.ExecutionID.Name,
Project: data.ExecutionID.Project,
Domain: data.ExecutionID.Domain,
Workflow: data.ReferenceWorkflowName,
LaunchPlan: data.ReferenceWorkflowName,
ExecutionID: data.ExecutionID.Name,
ExecutionClusterLabel: data.ExecutionParameters.ExecutionClusterLabel,
}
targetCluster, err := e.executionCluster.GetTarget(ctx, &executionTargetSpec)
if err != nil {
Expand Down
25 changes: 13 additions & 12 deletions flyteadmin/pkg/workflowengine/interfaces/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ type TaskResources struct {
}

type ExecutionParameters struct {
Inputs *core.LiteralMap
AcceptedAt time.Time
Labels map[string]string
Annotations map[string]string
TaskPluginOverrides []*admin.PluginOverride
ExecutionConfig *admin.WorkflowExecutionConfig
RecoveryExecution *core.WorkflowExecutionIdentifier
TaskResources *TaskResources
EventVersion int
RoleNameKey string
RawOutputDataConfig *admin.RawOutputDataConfig
ClusterAssignment *admin.ClusterAssignment
Inputs *core.LiteralMap
AcceptedAt time.Time
Labels map[string]string
Annotations map[string]string
TaskPluginOverrides []*admin.PluginOverride
ExecutionConfig *admin.WorkflowExecutionConfig
RecoveryExecution *core.WorkflowExecutionIdentifier
TaskResources *TaskResources
EventVersion int
RoleNameKey string
RawOutputDataConfig *admin.RawOutputDataConfig
ClusterAssignment *admin.ClusterAssignment
ExecutionClusterLabel *admin.ExecutionClusterLabel
}

// ExecutionData includes all parameters required to create an execution CRD object.
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/tests/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ package tests

import (
"context"
"github.com/flyteorg/flyte/flytestdlib/utils"
"testing"

"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/utils"
)

func TestCreateProject(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions flyteidl/clients/go/assets/admin.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/admin/execution_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e7548e0

Please sign in to comment.