Skip to content

Commit

Permalink
Support offloading workflow CRD inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan committed Jul 31, 2024
1 parent 025296a commit cfb6571
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 1 deletion.
1 change: 1 addition & 0 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ func (m *ExecutionManager) launchSingleTaskExecution(
WorkflowClosure: workflow.Closure.CompiledWorkflow,
WorkflowClosureReference: storage.DataReference(workflowModel.RemoteClosureIdentifier),
ExecutionParameters: executionParameters,
OffloadedInputsReference: inputsURI,
})

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ type ApplicationConfig struct {

// A URL pointing to the flyteconsole instance used to hit this flyteadmin instance.
ConsoleURL string `json:"consoleUrl,omitempty" pflag:",A URL pointing to the flyteconsole instance used to hit this flyteadmin instance."`

// Enabling this will instruct operator to use storage (s3/gcs/etc) to offload workflow execution inputs instead of storing them inline in the CRD.
UseOffloadedInputs bool `json:"useOffloadedInputs" pflag:",Use offloaded inputs for workflows."`
}

func (a *ApplicationConfig) GetRoleNameKey() string {
Expand Down
4 changes: 4 additions & 0 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
flyteWf.SubWorkflows = nil
flyteWf.Tasks = nil
}
if e.config.ApplicationConfiguration().GetTopLevelConfig().UseOffloadedInputs {
flyteWf.OffloadedInputs = data.OffloadedInputsReference
flyteWf.Inputs = nil

Check warning on line 59 in flyteadmin/pkg/workflowengine/impl/k8s_executor.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/workflowengine/impl/k8s_executor.go#L58-L59

Added lines #L58 - L59 were not covered by tests
}

if consoleURL := e.config.ApplicationConfiguration().GetTopLevelConfig().ConsoleURL; len(consoleURL) > 0 {
flyteWf.ConsoleURL = consoleURL
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/pkg/workflowengine/interfaces/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type ExecutionData struct {
WorkflowClosureReference storage.DataReference
// Additional parameters used to build a workflow execution
ExecutionParameters ExecutionParameters
// Storage data reference of the execution inputs
OffloadedInputsReference storage.DataReference
}

// ExecutionResponse is returned when a Flyte workflow execution is successfully created.
Expand Down
5 changes: 5 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type FlyteWorkflow struct {

// Flyteconsole url
ConsoleURL string `json:"consoleUrl,omitempty"`

// Much like WorkflowClosureReference, this field represents the location of offloaded inputs. If this exists,
// then the literal Inputs must not be populated. Flytepropeller must retrieve and parse the static inputs prior to
// processing.
OffloadedInputs DataReference `json:"offloadedInputs,omitempty"`
}

func (in *FlyteWorkflow) GetSecurityContext() core.SecurityContext {
Expand Down
16 changes: 15 additions & 1 deletion flytepropeller/pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,23 @@ func (c *workflowExecutor) handleReadyWorkflow(ctx context.Context, w *v1alpha1.
Message: err.Error()}), nil
}
w.GetExecutionStatus().SetDataDir(ref)
var inputs *core.LiteralMap
inputs := &core.LiteralMap{}
if w.Inputs != nil {
if len(w.OffloadedInputs) > 0 {
return StatusFailing(&core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: errors.BadSpecificationError.String(),
Message: "cannot specify inline inputs AND offloaded inputs"}), nil

Check warning on line 109 in flytepropeller/pkg/controller/workflow/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/workflow/executor.go#L106-L109

Added lines #L106 - L109 were not covered by tests
}
inputs = w.Inputs.LiteralMap
} else if len(w.OffloadedInputs) > 0 {
err = c.store.ReadProtobuf(ctx, w.OffloadedInputs, inputs)
if err != nil {
return StatusFailing(&core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: "OffloadedInputsReadFailure",
Message: err.Error()}), nil

Check warning on line 118 in flytepropeller/pkg/controller/workflow/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/workflow/executor.go#L112-L118

Added lines #L112 - L118 were not covered by tests
}
}
// Before starting the subworkflow, lets set the inputs for the Workflow. The inputs for a SubWorkflow are essentially
// Copy of the inputs to the Node
Expand Down

0 comments on commit cfb6571

Please sign in to comment.