Skip to content

Commit

Permalink
Add testing and fix warnings
Browse files Browse the repository at this point in the history
Signed-off-by: Rafael Raposo <[email protected]>
  • Loading branch information
RRap0so committed Sep 30, 2024
1 parent c7c6301 commit 7f16fd2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 18 deletions.
10 changes: 5 additions & 5 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
event "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/storage"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/timestamppb"
k8_api_err "k8s.io/apimachinery/pkg/api/errors"
k8apierr "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -82,7 +82,7 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
}
_, err = targetCluster.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Create(ctx, flyteWf, v1.CreateOptions{})
if err != nil {
if !k8_api_err.IsAlreadyExists(err) {
if !k8apierr.IsAlreadyExists(err) {
logger.Debugf(context.TODO(), "Failed to create execution [%+v] in cluster: %s", data.ExecutionID, targetCluster.ID)
return interfaces.ExecutionResponse{}, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create workflow in propeller %v", err)
}
Expand All @@ -103,11 +103,11 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat
PropagationPolicy: &deletePropagationBackground,
})
// An IsNotFound error indicates the resource is already deleted.
if err != nil && !k8_api_err.IsNotFound(err) {
if err != nil && !k8apierr.IsNotFound(err) {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)
}

e.executionEventWriter.Write(admin.WorkflowExecutionEventRequest{
e.executionEventWriter.Write(&admin.WorkflowExecutionEventRequest{
Event: &event.WorkflowExecutionEvent{
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: data.ExecutionID.Project,
Expand Down
40 changes: 27 additions & 13 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,7 @@ package impl
import (
"context"
"errors"
"regexp"
"testing"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
k8_api_err "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

eventMock "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/mocks"
"github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster"
execClusterIfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces"
clusterMock "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/mocks"
Expand All @@ -22,10 +13,19 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/mocks"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
flyteclient "github.com/flyteorg/flyte/flytepropeller/pkg/client/clientset/versioned"
v1alpha12 "github.com/flyteorg/flyte/flytepropeller/pkg/client/clientset/versioned/typed/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytestdlib/storage"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
k8apierr "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"regexp"
"testing"
)

var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{}
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestExecute(t *testing.T) {
func TestExecute_AlreadyExists(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.createCallback = func(flyteWorkflow *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) {
return nil, k8_api_err.NewAlreadyExists(schema.GroupResource{}, " ")
return nil, k8apierr.NewAlreadyExists(schema.GroupResource{}, " ")
}
fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
assert.Equal(t, namespace, ns)
Expand Down Expand Up @@ -279,6 +279,7 @@ func TestExecute_MiscError(t *testing.T) {
}

func TestAbort(t *testing.T) {
mockEventWriter := &eventMock.WorkflowExecutionEventWriter{}
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
assert.Equal(t, execID.Name, name)
Expand All @@ -290,20 +291,33 @@ func TestAbort(t *testing.T) {
return &fakeFlyteWorkflow
}
executor := K8sWorkflowExecutor{
executionCluster: getFakeExecutionCluster(),
executionCluster: getFakeExecutionCluster(),
executionEventWriter: mockEventWriter,
}

mockEventWriter.On("Write", mock.MatchedBy(func(req *admin.WorkflowExecutionEventRequest) bool {
return req.Event.ExecutionId.Project == execID.Project &&
req.Event.ExecutionId.Domain == execID.Domain &&
req.Event.ExecutionId.Name == execID.Name &&
req.Event.Phase == core.WorkflowExecution_ABORTED &&
req.Event.ProducerId == "k8s_executor" &&
req.Event.OutputResult.(*event.WorkflowExecutionEvent_Error).Error.Code == "ExecutionAborted" &&
req.Event.OutputResult.(*event.WorkflowExecutionEvent_Error).Error.Message == "Execution aborted"
})).Return(nil)

err := executor.Abort(context.TODO(), interfaces.AbortData{
Namespace: namespace,
ExecutionID: execID,
Cluster: clusterID,
})
assert.NoError(t, err)
mockEventWriter.AssertExpectations(t)
}

func TestAbort_Notfound(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
return k8_api_err.NewNotFound(schema.GroupResource{
return k8apierr.NewNotFound(schema.GroupResource{
Group: "foo",
Resource: "bar",
}, execID.Name)
Expand Down

0 comments on commit 7f16fd2

Please sign in to comment.