Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[House keeping] remove setting max size bytes in node context #5092

Merged
merged 2 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/core/exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/flyteorg/flyte/flytestdlib/storage"
)

//go:generate mockery -all -case=underscore

// An interface to access a remote/sharable location that contains the serialized TaskTemplate
type TaskTemplatePath interface {
// Returns the path
Expand All @@ -34,9 +36,6 @@ type TaskExecutionContext interface {
// Returns a method that allows a plugin to indicate that the task has a new update and can be invoked again to check for updates
TaskRefreshIndicator() SignalAsync

// Returns the max allowed dataset size that the outputwriter will accept
MaxDatasetSizeBytes() int64

// Returns a handle to the currently configured storage backend that can be used to communicate with the tasks or write metadata
DataStore() *storage.DataStore

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 28 additions & 30 deletions flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,15 @@
}

func NewRemoteFileOutputReader(_ context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader {
// Note: even though the data store retrieval checks against GetLimitMegabytes, there might be external
// storage implementations, so we keep this check here as well.
maxPayloadSize := maxDatasetSize
if maxPayloadSize == 0 {
maxPayloadSize = storage.GetConfig().Limits.GetLimitMegabytes * 1024 * 1024
}

Check warning on line 131 in flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go#L126-L131

Added lines #L126 - L131 were not covered by tests
return RemoteFileOutputReader{
outPath: outPaths,
store: store,
maxPayloadSize: maxDatasetSize,
maxPayloadSize: maxPayloadSize,

Check warning on line 135 in flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go#L135

Added line #L135 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there an issue with just removing the maxPayloadSize completely? IIUC there is just the two checks in this file. IMO the maxDownloadMBs applied on the data store retrieval would catch these first.

Copy link
Contributor Author

@pvditt pvditt Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should've mentioned this in the PR - I had some concern if there were any swaps with an external datastore implementation as mentioned in this comment. This external implementation might not have checks against maxDownloadMBs as we do in stow_store.

This is probably an unfound concern and we could just mention in release notes about potential payload size issues if you're using an external storage implementation.

}
}
32 changes: 0 additions & 32 deletions flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ type PluginContext interface {
// Returns a handle to the currently configured storage backend that can be used to communicate with the tasks or write metadata
DataStore() *storage.DataStore

// Returns the max allowed dataset size that the outputwriter will accept
MaxDatasetSizeBytes() int64

// Returns a handle to the Task's execution metadata.
TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/webapi/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ type TaskExecutionContext interface {

// Provides the raw datastore to enable persisting outputs.
DataStore() *storage.DataStore

// Returns the max allowed dataset size that the outputwriter will accept
MaxDatasetSizeBytes() int64
}

type GetContext interface {
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/array/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tC

switch w.Status() {
case workqueue.WorkStatusSucceeded:
or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), tCtx.MaxDatasetSizeBytes())
or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0)
if err = tCtx.OutputWriter().Put(ctx, or); err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions flyteplugins/go/tasks/plugins/array/outputs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func TestAssembleFinalOutputs(t *testing.T) {
tCtx := &mocks3.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(tMeta)
tCtx.OnOutputWriter().Return(ow)
tCtx.OnMaxDatasetSizeBytes().Return(10000)
tCtx.OnDataStore().Return(d)

_, err = AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, 1, s)
Expand Down Expand Up @@ -368,7 +367,6 @@ func TestAssembleFinalOutputs(t *testing.T) {
tCtx.OnTaskReader().Return(tReader)
tCtx.OnOutputWriter().Return(ow)
tCtx.OnDataStore().Return(ds)
tCtx.OnMaxDatasetSizeBytes().Return(10000)

_, err = AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, 1, s)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/testing/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
return core.UnknownTransition, err
}

or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), tCtx.MaxDatasetSizeBytes())
or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0)

Check warning on line 79 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L79

Added line #L79 was not covered by tests
if err = tCtx.OutputWriter().Put(ctx, or); err != nil {
return core.UnknownTransition, err
}
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@
opReader = ioutils.NewInMemoryOutputReader(outputs, nil, nil)
} else {
logger.Debugf(ctx, "AgentDeployment didn't return any output, assuming file based outputs.")
opReader = ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), taskCtx.MaxDatasetSizeBytes())
opReader = ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), 0)

Check warning on line 335 in flyteplugins/go/tasks/plugins/webapi/agent/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/webapi/agent/plugin.go#L335

Added line #L335 was not covered by tests
}
return taskCtx.OutputWriter().Put(ctx, opReader)
}
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@
return nil
}

outputReader := ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), taskCtx.MaxDatasetSizeBytes())
outputReader := ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), 0)

Check warning on line 295 in flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go#L295

Added line #L295 was not covered by tests
return taskCtx.OutputWriter().Put(ctx, outputReader)
}

Expand Down
1 change: 0 additions & 1 deletion flyteplugins/tests/end_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i
tCtx.OnCatalog().Return(cat)
tCtx.OnEventsRecorder().Return(eRecorder)
tCtx.OnResourceManager().Return(resourceManager)
tCtx.OnMaxDatasetSizeBytes().Return(1000000)
tCtx.OnSecretManager().Return(secretManager)

trns := pluginCore.DoTransition(pluginCore.PhaseInfoQueued(time.Now(), 0, ""))
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@
}

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
launchPlanActor, launchPlanActor, sCfg.Limits.GetLimitMegabytes*1024*1024, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient,
launchPlanActor, launchPlanActor, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient,

Check warning on line 442 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L442

Added line #L442 was not covered by tests
catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, nodeHandlerFactory, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@
// checkpoint paths are not computed here because this function is only called when writing
// existing cached outputs. if this functionality changes this will need to be revisited.
outputPaths := ioutils.NewCheckpointRemoteFilePaths(ctx, nCtx.DataStore(), subOutputDir, ioutils.NewRawOutputPaths(ctx, subDataDir), "")
reader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes())
reader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, 0)

gatherOutputsRequest.reader = &reader
a.gatherOutputsRequestChannel <- gatherOutputsRequest
Expand Down Expand Up @@ -505,7 +505,7 @@
}
}
case v1alpha1.NodeKindWorkflow:
// TODO - to support launchplans we will need to process the output interface variables here
// TODO - to support launchplans we will need to process the output interface variables here

Check warning on line 508 in flytepropeller/pkg/controller/nodes/array/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/handler.go#L508

Added line #L508 was not covered by tests
fallthrough
default:
logger.Warnf(ctx, "ArrayNode does not support pre-populating outputLiteral collections for node kind '%s'", arrayNode.GetSubNodeSpec().GetKind())
Expand Down
Loading
Loading