Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Iaroslav Ciupin <[email protected]>
  • Loading branch information
iaroslav-ciupin committed Jan 11, 2024
1 parent d6d6911 commit 72818d5
Showing 1 changed file with 60 additions and 0 deletions.
60 changes: 60 additions & 0 deletions flyteadmin/pkg/manager/impl/workflow_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,33 @@ func Test_GetDynamicNodeWorkflow_Success(t *testing.T) {
assert.True(t, proto.Equal(expected, resp))
}

func Test_GetDynamicNodeWorkflow_DBError(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
expectedErr := errors.New("failure")
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{}, expectedErr
})
mockStorageClient := commonMocks.GetMockStorageClient()
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

assert.Equal(t, expectedErr, err)
assert.Empty(t, resp)
}

func Test_GetDynamicNodeWorkflow_NoRemoteReference(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
Expand Down Expand Up @@ -434,6 +461,39 @@ func Test_GetDynamicNodeWorkflow_NoRemoteReference(t *testing.T) {
assert.True(t, proto.Equal(expected, resp))
}

func Test_GetDynamicNodeWorkflow_StorageError(t *testing.T) {
repo := repositoryMocks.NewMockRepository()
nodeExecID := core.NodeExecutionIdentifier{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: project,
Domain: domain,
Name: name,
},
}
repo.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).
SetGetCallback(func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.Equal(t, nodeExecID, input.NodeExecutionIdentifier)
return models.NodeExecution{DynamicWorkflowRemoteClosureReference: remoteClosureIdentifier}, nil
})
mockStorageClient := commonMocks.GetMockStorageClient()
mockStorageClient.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
assert.Equal(t, remoteClosureIdentifier, reference.String())
return errors.New("failure")
}
ctx := context.TODO()
workflowManager := NewWorkflowManager(repo, getMockWorkflowConfigProvider(), getMockWorkflowCompiler(),
mockStorageClient, storagePrefix, mockScope.NewTestScope(),
artifacts.NewArtifactRegistry(ctx, nil))

resp, err := workflowManager.GetDynamicNodeWorkflow(ctx, admin.GetDynamicNodeWorkflowRequest{Id: &nodeExecID})

st, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, codes.Internal, st.Code())
assert.Equal(t, "Unable to read WorkflowClosure from location s3://flyte/metadata/admin/remote closure id : failure", st.Message())
assert.Empty(t, resp)
}

func TestListWorkflows(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
workflowListFunc := func(input interfaces.ListResourceInput) (interfaces.WorkflowCollectionOutput, error) {
Expand Down

0 comments on commit 72818d5

Please sign in to comment.