From 72818d563f0b5f54da9f19e42544b0ae836e411a Mon Sep 17 00:00:00 2001 From: Iaroslav Ciupin Date: Thu, 11 Jan 2024 21:14:49 +0200 Subject: [PATCH] add tests Signed-off-by: Iaroslav Ciupin --- .../pkg/manager/impl/workflow_manager_test.go | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/flyteadmin/pkg/manager/impl/workflow_manager_test.go b/flyteadmin/pkg/manager/impl/workflow_manager_test.go index dc046b8b0e..f4dbf6e50a 100644 --- a/flyteadmin/pkg/manager/impl/workflow_manager_test.go +++ b/flyteadmin/pkg/manager/impl/workflow_manager_test.go @@ -407,6 +407,33 @@ func Test_GetDynamicNodeWorkflow_Success(t *testing.T) { assert.True(t, proto.Equal(expected, resp)) } +func Test_GetDynamicNodeWorkflow_DBError(t *testing.T) { + repo := repositoryMocks.NewMockRepository() + nodeExecID := core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: project, + Domain: domain, + Name: name, + }, + } + expectedErr := errors.New("failure") + repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo). + SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) { + assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier) + return models.NodeExecution{}, expectedErr + }) + mockStorageClient := commonMocks.GetMockStorageClient() + ctx := context.TODO() + workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(), + mockStorageClient, storagePrefix, mockScope.NewTestScope(), + artifacts.NewArtifactRegistry(ctx, nil)) + + resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID}) + + assert.Equal(t, expectedErr, err) + assert.Empty(t, resp) +} + func Test_GetDynamicNodeWorkflow_NoRemoteReference(t *testing.T) { repo := repositoryMocks.NewMockRepository() nodeExecID := core.NodeExecutionIdentifier{ @@ -434,6 +461,39 @@ func Test_GetDynamicNodeWorkflow_NoRemoteReference(t *testing.T) { assert.True(t, proto.Equal(expected, resp)) } +func Test_GetDynamicNodeWorkflow_StorageError(t *testing.T) { + repo := repositoryMocks.NewMockRepository() + nodeExecID := core.NodeExecutionIdentifier{ + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: project, + Domain: domain, + Name: name, + }, + } + repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo). + SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) { + assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier) + return models.NodeExecution{DynamicWorkflowRemoteClosureReference: remoteClosureIdentifier}, nil + }) + mockStorageClient := commonMocks.GetMockStorageClient() + mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(ctx context.Context, reference storage.DataReference, msg proto.Message) error { + assert.Equal(t, remoteClosureIdentifier, reference.String()) + return errors.New("failure") + } + ctx := context.TODO() + workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(), + mockStorageClient, storagePrefix, mockScope.NewTestScope(), + artifacts.NewArtifactRegistry(ctx, nil)) + + resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID}) + + st, ok := status.FromError(err) + assert.True(t, ok) + assert.Equal(t, codes.Internal, st.Code()) + assert.Equal(t, "Unable to read WorkflowClosure from location s3://flyte/metadata/admin/remote closure id : failure", st.Message()) + assert.Empty(t, resp) +} + func TestListWorkflows(t *testing.T) { repository := repositoryMocks.NewMockRepository() workflowListFunc := func(input interfaces.ListResourceInput) (interfaces.WorkflowCollectionOutput, error) {