From 1f161f9bb72991e86a157b74c24b5101fa76bfa5 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Fri, 12 Jan 2024 17:49:38 -0800 Subject: [PATCH] retry fetching subworkflow output data if fulloutputs are not set Signed-off-by: Paul Dittamo --- .../nodes/subworkflow/launchplan/admin.go | 2 +- .../subworkflow/launchplan/admin_test.go | 353 ++++++++---------- 2 files changed, 160 insertions(+), 195 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go index 0b55077598..336f095e90 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -268,7 +268,7 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc execData, err := a.adminClient.GetExecutionData(ctx, &admin.WorkflowExecutionGetDataRequest{ Id: &exec.WorkflowExecutionIdentifier, }) - if err != nil { + if err != nil || execData.GetFullOutputs() == nil || execData.GetFullOutputs().GetLiterals() == nil { outputURI := res.GetClosure().GetOutputs().GetUri() // attempt remote storage read on GetExecutionData failure if outputURI != "" { diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go index ba8510ec7a..1f359c1d6a 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go @@ -50,200 +50,6 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { assert.Equal(t, result, s) }) - t.Run("terminal-sync", func(t *testing.T) { - mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) - assert.NoError(t, err) - iwMock := &mocks2.ItemWrapper{} - i := executionCacheItem{ExecutionClosure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED, WorkflowId: &core.Identifier{Project: "p"}}} - iwMock.OnGetItem().Return(i) - iwMock.OnGetID().Return("id") - adminExec := exec.(*adminLaunchPlanExecutor) - v, err := adminExec.syncItem(ctx, cache.Batch{ - iwMock, - }) - assert.NoError(t, err) - assert.NotNil(t, v) - assert.Len(t, v, 1) - assert.Equal(t, v[0].ID, "id") - assert.Equal(t, v[0].Item, i) - }) - - t.Run("fetch-execution-error", func(t *testing.T) { - mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) - assert.NoError(t, err) - - iwMock := &mocks2.ItemWrapper{} - i := executionCacheItem{} - iwMock.OnGetItem().Return(i) - iwMock.OnGetID().Return("id") - - mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, "")) - - adminExec := exec.(*adminLaunchPlanExecutor) - v, err := adminExec.syncItem(ctx, cache.Batch{iwMock}) - assert.NoError(t, err) - assert.Len(t, v, 1) - - item, ok := v[0].Item.(executionCacheItem) - assert.True(t, ok) - assert.NotNil(t, item.SyncError.Error()) - }) - - t.Run("successful-execution-data-retrieval", func(t *testing.T) { - mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) - assert.NoError(t, err) - - iwMock := &mocks2.ItemWrapper{} - i := executionCacheItem{} - iwMock.OnGetItem().Return(i) - iwMock.OnGetID().Return("id") - - mockExecutionResp := &admin.Execution{ - Closure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED}, - } - mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil) - - fullOutputs := &core.LiteralMap{ - Literals: map[string]*core.Literal{ - "foo-1": coreutils.MustMakeLiteral("foo-value-1"), - }, - } - - mockExecData := &admin.WorkflowExecutionGetDataResponse{FullOutputs: fullOutputs} - mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(mockExecData, nil) - - adminExec := exec.(*adminLaunchPlanExecutor) - v, err := adminExec.syncItem(ctx, cache.Batch{iwMock}) - assert.NoError(t, err) - assert.Len(t, v, 1) - - item, ok := v[0].Item.(executionCacheItem) - assert.True(t, ok) - assert.Equal(t, item.ExecutionOutputs, fullOutputs) - }) - - t.Run("failed-execution-data-retrieval", func(t *testing.T) { - mockClient := &mocks.AdminServiceClient{} - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) - assert.NoError(t, err) - - iwMock := &mocks2.ItemWrapper{} - i := executionCacheItem{} - iwMock.OnGetItem().Return(i) - iwMock.OnGetID().Return("id") - - mockExecutionResp := &admin.Execution{ - Closure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED}, - } - mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil) - - mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, "")) - - adminExec := exec.(*adminLaunchPlanExecutor) - v, err := adminExec.syncItem(ctx, cache.Batch{iwMock}) - assert.NoError(t, err) - assert.Len(t, v, 1) - - item, ok := v[0].Item.(executionCacheItem) - assert.True(t, ok) - assert.NotNil(t, item.SyncError) - }) - - t.Run("read-proto-fails", func(t *testing.T) { - mockClient := &mocks.AdminServiceClient{} - - pbStore := &storageMocks.ComposedProtobufStore{} - - pbStore.On("ReadProtobuf", mock.Anything, mock.Anything, mock.Anything).Return(status.Error(codes.Internal, "")) - - storageClient := &storage.DataStore{ - ComposedProtobufStore: pbStore, - ReferenceConstructor: &storageMocks.ReferenceConstructor{}, - } - - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient) - assert.NoError(t, err) - - iwMock := &mocks2.ItemWrapper{} - i := executionCacheItem{} - iwMock.OnGetItem().Return(i) - iwMock.OnGetID().Return("id") - - mockExecutionResp := &admin.Execution{ - Closure: &admin.ExecutionClosure{ - Phase: core.WorkflowExecution_SUCCEEDED, - OutputResult: &admin.ExecutionClosure_Outputs{ - Outputs: &admin.LiteralMapBlob{ - Data: &admin.LiteralMapBlob_Uri{ - Uri: "s3://foo/bar", - }, - }, - }, - }, - } - mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil) - - mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, "")) - - adminExec := exec.(*adminLaunchPlanExecutor) - v, err := adminExec.syncItem(ctx, cache.Batch{iwMock}) - assert.NoError(t, err) - assert.Len(t, v, 1) - - item, ok := v[0].Item.(executionCacheItem) - assert.True(t, ok) - assert.NotNil(t, item.SyncError) - }) - - t.Run("read-proto-succeeds", func(t *testing.T) { - mockClient := &mocks.AdminServiceClient{} - - pbStore := &storageMocks.ComposedProtobufStore{} - - pbStore.On("ReadProtobuf", mock.Anything, mock.Anything, mock.Anything).Return(nil) - - storageClient := &storage.DataStore{ - ComposedProtobufStore: pbStore, - ReferenceConstructor: &storageMocks.ReferenceConstructor{}, - } - - exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient) - assert.NoError(t, err) - - iwMock := &mocks2.ItemWrapper{} - i := executionCacheItem{} - iwMock.OnGetItem().Return(i) - iwMock.OnGetID().Return("id") - - mockExecutionResp := &admin.Execution{ - Closure: &admin.ExecutionClosure{ - Phase: core.WorkflowExecution_SUCCEEDED, - OutputResult: &admin.ExecutionClosure_Outputs{ - Outputs: &admin.LiteralMapBlob{ - Data: &admin.LiteralMapBlob_Uri{ - Uri: "s3://foo/bar", - }, - }, - }, - }, - } - mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil) - - mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, "")) - - adminExec := exec.(*adminLaunchPlanExecutor) - v, err := adminExec.syncItem(ctx, cache.Batch{iwMock}) - assert.NoError(t, err) - assert.Len(t, v, 1) - - item, ok := v[0].Item.(executionCacheItem) - assert.True(t, ok) - assert.Nil(t, item.SyncError) - }) - t.Run("notFound", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{} @@ -612,6 +418,165 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) { }) } +type test struct { + name string + cacheItem executionCacheItem + getExecutionResp *admin.Execution + getExecutionError error + getExecutionDataResp *admin.WorkflowExecutionGetDataResponse + getExecutionDataError error + storageReadError error + expectSuccess bool + expectError bool + expectedOutputs *core.LiteralMap + expectedErrorContains string +} + +func TestAdminLaunchPlanExecutorScenarios(t *testing.T) { + ctx := context.TODO() + + mockExecutionRespWithOutputs := &admin.Execution{ + Closure: &admin.ExecutionClosure{ + Phase: core.WorkflowExecution_SUCCEEDED, + OutputResult: &admin.ExecutionClosure_Outputs{ + Outputs: &admin.LiteralMapBlob{ + Data: &admin.LiteralMapBlob_Uri{ + Uri: "s3://foo/bar", + }, + }, + }, + }, + } + mockExecutionRespWithoutOutputs := &admin.Execution{ + Closure: &admin.ExecutionClosure{ + Phase: core.WorkflowExecution_SUCCEEDED, + }, + } + outputLiteral := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "foo-1": coreutils.MustMakeLiteral("foo-value-1"), + }, + } + + tests := []test{ + { + name: "terminal-sync", + expectSuccess: true, + cacheItem: executionCacheItem{ + ExecutionClosure: &admin.ExecutionClosure{ + Phase: core.WorkflowExecution_SUCCEEDED, + WorkflowId: &core.Identifier{Project: "p"}}, + ExecutionOutputs: outputLiteral, + }, + expectedOutputs: outputLiteral, + }, + { + name: "GetExecution-fails", + expectError: true, + cacheItem: executionCacheItem{}, + getExecutionError: status.Error(codes.NotFound, ""), + expectedErrorContains: RemoteErrorNotFound, + }, + { + name: "GetExecution-fails-system", + expectError: true, + cacheItem: executionCacheItem{}, + getExecutionError: status.Error(codes.Internal, ""), + expectedErrorContains: RemoteErrorSystem, + }, + { + name: "GetExecutionData-succeeds", + expectSuccess: true, + cacheItem: executionCacheItem{}, + expectedOutputs: outputLiteral, + getExecutionDataResp: &admin.WorkflowExecutionGetDataResponse{ + FullOutputs: outputLiteral, + }, + getExecutionDataError: nil, + getExecutionResp: mockExecutionRespWithOutputs, + }, + { + name: "GetExecutionData-error-no-retry", + expectError: true, + cacheItem: executionCacheItem{}, + getExecutionDataError: status.Error(codes.NotFound, ""), + expectedErrorContains: codes.NotFound.String(), + getExecutionResp: mockExecutionRespWithoutOutputs, + }, + { + name: "GetExecutionData-error-retry-fails", + expectError: true, + cacheItem: executionCacheItem{}, + getExecutionDataError: status.Error(codes.NotFound, ""), + storageReadError: status.Error(codes.Internal, ""), + expectedErrorContains: codes.Internal.String(), + getExecutionResp: mockExecutionRespWithOutputs, + }, + { + name: "GetExecutionData-empty-retry-fails", + expectError: true, + cacheItem: executionCacheItem{}, + getExecutionDataResp: &admin.WorkflowExecutionGetDataResponse{ + FullOutputs: &core.LiteralMap{}, + }, + getExecutionDataError: nil, + storageReadError: status.Error(codes.Internal, ""), + expectedErrorContains: codes.Internal.String(), + getExecutionResp: mockExecutionRespWithOutputs, + }, + { + name: "GetExecutionData-empty-retry-succeeds", + expectSuccess: true, + cacheItem: executionCacheItem{}, + getExecutionDataResp: &admin.WorkflowExecutionGetDataResponse{ + FullOutputs: &core.LiteralMap{}, + }, + getExecutionDataError: nil, + expectedOutputs: &core.LiteralMap{}, + getExecutionResp: mockExecutionRespWithOutputs, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mockClient := &mocks.AdminServiceClient{} + pbStore := &storageMocks.ComposedProtobufStore{} + pbStore.On("ReadProtobuf", mock.Anything, mock.Anything, mock.Anything).Return(tc.storageReadError) + storageClient := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient) + assert.NoError(t, err) + + iwMock := &mocks2.ItemWrapper{} + i := tc.cacheItem + iwMock.OnGetItem().Return(i) + iwMock.OnGetID().Return("id") + + mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(tc.getExecutionResp, tc.getExecutionError) + mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(tc.getExecutionDataResp, tc.getExecutionDataError) + + adminExec := exec.(*adminLaunchPlanExecutor) + + v, err := adminExec.syncItem(ctx, cache.Batch{iwMock}) + assert.NoError(t, err) + assert.Len(t, v, 1) + item, ok := v[0].Item.(executionCacheItem) + assert.True(t, ok) + + if tc.expectSuccess { + assert.Nil(t, item.SyncError) + assert.Equal(t, tc.expectedOutputs, item.ExecutionOutputs) + } + if tc.expectError { + assert.NotNil(t, item.SyncError) + assert.Contains(t, item.SyncError.Error(), tc.expectedErrorContains) + } + }) + } +} + func TestIsWorkflowTerminated(t *testing.T) { assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_SUCCEEDED)) assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_ABORTED))