Skip to content

Commit

Permalink
Handle inputs in subworkflows
Browse files Browse the repository at this point in the history
Signed-off-by: Dennis Keck <[email protected]>
  • Loading branch information
fellhorn committed Jun 17, 2024
1 parent 42e4ca4 commit 5246e6c
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 12 deletions.
74 changes: 62 additions & 12 deletions flytectl/cmd/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,15 @@ func compileFromPackage(packagePath string) error {
}

var providers []common.InterfaceProvider
for _, plan := range plans {
providers = append(providers, compiler.NewLaunchPlanInterfaceProvider(*plan))
}
var compiledWorkflows = map[string]*core.CompiledWorkflowClosure{}

// compile workflows
for wfName, workflow := range workflows {

fmt.Println("\nCompiling workflow:", wfName)
for _, workflow := range workflows {
providers, err = handleWorkflow(workflow, compiledTasks, compiledWorkflows, providers, plans, workflows)

_, err := compiler.CompileWorkflow(workflow.Template,
workflow.SubWorkflows,
compiledTasks,
providers)
if err != nil {
fmt.Println(":( Error Compiling workflow:", wfName)
return err
}

}

fmt.Println("All Workflows compiled successfully!")
Expand All @@ -109,6 +100,65 @@ func compileFromPackage(packagePath string) error {
return nil
}

func handleWorkflow(
workflow *admin.WorkflowSpec,
compiledTasks []*core.CompiledTask,
compiledWorkflows map[string]*core.CompiledWorkflowClosure,
compiledLaunchPlanProviders []common.InterfaceProvider,
plans map[string]*admin.LaunchPlan,
workflows map[string]*admin.WorkflowSpec) ([]common.InterfaceProvider, error) {
reqs, _ := compiler.GetRequirements(workflow.Template, workflow.SubWorkflows)
wfName := workflow.Template.Id.Name

// Check if all the subworkflows referenced by launchplan are compiled
for i := range reqs.GetRequiredLaunchPlanIds() {
lpID := &reqs.GetRequiredLaunchPlanIds()[i]
lpWfName := plans[lpID.Name].Spec.WorkflowId.Name
missingWorkflow := workflows[lpWfName]
if compiledWorkflows[lpWfName] == nil {
// Recursively compile the missing workflow first
err := error(nil)
compiledLaunchPlanProviders, err = handleWorkflow(missingWorkflow, compiledTasks, compiledWorkflows, compiledLaunchPlanProviders, plans, workflows)
if err != nil {
return nil, err

Check warning on line 123 in flytectl/cmd/compile/compile.go

View check run for this annotation

Codecov / codecov/patch

flytectl/cmd/compile/compile.go#L120-L123

Added lines #L120 - L123 were not covered by tests
}
}
}

fmt.Println("\nCompiling workflow:", wfName)

wf, err := compiler.CompileWorkflow(workflow.Template,
workflow.SubWorkflows,
compiledTasks,
compiledLaunchPlanProviders)

if err != nil {
fmt.Println(":( Error Compiling workflow:", wfName)
return nil, err
}
compiledWorkflows[wfName] = wf

// Update the expected inputs and outputs for the launchplans which reference this workflow
for _, plan := range plans {
if plan.Spec.WorkflowId.Name == wfName {
plan.Closure.ExpectedOutputs = wf.Primary.Template.Interface.Outputs
newMap := make(map[string]*core.Parameter)

for key, value := range wf.Primary.Template.Interface.Inputs.Variables {
newMap[key] = &core.Parameter{
Var: value,
}
}
plan.Closure.ExpectedInputs = &core.ParameterMap{
Parameters: newMap,
}
compiledLaunchPlanProviders = append(compiledLaunchPlanProviders, compiler.NewLaunchPlanInterfaceProvider(*plan))
}
}

return compiledLaunchPlanProviders, nil
}

const (
compileShort = `Validate flyte packages without registration needed.`
compileLong = `
Expand Down
Binary file modified flytectl/cmd/compile/testdata/launchplan-in-wf.tgz
Binary file not shown.

0 comments on commit 5246e6c

Please sign in to comment.