Skip to content

Commit

Permalink
remove setting max size bytes in node context
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt committed Mar 21, 2024
1 parent 2db8eed commit 5a3945f
Show file tree
Hide file tree
Showing 35 changed files with 94 additions and 292 deletions.
5 changes: 2 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/core/exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/flyteorg/flyte/flytestdlib/storage"
)

//go:generate mockery -all -case=underscore

// An interface to access a remote/sharable location that contains the serialized TaskTemplate
type TaskTemplatePath interface {
// Returns the path
Expand All @@ -34,9 +36,6 @@ type TaskExecutionContext interface {
// Returns a method that allows a plugin to indicate that the task has a new update and can be invoked again to check for updates
TaskRefreshIndicator() SignalAsync

// Returns the max allowed dataset size that the outputwriter will accept
MaxDatasetSizeBytes() int64

// Returns a handle to the currently configured storage backend that can be used to communicate with the tasks or write metadata
DataStore() *storage.DataStore

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 28 additions & 30 deletions flyteplugins/go/tasks/pluginmachinery/core/mocks/task_overrides.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,13 @@ func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error) {
}

func NewRemoteFileOutputReader(_ context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader {
maxPayloadSize := maxDatasetSize
if maxPayloadSize == 0 {
maxPayloadSize = storage.GetConfig().Limits.GetLimitMegabytes * 1024 * 1024
}

Check warning on line 129 in flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go#L126-L129

Added lines #L126 - L129 were not covered by tests
return RemoteFileOutputReader{
outPath: outPaths,
store: store,
maxPayloadSize: maxDatasetSize,
maxPayloadSize: maxPayloadSize,

Check warning on line 133 in flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go#L133

Added line #L133 was not covered by tests
}
}
32 changes: 0 additions & 32 deletions flyteplugins/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ type PluginContext interface {
// Returns a handle to the currently configured storage backend that can be used to communicate with the tasks or write metadata
DataStore() *storage.DataStore

// Returns the max allowed dataset size that the outputwriter will accept
MaxDatasetSizeBytes() int64

// Returns a handle to the Task's execution metadata.
TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/webapi/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ type TaskExecutionContext interface {

// Provides the raw datastore to enable persisting outputs.
DataStore() *storage.DataStore

// Returns the max allowed dataset size that the outputwriter will accept
MaxDatasetSizeBytes() int64
}

type GetContext interface {
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/array/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func AssembleFinalOutputs(ctx context.Context, assemblyQueue OutputAssembler, tC

switch w.Status() {
case workqueue.WorkStatusSucceeded:
or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), tCtx.MaxDatasetSizeBytes())
or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0)
if err = tCtx.OutputWriter().Put(ctx, or); err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions flyteplugins/go/tasks/plugins/array/outputs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func TestAssembleFinalOutputs(t *testing.T) {
tCtx := &mocks3.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(tMeta)
tCtx.OnOutputWriter().Return(ow)
tCtx.OnMaxDatasetSizeBytes().Return(10000)
tCtx.OnDataStore().Return(d)

_, err = AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, 1, s)
Expand Down Expand Up @@ -368,7 +367,6 @@ func TestAssembleFinalOutputs(t *testing.T) {
tCtx.OnTaskReader().Return(tReader)
tCtx.OnOutputWriter().Return(ow)
tCtx.OnDataStore().Return(ds)
tCtx.OnMaxDatasetSizeBytes().Return(10000)

_, err = AssembleFinalOutputs(ctx, assemblyQueue, tCtx, arrayCore.PhaseSuccess, 1, s)
assert.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/testing/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (e *EchoPlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext)
return core.UnknownTransition, err
}

or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), tCtx.MaxDatasetSizeBytes())
or := ioutils.NewRemoteFileOutputReader(ctx, tCtx.DataStore(), tCtx.OutputWriter(), 0)

Check warning on line 79 in flyteplugins/go/tasks/plugins/testing/echo.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/testing/echo.go#L79

Added line #L79 was not covered by tests
if err = tCtx.OutputWriter().Put(ctx, or); err != nil {
return core.UnknownTransition, err
}
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func writeOutput(ctx context.Context, taskCtx webapi.StatusContext, outputs *fly
opReader = ioutils.NewInMemoryOutputReader(outputs, nil, nil)
} else {
logger.Debugf(ctx, "AgentDeployment didn't return any output, assuming file based outputs.")
opReader = ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), taskCtx.MaxDatasetSizeBytes())
opReader = ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), 0)

Check warning on line 335 in flyteplugins/go/tasks/plugins/webapi/agent/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/webapi/agent/plugin.go#L335

Added line #L335 was not covered by tests
}
return taskCtx.OutputWriter().Put(ctx, opReader)
}
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func writeOutput(ctx context.Context, taskCtx webapi.StatusContext) error {
return nil
}

outputReader := ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), taskCtx.MaxDatasetSizeBytes())
outputReader := ioutils.NewRemoteFileOutputReader(ctx, taskCtx.DataStore(), taskCtx.OutputWriter(), 0)

Check warning on line 295 in flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/webapi/databricks/plugin.go#L295

Added line #L295 was not covered by tests
return taskCtx.OutputWriter().Put(ctx, outputReader)
}

Expand Down
1 change: 0 additions & 1 deletion flyteplugins/tests/end_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i
tCtx.OnCatalog().Return(cat)
tCtx.OnEventsRecorder().Return(eRecorder)
tCtx.OnResourceManager().Return(resourceManager)
tCtx.OnMaxDatasetSizeBytes().Return(1000000)
tCtx.OnSecretManager().Return(secretManager)

trns := pluginCore.DoTransition(pluginCore.PhaseInfoQueued(time.Now(), 0, ""))
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter
}

nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
launchPlanActor, launchPlanActor, sCfg.Limits.GetLimitMegabytes*1024*1024, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient,
launchPlanActor, launchPlanActor, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient,

Check warning on line 442 in flytepropeller/pkg/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/controller.go#L442

Added line #L442 was not covered by tests
catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, nodeHandlerFactory, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
// checkpoint paths are not computed here because this function is only called when writing
// existing cached outputs. if this functionality changes this will need to be revisited.
outputPaths := ioutils.NewCheckpointRemoteFilePaths(ctx, nCtx.DataStore(), subOutputDir, ioutils.NewRawOutputPaths(ctx, subDataDir), "")
reader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes())
reader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, 0)

gatherOutputsRequest.reader = &reader
a.gatherOutputsRequestChannel <- gatherOutputsRequest
Expand Down Expand Up @@ -505,7 +505,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
}
}
case v1alpha1.NodeKindWorkflow:
// TODO - to support launchplans we will need to process the output interface variables here
// TODO - to support launchplans we will need to process the output interface variables here

Check warning on line 508 in flytepropeller/pkg/controller/nodes/array/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/handler.go#L508

Added line #L508 was not covered by tests
fallthrough
default:
logger.Warnf(ctx, "ArrayNode does not support pre-populating outputLiteral collections for node kind '%s'", arrayNode.GetSubNodeSpec().GetKind())
Expand Down
Loading

0 comments on commit 5a3945f

Please sign in to comment.