diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index da7489258d1..7f1efc9b8ef 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -592,6 +592,7 @@ func (m *ExecutionManager) launchSingleTaskExecution( WorkflowClosure: workflow.Closure.CompiledWorkflow, WorkflowClosureReference: storage.DataReference(workflowModel.RemoteClosureIdentifier), ExecutionParameters: executionParameters, + OffloadedInputsReference: inputsURI, }) if err != nil { diff --git a/flyteadmin/pkg/runtime/interfaces/application_configuration.go b/flyteadmin/pkg/runtime/interfaces/application_configuration.go index 092aa665b60..ca6dc609231 100644 --- a/flyteadmin/pkg/runtime/interfaces/application_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/application_configuration.go @@ -106,6 +106,9 @@ type ApplicationConfig struct { // A URL pointing to the flyteconsole instance used to hit this flyteadmin instance. ConsoleURL string `json:"consoleUrl,omitempty" pflag:",A URL pointing to the flyteconsole instance used to hit this flyteadmin instance."` + + // Enabling this will instruct operator to use storage (s3/gcs/etc) to offload workflow execution inputs instead of storing them inline in the CRD. + UseOffloadedInputs bool `json:"useOffloadedInputs" pflag:",Use offloaded inputs for workflows."` } func (a *ApplicationConfig) GetRoleNameKey() string { diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go index 163a58cab3a..d941cc8309d 100644 --- a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go +++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go @@ -54,6 +54,10 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut flyteWf.SubWorkflows = nil flyteWf.Tasks = nil } + if e.config.ApplicationConfiguration().GetTopLevelConfig().UseOffloadedInputs { + flyteWf.OffloadedInputs = data.OffloadedInputsReference + flyteWf.Inputs = nil + } if consoleURL := e.config.ApplicationConfiguration().GetTopLevelConfig().ConsoleURL; len(consoleURL) > 0 { flyteWf.ConsoleURL = consoleURL diff --git a/flyteadmin/pkg/workflowengine/interfaces/executor.go b/flyteadmin/pkg/workflowengine/interfaces/executor.go index 181986c2c34..27de882a5f0 100644 --- a/flyteadmin/pkg/workflowengine/interfaces/executor.go +++ b/flyteadmin/pkg/workflowengine/interfaces/executor.go @@ -49,6 +49,8 @@ type ExecutionData struct { WorkflowClosureReference storage.DataReference // Additional parameters used to build a workflow execution ExecutionParameters ExecutionParameters + // Storage data reference of the execution inputs + OffloadedInputsReference storage.DataReference } // ExecutionResponse is returned when a Flyte workflow execution is successfully created. diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go index 22ed947f11a..41bd508894e 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -79,6 +79,11 @@ type FlyteWorkflow struct { // Flyteconsole url ConsoleURL string `json:"consoleUrl,omitempty"` + + // Much like WorkflowClosureReference, this field represents the location of offloaded inputs. If this exists, + // then the literal Inputs must not be populated. Flytepropeller must retrieve and parse the static inputs prior to + // processing. + OffloadedInputs DataReference `json:"offloadedInputs,omitempty"` } func (in *FlyteWorkflow) GetSecurityContext() core.SecurityContext { diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index c2f7a35ebef..1982b405cb6 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -100,9 +100,23 @@ func (c *workflowExecutor) handleReadyWorkflow(ctx context.Context, w *v1alpha1. Message: err.Error()}), nil } w.GetExecutionStatus().SetDataDir(ref) - var inputs *core.LiteralMap + inputs := &core.LiteralMap{} if w.Inputs != nil { + if len(w.OffloadedInputs) > 0 { + return StatusFailing(&core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: errors.BadSpecificationError.String(), + Message: "cannot specify inline inputs AND offloaded inputs"}), nil + } inputs = w.Inputs.LiteralMap + } else if len(w.OffloadedInputs) > 0 { + err = c.store.ReadProtobuf(ctx, w.OffloadedInputs, inputs) + if err != nil { + return StatusFailing(&core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: "OffloadedInputsReadFailure", + Message: err.Error()}), nil + } } // Before starting the subworkflow, lets set the inputs for the Workflow. The inputs for a SubWorkflow are essentially // Copy of the inputs to the Node