diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go index d941cc8309..7ed6b826b9 100644 --- a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go +++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go @@ -16,9 +16,8 @@ import ( "github.com/flyteorg/flyte/flytestdlib/storage" ) -var deletePropagationBackground = v1.DeletePropagationBackground - const defaultIdentifier = "DefaultK8sExecutor" +const AbortedWorkflowAnnotation = "workflow-aborted" // K8sWorkflowExecutor directly creates and delete Flyte workflow execution CRD objects using the configured execution // cluster interface. @@ -94,13 +93,27 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat if err != nil { return errors.NewFlyteAdminErrorf(codes.Internal, err.Error()) } - err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{ - PropagationPolicy: &deletePropagationBackground, - }) + + w, err := target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Get(ctx, data.ExecutionID.GetName(), v1.GetOptions{}) + if err != nil && !k8_api_err.IsNotFound(err) { + return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err) + } else if k8_api_err.IsNotFound(err) { + logger.Infof(ctx, "Trying to abort an execution [%+v] that is not found in cluster: %s, skipping...", data.ExecutionID, target.ID) + return nil + } + + if w.ObjectMeta.Annotations == nil { + w.ObjectMeta.Annotations = make(map[string]string) + } + + w.ObjectMeta.Annotations[AbortedWorkflowAnnotation] = "true" + _, err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Update(ctx, w, v1.UpdateOptions{}) + // An IsNotFound error indicates the resource is already deleted. if err != nil && !k8_api_err.IsNotFound(err) { return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err) } + return nil } diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go index a2ecb51364..d01af72224 100644 --- a/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go +++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go @@ -31,11 +31,13 @@ import ( var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{} type createCallback func(*v1alpha1.FlyteWorkflow, v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) -type deleteCallback func(name string, options *v1.DeleteOptions) error +type getCallback func(string, v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) +type updateCallback func(*v1alpha1.FlyteWorkflow, v1.UpdateOptions) (*v1alpha1.FlyteWorkflow, error) type FakeFlyteWorkflow struct { v1alpha12.FlyteWorkflowInterface createCallback createCallback - deleteCallback deleteCallback + getCallback getCallback + updateCallback updateCallback } func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) { @@ -45,11 +47,18 @@ func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkfl return nil, nil } -func (b *FakeFlyteWorkflow) Delete(ctx context.Context, name string, options v1.DeleteOptions) error { - if b.deleteCallback != nil { - return b.deleteCallback(name, &options) +func (b *FakeFlyteWorkflow) Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) { + if b.getCallback != nil { + return b.getCallback(name, opts) } - return nil + return nil, nil +} + +func (b *FakeFlyteWorkflow) Update(ctx context.Context, w *v1alpha1.FlyteWorkflow, options v1.UpdateOptions) (*v1alpha1.FlyteWorkflow, error) { + if b.updateCallback != nil { + return b.updateCallback(w, options) + } + return nil, nil } type flyteWorkflowsCallback func(string) v1alpha12.FlyteWorkflowInterface @@ -86,6 +95,9 @@ var execID = &core.WorkflowExecutionIdentifier{ } var flyteWf = &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: execID.Name, + }, ExecutionID: v1alpha1.ExecutionID{ WorkflowExecutionIdentifier: execID, }, @@ -280,10 +292,12 @@ func TestExecute_MiscError(t *testing.T) { func TestAbort(t *testing.T) { fakeFlyteWorkflow := FakeFlyteWorkflow{} - fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error { - assert.Equal(t, execID.Name, name) - assert.Equal(t, options.PropagationPolicy, &deletePropagationBackground) - return nil + fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) { + return flyteWf, nil + } + fakeFlyteWorkflow.updateCallback = func(w *v1alpha1.FlyteWorkflow, options v1.UpdateOptions) (*v1alpha1.FlyteWorkflow, error) { + assert.Equal(t, execID.Name, w.GetName()) + return nil, nil } fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface { assert.Equal(t, namespace, ns) @@ -302,8 +316,8 @@ func TestAbort(t *testing.T) { func TestAbort_Notfound(t *testing.T) { fakeFlyteWorkflow := FakeFlyteWorkflow{} - fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error { - return k8_api_err.NewNotFound(schema.GroupResource{ + fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) { + return nil, k8_api_err.NewNotFound(schema.GroupResource{ Group: "foo", Resource: "bar", }, execID.Name) @@ -325,8 +339,11 @@ func TestAbort_Notfound(t *testing.T) { func TestAbort_MiscError(t *testing.T) { fakeFlyteWorkflow := FakeFlyteWorkflow{} - fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error { - return errors.New("call failed") + fakeFlyteWorkflow.getCallback = func(name string, options v1.GetOptions) (*v1alpha1.FlyteWorkflow, error) { + return flyteWf, nil + } + fakeFlyteWorkflow.updateCallback = func(w *v1alpha1.FlyteWorkflow, options v1.UpdateOptions) (*v1alpha1.FlyteWorkflow, error) { + return nil, errors.New("call failed") } fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface { assert.Equal(t, namespace, ns) diff --git a/flytepropeller/pkg/controller/finalizer.go b/flytepropeller/pkg/controller/finalizer.go index f1a8ba8ebd..4bfc4a650a 100644 --- a/flytepropeller/pkg/controller/finalizer.go +++ b/flytepropeller/pkg/controller/finalizer.go @@ -1,6 +1,10 @@ package controller -import v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +import ( + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" +) const FinalizerKey = "flyte-finalizer" @@ -20,6 +24,12 @@ func IsDeleted(meta v1.Object) bool { return meta.GetDeletionTimestamp() != nil } +// Check if the workflow is annotated as aborted +func IsAborted(w *v1alpha1.FlyteWorkflow) bool { + annotations := w.GetObjectMeta().GetAnnotations() + return annotations != nil && annotations[AbortedWorkflowAnnotation] == "true" +} + // Reset all the finalizers on the object func ResetFinalizers(meta v1.Object) { meta.SetFinalizers([]string{}) diff --git a/flytepropeller/pkg/controller/handler.go b/flytepropeller/pkg/controller/handler.go index 49c2c21549..86f1895c9a 100644 --- a/flytepropeller/pkg/controller/handler.go +++ b/flytepropeller/pkg/controller/handler.go @@ -42,6 +42,8 @@ type propellerMetrics struct { RoundTime labeled.StopWatch } +const AbortedWorkflowAnnotation = "workflow-aborted" + func newPropellerMetrics(scope promutils.Scope) *propellerMetrics { roundScope := scope.NewSubScope("round") return &propellerMetrics{ @@ -103,7 +105,8 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F ctx = contextutils.WithResourceVersion(ctx, mutableW.GetResourceVersion()) maxRetries := uint32(p.cfg.MaxWorkflowRetries) - if IsDeleted(mutableW) || (mutableW.Status.FailedAttempts > maxRetries) { + + if IsAborted(mutableW) || IsDeleted(mutableW) || (mutableW.Status.FailedAttempts > maxRetries) { var err error func() { defer func() { diff --git a/flytepropeller/pkg/controller/handler_test.go b/flytepropeller/pkg/controller/handler_test.go index 3469c1da80..07e2d554f8 100644 --- a/flytepropeller/pkg/controller/handler_test.go +++ b/flytepropeller/pkg/controller/handler_test.go @@ -356,14 +356,13 @@ func TestPropeller_Handle(t *testing.T) { assert.True(t, abortCalled) }) - t.Run("deletedShouldBeFinalized", func(t *testing.T) { - n := v1.Now() + t.Run("abortedShouldBeFinalized", func(t *testing.T) { assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ ObjectMeta: v1.ObjectMeta{ - Name: name, - Namespace: namespace, - Finalizers: []string{"f1"}, - DeletionTimestamp: &n, + Name: name, + Namespace: namespace, + Finalizers: []string{"f1"}, + Annotations: map[string]string{AbortedWorkflowAnnotation: "true"}, }, WorkflowSpec: &v1alpha1.WorkflowSpec{ ID: "w1", @@ -393,6 +392,7 @@ func TestPropeller_Handle(t *testing.T) { Namespace: namespace, Finalizers: []string{"f1"}, DeletionTimestamp: &n, + Annotations: map[string]string{AbortedWorkflowAnnotation: "true"}, }, WorkflowSpec: &v1alpha1.WorkflowSpec{ ID: "w1",