Skip to content

Commit

Permalink
retry fetching subworkflow output data if fulloutputs are not set
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt committed Jan 13, 2024
1 parent 361134b commit 1f161f9
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
execData, err := a.adminClient.GetExecutionData(ctx, &admin.WorkflowExecutionGetDataRequest{
Id: &exec.WorkflowExecutionIdentifier,
})
if err != nil {
if err != nil || execData.GetFullOutputs() == nil || execData.GetFullOutputs().GetLiterals() == nil {
outputURI := res.GetClosure().GetOutputs().GetUri()
// attempt remote storage read on GetExecutionData failure
if outputURI != "" {
Expand Down
353 changes: 159 additions & 194 deletions flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,200 +50,6 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
assert.Equal(t, result, s)
})

t.Run("terminal-sync", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)
iwMock := &mocks2.ItemWrapper{}
i := executionCacheItem{ExecutionClosure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED, WorkflowId: &core.Identifier{Project: "p"}}}
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")
adminExec := exec.(*adminLaunchPlanExecutor)
v, err := adminExec.syncItem(ctx, cache.Batch{
iwMock,
})
assert.NoError(t, err)
assert.NotNil(t, v)
assert.Len(t, v, 1)
assert.Equal(t, v[0].ID, "id")
assert.Equal(t, v[0].Item, i)
})

t.Run("fetch-execution-error", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
i := executionCacheItem{}
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")

mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, ""))

adminExec := exec.(*adminLaunchPlanExecutor)
v, err := adminExec.syncItem(ctx, cache.Batch{iwMock})
assert.NoError(t, err)
assert.Len(t, v, 1)

item, ok := v[0].Item.(executionCacheItem)
assert.True(t, ok)
assert.NotNil(t, item.SyncError.Error())
})

t.Run("successful-execution-data-retrieval", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
i := executionCacheItem{}
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")

mockExecutionResp := &admin.Execution{
Closure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED},
}
mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil)

fullOutputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
"foo-1": coreutils.MustMakeLiteral("foo-value-1"),
},
}

mockExecData := &admin.WorkflowExecutionGetDataResponse{FullOutputs: fullOutputs}
mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(mockExecData, nil)

adminExec := exec.(*adminLaunchPlanExecutor)
v, err := adminExec.syncItem(ctx, cache.Batch{iwMock})
assert.NoError(t, err)
assert.Len(t, v, 1)

item, ok := v[0].Item.(executionCacheItem)
assert.True(t, ok)
assert.Equal(t, item.ExecutionOutputs, fullOutputs)
})

t.Run("failed-execution-data-retrieval", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
i := executionCacheItem{}
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")

mockExecutionResp := &admin.Execution{
Closure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED},
}
mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil)

mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, ""))

adminExec := exec.(*adminLaunchPlanExecutor)
v, err := adminExec.syncItem(ctx, cache.Batch{iwMock})
assert.NoError(t, err)
assert.Len(t, v, 1)

item, ok := v[0].Item.(executionCacheItem)
assert.True(t, ok)
assert.NotNil(t, item.SyncError)
})

t.Run("read-proto-fails", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}

pbStore := &storageMocks.ComposedProtobufStore{}

pbStore.On("ReadProtobuf", mock.Anything, mock.Anything, mock.Anything).Return(status.Error(codes.Internal, ""))

storageClient := &storage.DataStore{
ComposedProtobufStore: pbStore,
ReferenceConstructor: &storageMocks.ReferenceConstructor{},
}

exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient)
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
i := executionCacheItem{}
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")

mockExecutionResp := &admin.Execution{
Closure: &admin.ExecutionClosure{
Phase: core.WorkflowExecution_SUCCEEDED,
OutputResult: &admin.ExecutionClosure_Outputs{
Outputs: &admin.LiteralMapBlob{
Data: &admin.LiteralMapBlob_Uri{
Uri: "s3://foo/bar",
},
},
},
},
}
mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil)

mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, ""))

adminExec := exec.(*adminLaunchPlanExecutor)
v, err := adminExec.syncItem(ctx, cache.Batch{iwMock})
assert.NoError(t, err)
assert.Len(t, v, 1)

item, ok := v[0].Item.(executionCacheItem)
assert.True(t, ok)
assert.NotNil(t, item.SyncError)
})

t.Run("read-proto-succeeds", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}

pbStore := &storageMocks.ComposedProtobufStore{}

pbStore.On("ReadProtobuf", mock.Anything, mock.Anything, mock.Anything).Return(nil)

storageClient := &storage.DataStore{
ComposedProtobufStore: pbStore,
ReferenceConstructor: &storageMocks.ReferenceConstructor{},
}

exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient)
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
i := executionCacheItem{}
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")

mockExecutionResp := &admin.Execution{
Closure: &admin.ExecutionClosure{
Phase: core.WorkflowExecution_SUCCEEDED,
OutputResult: &admin.ExecutionClosure_Outputs{
Outputs: &admin.LiteralMapBlob{
Data: &admin.LiteralMapBlob_Uri{
Uri: "s3://foo/bar",
},
},
},
},
}
mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil)

mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, ""))

adminExec := exec.(*adminLaunchPlanExecutor)
v, err := adminExec.syncItem(ctx, cache.Batch{iwMock})
assert.NoError(t, err)
assert.Len(t, v, 1)

item, ok := v[0].Item.(executionCacheItem)
assert.True(t, ok)
assert.Nil(t, item.SyncError)
})

t.Run("notFound", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}

Expand Down Expand Up @@ -612,6 +418,165 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) {
})
}

type test struct {
name string
cacheItem executionCacheItem
getExecutionResp *admin.Execution
getExecutionError error
getExecutionDataResp *admin.WorkflowExecutionGetDataResponse
getExecutionDataError error
storageReadError error
expectSuccess bool
expectError bool
expectedOutputs *core.LiteralMap
expectedErrorContains string
}

func TestAdminLaunchPlanExecutorScenarios(t *testing.T) {
ctx := context.TODO()

mockExecutionRespWithOutputs := &admin.Execution{
Closure: &admin.ExecutionClosure{
Phase: core.WorkflowExecution_SUCCEEDED,
OutputResult: &admin.ExecutionClosure_Outputs{
Outputs: &admin.LiteralMapBlob{
Data: &admin.LiteralMapBlob_Uri{
Uri: "s3://foo/bar",
},
},
},
},
}
mockExecutionRespWithoutOutputs := &admin.Execution{
Closure: &admin.ExecutionClosure{
Phase: core.WorkflowExecution_SUCCEEDED,
},
}
outputLiteral := &core.LiteralMap{
Literals: map[string]*core.Literal{
"foo-1": coreutils.MustMakeLiteral("foo-value-1"),
},
}

tests := []test{
{
name: "terminal-sync",
expectSuccess: true,
cacheItem: executionCacheItem{
ExecutionClosure: &admin.ExecutionClosure{
Phase: core.WorkflowExecution_SUCCEEDED,
WorkflowId: &core.Identifier{Project: "p"}},
ExecutionOutputs: outputLiteral,
},
expectedOutputs: outputLiteral,
},
{
name: "GetExecution-fails",
expectError: true,
cacheItem: executionCacheItem{},
getExecutionError: status.Error(codes.NotFound, ""),
expectedErrorContains: RemoteErrorNotFound,
},
{
name: "GetExecution-fails-system",
expectError: true,
cacheItem: executionCacheItem{},
getExecutionError: status.Error(codes.Internal, ""),
expectedErrorContains: RemoteErrorSystem,
},
{
name: "GetExecutionData-succeeds",
expectSuccess: true,
cacheItem: executionCacheItem{},
expectedOutputs: outputLiteral,
getExecutionDataResp: &admin.WorkflowExecutionGetDataResponse{
FullOutputs: outputLiteral,
},
getExecutionDataError: nil,
getExecutionResp: mockExecutionRespWithOutputs,
},
{
name: "GetExecutionData-error-no-retry",
expectError: true,
cacheItem: executionCacheItem{},
getExecutionDataError: status.Error(codes.NotFound, ""),
expectedErrorContains: codes.NotFound.String(),
getExecutionResp: mockExecutionRespWithoutOutputs,
},
{
name: "GetExecutionData-error-retry-fails",
expectError: true,
cacheItem: executionCacheItem{},
getExecutionDataError: status.Error(codes.NotFound, ""),
storageReadError: status.Error(codes.Internal, ""),
expectedErrorContains: codes.Internal.String(),
getExecutionResp: mockExecutionRespWithOutputs,
},
{
name: "GetExecutionData-empty-retry-fails",
expectError: true,
cacheItem: executionCacheItem{},
getExecutionDataResp: &admin.WorkflowExecutionGetDataResponse{
FullOutputs: &core.LiteralMap{},
},
getExecutionDataError: nil,
storageReadError: status.Error(codes.Internal, ""),
expectedErrorContains: codes.Internal.String(),
getExecutionResp: mockExecutionRespWithOutputs,
},
{
name: "GetExecutionData-empty-retry-succeeds",
expectSuccess: true,
cacheItem: executionCacheItem{},
getExecutionDataResp: &admin.WorkflowExecutionGetDataResponse{
FullOutputs: &core.LiteralMap{},
},
getExecutionDataError: nil,
expectedOutputs: &core.LiteralMap{},
getExecutionResp: mockExecutionRespWithOutputs,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
pbStore := &storageMocks.ComposedProtobufStore{}
pbStore.On("ReadProtobuf", mock.Anything, mock.Anything, mock.Anything).Return(tc.storageReadError)
storageClient := &storage.DataStore{
ComposedProtobufStore: pbStore,
ReferenceConstructor: &storageMocks.ReferenceConstructor{},
}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient)
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
i := tc.cacheItem
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")

mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(tc.getExecutionResp, tc.getExecutionError)
mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(tc.getExecutionDataResp, tc.getExecutionDataError)

adminExec := exec.(*adminLaunchPlanExecutor)

v, err := adminExec.syncItem(ctx, cache.Batch{iwMock})
assert.NoError(t, err)
assert.Len(t, v, 1)
item, ok := v[0].Item.(executionCacheItem)
assert.True(t, ok)

if tc.expectSuccess {
assert.Nil(t, item.SyncError)
assert.Equal(t, tc.expectedOutputs, item.ExecutionOutputs)
}
if tc.expectError {
assert.NotNil(t, item.SyncError)
assert.Contains(t, item.SyncError.Error(), tc.expectedErrorContains)
}
})
}
}

func TestIsWorkflowTerminated(t *testing.T) {
assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_SUCCEEDED))
assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_ABORTED))
Expand Down

0 comments on commit 1f161f9

Please sign in to comment.