Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Rafael Raposo <[email protected]>
  • Loading branch information
RRap0so committed Oct 6, 2024
1 parent 881d7a2 commit 51a325a
Show file tree
Hide file tree
Showing 14 changed files with 405 additions and 110 deletions.
1 change: 1 addition & 0 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,7 @@ func (m *ExecutionManager) TerminateExecution(

ExecutionID: request.Id,
Cluster: executionModel.Cluster,
Force: request.Force,
})
if err != nil {
m.systemMetrics.TerminateExecutionFailures.Inc()
Expand Down
64 changes: 64 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3450,6 +3450,7 @@ func TestTerminateExecution(t *testing.T) {
Domain: "domain",
Name: "name",
}, data.ExecutionID))
assert.True(t, data.Force)
return true
})).Return(nil)
mockExecutor.OnID().Return("customMockExecutor")
Expand All @@ -3473,6 +3474,69 @@ func TestTerminateExecution(t *testing.T) {
assert.NotNil(t, resp)
}

func TestForceTerminateExecution(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
startTime := time.Now()
executionGetFunc := makeExecutionGetFunc(t, []byte{}, &startTime)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)

abortCause := "abort cause"
updateExecutionFunc := func(
context context.Context, execution models.Execution) error {
assert.Equal(t, "project", execution.Project)
assert.Equal(t, "domain", execution.Domain)
assert.Equal(t, "name", execution.Name)
assert.Equal(t, uint(1), execution.LaunchPlanID)
assert.Equal(t, uint(2), execution.WorkflowID)
assert.Equal(t, core.WorkflowExecution_ABORTING.String(), execution.Phase)
assert.Equal(t, execution.ExecutionCreatedAt, execution.ExecutionUpdatedAt,
"an abort call should not change ExecutionUpdatedAt until a corresponding execution event is received")
assert.Equal(t, abortCause, execution.AbortCause)
assert.Equal(t, testCluster, execution.Cluster)

var unmarshaledClosure admin.ExecutionClosure
err := proto.Unmarshal(execution.Closure, &unmarshaledClosure)
assert.NoError(t, err)
assert.True(t, proto.Equal(&admin.AbortMetadata{
Cause: abortCause,
Principal: principal,
}, unmarshaledClosure.GetAbortMetadata()))
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)

mockExecutor := workflowengineMocks.WorkflowExecutor{}
mockExecutor.OnAbortMatch(mock.Anything, mock.MatchedBy(func(data workflowengineInterfaces.AbortData) bool {
assert.True(t, proto.Equal(&core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}, data.ExecutionID))
assert.True(t, data.Force)
return true
})).Return(nil)
mockExecutor.OnID().Return("customMockExecutor")
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &mockExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

identity, err := auth.NewIdentityContext("", principal, "", time.Now(), sets.NewString(), nil, nil)
assert.NoError(t, err)
ctx := identity.WithContext(context.Background())
resp, err := execManager.TerminateExecution(ctx, &admin.ExecutionTerminateRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
},
Cause: abortCause,
Force: true,
})

assert.Nil(t, err)
assert.NotNil(t, resp)
}

func TestTerminateExecution_PropellerError(t *testing.T) {
var expectedError = errors.New("expected error")

Expand Down
52 changes: 48 additions & 4 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

var deletePropagationBackground = v1.DeletePropagationBackground

const AbortedWorkflowAnnotation = "workflow-aborted"
const defaultIdentifier = "DefaultK8sExecutor"

// K8sWorkflowExecutor directly creates and delete Flyte workflow execution CRD objects using the configured execution
Expand Down Expand Up @@ -94,9 +94,53 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, err.Error())
}
err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{
PropagationPolicy: &deletePropagationBackground,
})

// Fetch the workflow
w, err := target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Get(ctx, data.ExecutionID.GetName(), v1.GetOptions{})
if err != nil && !k8_api_err.IsNotFound(err) {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)

} else if k8_api_err.IsNotFound(err) {
logger.Infof(ctx, "Trying to abort an execution [%+v] that is not found in cluster: %s, skipping...", data.ExecutionID, target.ID)
return nil
}

if data.Force {
logger.Debugf(ctx, "Force deleting execution [%+v] in cluster: %s", data.ExecutionID, target.ID)

// Remove finalizers if any
if w.Finalizers != nil || w.ObjectMeta.Finalizers != nil {
w.Finalizers = []string{}
}

// Write the updated workflow
if _, err := target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Update(ctx, w, v1.UpdateOptions{}); err != nil {
if k8_api_err.IsNotFound(err) {
logger.Debugf(ctx, "Trying to force delete an execution [%+v] that is not found in cluster: %s, skipping...", data.ExecutionID, target.ID)
return nil
}
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to remove finalizer for execution [%+v] in cluster: %s with err %v", data.ExecutionID, target.ID, err)
}

// Finally delete the workflow
if err := target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{
PropagationPolicy: &deletePropagationBackground,
}); err != nil && !k8_api_err.IsNotFound(err) {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)
}

return nil

}

if w.ObjectMeta.Annotations == nil {
w.ObjectMeta.Annotations = make(map[string]string)
}

w.ObjectMeta.Annotations[AbortedWorkflowAnnotation] = "true"

_, err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Update(ctx, w, v1.UpdateOptions{})

// An IsNotFound error indicates the resource is already deleted.
if err != nil && !k8_api_err.IsNotFound(err) {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)
Expand Down
139 changes: 135 additions & 4 deletions flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{}

type createCallback func(*v1alpha1.FlyteWorkflow, v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error)
type deleteCallback func(name string, options *v1.DeleteOptions) error
type updateCallback func(*v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error)
type getCallback func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error)
type FakeFlyteWorkflow struct {
v1alpha12.FlyteWorkflowInterface
createCallback createCallback
deleteCallback deleteCallback
getCallback getCallback
updateCallback updateCallback
}

func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) {
Expand All @@ -52,6 +56,20 @@ func (b *FakeFlyteWorkflow) Delete(ctx context.Context, name string, options v1.
return nil
}

func (b *FakeFlyteWorkflow) Get(ctx context.Context, name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) {
if b.getCallback != nil {
return b.getCallback(name, options)
}
return nil, nil
}

func (b *FakeFlyteWorkflow) Update(ctx context.Context, wf *v1alpha1.FlyteWorkflow, opts v1.UpdateOptions) (*v1alpha1.FlyteWorkflow, error) {
if b.updateCallback != nil {
return b.updateCallback(wf)
}
return nil, nil
}

type flyteWorkflowsCallback func(string) v1alpha12.FlyteWorkflowInterface

type FakeFlyteWorkflowV1alpha1 struct {
Expand Down Expand Up @@ -86,9 +104,10 @@ var execID = &core.WorkflowExecutionIdentifier{
}

var flyteWf = &v1alpha1.FlyteWorkflow{
ExecutionID: v1alpha1.ExecutionID{
WorkflowExecutionIdentifier: execID,
ObjectMeta: v1.ObjectMeta{
Finalizers: []string{"mock-finalizer"},
},
ExecutionID: v1alpha1.ExecutionID{WorkflowExecutionIdentifier: execID},
}

var testInputs = &core.LiteralMap{
Expand Down Expand Up @@ -278,8 +297,20 @@ func TestExecute_MiscError(t *testing.T) {
assert.EqualError(t, err, "failed to create workflow in propeller call failed")
}

func TestAbort(t *testing.T) {
func TestAbort_ForceTrue(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, execID.Name, name)
return flyteWf, nil
}

fakeFlyteWorkflow.updateCallback = func(flyteWorkflow *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, flyteWf, flyteWorkflow)
assert.Empty(t, flyteWorkflow.Finalizers)
return nil, nil
}


fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
assert.Equal(t, execID.Name, name)
assert.Equal(t, options.PropagationPolicy, &deletePropagationBackground)
Expand All @@ -296,18 +327,116 @@ func TestAbort(t *testing.T) {
Namespace: namespace,
ExecutionID: execID,
Cluster: clusterID,
Force: true,
})
assert.NoError(t, err)
}

func TestAbort_ForceFalse(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, execID.Name, name)
return flyteWf, nil
}

fakeFlyteWorkflow.updateCallback = func(flyteWorkflow *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, flyteWf, flyteWorkflow)
assert.Equal(t, flyteWorkflow.ObjectMeta.Annotations[AbortedWorkflowAnnotation], "true")
return nil, nil
}

fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
assert.Equal(t, namespace, ns)
return &fakeFlyteWorkflow
}
executor := K8sWorkflowExecutor{
executionCluster: getFakeExecutionCluster(),
}
err := executor.Abort(context.TODO(), interfaces.AbortData{
Namespace: namespace,
ExecutionID: execID,
Cluster: clusterID,
})
assert.NoError(t, err)
}

func TestAbort_NotfoundWhenGet(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}

fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) {
return nil, k8_api_err.NewNotFound(schema.GroupResource{
Group: "foo",
Resource: "bar",
}, execID.Name)
}
fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
assert.Equal(t, namespace, ns)
return &fakeFlyteWorkflow
}
executor := K8sWorkflowExecutor{
executionCluster: getFakeExecutionCluster(),
}
err := executor.Abort(context.TODO(), interfaces.AbortData{
Namespace: namespace,
ExecutionID: execID,
Cluster: clusterID,
})
assert.NoError(t, err)
}

func TestAbort_NotfoundWhenForceUpdate(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}

fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, execID.Name, name)
return flyteWf, nil
}

fakeFlyteWorkflow.updateCallback = func(flyteWorkflow *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) {
return nil, k8_api_err.NewNotFound(schema.GroupResource{
Group: "foo",
Resource: "bar",
}, execID.Name)
}


fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
assert.Equal(t, namespace, ns)
return &fakeFlyteWorkflow
}
executor := K8sWorkflowExecutor{
executionCluster: getFakeExecutionCluster(),
}
err := executor.Abort(context.TODO(), interfaces.AbortData{
Namespace: namespace,
ExecutionID: execID,
Cluster: clusterID,
Force: true,
})
assert.NoError(t, err)
}

func TestAbort_Notfound(t *testing.T) {
func TestAbort_NotfoundWhenForceDelete(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}

fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, execID.Name, name)
return flyteWf, nil
}

fakeFlyteWorkflow.updateCallback = func(flyteWorkflow *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) {
assert.Equal(t, flyteWf, flyteWorkflow)
return nil, nil
}

fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
return k8_api_err.NewNotFound(schema.GroupResource{
Group: "foo",
Resource: "bar",
}, execID.Name)
}


fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
assert.Equal(t, namespace, ns)
return &fakeFlyteWorkflow
Expand All @@ -319,10 +448,12 @@ func TestAbort_Notfound(t *testing.T) {
Namespace: namespace,
ExecutionID: execID,
Cluster: clusterID,
Force: true,
})
assert.NoError(t, err)
}


func TestAbort_MiscError(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/pkg/workflowengine/interfaces/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type AbortData struct {
ExecutionID *core.WorkflowExecutionIdentifier
// Cluster identifier where the execution was created
Cluster string
// Is force abort
Force bool
}

// WorkflowExecutor is a client interface used to create and delete Flyte workflow CRD objects.
Expand Down
8 changes: 8 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/admin/execution_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 51a325a

Please sign in to comment.