Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Make 'flytectl compile' consider launchplans used within workflows #5463

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 64 additions & 10 deletions flytectl/cmd/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,16 @@
return err
}

// compile workflows
for wfName, workflow := range workflows {
var providers []common.InterfaceProvider
var compiledWorkflows = map[string]*core.CompiledWorkflowClosure{}

fmt.Println("\nCompiling workflow:", wfName)
plan := plans[wfName]
// compile workflows
for _, workflow := range workflows {
providers, err = handleWorkflow(workflow, compiledTasks, compiledWorkflows, providers, plans, workflows)

_, err := compiler.CompileWorkflow(workflow.Template,
workflow.SubWorkflows,
compiledTasks,
[]common.InterfaceProvider{compiler.NewLaunchPlanInterfaceProvider(*plan)})
if err != nil {
fmt.Println(":( Error Compiling workflow:", wfName)
return err
}

}

fmt.Println("All Workflows compiled successfully!")
Expand All @@ -105,6 +100,65 @@
return nil
}

func handleWorkflow(
workflow *admin.WorkflowSpec,
compiledTasks []*core.CompiledTask,
compiledWorkflows map[string]*core.CompiledWorkflowClosure,
compiledLaunchPlanProviders []common.InterfaceProvider,
plans map[string]*admin.LaunchPlan,
workflows map[string]*admin.WorkflowSpec) ([]common.InterfaceProvider, error) {
reqs, _ := compiler.GetRequirements(workflow.Template, workflow.SubWorkflows)
wfName := workflow.Template.Id.Name

// Check if all the subworkflows referenced by launchplan are compiled
for i := range reqs.GetRequiredLaunchPlanIds() {
lpID := &reqs.GetRequiredLaunchPlanIds()[i]
fg91 marked this conversation as resolved.
Show resolved Hide resolved
lpWfName := plans[lpID.Name].Spec.WorkflowId.Name
missingWorkflow := workflows[lpWfName]
if compiledWorkflows[lpWfName] == nil {
// Recursively compile the missing workflow first
err := error(nil)
compiledLaunchPlanProviders, err = handleWorkflow(missingWorkflow, compiledTasks, compiledWorkflows, compiledLaunchPlanProviders, plans, workflows)
if err != nil {
return nil, err

Check warning on line 123 in flytectl/cmd/compile/compile.go

View check run for this annotation

Codecov / codecov/patch

flytectl/cmd/compile/compile.go#L120-L123

Added lines #L120 - L123 were not covered by tests
}
}
}

fmt.Println("\nCompiling workflow:", wfName)

wf, err := compiler.CompileWorkflow(workflow.Template,
workflow.SubWorkflows,
compiledTasks,
compiledLaunchPlanProviders)

if err != nil {
fmt.Println(":( Error Compiling workflow:", wfName)
return nil, err
}
compiledWorkflows[wfName] = wf

// Update the expected inputs and outputs for the launchplans which reference this workflow
for _, plan := range plans {
if plan.Spec.WorkflowId.Name == wfName {
plan.Closure.ExpectedOutputs = wf.Primary.Template.Interface.Outputs
newMap := make(map[string]*core.Parameter)

for key, value := range wf.Primary.Template.Interface.Inputs.Variables {
newMap[key] = &core.Parameter{
Var: value,
}
}
plan.Closure.ExpectedInputs = &core.ParameterMap{
Parameters: newMap,
}
compiledLaunchPlanProviders = append(compiledLaunchPlanProviders, compiler.NewLaunchPlanInterfaceProvider(*plan))
}
}

return compiledLaunchPlanProviders, nil
}

const (
compileShort = `Validate flyte packages without registration needed.`
compileLong = `
Expand Down
5 changes: 5 additions & 0 deletions flytectl/cmd/compile/compile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func TestCompileCommand(t *testing.T) {
assert.NotNil(t, err, "calling compile with Empty file flag does not error")
}

// New packages can be created by using the following command
// pyflyte --pkgs <module> package -f
func TestCompilePackage(t *testing.T) {
// valid package contains two workflows
// with three tasks
Expand All @@ -69,4 +71,7 @@ func TestCompilePackage(t *testing.T) {
err = compileFromPackage("testdata/invalidworkflow.tgz")
assert.NotNil(t, err, "unable to handle invalid workflow")

// testing workflows with launchplans used within workflow
err = compileFromPackage("testdata/launchplan-in-wf.tgz")
assert.Nil(t, err, "unable to compile workflow with launchplans used within workflow")
}
16 changes: 16 additions & 0 deletions flytectl/cmd/compile/testdata/launchplan-in-wf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Tests that a LaunchPlan with inputs can be used in a workflow for flytectl compile
from flytekit import LaunchPlan, task, workflow

@task
def my_task(num: int) -> int:
return num + 1


@workflow
def inner_workflow(num: int) -> int:
return my_task(num=num)


@workflow
def outer_workflow() -> int:
return LaunchPlan.get_or_create(inner_workflow, "name_override", default_inputs={"num": 42})()
Binary file added flytectl/cmd/compile/testdata/launchplan-in-wf.tgz
Binary file not shown.
Loading