diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index dc36a308c4..49fa13d153 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -1724,27 +1724,21 @@ func (m *ExecutionManager) GetExecutionData( return nil, err } } - inputs, inputURLBlob, err := util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), - m.storageClient, executionModel.InputsURI.String()) + inputs, err := util.GetInputs(ctx, m.storageClient, executionModel.InputsURI.String()) if err != nil { return nil, err } - outputs, outputURLBlob, err := util.GetOutputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), + outputs, err := util.GetOutputs(ctx, m.config.ApplicationConfiguration().GetRemoteDataConfig(), m.storageClient, util.ToExecutionClosureInterface(execution.Closure)) if err != nil { return nil, err } response := &admin.WorkflowExecutionGetDataResponse{ - Inputs: inputURLBlob, - Outputs: outputURLBlob, FullInputs: inputs, FullOutputs: outputs, } - m.userMetrics.WorkflowExecutionInputBytes.Observe(float64(response.Inputs.Bytes)) - if response.Outputs.Bytes > 0 { - m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(response.Outputs.Bytes)) - } else if response.FullOutputs != nil { + if response.FullOutputs != nil { m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(proto.Size(response.FullOutputs))) } return response, nil diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index e1f59cdf43..7e58483f8e 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strings" "testing" "time" @@ -3595,22 +3594,7 @@ func TestGetExecutionData(t *testing.T) { }, nil } mockExecutionRemoteURL := dataMocks.NewMockRemoteURL() - mockExecutionRemoteURL.(*dataMocks.MockRemoteURL).GetCallback = func( - ctx context.Context, uri string) (admin.UrlBlob, error) { - if uri == outputURI { - return admin.UrlBlob{ - Url: "outputs", - Bytes: 200, - }, nil - } else if strings.HasSuffix(uri, shared.Inputs) { - return admin.UrlBlob{ - Url: "inputs", - Bytes: 200, - }, nil - } - return admin.UrlBlob{}, errors.New("unexpected input") - } mockStorage := commonMocks.GetMockStorageClient() fullInputs := &core.LiteralMap{ Literals: map[string]*core.Literal{ @@ -3645,14 +3629,6 @@ func TestGetExecutionData(t *testing.T) { }) assert.Nil(t, err) assert.True(t, proto.Equal(&admin.WorkflowExecutionGetDataResponse{ - Outputs: &admin.UrlBlob{ - Url: "outputs", - Bytes: 200, - }, - Inputs: &admin.UrlBlob{ - Url: "inputs", - Bytes: 200, - }, FullInputs: fullInputs, FullOutputs: fullOutputs, }, dataResponse)) @@ -3781,86 +3757,6 @@ func TestGetExecution_Legacy(t *testing.T) { assert.True(t, proto.Equal(getLegacyClosure(), execution.Closure)) } -func TestGetExecutionData_LegacyModel(t *testing.T) { - repository := repositoryMocks.NewMockRepository() - startedAt := time.Date(2018, 8, 30, 0, 0, 0, 0, time.UTC) - closure := getLegacyClosure() - closure.OutputResult = &admin.ExecutionClosure_Outputs{ - Outputs: &admin.LiteralMapBlob{ - Data: &admin.LiteralMapBlob_Uri{ - Uri: outputURI, - }, - }, - } - var closureBytes, _ = proto.Marshal(closure) - - executionGetFunc := func(ctx context.Context, input interfaces.Identifier) (models.Execution, error) { - return models.Execution{ - BaseModel: models.BaseModel{ - CreatedAt: testutils.MockCreatedAtValue, - }, - ExecutionKey: models.ExecutionKey{ - Project: "project", - Domain: "domain", - Name: "name", - }, - Spec: getLegacySpecBytes(), - Phase: phase, - Closure: closureBytes, - LaunchPlanID: uint(1), - WorkflowID: uint(2), - StartedAt: &startedAt, - }, nil - } - mockExecutionRemoteURL := dataMocks.NewMockRemoteURL() - mockExecutionRemoteURL.(*dataMocks.MockRemoteURL).GetCallback = func( - ctx context.Context, uri string) (admin.UrlBlob, error) { - if uri == outputURI { - return admin.UrlBlob{ - Url: "outputs", - Bytes: 200, - }, nil - } else if strings.HasSuffix(uri, shared.Inputs) { - return admin.UrlBlob{ - Url: "inputs", - Bytes: 200, - }, nil - } - - return admin.UrlBlob{}, errors.New("unexpected input") - } - - repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc) - storageClient := getMockStorageForExecTest(context.Background()) - r := plugins.NewRegistry() - r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor) - execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), storageClient, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}, artifacts.NewArtifactRegistry(context.Background(), nil)) - dataResponse, err := execManager.GetExecutionData(context.Background(), admin.WorkflowExecutionGetDataRequest{ - Id: &executionIdentifier, - }) - assert.Nil(t, err) - assert.True(t, proto.Equal(&admin.WorkflowExecutionGetDataResponse{ - Outputs: &admin.UrlBlob{ - Url: "outputs", - Bytes: 200, - }, - Inputs: &admin.UrlBlob{ - Url: "inputs", - Bytes: 200, - }, - FullInputs: &core.LiteralMap{ - Literals: map[string]*core.Literal{ - "foo": testutils.MakeStringLiteral("foo-value-1"), - }, - }, - FullOutputs: &core.LiteralMap{}, - }, dataResponse)) - var inputs core.LiteralMap - err = storageClient.ReadProtobuf(context.Background(), storage.DataReference("s3://bucket/metadata/project/domain/name/inputs"), &inputs) - assert.Nil(t, err) - assert.True(t, proto.Equal(&inputs, closure.ComputedInputs)) -} - func TestCreateExecution_LegacyClient(t *testing.T) { repository := getMockRepositoryForExecTest() setDefaultLpCallbackForExecTest(repository) diff --git a/flyteadmin/pkg/manager/impl/node_execution_manager.go b/flyteadmin/pkg/manager/impl/node_execution_manager.go index afe17b6ac6..dbe6f30679 100644 --- a/flyteadmin/pkg/manager/impl/node_execution_manager.go +++ b/flyteadmin/pkg/manager/impl/node_execution_manager.go @@ -495,21 +495,18 @@ func (m *NodeExecutionManager) GetNodeExecutionData( return nil, err } - inputs, inputURLBlob, err := util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), - m.storageClient, nodeExecution.InputUri) + inputs, err := util.GetInputs(ctx, m.storageClient, nodeExecution.InputUri) if err != nil { return nil, err } - outputs, outputURLBlob, err := util.GetOutputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), + outputs, err := util.GetOutputs(ctx, m.config.ApplicationConfiguration().GetRemoteDataConfig(), m.storageClient, nodeExecution.Closure) if err != nil { return nil, err } response := &admin.NodeExecutionGetDataResponse{ - Inputs: inputURLBlob, - Outputs: outputURLBlob, FullInputs: inputs, FullOutputs: outputs, FlyteUrls: common.FlyteURLsFromNodeExecutionID(*request.Id, nodeExecution.GetClosure() != nil && nodeExecution.GetClosure().GetDeckUri() != ""), @@ -536,10 +533,7 @@ func (m *NodeExecutionManager) GetNodeExecutionData( } } - m.metrics.NodeExecutionInputBytes.Observe(float64(response.Inputs.Bytes)) - if response.Outputs.Bytes > 0 { - m.metrics.NodeExecutionOutputBytes.Observe(float64(response.Outputs.Bytes)) - } else if response.FullOutputs != nil { + if response.FullOutputs != nil { m.metrics.NodeExecutionOutputBytes.Observe(float64(proto.Size(response.FullOutputs))) } diff --git a/flyteadmin/pkg/manager/impl/node_execution_manager_test.go b/flyteadmin/pkg/manager/impl/node_execution_manager_test.go index 1f36d28afc..36d062e1a2 100644 --- a/flyteadmin/pkg/manager/impl/node_execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/node_execution_manager_test.go @@ -1254,11 +1254,6 @@ func TestGetNodeExecutionData(t *testing.T) { Url: "inputs", Bytes: 100, }, nil - } else if uri == util.OutputsFile { - return admin.UrlBlob{ - Url: "outputs", - Bytes: 200, - }, nil } return admin.UrlBlob{}, errors.New("unexpected input") @@ -1298,14 +1293,6 @@ func TestGetNodeExecutionData(t *testing.T) { }) assert.NoError(t, err) assert.True(t, proto.Equal(&admin.NodeExecutionGetDataResponse{ - Inputs: &admin.UrlBlob{ - Url: "inputs", - Bytes: 100, - }, - Outputs: &admin.UrlBlob{ - Url: "outputs", - Bytes: 200, - }, FullInputs: fullInputs, FullOutputs: fullOutputs, DynamicWorkflow: &admin.DynamicWorkflowNodeMetadata{ diff --git a/flyteadmin/pkg/manager/impl/task_execution_manager.go b/flyteadmin/pkg/manager/impl/task_execution_manager.go index 61d94a086f..142c4047ae 100644 --- a/flyteadmin/pkg/manager/impl/task_execution_manager.go +++ b/flyteadmin/pkg/manager/impl/task_execution_manager.go @@ -310,29 +310,23 @@ func (m *TaskExecutionManager) GetTaskExecutionData( return nil, err } - inputs, inputURLBlob, err := util.GetInputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), - m.storageClient, taskExecution.InputUri) + inputs, err := util.GetInputs(ctx, m.storageClient, taskExecution.InputUri) if err != nil { return nil, err } - outputs, outputURLBlob, err := util.GetOutputs(ctx, m.urlData, m.config.ApplicationConfiguration().GetRemoteDataConfig(), + outputs, err := util.GetOutputs(ctx, m.config.ApplicationConfiguration().GetRemoteDataConfig(), m.storageClient, taskExecution.Closure) if err != nil { return nil, err } response := &admin.TaskExecutionGetDataResponse{ - Inputs: inputURLBlob, - Outputs: outputURLBlob, FullInputs: inputs, FullOutputs: outputs, FlyteUrls: common.FlyteURLsFromTaskExecutionID(*request.Id, false), } - m.metrics.TaskExecutionInputBytes.Observe(float64(response.Inputs.Bytes)) - if response.Outputs.Bytes > 0 { - m.metrics.TaskExecutionOutputBytes.Observe(float64(response.Outputs.Bytes)) - } else if response.FullOutputs != nil { + if response.FullOutputs != nil { m.metrics.TaskExecutionOutputBytes.Observe(float64(proto.Size(response.FullOutputs))) } return response, nil diff --git a/flyteadmin/pkg/manager/impl/task_execution_manager_test.go b/flyteadmin/pkg/manager/impl/task_execution_manager_test.go index 8fd8019647..828a1e7862 100644 --- a/flyteadmin/pkg/manager/impl/task_execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/task_execution_manager_test.go @@ -897,21 +897,7 @@ func TestGetTaskExecutionData(t *testing.T) { }, nil }) mockTaskExecutionRemoteURL = dataMocks.NewMockRemoteURL() - mockTaskExecutionRemoteURL.(*dataMocks.MockRemoteURL).GetCallback = func(ctx context.Context, uri string) (admin.UrlBlob, error) { - if uri == "input-uri.pb" { - return admin.UrlBlob{ - Url: "inputs", - Bytes: 100, - }, nil - } else if uri == "test-output.pb" { - return admin.UrlBlob{ - Url: "outputs", - Bytes: 200, - }, nil - } - return admin.UrlBlob{}, errors.New("unexpected input") - } mockStorage := commonMocks.GetMockStorageClient() fullInputs := &core.LiteralMap{ Literals: map[string]*core.Literal{ @@ -947,14 +933,6 @@ func TestGetTaskExecutionData(t *testing.T) { assert.Nil(t, err) assert.True(t, getTaskCalled) assert.True(t, proto.Equal(&admin.TaskExecutionGetDataResponse{ - Inputs: &admin.UrlBlob{ - Url: "inputs", - Bytes: 100, - }, - Outputs: &admin.UrlBlob{ - Url: "outputs", - Bytes: 200, - }, FullInputs: fullInputs, FullOutputs: fullOutputs, FlyteUrls: &admin.FlyteURLs{ diff --git a/flyteadmin/pkg/manager/impl/util/data.go b/flyteadmin/pkg/manager/impl/util/data.go index 5921cc83ac..d8ea440b42 100644 --- a/flyteadmin/pkg/manager/impl/util/data.go +++ b/flyteadmin/pkg/manager/impl/util/data.go @@ -5,11 +5,10 @@ import ( "github.com/golang/protobuf/proto" - "github.com/flyteorg/flyte/flyteadmin/pkg/common" - dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyte/flytestdlib/errors" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/storage" ) @@ -19,43 +18,22 @@ const ( DeckFile = "deck.html" ) -func shouldFetchData(config *runtimeInterfaces.RemoteDataConfig, urlBlob admin.UrlBlob) bool { - return config.Scheme == common.Local || config.Scheme == common.None || config.MaxSizeInBytes == 0 || - urlBlob.Bytes < config.MaxSizeInBytes -} - -func shouldFetchOutputData(config *runtimeInterfaces.RemoteDataConfig, urlBlob admin.UrlBlob, outputURI string) bool { - return len(outputURI) > 0 && shouldFetchData(config, urlBlob) -} - -// GetInputs returns an inputs URL blob and if config settings permit, inline inputs data for an execution. -func GetInputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface, - remoteDataConfig *runtimeInterfaces.RemoteDataConfig, storageClient *storage.DataStore, inputURI string) ( - *core.LiteralMap, *admin.UrlBlob, error) { - var inputsURLBlob admin.UrlBlob +// GetInputs returns input data for an execution +func GetInputs(ctx context.Context, storageClient *storage.DataStore, inputURI string) ( + *core.LiteralMap, error) { var fullInputs core.LiteralMap if len(inputURI) == 0 { - return &fullInputs, &inputsURLBlob, nil + return &fullInputs, nil } - var err error - if remoteDataConfig.SignedURL.Enabled { - inputsURLBlob, err = urlData.Get(ctx, inputURI) - if err != nil { - return nil, nil, err - } + err := storageClient.ReadProtobuf(ctx, storage.DataReference(inputURI), &fullInputs) + if err != nil { + logger.Errorf(ctx, "Failed to read inputs from URI [%s] with err: %v", inputURI, err) + return nil, err } - if shouldFetchData(remoteDataConfig, inputsURLBlob) { - err = storageClient.ReadProtobuf(ctx, storage.DataReference(inputURI), &fullInputs) - if err != nil { - // If we fail to read the protobuf from the remote store, we shouldn't fail the request altogether. - // Instead we return the signed URL blob so that the client can use that to fetch the input data. - logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", inputURI, err) - } - } - return &fullInputs, &inputsURLBlob, nil + return &fullInputs, nil } // ExecutionClosure defines common methods in NodeExecutionClosure and TaskExecutionClosure used to return output data. @@ -94,38 +72,29 @@ func ToExecutionClosureInterface(closure *admin.ExecutionClosure) ExecutionClosu } } -// GetOutputs returns an outputs URL blob and if config settings permit, inline outputs data for an execution. -func GetOutputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface, - remoteDataConfig *runtimeInterfaces.RemoteDataConfig, storageClient *storage.DataStore, closure ExecutionClosure) ( - *core.LiteralMap, *admin.UrlBlob, error) { - var outputsURLBlob admin.UrlBlob +// GetOutputs returns outputs data for an execution +func GetOutputs(ctx context.Context, remoteDataConfig *runtimeInterfaces.RemoteDataConfig, + storageClient *storage.DataStore, closure ExecutionClosure) ( + *core.LiteralMap, error) { var fullOutputs = &core.LiteralMap{} if closure == nil { - return fullOutputs, &outputsURLBlob, nil - } - - if len(closure.GetOutputUri()) > 0 && remoteDataConfig.SignedURL.Enabled { - var err error - outputsURLBlob, err = urlData.Get(ctx, closure.GetOutputUri()) - if err != nil { - return nil, nil, err - } + return fullOutputs, nil } if closure.GetOutputData() != nil { if int64(proto.Size(closure.GetOutputData())) < remoteDataConfig.MaxSizeInBytes { fullOutputs = closure.GetOutputData() } else { - logger.Debugf(ctx, "execution closure contains output data that exceeds max data size for responses") + logger.Errorf(ctx, "execution closure contains output data that exceeds max data size for responses") + return nil, errors.Errorf(storage.ErrExceedsLimit, "limit exceeded. %.6vb > %vb.", int64(proto.Size(closure.GetOutputData())), remoteDataConfig.MaxSizeInBytes) } - } else if shouldFetchOutputData(remoteDataConfig, outputsURLBlob, closure.GetOutputUri()) { + } else if len(closure.GetOutputUri()) > 0 { err := storageClient.ReadProtobuf(ctx, storage.DataReference(closure.GetOutputUri()), fullOutputs) if err != nil { - // If we fail to read the protobuf from the remote store, we shouldn't fail the request altogether. - // Instead we return the signed URL blob so that the client can use that to fetch the output data. - logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", closure.GetOutputUri(), err) + logger.Errorf(ctx, "Failed to read outputs from URI [%s] with err: %v", closure.GetOutputUri(), err) + return nil, err } } - return fullOutputs, &outputsURLBlob, nil + return fullOutputs, nil } diff --git a/flyteadmin/pkg/manager/impl/util/data_test.go b/flyteadmin/pkg/manager/impl/util/data_test.go index e447b1883c..b0077fd374 100644 --- a/flyteadmin/pkg/manager/impl/util/data_test.go +++ b/flyteadmin/pkg/manager/impl/util/data_test.go @@ -7,13 +7,12 @@ import ( "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" - "github.com/flyteorg/flyte/flyteadmin/pkg/common" commonMocks "github.com/flyteorg/flyte/flyteadmin/pkg/common/mocks" - urlMocks "github.com/flyteorg/flyte/flyteadmin/pkg/data/mocks" "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyte/flyteidl/clients/go/coreutils" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyte/flytestdlib/errors" "github.com/flyteorg/flyte/flytestdlib/storage" ) @@ -25,104 +24,9 @@ var testLiteralMap = &core.LiteralMap{ const testOutputsURI = "s3://foo/bar/outputs.pb" -func TestShouldFetchData(t *testing.T) { - t.Run("local config", func(t *testing.T) { - assert.True(t, shouldFetchData(&interfaces.RemoteDataConfig{ - Scheme: common.Local, - MaxSizeInBytes: 100, - }, admin.UrlBlob{ - Url: "s3://data", - Bytes: 200, - })) - }) - t.Run("no config", func(t *testing.T) { - assert.True(t, shouldFetchData(&interfaces.RemoteDataConfig{ - Scheme: common.None, - MaxSizeInBytes: 100, - }, admin.UrlBlob{ - Url: "s3://data", - Bytes: 200, - })) - }) - t.Run("no config", func(t *testing.T) { - assert.True(t, shouldFetchData(&interfaces.RemoteDataConfig{ - Scheme: common.None, - }, admin.UrlBlob{ - Url: "s3://data", - Bytes: 200, - })) - }) - t.Run("max size under limit", func(t *testing.T) { - assert.True(t, shouldFetchData(&interfaces.RemoteDataConfig{ - Scheme: common.AWS, - MaxSizeInBytes: 1000, - }, admin.UrlBlob{ - Url: "s3://data", - Bytes: 200, - })) - }) - t.Run("max size over limit", func(t *testing.T) { - assert.False(t, shouldFetchData(&interfaces.RemoteDataConfig{ - Scheme: common.AWS, - MaxSizeInBytes: 100, - }, admin.UrlBlob{ - Url: "s3://data", - Bytes: 200, - })) - }) - t.Run("empty url config", func(t *testing.T) { - assert.False(t, shouldFetchData(&interfaces.RemoteDataConfig{ - Scheme: common.AWS, - MaxSizeInBytes: 100, - }, admin.UrlBlob{ - Bytes: 200, - })) - }) -} - -func TestShouldFetchOutputData(t *testing.T) { - t.Run("local config", func(t *testing.T) { - assert.True(t, shouldFetchOutputData(&interfaces.RemoteDataConfig{ - Scheme: common.Local, - MaxSizeInBytes: 100, - }, admin.UrlBlob{ - Url: "s3://data", - Bytes: 200, - }, "s3://foo/bar.txt")) - }) - t.Run("max size under limit", func(t *testing.T) { - assert.True(t, shouldFetchOutputData(&interfaces.RemoteDataConfig{ - Scheme: common.AWS, - MaxSizeInBytes: 1000, - }, admin.UrlBlob{ - Url: "s3://data", - Bytes: 200, - }, "s3://foo/bar.txt")) - }) - t.Run("output uri empty", func(t *testing.T) { - assert.False(t, shouldFetchOutputData(&interfaces.RemoteDataConfig{ - Scheme: common.AWS, - MaxSizeInBytes: 1000, - }, admin.UrlBlob{ - Url: "s3://data", - Bytes: 200, - }, "")) - }) -} - func TestGetInputs(t *testing.T) { inputsURI := "s3://foo/bar/inputs.pb" - expectedURLBlob := admin.UrlBlob{ - Url: "s3://foo/signed/inputs.pb", - Bytes: 1000, - } - - mockRemoteURL := urlMocks.NewMockRemoteURL() - mockRemoteURL.(*urlMocks.MockRemoteURL).GetCallback = func(ctx context.Context, uri string) (admin.UrlBlob, error) { - assert.Equal(t, inputsURI, uri) - return expectedURLBlob, nil - } remoteDataConfig := interfaces.RemoteDataConfig{ MaxSizeInBytes: 2000, } @@ -140,33 +44,21 @@ func TestGetInputs(t *testing.T) { remoteDataConfig.SignedURL = interfaces.SignedURL{ Enabled: true, } - fullInputs, inputURLBlob, err := GetInputs(context.TODO(), mockRemoteURL, &remoteDataConfig, mockStorage, inputsURI) + fullInputs, err := GetInputs(context.TODO(), mockStorage, inputsURI) assert.NoError(t, err) assert.True(t, proto.Equal(fullInputs, testLiteralMap)) - assert.True(t, proto.Equal(inputURLBlob, &expectedURLBlob)) }) t.Run("should not sign URL", func(t *testing.T) { remoteDataConfig.SignedURL = interfaces.SignedURL{ Enabled: false, } - fullInputs, inputURLBlob, err := GetInputs(context.TODO(), mockRemoteURL, &remoteDataConfig, mockStorage, inputsURI) + fullInputs, err := GetInputs(context.TODO(), mockStorage, inputsURI) assert.NoError(t, err) assert.True(t, proto.Equal(fullInputs, testLiteralMap)) - assert.Empty(t, inputURLBlob) }) } func TestGetOutputs(t *testing.T) { - expectedURLBlob := admin.UrlBlob{ - Url: "s3://foo/signed/outputs.pb", - Bytes: 1000, - } - - mockRemoteURL := urlMocks.NewMockRemoteURL() - mockRemoteURL.(*urlMocks.MockRemoteURL).GetCallback = func(ctx context.Context, uri string) (admin.UrlBlob, error) { - assert.Equal(t, testOutputsURI, uri) - return expectedURLBlob, nil - } remoteDataConfig := interfaces.RemoteDataConfig{ MaxSizeInBytes: 2000, @@ -179,37 +71,27 @@ func TestGetOutputs(t *testing.T) { _ = proto.Unmarshal(marshalled, msg) return nil } + mockStorageReadFailure := commonMocks.GetMockStorageClient() + mockStorageReadFailure.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func( + ctx context.Context, reference storage.DataReference, msg proto.Message) error { + return errors.Errorf("error", "error") + } closure := &admin.NodeExecutionClosure{ OutputResult: &admin.NodeExecutionClosure_OutputUri{ OutputUri: testOutputsURI, }, } - t.Run("offloaded outputs with signed URL", func(t *testing.T) { - remoteDataConfig.SignedURL = interfaces.SignedURL{ - Enabled: true, - } - - fullOutputs, outputURLBlob, err := GetOutputs(context.TODO(), mockRemoteURL, &remoteDataConfig, mockStorage, closure) + t.Run("offloaded outputs", func(t *testing.T) { + fullOutputs, err := GetOutputs(context.TODO(), &remoteDataConfig, mockStorage, closure) assert.NoError(t, err) assert.True(t, proto.Equal(fullOutputs, testLiteralMap)) - assert.True(t, proto.Equal(outputURLBlob, &expectedURLBlob)) }) - t.Run("offloaded outputs without signed URL", func(t *testing.T) { - remoteDataConfig.SignedURL = interfaces.SignedURL{ - Enabled: false, - } - - fullOutputs, outputURLBlob, err := GetOutputs(context.TODO(), mockRemoteURL, &remoteDataConfig, mockStorage, closure) - assert.NoError(t, err) - assert.True(t, proto.Equal(fullOutputs, testLiteralMap)) - assert.Empty(t, outputURLBlob) + t.Run("offloaded outputs storage read fails", func(t *testing.T) { + fullOutputs, err := GetOutputs(context.TODO(), &remoteDataConfig, mockStorageReadFailure, closure) + assert.Error(t, err) + assert.Nil(t, fullOutputs) }) t.Run("inline outputs", func(t *testing.T) { - mockRemoteURL := urlMocks.NewMockRemoteURL() - mockRemoteURL.(*urlMocks.MockRemoteURL).GetCallback = func(ctx context.Context, uri string) (admin.UrlBlob, error) { - t.Fatal("Should not fetch a remote URL for outputs stored inline for an execution model") - return admin.UrlBlob{}, nil - } remoteDataConfig := interfaces.RemoteDataConfig{} remoteDataConfig.MaxSizeInBytes = 2000 @@ -225,10 +107,30 @@ func TestGetOutputs(t *testing.T) { }, } - fullOutputs, outputURLBlob, err := GetOutputs(context.TODO(), mockRemoteURL, &remoteDataConfig, mockStorage, closure) + fullOutputs, err := GetOutputs(context.TODO(), &remoteDataConfig, mockStorage, closure) assert.NoError(t, err) assert.True(t, proto.Equal(fullOutputs, testLiteralMap)) - assert.Empty(t, outputURLBlob) + }) + t.Run("inline outputs over limit", func(t *testing.T) { + remoteDataConfig := interfaces.RemoteDataConfig{} + remoteDataConfig.MaxSizeInBytes = 0 + + mockStorage := commonMocks.GetMockStorageClient() + mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func( + ctx context.Context, reference storage.DataReference, msg proto.Message) error { + t.Fatal("Should not fetch when outputs stored inline for an execution model") + return nil + } + closure := &admin.NodeExecutionClosure{ + OutputResult: &admin.NodeExecutionClosure_OutputData{ + OutputData: testLiteralMap, + }, + } + + fullOutputs, err := GetOutputs(context.TODO(), &remoteDataConfig, mockStorage, closure) + assert.Nil(t, fullOutputs) + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "LIMIT_EXCEEDED") }) }