From 5a3945f90d4085c2112dfea41325dd9f0acf006c Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Thu, 21 Mar 2024 12:17:00 -0700 Subject: [PATCH 1/2] remove setting max size bytes in node context Signed-off-by: Paul Dittamo --- .../pluginmachinery/core/exec_context.go | 5 +- .../core/mocks/task_execution_context.go | 32 ---------- .../core/mocks/task_overrides.go | 58 +++++++++---------- .../ioutils/remote_file_output_reader.go | 6 +- .../k8s/mocks/plugin_context.go | 32 ---------- .../go/tasks/pluginmachinery/k8s/plugin.go | 3 - .../webapi/mocks/status_context.go | 32 ---------- .../webapi/mocks/task_execution_context.go | 32 ---------- .../go/tasks/pluginmachinery/webapi/plugin.go | 3 - .../go/tasks/plugins/array/outputs.go | 2 +- .../go/tasks/plugins/array/outputs_test.go | 2 - flyteplugins/go/tasks/plugins/testing/echo.go | 2 +- .../go/tasks/plugins/webapi/agent/plugin.go | 2 +- .../tasks/plugins/webapi/databricks/plugin.go | 2 +- flyteplugins/tests/end_to_end.go | 1 - flytepropeller/pkg/controller/controller.go | 2 +- .../pkg/controller/nodes/array/handler.go | 4 +- .../controller/nodes/array/handler_test.go | 3 +- .../controller/nodes/branch/handler_test.go | 1 - flytepropeller/pkg/controller/nodes/cache.go | 2 +- .../nodes/dynamic/dynamic_workflow_test.go | 1 - .../pkg/controller/nodes/dynamic/handler.go | 2 +- .../controller/nodes/dynamic/handler_test.go | 4 -- .../pkg/controller/nodes/executor.go | 4 +- .../pkg/controller/nodes/executor_test.go | 35 +++++------ .../mocks/node_execution_context.go | 32 ---------- .../nodes/interfaces/node_exec_context.go | 3 +- .../pkg/controller/nodes/node_exec_context.go | 49 +++++++--------- .../nodes/node_exec_context_test.go | 5 +- .../nodes/subworkflow/handler_test.go | 1 - .../pkg/controller/nodes/task/handler_test.go | 4 -- .../nodes/task/k8s/plugin_manager.go | 2 +- .../nodes/task/k8s/plugin_manager_test.go | 1 - .../nodes/task/taskexec_context_test.go | 2 - .../pkg/controller/workflow/executor_test.go | 15 +++-- 35 files changed, 94 insertions(+), 292 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/core/exec_context.go b/flyteplugins/go/tasks/pluginmachinery/core/exec_context.go index 11f48a33b2..a2f20bcee5 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/exec_context.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/exec_context.go @@ -9,6 +9,8 @@ import ( "github.com/flyteorg/flyte/flytestdlib/storage" ) +//go:generate mockery -all -case=underscore + // An interface to access a remote/sharable location that contains the serialized TaskTemplate type TaskTemplatePath interface { // Returns the path @@ -34,9 +36,6 @@ type TaskExecutionContext interface { // Returns a method that allows a plugin to indicate that the task has a new update and can be invoked again to check for updates TaskRefreshIndicator() SignalAsync - // Returns the max allowed dataset size that the outputwriter will accept - MaxDatasetSizeBytes() int64 - // Returns a handle to the currently configured storage backend that can be used to communicate with the tasks or write metadata DataStore() *storage.DataStore diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_context.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_context.go index ec4b5e346f..8ae8601b1e 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_context.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_execution_context.go @@ -154,38 +154,6 @@ func (_m *TaskExecutionContext) InputReader() io.InputReader { return r0 } -type TaskExecutionContext_MaxDatasetSizeBytes struct { - *mock.Call -} - -func (_m TaskExecutionContext_MaxDatasetSizeBytes) Return(_a0 int64) *TaskExecutionContext_MaxDatasetSizeBytes { - return &TaskExecutionContext_MaxDatasetSizeBytes{Call: _m.Call.Return(_a0)} -} - -func (_m *TaskExecutionContext) OnMaxDatasetSizeBytes() *TaskExecutionContext_MaxDatasetSizeBytes { - c_call := _m.On("MaxDatasetSizeBytes") - return &TaskExecutionContext_MaxDatasetSizeBytes{Call: c_call} -} - -func (_m *TaskExecutionContext) OnMaxDatasetSizeBytesMatch(matchers ...interface{}) *TaskExecutionContext_MaxDatasetSizeBytes { - c_call := _m.On("MaxDatasetSizeBytes", matchers...) - return &TaskExecutionContext_MaxDatasetSizeBytes{Call: c_call} -} - -// MaxDatasetSizeBytes provides a mock function with given fields: -func (_m *TaskExecutionContext) MaxDatasetSizeBytes() int64 { - ret := _m.Called() - - var r0 int64 - if rf, ok := ret.Get(0).(func() int64); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(int64) - } - - return r0 -} - type TaskExecutionContext_OutputWriter struct { *mock.Call } diff --git a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go index 57c9f9db2d..ab42b1189f 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go @@ -48,68 +48,66 @@ func (_m *TaskOverrides) GetConfig() *v1.ConfigMap { return r0 } -type TaskOverrides_GetExtendedResources struct { +type TaskOverrides_GetContainerImage struct { *mock.Call } -func (_m TaskOverrides_GetExtendedResources) Return(_a0 *flyteidlcore.ExtendedResources) *TaskOverrides_GetExtendedResources { - return &TaskOverrides_GetExtendedResources{Call: _m.Call.Return(_a0)} +func (_m TaskOverrides_GetContainerImage) Return(_a0 string) *TaskOverrides_GetContainerImage { + return &TaskOverrides_GetContainerImage{Call: _m.Call.Return(_a0)} } -func (_m *TaskOverrides) OnGetExtendedResources() *TaskOverrides_GetExtendedResources { - c_call := _m.On("GetExtendedResources") - return &TaskOverrides_GetExtendedResources{Call: c_call} +func (_m *TaskOverrides) OnGetContainerImage() *TaskOverrides_GetContainerImage { + c_call := _m.On("GetContainerImage") + return &TaskOverrides_GetContainerImage{Call: c_call} } -func (_m *TaskOverrides) OnGetExtendedResourcesMatch(matchers ...interface{}) *TaskOverrides_GetExtendedResources { - c_call := _m.On("GetExtendedResources", matchers...) - return &TaskOverrides_GetExtendedResources{Call: c_call} +func (_m *TaskOverrides) OnGetContainerImageMatch(matchers ...interface{}) *TaskOverrides_GetContainerImage { + c_call := _m.On("GetContainerImage", matchers...) + return &TaskOverrides_GetContainerImage{Call: c_call} } -// GetExtendedResources provides a mock function with given fields: -func (_m *TaskOverrides) GetExtendedResources() *flyteidlcore.ExtendedResources { +// GetContainerImage provides a mock function with given fields: +func (_m *TaskOverrides) GetContainerImage() string { ret := _m.Called() - var r0 *flyteidlcore.ExtendedResources - if rf, ok := ret.Get(0).(func() *flyteidlcore.ExtendedResources); ok { + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { r0 = rf() } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*flyteidlcore.ExtendedResources) - } + r0 = ret.Get(0).(string) } return r0 } -type TaskOverrides_GetContainerImage struct { +type TaskOverrides_GetExtendedResources struct { *mock.Call } -func (_m TaskOverrides_GetContainerImage) Return(_a0 string) *TaskOverrides_GetContainerImage { - return &TaskOverrides_GetContainerImage{Call: _m.Call.Return(_a0)} +func (_m TaskOverrides_GetExtendedResources) Return(_a0 *flyteidlcore.ExtendedResources) *TaskOverrides_GetExtendedResources { + return &TaskOverrides_GetExtendedResources{Call: _m.Call.Return(_a0)} } -func (_m *TaskOverrides) OnGetContainerImage() *TaskOverrides_GetContainerImage { - c_call := _m.On("GetContainerImage") - return &TaskOverrides_GetContainerImage{Call: c_call} +func (_m *TaskOverrides) OnGetExtendedResources() *TaskOverrides_GetExtendedResources { + c_call := _m.On("GetExtendedResources") + return &TaskOverrides_GetExtendedResources{Call: c_call} } -func (_m *TaskOverrides) OnGetContainerImageMatch(matchers ...interface{}) *TaskOverrides_GetContainerImage { - c_call := _m.On("GetContainerImage", matchers...) - return &TaskOverrides_GetContainerImage{Call: c_call} +func (_m *TaskOverrides) OnGetExtendedResourcesMatch(matchers ...interface{}) *TaskOverrides_GetExtendedResources { + c_call := _m.On("GetExtendedResources", matchers...) + return &TaskOverrides_GetExtendedResources{Call: c_call} } -// GetContainerImage provides a mock function with given fields: -func (_m *TaskOverrides) GetContainerImage() string { +// GetExtendedResources provides a mock function with given fields: +func (_m *TaskOverrides) GetExtendedResources() *flyteidlcore.ExtendedResources { ret := _m.Called() - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { + var r0 *flyteidlcore.ExtendedResources + if rf, ok := ret.Get(0).(func() *flyteidlcore.ExtendedResources); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(string) + r0 = ret.Get(0).(*flyteidlcore.ExtendedResources) } } diff --git a/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go b/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go index 152644d828..823fb29fc5 100644 --- a/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go +++ b/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go @@ -123,9 +123,13 @@ func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error) { } func NewRemoteFileOutputReader(_ context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader { + maxPayloadSize := maxDatasetSize + if maxPayloadSize == 0 { + maxPayloadSize = storage.GetConfig().Limits.GetLimitMegabytes * 1024 * 1024 + } return RemoteFileOutputReader{ outPath: outPaths, store: store, - maxPayloadSize: maxDatasetSize, + maxPayloadSize: maxPayloadSize, } } diff --git a/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go b/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go index 532b5fe76c..a274b0559e 100644 --- a/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go +++ b/flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go @@ -84,38 +84,6 @@ func (_m *PluginContext) InputReader() io.InputReader { return r0 } -type PluginContext_MaxDatasetSizeBytes struct { - *mock.Call -} - -func (_m PluginContext_MaxDatasetSizeBytes) Return(_a0 int64) *PluginContext_MaxDatasetSizeBytes { - return &PluginContext_MaxDatasetSizeBytes{Call: _m.Call.Return(_a0)} -} - -func (_m *PluginContext) OnMaxDatasetSizeBytes() *PluginContext_MaxDatasetSizeBytes { - c_call := _m.On("MaxDatasetSizeBytes") - return &PluginContext_MaxDatasetSizeBytes{Call: c_call} -} - -func (_m *PluginContext) OnMaxDatasetSizeBytesMatch(matchers ...interface{}) *PluginContext_MaxDatasetSizeBytes { - c_call := _m.On("MaxDatasetSizeBytes", matchers...) - return &PluginContext_MaxDatasetSizeBytes{Call: c_call} -} - -// MaxDatasetSizeBytes provides a mock function with given fields: -func (_m *PluginContext) MaxDatasetSizeBytes() int64 { - ret := _m.Called() - - var r0 int64 - if rf, ok := ret.Get(0).(func() int64); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(int64) - } - - return r0 -} - type PluginContext_OutputWriter struct { *mock.Call } diff --git a/flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go b/flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go index 8b56f7dd5e..1bbe07c02a 100644 --- a/flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go +++ b/flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go @@ -61,9 +61,6 @@ type PluginContext interface { // Returns a handle to the currently configured storage backend that can be used to communicate with the tasks or write metadata DataStore() *storage.DataStore - // Returns the max allowed dataset size that the outputwriter will accept - MaxDatasetSizeBytes() int64 - // Returns a handle to the Task's execution metadata. TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata diff --git a/flyteplugins/go/tasks/pluginmachinery/webapi/mocks/status_context.go b/flyteplugins/go/tasks/pluginmachinery/webapi/mocks/status_context.go index f9d1239185..e860c1843c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/webapi/mocks/status_context.go +++ b/flyteplugins/go/tasks/pluginmachinery/webapi/mocks/status_context.go @@ -84,38 +84,6 @@ func (_m *StatusContext) InputReader() io.InputReader { return r0 } -type StatusContext_MaxDatasetSizeBytes struct { - *mock.Call -} - -func (_m StatusContext_MaxDatasetSizeBytes) Return(_a0 int64) *StatusContext_MaxDatasetSizeBytes { - return &StatusContext_MaxDatasetSizeBytes{Call: _m.Call.Return(_a0)} -} - -func (_m *StatusContext) OnMaxDatasetSizeBytes() *StatusContext_MaxDatasetSizeBytes { - c_call := _m.On("MaxDatasetSizeBytes") - return &StatusContext_MaxDatasetSizeBytes{Call: c_call} -} - -func (_m *StatusContext) OnMaxDatasetSizeBytesMatch(matchers ...interface{}) *StatusContext_MaxDatasetSizeBytes { - c_call := _m.On("MaxDatasetSizeBytes", matchers...) - return &StatusContext_MaxDatasetSizeBytes{Call: c_call} -} - -// MaxDatasetSizeBytes provides a mock function with given fields: -func (_m *StatusContext) MaxDatasetSizeBytes() int64 { - ret := _m.Called() - - var r0 int64 - if rf, ok := ret.Get(0).(func() int64); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(int64) - } - - return r0 -} - type StatusContext_OutputWriter struct { *mock.Call } diff --git a/flyteplugins/go/tasks/pluginmachinery/webapi/mocks/task_execution_context.go b/flyteplugins/go/tasks/pluginmachinery/webapi/mocks/task_execution_context.go index 602c34114b..a8a78314ff 100644 --- a/flyteplugins/go/tasks/pluginmachinery/webapi/mocks/task_execution_context.go +++ b/flyteplugins/go/tasks/pluginmachinery/webapi/mocks/task_execution_context.go @@ -84,38 +84,6 @@ func (_m *TaskExecutionContext) InputReader() io.InputReader { return r0 } -type TaskExecutionContext_MaxDatasetSizeBytes struct { - *mock.Call -} - -func (_m TaskExecutionContext_MaxDatasetSizeBytes) Return(_a0 int64) *TaskExecutionContext_MaxDatasetSizeBytes { - return &TaskExecutionContext_MaxDatasetSizeBytes{Call: _m.Call.Return(_a0)} -} - -func (_m *TaskExecutionContext) OnMaxDatasetSizeBytes() *TaskExecutionContext_MaxDatasetSizeBytes { - c_call := _m.On("MaxDatasetSizeBytes") - return &TaskExecutionContext_MaxDatasetSizeBytes{Call: c_call} -} - -func (_m *TaskExecutionContext) OnMaxDatasetSizeBytesMatch(matchers ...interface{}) *TaskExecutionContext_MaxDatasetSizeBytes { - c_call := _m.On("MaxDatasetSizeBytes", matchers...) - return &TaskExecutionContext_MaxDatasetSizeBytes{Call: c_call} -} - -// MaxDatasetSizeBytes provides a mock function with given fields: -func (_m *TaskExecutionContext) MaxDatasetSizeBytes() int64 { - ret := _m.Called() - - var r0 int64 - if rf, ok := ret.Get(0).(func() int64); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(int64) - } - - return r0 -} - type TaskExecutionContext_OutputWriter struct { *mock.Call } diff --git a/flyteplugins/go/tasks/pluginmachinery/webapi/plugin.go b/flyteplugins/go/tasks/pluginmachinery/webapi/plugin.go index 37be2c1e40..920040cab0 100644 --- a/flyteplugins/go/tasks/pluginmachinery/webapi/plugin.go +++ b/flyteplugins/go/tasks/pluginmachinery/webapi/plugin.go @@ -72,9 +72,6 @@ type TaskExecutionContext interface { // Provides the raw datastore to enable persisting outputs. DataStore() *storage.DataStore - - // Returns the max allowed dataset size that the outputwriter will accept - MaxDatasetSizeBytes() int64 } type GetContext interface { diff --git a/flyteplugins/go/tasks/plugins/array/outputs.go b/flyteplugins/go/tasks/plugins/array/outputs.go index d31988425f..cb07fb0de1 100644 --- a/flyteplugins/go/tasks/plugins/array/outputs.go +++ b/flyteplugins/go/tasks/plugins/array/outputs.go @@ -225,7 +225,7 @@ func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tC switch w.Status() { case workqueue.WorkStatusSucceeded: - or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), tCtx.MaxDatasetSizeBytes()) + or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0) if err = tCtx.OutputWriter().Put(ctx, or); err != nil { return nil, err } diff --git a/flyteplugins/go/tasks/plugins/array/outputs_test.go b/flyteplugins/go/tasks/plugins/array/outputs_test.go index f218e9540f..529eba0429 100644 --- a/flyteplugins/go/tasks/plugins/array/outputs_test.go +++ b/flyteplugins/go/tasks/plugins/array/outputs_test.go @@ -244,7 +244,6 @@ func TestAssembleFinalOutputs(t *testing.T) { tCtx := &mocks3.TaskExecutionContext{} tCtx.OnTaskExecutionMetadata().Return(tMeta) tCtx.OnOutputWriter().Return(ow) - tCtx.OnMaxDatasetSizeBytes().Return(10000) tCtx.OnDataStore().Return(d) _, err = AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, 1, s) @@ -368,7 +367,6 @@ func TestAssembleFinalOutputs(t *testing.T) { tCtx.OnTaskReader().Return(tReader) tCtx.OnOutputWriter().Return(ow) tCtx.OnDataStore().Return(ds) - tCtx.OnMaxDatasetSizeBytes().Return(10000) _, err = AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, 1, s) assert.NoError(t, err) diff --git a/flyteplugins/go/tasks/plugins/testing/echo.go b/flyteplugins/go/tasks/plugins/testing/echo.go index 7c5587dd71..885ab5dfc4 100644 --- a/flyteplugins/go/tasks/plugins/testing/echo.go +++ b/flyteplugins/go/tasks/plugins/testing/echo.go @@ -76,7 +76,7 @@ func (e *EchoPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) return core.UnknownTransition, err } - or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), tCtx.MaxDatasetSizeBytes()) + or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0) if err = tCtx.OutputWriter().Put(ctx, or); err != nil { return core.UnknownTransition, err } diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go index c4decd04ef..099903f3e5 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go @@ -332,7 +332,7 @@ func writeOutput(ctx context.Context, taskCtx webapi.StatusContext, outputs *fly opReader = ioutils.NewInMemoryOutputReader(outputs, nil, nil) } else { logger.Debugf(ctx, "AgentDeployment didn't return any output, assuming file based outputs.") - opReader = ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), taskCtx.MaxDatasetSizeBytes()) + opReader = ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), 0) } return taskCtx.OutputWriter().Put(ctx, opReader) } diff --git a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go index 5ebe1d0075..47464f0311 100644 --- a/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go @@ -292,7 +292,7 @@ func writeOutput(ctx context.Context, taskCtx webapi.StatusContext) error { return nil } - outputReader := ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), taskCtx.MaxDatasetSizeBytes()) + outputReader := ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), 0) return taskCtx.OutputWriter().Put(ctx, outputReader) } diff --git a/flyteplugins/tests/end_to_end.go b/flyteplugins/tests/end_to_end.go index 1a55b52c0c..732241953d 100644 --- a/flyteplugins/tests/end_to_end.go +++ b/flyteplugins/tests/end_to_end.go @@ -243,7 +243,6 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i tCtx.OnCatalog().Return(cat) tCtx.OnEventsRecorder().Return(eRecorder) tCtx.OnResourceManager().Return(resourceManager) - tCtx.OnMaxDatasetSizeBytes().Return(1000000) tCtx.OnSecretManager().Return(secretManager) trns := pluginCore.DoTransition(pluginCore.PhaseInfoQueued(time.Now(), 0, "")) diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index acd7747d3f..c43f3688ec 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -439,7 +439,7 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter } nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink, - launchPlanActor, launchPlanActor, sCfg.Limits.GetLimitMegabytes*1024*1024, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, + launchPlanActor, launchPlanActor, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient, catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, nodeHandlerFactory, scope) if err != nil { return nil, errors.Wrapf(err, "Failed to create Controller.") diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 991a28c1c3..8820c41498 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -469,7 +469,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // checkpoint paths are not computed here because this function is only called when writing // existing cached outputs. if this functionality changes this will need to be revisited. outputPaths := ioutils.NewCheckpointRemoteFilePaths(ctx, nCtx.DataStore(), subOutputDir, ioutils.NewRawOutputPaths(ctx, subDataDir), "") - reader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes()) + reader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, 0) gatherOutputsRequest.reader = &reader a.gatherOutputsRequestChannel <- gatherOutputsRequest @@ -505,7 +505,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } } case v1alpha1.NodeKindWorkflow: - // TODO - to support launchplans we will need to process the output interface variables here + // TODO - to support launchplans we will need to process the output interface variables here fallthrough default: logger.Warnf(ctx, "ArrayNode does not support pre-populating outputLiteral collections for node kind '%s'", arrayNode.GetSubNodeSpec().GetKind()) diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index f0b91217e0..a785bcbe48 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -60,7 +60,7 @@ func createArrayNodeHandler(ctx context.Context, t *testing.T, nodeHandler inter // create node executor nodeExecutor, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, dataStore, enqueueWorkflowFunc, mockEventSink, adminClient, - adminClient, 10, "s3://bucket/", mockKubeClient, noopCatalogClient, mockRecoveryClient, eventConfig, "clusterID", mockSignalClient, mockHandlerFactory, scope) + adminClient, "s3://bucket/", mockKubeClient, noopCatalogClient, mockRecoveryClient, eventConfig, "clusterID", mockSignalClient, mockHandlerFactory, scope) assert.NoError(t, err) // return ArrayNodeHandler @@ -77,7 +77,6 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte inputLiteralMap *idlcore.LiteralMap, arrayNodeSpec *v1alpha1.NodeSpec, arrayNodeState *handler.ArrayNodeState) interfaces.NodeExecutionContext { nCtx := &mocks.NodeExecutionContext{} - nCtx.OnMaxDatasetSizeBytes().Return(9999999) nCtx.OnCurrentAttempt().Return(uint32(0)) // ContextualNodeLookup diff --git a/flytepropeller/pkg/controller/nodes/branch/handler_test.go b/flytepropeller/pkg/controller/nodes/branch/handler_test.go index a194452ff5..dfe0338fc1 100644 --- a/flytepropeller/pkg/controller/nodes/branch/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/branch/handler_test.go @@ -118,7 +118,6 @@ func createNodeContext(phase v1alpha1.BranchNodePhase, childNodeID *v1alpha1.Nod tmpDataStore, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) nCtx.OnDataStore().Return(tmpDataStore) nCtx.OnCurrentAttempt().Return(uint32(1)) - nCtx.OnMaxDatasetSizeBytes().Return(int64(1)) nCtx.OnNodeStatus().Return(ns) nCtx.OnNodeID().Return("n1") diff --git a/flytepropeller/pkg/controller/nodes/cache.go b/flytepropeller/pkg/controller/nodes/cache.go index 5d4c8455a5..c0fc071640 100644 --- a/flytepropeller/pkg/controller/nodes/cache.go +++ b/flytepropeller/pkg/controller/nodes/cache.go @@ -211,7 +211,7 @@ func (n *nodeExecutor) WriteCatalogCache(ctx context.Context, nCtx interfaces.No catalogKey.Identifier.Domain, catalogKey.Identifier.Name, catalogKey.Identifier.Version) outputPaths := ioutils.NewReadOnlyOutputFilePaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir()) - outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes()) + outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, 0) metadata := catalog.Metadata{ TaskExecutionIdentifier: task.GetTaskExecutionIdentifier(nCtx), } diff --git a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go index 4011ffb956..389ea0439b 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go @@ -94,7 +94,6 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t nCtx.OnInputReader().Return(ir) nCtx.OnCurrentAttempt().Return(uint32(1)) nCtx.OnTaskReader().Return(tr) - nCtx.OnMaxDatasetSizeBytes().Return(int64(1)) nCtx.OnNodeID().Return("n1") nCtx.OnEnqueueOwnerFunc().Return(func() error { return nil }) nCtx.OnDataStore().Return(dataStore) diff --git a/flytepropeller/pkg/controller/nodes/dynamic/handler.go b/flytepropeller/pkg/controller/nodes/dynamic/handler.go index 21ef2da487..e23f145bb3 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/handler.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/handler.go @@ -141,7 +141,7 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n // These outputPaths only reads the output metadata. So the sandbox is completely optional here and hence it is nil. // The sandbox creation as it uses hashing can be expensive and we skip that expense. outputPaths := ioutils.NewReadOnlyOutputFilePaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir()) - outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes()) + outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, 0) ee, err := d.TaskNodeHandler.ValidateOutput(ctx, nCtx.NodeID(), nCtx.InputReader(), outputReader, nil, nCtx.ExecutionContext().GetExecutionConfig(), nCtx.TaskReader()) diff --git a/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go b/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go index e66dadbebf..bab7b48f1b 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go @@ -135,7 +135,6 @@ func Test_dynamicNodeHandler_Handle_Parent(t *testing.T) { nCtx.OnInputReader().Return(ir) nCtx.OnCurrentAttempt().Return(uint32(1)) nCtx.OnTaskReader().Return(tr) - nCtx.OnMaxDatasetSizeBytes().Return(int64(1)) nCtx.OnNodeStatus().Return(ns) nCtx.OnNodeID().Return("n1") nCtx.OnEnqueueOwnerFunc().Return(nil) @@ -467,7 +466,6 @@ func Test_dynamicNodeHandler_Handle_SubTaskV1(t *testing.T) { nCtx.OnInputReader().Return(ir) nCtx.OnCurrentAttempt().Return(uint32(1)) nCtx.OnTaskReader().Return(tr) - nCtx.OnMaxDatasetSizeBytes().Return(int64(1)) nCtx.OnNodeID().Return(nodeID) nCtx.OnEnqueueOwnerFunc().Return(func() error { return nil }) nCtx.OnDataStore().Return(dataStore) @@ -665,7 +663,6 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) { nCtx.OnInputReader().Return(ir) nCtx.OnCurrentAttempt().Return(uint32(1)) nCtx.OnTaskReader().Return(tr) - nCtx.OnMaxDatasetSizeBytes().Return(int64(1)) nCtx.OnNodeID().Return(nodeID) nCtx.OnEnqueueOwnerFunc().Return(func() error { return nil }) nCtx.OnDataStore().Return(dataStore) @@ -912,7 +909,6 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) { nCtx.OnInputReader().Return(ir) nCtx.OnCurrentAttempt().Return(uint32(1)) nCtx.OnTaskReader().Return(tr) - nCtx.OnMaxDatasetSizeBytes().Return(int64(1)) nCtx.OnNodeID().Return(nodeID) nCtx.OnEnqueueOwnerFunc().Return(func() error { return nil }) nCtx.OnDataStore().Return(dataStore) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index cf8c62cfad..d59aa4e227 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -482,7 +482,6 @@ type nodeExecutor struct { enqueueWorkflow v1alpha1.EnqueueWorkflow eventConfig *config.EventConfig interruptibleFailureThreshold int32 - maxDatasetSizeBytes int64 maxNodeRetriesForSystemFailures uint32 metrics *nodeMetrics nodeRecorder events.NodeEventRecorder @@ -1389,7 +1388,7 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur } func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *storage.DataStore, enQWorkflow v1alpha1.EnqueueWorkflow, eventSink events.EventSink, - workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader, maxDatasetSize int64, defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client, + workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader, defaultRawOutputPrefix storage.DataReference, kubeClient executors.Client, catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig, clusterID string, signalClient service.SignalServiceClient, nodeHandlerFactory interfaces.HandlerFactory, scope promutils.Scope) (interfaces.Node, error) { @@ -1443,7 +1442,6 @@ func NewExecutor(ctx context.Context, nodeConfig config.NodeConfig, store *stora enqueueWorkflow: enQWorkflow, eventConfig: eventConfig, interruptibleFailureThreshold: nodeConfig.InterruptibleFailureThreshold, - maxDatasetSizeBytes: maxDatasetSize, maxNodeRetriesForSystemFailures: uint32(nodeConfig.MaxNodeRetriesOnSystemFailures), metrics: metrics, nodeRecorder: events.NewNodeEventRecorder(eventSink, nodeScope, store), diff --git a/flytepropeller/pkg/controller/nodes/executor_test.go b/flytepropeller/pkg/controller/nodes/executor_test.go index 237928a937..227f3e5f1b 100644 --- a/flytepropeller/pkg/controller/nodes/executor_test.go +++ b/flytepropeller/pkg/controller/nodes/executor_test.go @@ -69,7 +69,7 @@ func TestSetInputsForStartNode(t *testing.T) { hf := &nodemocks.HandlerFactory{} hf.On("Setup", mock.Anything, mock.Anything, mock.Anything).Return(nil) exec, err := NewExecutor(ctx, config.GetConfig().NodeConfig, mockStorage, enQWf, eventMocks.NewMockEventSink(), adminClient, - adminClient, 10, "s3://bucket/", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + adminClient, "s3://bucket/", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) inputs := &core.LiteralMap{ Literals: map[string]*core.Literal{ @@ -116,7 +116,7 @@ func TestSetInputsForStartNode(t *testing.T) { failStorage := createFailingDatastore(t, testScope.NewSubScope("failing")) execFail, err := NewExecutor(ctx, config.GetConfig().NodeConfig, failStorage, enQWf, eventMocks.NewMockEventSink(), adminClient, - adminClient, 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + adminClient, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) t.Run("StorageFailure", func(t *testing.T) { w := createDummyBaseWorkflow(mockStorage) @@ -145,7 +145,7 @@ func TestNodeExecutor_Initialize(t *testing.T) { hf.On("Setup", mock.Anything, mock.Anything, mock.Anything).Return(nil) execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -156,8 +156,7 @@ func TestNodeExecutor_Initialize(t *testing.T) { hf := &nodemocks.HandlerFactory{} hf.On("Setup", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("error")) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, memStore, enQWf, mockEventSink, adminClient, adminClient, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -177,7 +176,7 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseStartNodes(t *testing.T) { hf := &nodemocks.HandlerFactory{} hf.On("Setup", mock.Anything, mock.Anything, mock.Anything).Return(nil) execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -282,8 +281,7 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() hf := &nodemocks.HandlerFactory{} hf.On("Setup", mock.Anything, mock.Anything, mock.Anything).Return(nil) - execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -696,7 +694,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { nodeConfig := config.GetConfig().NodeConfig nodeConfig.EnableCRDebugMetadata = test.enableCRDebugMetadata execIface, err := NewExecutor(ctx, nodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -771,7 +769,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -885,7 +883,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -952,7 +950,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -983,7 +981,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) { store := createInmemoryDataStore(t, promutils.NewTestScope()) adminClient := launchplan.NewFailFastLaunchPlanExecutor() execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -1018,7 +1016,7 @@ func TestNodeExecutor_RecursiveNodeHandler_NoDownstream(t *testing.T) { hf := &nodemocks.HandlerFactory{} hf.On("Setup", mock.Anything, mock.Anything, mock.Anything).Return(nil) execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -1131,7 +1129,7 @@ func TestNodeExecutor_RecursiveNodeHandler_UpstreamNotReady(t *testing.T) { hf := &nodemocks.HandlerFactory{} hf.On("Setup", mock.Anything, mock.Anything, mock.Anything).Return(nil) execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -1249,7 +1247,7 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) { hf := &nodemocks.HandlerFactory{} hf.On("Setup", mock.Anything, mock.Anything, mock.Anything).Return(nil) execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) // Node not yet started @@ -1855,7 +1853,7 @@ func TestNodeExecutor_RecursiveNodeHandler_ParallelismLimit(t *testing.T) { hf := &nodemocks.HandlerFactory{} hf.On("Setup", mock.Anything, mock.Anything, mock.Anything).Return(nil) execIface, err := NewExecutor(ctx, config.GetConfig().NodeConfig, store, enQWf, mockEventSink, adminClient, adminClient, - 10, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, hf, promutils.NewTestScope()) assert.NoError(t, err) exec := execIface.(*recursiveNodeExecutor) @@ -2621,7 +2619,6 @@ func TestNodeExecutor_RecursiveNodeHandler_Cache(t *testing.T) { RawOutputPolicy: config.RawOutputPolicyReference, } fakeKubeClient := mocks4.NewFakeKubeClient() - maxDatasetSize := int64(10) mockEventSink := eventMocks.NewMockEventSink() nodeConfig := config.GetConfig().NodeConfig rawOutputPrefix := storage.DataReference("s3://bucket/") @@ -2632,7 +2629,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Cache(t *testing.T) { mockHandlerFactory := &nodemocks.HandlerFactory{} mockHandlerFactory.OnGetHandler(v1alpha1.NodeKindTask).Return(mockHandler, nil) nodeExecutor, err := NewExecutor(ctx, nodeConfig, dataStore, enqueueWorkflow, mockEventSink, - adminClient, adminClient, maxDatasetSize, rawOutputPrefix, fakeKubeClient, catalogClient, + adminClient, adminClient, rawOutputPrefix, fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, mockHandlerFactory, testScope) assert.NoError(t, err) diff --git a/flytepropeller/pkg/controller/nodes/interfaces/mocks/node_execution_context.go b/flytepropeller/pkg/controller/nodes/interfaces/mocks/node_execution_context.go index 448b5aa13f..7d6b66c61d 100644 --- a/flytepropeller/pkg/controller/nodes/interfaces/mocks/node_execution_context.go +++ b/flytepropeller/pkg/controller/nodes/interfaces/mocks/node_execution_context.go @@ -258,38 +258,6 @@ func (_m *NodeExecutionContext) InputReader() io.InputReader { return r0 } -type NodeExecutionContext_MaxDatasetSizeBytes struct { - *mock.Call -} - -func (_m NodeExecutionContext_MaxDatasetSizeBytes) Return(_a0 int64) *NodeExecutionContext_MaxDatasetSizeBytes { - return &NodeExecutionContext_MaxDatasetSizeBytes{Call: _m.Call.Return(_a0)} -} - -func (_m *NodeExecutionContext) OnMaxDatasetSizeBytes() *NodeExecutionContext_MaxDatasetSizeBytes { - c_call := _m.On("MaxDatasetSizeBytes") - return &NodeExecutionContext_MaxDatasetSizeBytes{Call: c_call} -} - -func (_m *NodeExecutionContext) OnMaxDatasetSizeBytesMatch(matchers ...interface{}) *NodeExecutionContext_MaxDatasetSizeBytes { - c_call := _m.On("MaxDatasetSizeBytes", matchers...) - return &NodeExecutionContext_MaxDatasetSizeBytes{Call: c_call} -} - -// MaxDatasetSizeBytes provides a mock function with given fields: -func (_m *NodeExecutionContext) MaxDatasetSizeBytes() int64 { - ret := _m.Called() - - var r0 int64 - if rf, ok := ret.Get(0).(func() int64); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(int64) - } - - return r0 -} - type NodeExecutionContext_Node struct { *mock.Call } diff --git a/flytepropeller/pkg/controller/nodes/interfaces/node_exec_context.go b/flytepropeller/pkg/controller/nodes/interfaces/node_exec_context.go index 8f10becca4..b6a33a4e35 100644 --- a/flytepropeller/pkg/controller/nodes/interfaces/node_exec_context.go +++ b/flytepropeller/pkg/controller/nodes/interfaces/node_exec_context.go @@ -15,6 +15,8 @@ import ( "github.com/flyteorg/flyte/flytestdlib/storage" ) +//go:generate mockery -all -case=underscore + type TaskReader interface { Read(ctx context.Context) (*core.TaskTemplate, error) GetTaskType() v1alpha1.TaskType @@ -61,7 +63,6 @@ type NodeExecutionContext interface { NodeStateWriter() NodeStateWriter NodeExecutionMetadata() NodeExecutionMetadata - MaxDatasetSizeBytes() int64 EnqueueOwnerFunc() func() error diff --git a/flytepropeller/pkg/controller/nodes/node_exec_context.go b/flytepropeller/pkg/controller/nodes/node_exec_context.go index f42f8b0324..a579b241f3 100644 --- a/flytepropeller/pkg/controller/nodes/node_exec_context.go +++ b/flytepropeller/pkg/controller/nodes/node_exec_context.go @@ -119,20 +119,19 @@ func (e nodeExecMetadata) GetLabels() map[string]string { } type nodeExecContext struct { - store *storage.DataStore - tr interfaces.TaskReader - md interfaces.NodeExecutionMetadata - eventRecorder interfaces.EventRecorder - inputs io.InputReader - node v1alpha1.ExecutableNode - nodeStatus v1alpha1.ExecutableNodeStatus - maxDatasetSizeBytes int64 - nsm *nodeStateManager - enqueueOwner func() error - rawOutputPrefix storage.DataReference - shardSelector ioutils.ShardSelector - nl executors.NodeLookup - ic executors.ExecutionContext + store *storage.DataStore + tr interfaces.TaskReader + md interfaces.NodeExecutionMetadata + eventRecorder interfaces.EventRecorder + inputs io.InputReader + node v1alpha1.ExecutableNode + nodeStatus v1alpha1.ExecutableNodeStatus + nsm *nodeStateManager + enqueueOwner func() error + rawOutputPrefix storage.DataReference + shardSelector ioutils.ShardSelector + nl executors.NodeLookup + ic executors.ExecutionContext } func (e nodeExecContext) ExecutionContext() executors.ExecutionContext { @@ -199,13 +198,9 @@ func (e nodeExecContext) NodeExecutionMetadata() interfaces.NodeExecutionMetadat return e.md } -func (e nodeExecContext) MaxDatasetSizeBytes() int64 { - return e.maxDatasetSizeBytes -} - func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, interruptibleFailureThreshold int32, - maxDatasetSize int64, taskEventRecorder events.TaskEventRecorder, nodeEventRecorder events.NodeEventRecorder, tr interfaces.TaskReader, nsm *nodeStateManager, + taskEventRecorder events.TaskEventRecorder, nodeEventRecorder events.NodeEventRecorder, tr interfaces.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *nodeExecContext { md := nodeExecMetadata{ @@ -240,14 +235,13 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext taskEventRecorder: taskEventRecorder, nodeEventRecorder: nodeEventRecorder, }, - maxDatasetSizeBytes: maxDatasetSize, - tr: tr, - nsm: nsm, - enqueueOwner: enqueueOwner, - rawOutputPrefix: rawOutputPrefix, - shardSelector: outputShardSelector, - nl: nl, - ic: execContext, + tr: tr, + nsm: nsm, + enqueueOwner: enqueueOwner, + rawOutputPrefix: rawOutputPrefix, + shardSelector: outputShardSelector, + nl: nl, + ic: execContext, } } @@ -331,7 +325,6 @@ func (c *nodeExecutor) BuildNodeExecutionContext(ctx context.Context, executionC ), interruptible, c.interruptibleFailureThreshold, - c.maxDatasetSizeBytes, c.taskRecorder, c.nodeRecorder, tr, diff --git a/flytepropeller/pkg/controller/nodes/node_exec_context_test.go b/flytepropeller/pkg/controller/nodes/node_exec_context_test.go index e64abb4c99..10e12d35e4 100644 --- a/flytepropeller/pkg/controller/nodes/node_exec_context_test.go +++ b/flytepropeller/pkg/controller/nodes/node_exec_context_test.go @@ -107,7 +107,7 @@ func Test_NodeContext(t *testing.T) { s, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) p := parentInfo{} execContext := executors.NewExecutionContext(w1, nil, nil, p, nil) - nCtx := newNodeExecContext(context.TODO(), s, execContext, w1, getTestNodeSpec(nil), nil, nil, false, 0, 2, nil, nil, TaskReader{}, nil, nil, "s3://bucket", ioutils.NewConstantShardSelector([]string{"x"})) + nCtx := newNodeExecContext(context.TODO(), s, execContext, w1, getTestNodeSpec(nil), nil, nil, false, 0, nil, nil, TaskReader{}, nil, nil, "s3://bucket", ioutils.NewConstantShardSelector([]string{"x"})) assert.Equal(t, "id", nCtx.NodeExecutionMetadata().GetLabels()["node-id"]) assert.Equal(t, "false", nCtx.NodeExecutionMetadata().GetLabels()["interruptible"]) assert.Equal(t, "task-name", nCtx.NodeExecutionMetadata().GetLabels()["task-name"]) @@ -127,7 +127,6 @@ func Test_NodeContextDefault(t *testing.T) { nodeExecutor := nodeExecutor{ interruptibleFailureThreshold: 0, - maxDatasetSizeBytes: 0, defaultDataSandbox: "s3://bucket-a", store: dataStore, shardSelector: ioutils.NewConstantShardSelector([]string{"x"}), @@ -152,7 +151,6 @@ func Test_NodeContextDefaultInterruptible(t *testing.T) { dataStore, _ := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, scope.NewSubScope("dataStore")) nodeExecutor := nodeExecutor{ interruptibleFailureThreshold: 10, - maxDatasetSizeBytes: 0, defaultDataSandbox: "s3://bucket-a", store: dataStore, shardSelector: ioutils.NewConstantShardSelector([]string{"x"}), @@ -414,7 +412,6 @@ func Test_NodeContext_IsInterruptible(t *testing.T) { nodeExecutor := nodeExecutor{ interruptibleFailureThreshold: tt.interruptibleFailureThreshold, maxNodeRetriesForSystemFailures: tt.maxSystemFailures, - maxDatasetSizeBytes: 0, defaultDataSandbox: "s3://bucket-a", store: dataStore, shardSelector: ioutils.NewConstantShardSelector([]string{"x"}), diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/handler_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/handler_test.go index 6bf6781eac..c47c39cde8 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/handler_test.go @@ -100,7 +100,6 @@ func createNodeContextWithVersion(phase v1alpha1.WorkflowNodePhase, n v1alpha1.E nCtx.OnNodeExecutionMetadata().Return(nm) nCtx.OnInputReader().Return(ir) nCtx.OnCurrentAttempt().Return(uint32(1)) - nCtx.OnMaxDatasetSizeBytes().Return(int64(1)) nCtx.OnNodeID().Return(n.GetID()) nCtx.OnEnqueueOwnerFunc().Return(nil) nCtx.OnNodeStatus().Return(s) diff --git a/flytepropeller/pkg/controller/nodes/task/handler_test.go b/flytepropeller/pkg/controller/nodes/task/handler_test.go index 91c0879759..f0e177838f 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/task/handler_test.go @@ -490,7 +490,6 @@ func Test_task_Handle_NoCatalog(t *testing.T) { nCtx.OnDataStore().Return(ds) nCtx.OnCurrentAttempt().Return(uint32(1)) nCtx.OnTaskReader().Return(tr) - nCtx.OnMaxDatasetSizeBytes().Return(int64(1)) nCtx.OnNodeStatus().Return(ns) nCtx.OnNodeID().Return(nodeID) nCtx.OnEventsRecorder().Return(recorder) @@ -806,7 +805,6 @@ func Test_task_Abort(t *testing.T) { nCtx.OnDataStore().Return(ds) nCtx.OnCurrentAttempt().Return(uint32(1)) nCtx.OnTaskReader().Return(tr) - nCtx.OnMaxDatasetSizeBytes().Return(int64(1)) nCtx.OnNodeStatus().Return(ns) nCtx.OnNodeID().Return("n1") nCtx.OnEnqueueOwnerFunc().Return(nil) @@ -968,7 +966,6 @@ func Test_task_Abort_v1(t *testing.T) { nCtx.OnDataStore().Return(ds) nCtx.OnCurrentAttempt().Return(uint32(1)) nCtx.OnTaskReader().Return(tr) - nCtx.OnMaxDatasetSizeBytes().Return(int64(1)) nCtx.OnNodeStatus().Return(ns) nCtx.OnNodeID().Return("n1") nCtx.OnEnqueueOwnerFunc().Return(nil) @@ -1150,7 +1147,6 @@ func Test_task_Finalize(t *testing.T) { nCtx.OnDataStore().Return(ds) nCtx.OnCurrentAttempt().Return(uint32(1)) nCtx.OnTaskReader().Return(tr) - nCtx.OnMaxDatasetSizeBytes().Return(int64(1)) nCtx.OnNodeStatus().Return(ns) nCtx.OnNodeID().Return("n1") nCtx.OnEventsRecorder().Return(nil) diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go index f32651caa4..95975da7b4 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -289,7 +289,7 @@ func (e *PluginManager) checkResourcePhase(ctx context.Context, tCtx pluginsCore var opReader io.OutputReader if pCtx.ow == nil { logger.Infof(ctx, "Plugin [%s] returned no outputReader, assuming file based outputs", e.id) - opReader = ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), tCtx.MaxDatasetSizeBytes()) + opReader = ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0) } else { logger.Infof(ctx, "Plugin [%s] returned outputReader", e.id) opReader = pCtx.ow.GetReader() diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 2c7fda0f6e..3a2b00508c 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -190,7 +190,6 @@ func getMockTaskContext(initPhase PluginPhase, wantPhase PluginPhase) pluginsCor taskExecutionContext.OnOutputWriter().Return(&dummyOutputWriter{}) taskExecutionContext.OnDataStore().Return(nil) - taskExecutionContext.OnMaxDatasetSizeBytes().Return(int64(0)) return taskExecutionContext } diff --git a/flytepropeller/pkg/controller/nodes/task/taskexec_context_test.go b/flytepropeller/pkg/controller/nodes/task/taskexec_context_test.go index 84245cea8f..e798f82a04 100644 --- a/flytepropeller/pkg/controller/nodes/task/taskexec_context_test.go +++ b/flytepropeller/pkg/controller/nodes/task/taskexec_context_test.go @@ -87,7 +87,6 @@ func dummyNodeExecutionContext(t *testing.T, parentInfo executors.ImmutableParen nCtx.OnInputReader().Return(ir) nCtx.OnCurrentAttempt().Return(uint32(1)) nCtx.OnTaskReader().Return(tr) - nCtx.OnMaxDatasetSizeBytes().Return(int64(1)) nCtx.OnNodeStatus().Return(ns) nCtx.OnNodeID().Return(nodeID) nCtx.OnEventsRecorder().Return(nil) @@ -162,7 +161,6 @@ func TestHandler_newTaskExecutionContext(t *testing.T) { assert.NotNil(t, got.psm.newState) assert.NotNil(t, got.TaskReader()) - assert.Equal(t, got.MaxDatasetSizeBytes(), int64(1)) assert.NotNil(t, got.SecretManager()) assert.NotNil(t, got.OutputWriter()) diff --git a/flytepropeller/pkg/controller/workflow/executor_test.go b/flytepropeller/pkg/controller/workflow/executor_test.go index cc9910abc3..a2fd3c62a8 100644 --- a/flytepropeller/pkg/controller/workflow/executor_test.go +++ b/flytepropeller/pkg/controller/workflow/executor_test.go @@ -58,7 +58,6 @@ var ( ) const ( - maxOutputSize = 10 * 1024 testClusterID = "C1" ) @@ -99,7 +98,7 @@ func (f fakeRemoteWritePlugin) Handle(ctx context.Context, tCtx pluginCore.TaskE o.Literals[k] = l } assert.NoError(f.t, tCtx.DataStore().WriteProtobuf(ctx, tCtx.OutputWriter().GetOutputPath(), storage.Options{}, o)) - assert.NoError(f.t, tCtx.OutputWriter().Put(ctx, ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), tCtx.MaxDatasetSizeBytes()))) + assert.NoError(f.t, tCtx.OutputWriter().Put(ctx, ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0))) } return trns, err } @@ -246,7 +245,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) { assert.NoError(t, err) nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, - maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) assert.NoError(t, err) @@ -329,7 +328,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow(t *testing.T) { assert.NoError(t, err) nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, - maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) @@ -393,7 +392,7 @@ func BenchmarkWorkflowExecutor(b *testing.B) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() handlerFactory := &nodemocks.HandlerFactory{} nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, - maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, scope) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, scope) assert.NoError(b, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) @@ -505,7 +504,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) { handlerFactory.OnGetHandlerMatch(mock.Anything).Return(h, nil) nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, - maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) assert.NoError(t, err) @@ -607,7 +606,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) { assert.NoError(t, err) nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, - maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) assert.NoError(t, err) executor, err := NewExecutor(ctx, store, enqueueWorkflow, eventSink, recorder, "metadata", nodeExec, eventConfig, testClusterID, promutils.NewTestScope()) assert.NoError(t, err) @@ -673,7 +672,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_EventFailure(t *testing.T) { handlerFactory.OnSetupMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) handlerFactory.OnGetHandlerMatch(mock.Anything).Return(h, nil) nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, nodeEventSink, adminClient, adminClient, - maxOutputSize, "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) + "s3://bucket", fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, handlerFactory, promutils.NewTestScope()) assert.NoError(t, err) t.Run("EventAlreadyInTerminalStateError", func(t *testing.T) { From b31f6386ebfed31a79af355bd2bba71d18452baa Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Fri, 22 Mar 2024 12:40:23 -0700 Subject: [PATCH 2/2] add note for clarity Signed-off-by: Paul Dittamo --- .../tasks/pluginmachinery/ioutils/remote_file_output_reader.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go b/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go index 823fb29fc5..27d7748701 100644 --- a/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go +++ b/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go @@ -123,6 +123,8 @@ func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error) { } func NewRemoteFileOutputReader(_ context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader { + // Note: even though the data store retrieval checks against GetLimitMegabytes, there might be external + // storage implementations, so we keep this check here as well. maxPayloadSize := maxDatasetSize if maxPayloadSize == 0 { maxPayloadSize = storage.GetConfig().Limits.GetLimitMegabytes * 1024 * 1024