From e5dafff7c129aecdc5ed3a7d71fc5f9e0ecfc8bb Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Fri, 12 Jan 2024 00:19:27 -0800 Subject: [PATCH] add unit tests Signed-off-by: Paul Dittamo --- .../subworkflow/launchplan/admin_test.go | 180 ++++++++++++++++++ 1 file changed, 180 insertions(+) diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go index e21013b3cd..1c20cf3b36 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin_test.go @@ -5,6 +5,9 @@ import ( "testing" "time" + "github.com/flyteorg/flyte/flyteidl/clients/go/coreutils" + storageMocks "github.com/flyteorg/flyte/flytestdlib/storage/mocks" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -12,6 +15,7 @@ import ( "google.golang.org/grpc/status" "github.com/flyteorg/flyte/flyteidl/clients/go/admin/mocks" + "github.com/flyteorg/flyte/flyteidl/clients/go/coreutils" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/cache" @@ -20,6 +24,7 @@ import ( "github.com/flyteorg/flyte/flytestdlib/promutils" "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" "github.com/flyteorg/flyte/flytestdlib/storage" + storageMocks "github.com/flyteorg/flyte/flytestdlib/storage/mocks" ) func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { @@ -67,6 +72,181 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) { assert.Equal(t, v[0].Item, i) }) + t.Run("fetch-execution-error", func(t *testing.T) { + mockClient := &mocks.AdminServiceClient{} + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) + assert.NoError(t, err) + + iwMock := &mocks2.ItemWrapper{} + i := executionCacheItem{} + iwMock.OnGetItem().Return(i) + iwMock.OnGetID().Return("id") + + mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, "")) + + adminExec := exec.(*adminLaunchPlanExecutor) + v, err := adminExec.syncItem(ctx, cache.Batch{iwMock}) + assert.NoError(t, err) + assert.Len(t, v, 1) + + item, ok := v[0].Item.(executionCacheItem) + assert.True(t, ok) + assert.NotNil(t, item.SyncError.Error()) + }) + + t.Run("successful-execution-data-retrieval", func(t *testing.T) { + mockClient := &mocks.AdminServiceClient{} + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) + assert.NoError(t, err) + + iwMock := &mocks2.ItemWrapper{} + i := executionCacheItem{} + iwMock.OnGetItem().Return(i) + iwMock.OnGetID().Return("id") + + mockExecutionResp := &admin.Execution{ + Closure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED}, + } + mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil) + + fullOutputs := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "foo-1": coreutils.MustMakeLiteral("foo-value-1"), + }, + } + + mockExecData := &admin.WorkflowExecutionGetDataResponse{FullOutputs: fullOutputs} + mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(mockExecData, nil) + + adminExec := exec.(*adminLaunchPlanExecutor) + v, err := adminExec.syncItem(ctx, cache.Batch{iwMock}) + assert.NoError(t, err) + assert.Len(t, v, 1) + + item, ok := v[0].Item.(executionCacheItem) + assert.True(t, ok) + assert.Equal(t, item.ExecutionOutputs, fullOutputs) + }) + + t.Run("failed-execution-data-retrieval", func(t *testing.T) { + mockClient := &mocks.AdminServiceClient{} + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore) + assert.NoError(t, err) + + iwMock := &mocks2.ItemWrapper{} + i := executionCacheItem{} + iwMock.OnGetItem().Return(i) + iwMock.OnGetID().Return("id") + + mockExecutionResp := &admin.Execution{ + Closure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED}, + } + mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil) + + mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, "")) + + adminExec := exec.(*adminLaunchPlanExecutor) + v, err := adminExec.syncItem(ctx, cache.Batch{iwMock}) + assert.NoError(t, err) + assert.Len(t, v, 1) + + item, ok := v[0].Item.(executionCacheItem) + assert.True(t, ok) + assert.NotNil(t, item.SyncError) + }) + + t.Run("read-proto-fails", func(t *testing.T) { + mockClient := &mocks.AdminServiceClient{} + + pbStore := &storageMocks.ComposedProtobufStore{} + + pbStore.On("ReadProtobuf", mock.Anything, mock.Anything, mock.Anything).Return(status.Error(codes.Internal, "")) + + storageClient := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient) + assert.NoError(t, err) + + iwMock := &mocks2.ItemWrapper{} + i := executionCacheItem{} + iwMock.OnGetItem().Return(i) + iwMock.OnGetID().Return("id") + + mockExecutionResp := &admin.Execution{ + Closure: &admin.ExecutionClosure{ + Phase: core.WorkflowExecution_SUCCEEDED, + OutputResult: &admin.ExecutionClosure_Outputs{ + Outputs: &admin.LiteralMapBlob{ + Data: &admin.LiteralMapBlob_Uri{ + Uri: "s3://foo/bar", + }, + }, + }, + }, + } + mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil) + + mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, "")) + + adminExec := exec.(*adminLaunchPlanExecutor) + v, err := adminExec.syncItem(ctx, cache.Batch{iwMock}) + assert.NoError(t, err) + assert.Len(t, v, 1) + + item, ok := v[0].Item.(executionCacheItem) + assert.True(t, ok) + assert.NotNil(t, item.SyncError) + }) + + t.Run("read-proto-succeeds", func(t *testing.T) { + mockClient := &mocks.AdminServiceClient{} + + pbStore := &storageMocks.ComposedProtobufStore{} + + pbStore.On("ReadProtobuf", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + storageClient := &storage.DataStore{ + ComposedProtobufStore: pbStore, + ReferenceConstructor: &storageMocks.ReferenceConstructor{}, + } + + exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient) + assert.NoError(t, err) + + iwMock := &mocks2.ItemWrapper{} + i := executionCacheItem{} + iwMock.OnGetItem().Return(i) + iwMock.OnGetID().Return("id") + + mockExecutionResp := &admin.Execution{ + Closure: &admin.ExecutionClosure{ + Phase: core.WorkflowExecution_SUCCEEDED, + OutputResult: &admin.ExecutionClosure_Outputs{ + Outputs: &admin.LiteralMapBlob{ + Data: &admin.LiteralMapBlob_Uri{ + Uri: "s3://foo/bar", + }, + }, + }, + }, + } + mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil) + + mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, "")) + + adminExec := exec.(*adminLaunchPlanExecutor) + v, err := adminExec.syncItem(ctx, cache.Batch{iwMock}) + assert.NoError(t, err) + assert.Len(t, v, 1) + + item, ok := v[0].Item.(executionCacheItem) + assert.True(t, ok) + assert.Nil(t, item.SyncError) + }) + t.Run("notFound", func(t *testing.T) { mockClient := &mocks.AdminServiceClient{}