Skip to content

Commit

Permalink
fix: set execution name empty is not specified
Browse files Browse the repository at this point in the history
Signed-off-by: wayner0628 <[email protected]>
  • Loading branch information
wayner0628 committed Aug 13, 2024
1 parent 18d898c commit a065a58
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 24 deletions.
14 changes: 10 additions & 4 deletions flytectl/cmd/create/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,23 +223,29 @@ var executionConfig = &ExecutionConfig{}
func createExecutionCommand(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error {
sourceProject := config.GetConfig().Project
sourceDomain := config.GetConfig().Domain

var targetExecName string
if len(args) > 0 {
targetExecName = args[0]
}

execParams, err := readConfigAndValidate(config.GetConfig().Project, config.GetConfig().Domain)
if err != nil {
return err
}
var executionRequest *admin.ExecutionCreateRequest
switch execParams.execType {
case Relaunch:
return relaunchExecution(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig)
return relaunchExecution(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName)
case Recover:
return recoverExecution(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig)
return recoverExecution(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName)
case Task:
executionRequest, err = createExecutionRequestForTask(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig)
executionRequest, err = createExecutionRequestForTask(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName)
if err != nil {
return err
}
case Workflow:
executionRequest, err = createExecutionRequestForWorkflow(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig)
executionRequest, err = createExecutionRequestForWorkflow(ctx, execParams.name, sourceProject, sourceDomain, cmdCtx, executionConfig, targetExecName)
if err != nil {
return err
}
Expand Down
20 changes: 12 additions & 8 deletions flytectl/cmd/create/execution_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ import (
"context"
"fmt"
"io/ioutil"
"strings"

cmdCore "github.com/flyteorg/flyte/flytectl/cmd/core"
cmdGet "github.com/flyteorg/flyte/flytectl/cmd/get"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/google/uuid"
"sigs.k8s.io/yaml"
)

func createExecutionRequestForWorkflow(ctx context.Context, workflowName, project, domain string,
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig) (*admin.ExecutionCreateRequest, error) {
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig, targetExecName string) (*admin.ExecutionCreateRequest, error) {
// Fetch the launch plan
lp, err := cmdCtx.AdminFetcherExt().FetchLPVersion(ctx, workflowName, executionConfig.Version, project, domain)
if err != nil {
Expand Down Expand Up @@ -51,11 +53,11 @@ func createExecutionRequestForWorkflow(ctx context.Context, workflowName, projec
}
}

return createExecutionRequest(lp.Id, inputs, envs, securityContext, authRole, executionConfig.TargetExecutionCluster), nil
return createExecutionRequest(lp.Id, inputs, envs, securityContext, authRole, targetExecName, executionConfig.TargetExecutionCluster), nil
}

func createExecutionRequestForTask(ctx context.Context, taskName string, project string, domain string,
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig) (*admin.ExecutionCreateRequest, error) {
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig, targetExecName string) (*admin.ExecutionCreateRequest, error) {
// Fetch the task
task, err := cmdCtx.AdminFetcherExt().FetchTaskVersion(ctx, taskName, executionConfig.Version, project, domain)
if err != nil {
Expand Down Expand Up @@ -99,11 +101,11 @@ func createExecutionRequestForTask(ctx context.Context, taskName string, project
Version: task.Id.Version,
}

return createExecutionRequest(id, inputs, envs, securityContext, authRole, executionConfig.TargetExecutionCluster), nil
return createExecutionRequest(id, inputs, envs, securityContext, authRole, targetExecName, executionConfig.TargetExecutionCluster), nil
}

func relaunchExecution(ctx context.Context, executionName string, project string, domain string,
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig) error {
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig, targetExecutionName string) error {
if executionConfig.DryRun {
logger.Debugf(ctx, "skipping RelaunchExecution request (DryRun)")
return nil
Expand All @@ -114,6 +116,7 @@ func relaunchExecution(ctx context.Context, executionName string, project string
Project: project,
Domain: domain,
},
Name: targetExecutionName,
OverwriteCache: executionConfig.OverwriteCache,
})
if err != nil {
Expand All @@ -124,7 +127,7 @@ func relaunchExecution(ctx context.Context, executionName string, project string
}

func recoverExecution(ctx context.Context, executionName string, project string, domain string,
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig) error {
cmdCtx cmdCore.CommandContext, executionConfig *ExecutionConfig, targetExecName string) error {
if executionConfig.DryRun {
logger.Debugf(ctx, "skipping RecoverExecution request (DryRun)")
return nil
Expand All @@ -135,6 +138,7 @@ func recoverExecution(ctx context.Context, executionName string, project string,
Project: project,
Domain: domain,
},
Name: targetExecName,
})
if err != nil {
return err
Expand All @@ -143,8 +147,7 @@ func recoverExecution(ctx context.Context, executionName string, project string,
return nil
}

func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *admin.Envs, securityContext *core.SecurityContext, authRole *admin.AuthRole, targetExecutionCluster string) *admin.ExecutionCreateRequest {

func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *admin.Envs, securityContext *core.SecurityContext, authRole *admin.AuthRole, targetExecName string, targetExecutionCluster string) *admin.ExecutionCreateRequest {
var clusterAssignment *admin.ClusterAssignment
if executionConfig.ClusterPool != "" {
clusterAssignment = &admin.ClusterAssignment{ClusterPoolName: executionConfig.ClusterPool}
Expand All @@ -156,6 +159,7 @@ func createExecutionRequest(ID *core.Identifier, inputs *core.LiteralMap, envs *
return &admin.ExecutionCreateRequest{
Project: executionConfig.TargetProject,
Domain: executionConfig.TargetDomain,
Name: targetExecName,
Spec: &admin.ExecutionSpec{
LaunchPlan: ID,
Metadata: &admin.ExecutionMetadata{
Expand Down
24 changes: 12 additions & 12 deletions flytectl/cmd/create/execution_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestCreateExecutionForRelaunch(t *testing.T) {

createExecutionUtilSetup()
s.MockAdminClient.OnRelaunchExecutionMatch(s.Ctx, relaunchRequest).Return(executionCreateResponse, nil)
err := relaunchExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
err := relaunchExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.Nil(t, err)
}

Expand All @@ -60,7 +60,7 @@ func TestCreateExecutionForRelaunchNotFound(t *testing.T) {

createExecutionUtilSetup()
s.MockAdminClient.OnRelaunchExecutionMatch(s.Ctx, relaunchRequest).Return(nil, errors.New("unknown execution"))
err := relaunchExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
err := relaunchExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")

assert.NotNil(t, err)
assert.Equal(t, err, errors.New("unknown execution"))
Expand All @@ -72,7 +72,7 @@ func TestCreateExecutionForRecovery(t *testing.T) {

createExecutionUtilSetup()
s.MockAdminClient.OnRecoverExecutionMatch(s.Ctx, recoverRequest).Return(executionCreateResponse, nil)
err := recoverExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
err := recoverExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.Nil(t, err)
}

Expand All @@ -82,7 +82,7 @@ func TestCreateExecutionForRecoveryNotFound(t *testing.T) {

createExecutionUtilSetup()
s.MockAdminClient.OnRecoverExecutionMatch(s.Ctx, recoverRequest).Return(nil, errors.New("unknown execution"))
err := recoverExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
err := recoverExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.NotNil(t, err)
assert.Equal(t, err, errors.New("unknown execution"))
}
Expand All @@ -95,7 +95,7 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
createExecutionUtilSetup()
launchPlan := &admin.LaunchPlan{}
s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(launchPlan, nil)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
Expand All @@ -109,7 +109,7 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
var executionConfigWithEnvs = &ExecutionConfig{
Envs: map[string]string{"foo": "bar"},
}
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
Expand All @@ -123,7 +123,7 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
var executionConfigWithEnvs = &ExecutionConfig{
Envs: map[string]string{},
}
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
})
Expand All @@ -138,7 +138,7 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
Envs: map[string]string{},
TargetExecutionCluster: "cluster",
}
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfigWithEnvs, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
assert.Equal(t, "cluster", execCreateRequest.Spec.ExecutionClusterLabel.Value)
Expand All @@ -156,7 +156,7 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
},
}
s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(launchPlan, nil)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.NotNil(t, err)
assert.Nil(t, execCreateRequest)
assert.Equal(t, fmt.Errorf("parameter [nilparam] has nil Variable"), err)
Expand All @@ -167,7 +167,7 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {

createExecutionUtilSetup()
s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("failed"))
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.NotNil(t, err)
assert.Nil(t, execCreateRequest)
assert.Equal(t, err, errors.New("failed"))
Expand All @@ -181,7 +181,7 @@ func TestCreateExecutionRequestForWorkflow(t *testing.T) {
launchPlan := &admin.LaunchPlan{}
s.FetcherExt.OnFetchLPVersionMatch(s.Ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(launchPlan, nil)
s.MockAdminClient.OnGetLaunchPlanMatch(s.Ctx, mock.Anything).Return(launchPlan, nil)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
execCreateRequest, err := createExecutionRequestForWorkflow(s.Ctx, "wfName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.Nil(t, err)
assert.NotNil(t, execCreateRequest)
executionConfig.KubeServiceAcct = ""
Expand Down Expand Up @@ -323,6 +323,6 @@ func TestCreateExecutionForRelaunchOverwritingCache(t *testing.T) {
executionConfig.OverwriteCache = true
relaunchRequest.OverwriteCache = true // ensure request has overwriteCache param set
s.MockAdminClient.OnRelaunchExecutionMatch(s.Ctx, relaunchRequest).Return(executionCreateResponse, nil)
err := relaunchExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig)
err := relaunchExecution(s.Ctx, "execName", config.GetConfig().Project, config.GetConfig().Domain, s.CmdCtx, executionConfig, "")
assert.Nil(t, err)
}

0 comments on commit a065a58

Please sign in to comment.