Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt committed Jan 12, 2024
1 parent b85c172 commit e5dafff
Showing 1 changed file with 180 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ import (
"testing"
"time"

"github.com/flyteorg/flyte/flyteidl/clients/go/coreutils"
storageMocks "github.com/flyteorg/flyte/flytestdlib/storage/mocks"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/flyteorg/flyte/flyteidl/clients/go/admin/mocks"
"github.com/flyteorg/flyte/flyteidl/clients/go/coreutils"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/cache"
Expand All @@ -20,6 +24,7 @@ import (
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/flyteorg/flyte/flytestdlib/storage"
storageMocks "github.com/flyteorg/flyte/flytestdlib/storage/mocks"
)

func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
Expand Down Expand Up @@ -67,6 +72,181 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
assert.Equal(t, v[0].Item, i)
})

t.Run("fetch-execution-error", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
i := executionCacheItem{}
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")

mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, ""))

adminExec := exec.(*adminLaunchPlanExecutor)
v, err := adminExec.syncItem(ctx, cache.Batch{iwMock})
assert.NoError(t, err)
assert.Len(t, v, 1)

item, ok := v[0].Item.(executionCacheItem)
assert.True(t, ok)
assert.NotNil(t, item.SyncError.Error())
})

t.Run("successful-execution-data-retrieval", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
i := executionCacheItem{}
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")

mockExecutionResp := &admin.Execution{
Closure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED},
}
mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil)

fullOutputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
"foo-1": coreutils.MustMakeLiteral("foo-value-1"),
},
}

mockExecData := &admin.WorkflowExecutionGetDataResponse{FullOutputs: fullOutputs}
mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(mockExecData, nil)

adminExec := exec.(*adminLaunchPlanExecutor)
v, err := adminExec.syncItem(ctx, cache.Batch{iwMock})
assert.NoError(t, err)
assert.Len(t, v, 1)

item, ok := v[0].Item.(executionCacheItem)
assert.True(t, ok)
assert.Equal(t, item.ExecutionOutputs, fullOutputs)
})

t.Run("failed-execution-data-retrieval", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
i := executionCacheItem{}
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")

mockExecutionResp := &admin.Execution{
Closure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED},
}
mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil)

mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, ""))

adminExec := exec.(*adminLaunchPlanExecutor)
v, err := adminExec.syncItem(ctx, cache.Batch{iwMock})
assert.NoError(t, err)
assert.Len(t, v, 1)

item, ok := v[0].Item.(executionCacheItem)
assert.True(t, ok)
assert.NotNil(t, item.SyncError)
})

t.Run("read-proto-fails", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}

pbStore := &storageMocks.ComposedProtobufStore{}

pbStore.On("ReadProtobuf", mock.Anything, mock.Anything, mock.Anything).Return(status.Error(codes.Internal, ""))

storageClient := &storage.DataStore{
ComposedProtobufStore: pbStore,
ReferenceConstructor: &storageMocks.ReferenceConstructor{},
}

exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient)
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
i := executionCacheItem{}
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")

mockExecutionResp := &admin.Execution{
Closure: &admin.ExecutionClosure{
Phase: core.WorkflowExecution_SUCCEEDED,
OutputResult: &admin.ExecutionClosure_Outputs{
Outputs: &admin.LiteralMapBlob{
Data: &admin.LiteralMapBlob_Uri{
Uri: "s3://foo/bar",
},
},
},
},
}
mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil)

mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, ""))

adminExec := exec.(*adminLaunchPlanExecutor)
v, err := adminExec.syncItem(ctx, cache.Batch{iwMock})
assert.NoError(t, err)
assert.Len(t, v, 1)

item, ok := v[0].Item.(executionCacheItem)
assert.True(t, ok)
assert.NotNil(t, item.SyncError)
})

t.Run("read-proto-succeeds", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}

pbStore := &storageMocks.ComposedProtobufStore{}

pbStore.On("ReadProtobuf", mock.Anything, mock.Anything, mock.Anything).Return(nil)

storageClient := &storage.DataStore{
ComposedProtobufStore: pbStore,
ReferenceConstructor: &storageMocks.ReferenceConstructor{},
}

exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient)
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
i := executionCacheItem{}
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")

mockExecutionResp := &admin.Execution{
Closure: &admin.ExecutionClosure{
Phase: core.WorkflowExecution_SUCCEEDED,
OutputResult: &admin.ExecutionClosure_Outputs{
Outputs: &admin.LiteralMapBlob{
Data: &admin.LiteralMapBlob_Uri{
Uri: "s3://foo/bar",
},
},
},
},
}
mockClient.On("GetExecution", mock.Anything, mock.Anything).Return(mockExecutionResp, nil)

mockClient.On("GetExecutionData", mock.Anything, mock.Anything).Return(nil, status.Error(codes.NotFound, ""))

adminExec := exec.(*adminLaunchPlanExecutor)
v, err := adminExec.syncItem(ctx, cache.Batch{iwMock})
assert.NoError(t, err)
assert.Len(t, v, 1)

item, ok := v[0].Item.(executionCacheItem)
assert.True(t, ok)
assert.Nil(t, item.SyncError)
})

t.Run("notFound", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}

Expand Down

0 comments on commit e5dafff

Please sign in to comment.