diff --git a/go.mod b/go.mod index 48e4211ea..3df76b029 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.10.0 - github.com/flyteorg/flyteidl v0.21.0 + github.com/flyteorg/flyteidl v0.21.1 github.com/flyteorg/flyteplugins v0.6.1 github.com/flyteorg/flytestdlib v0.3.34 github.com/ghodss/yaml v1.0.0 diff --git a/go.sum b/go.sum index 723576d50..e2f5fea66 100644 --- a/go.sum +++ b/go.sum @@ -229,8 +229,12 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= -github.com/flyteorg/flyteidl v0.21.0 h1:AwHNusfxJMfRRSDk2QWfb3aIlyLJrFWVGtpXCbCtJ5A= +github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.19.22 h1:e3M0Dob/r5n+AJfAByzad/svMAVes7XfZVxUNCi6AeQ= +github.com/flyteorg/flyteidl v0.19.22/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteidl v0.21.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.21.1 h1:wINIuKv+0xtTD0kR2RF99C5uGYuwflhY78iw+DkiY8o= +github.com/flyteorg/flyteidl v0.21.1/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteplugins v0.6.1 h1:Mq9uM/IN6fXHo03NlXSa+to2GHEom2NAcRWlr+bVH6g= github.com/flyteorg/flyteplugins v0.6.1/go.mod h1:rPzV/KS6h0BkgK0Z+CnO6JjY58tzUdYvDLMYS10IKG0= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index 24afc1115..18c4ae4a2 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -7,6 +7,7 @@ import ( "runtime/debug" "time" + eventsErr "github.com/flyteorg/flyteidl/clients/go/events/errors" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/contextutils" @@ -227,6 +228,28 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { ResetFinalizers(mutatedWf) } } + + // ExecutionNotFound error is returned when flyteadmin is missing the workflow. This is not + // a valid state unless we are experiencing a race condition where the workflow has not yet + // been inserted into the db (ie. workflow phase is WorkflowPhaseReady). + if err != nil && eventsErr.IsNotFound(err) && w.GetExecutionStatus().GetPhase() != v1alpha1.WorkflowPhaseReady { + t.Stop() + logger.Errorf(ctx, "Failed to process workflow, failing: %s", err) + + // We set the workflow status to failing to abort any active tasks in the next round. + mutableW := w.DeepCopy() + mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow execution is missing in flyteadmin, aborting", &core.ExecutionError{ + Kind: core.ExecutionError_SYSTEM, + Code: "ExecutionNotFound", + Message: "Workflow execution not found in flyteadmin.", + }) + if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil { + logger.Errorf(ctx, "Failed to record an ExecutionNotFound workflow as failed, reason: %s. Retrying...", e) + return e + } + return nil + } + // TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update // update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not diff --git a/pkg/controller/handler_test.go b/pkg/controller/handler_test.go index b42667340..dadfe8ddb 100644 --- a/pkg/controller/handler_test.go +++ b/pkg/controller/handler_test.go @@ -10,10 +10,12 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/mock" + eventErrors "github.com/flyteorg/flyteidl/clients/go/events/errors" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/flyteorg/flytepropeller/pkg/controller/config" + workflowErrors "github.com/flyteorg/flytepropeller/pkg/controller/workflow/errors" "github.com/flyteorg/flytepropeller/pkg/controller/workflowstore" "github.com/flyteorg/flytestdlib/promutils" @@ -460,6 +462,37 @@ func TestPropeller_Handle(t *testing.T) { assert.Equal(t, 0, len(r.Finalizers)) assert.True(t, HasCompletedLabel(r)) }) + + t.Run("failOnExecutionNotFoundError", func(t *testing.T) { + assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowSpec: &v1alpha1.WorkflowSpec{ + ID: "w1", + }, + Status: v1alpha1.WorkflowStatus{ + Phase: v1alpha1.WorkflowPhaseRunning, + FailedAttempts: 0, + }, + })) + exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error { + return workflowErrors.Wrapf(workflowErrors.EventRecordingError, "", + &eventErrors.EventError{ + Code: eventErrors.ExecutionNotFound, + Cause: nil, + Message: "The execution that the event belongs to does not exist", + }, "failed to transition phase") + } + assert.NoError(t, p.Handle(ctx, namespace, name)) + + r, err := s.Get(ctx, namespace, name) + assert.NoError(t, err) + assert.Equal(t, v1alpha1.WorkflowPhaseFailing, r.GetExecutionStatus().GetPhase()) + assert.Equal(t, 0, len(r.Finalizers)) + assert.False(t, HasCompletedLabel(r)) + }) } func TestPropeller_Handle_TurboMode(t *testing.T) { diff --git a/pkg/controller/nodes/errors/errors.go b/pkg/controller/nodes/errors/errors.go index 5db8196aa..69a1d63f9 100644 --- a/pkg/controller/nodes/errors/errors.go +++ b/pkg/controller/nodes/errors/errors.go @@ -23,11 +23,29 @@ func (n *NodeError) Error() string { return fmt.Sprintf("failed at Node[%s]. %v: %v", n.Node, n.ErrCode, n.Message) } +func (n *NodeError) Is(target error) bool { + t, ok := target.(*NodeError) + if !ok { + return false + } + if n == nil && t == nil { + return true + } + if n == nil || t == nil { + return false + } + return n.ErrCode == t.ErrCode && (n.Message == t.Message || t.Message == "") && (n.Node == t.Node || t.Node == "") +} + type NodeErrorWithCause struct { NodeError error cause error } +func (n *NodeErrorWithCause) Cause() error { + return n.cause +} + func (n *NodeErrorWithCause) Code() ErrorCode { if asNodeErr, casted := n.NodeError.(*NodeError); casted { return asNodeErr.Code() @@ -50,7 +68,21 @@ func (n *NodeErrorWithCause) Error() string { return fmt.Sprintf("%v, caused by: %v", nodeError, cause) } -func (n *NodeErrorWithCause) Cause() error { +func (n *NodeErrorWithCause) Is(target error) bool { + t, ok := target.(*NodeErrorWithCause) + if !ok { + return false + } + if n == nil && t == nil { + return true + } + if n == nil || t == nil { + return false + } + return n.Is(target) && (n.cause == t.cause || t.cause == nil) +} + +func (n *NodeErrorWithCause) Unwrap() error { return n.cause } diff --git a/pkg/controller/workflow/errors/errors.go b/pkg/controller/workflow/errors/errors.go index 4c7b9e8ec..b6695a0fb 100644 --- a/pkg/controller/workflow/errors/errors.go +++ b/pkg/controller/workflow/errors/errors.go @@ -21,16 +21,36 @@ func (w *WorkflowError) Error() string { return fmt.Sprintf("Workflow[%s] failed. %v: %v", w.Workflow, w.Code, w.Message) } +func (w *WorkflowError) Is(target error) bool { + t, ok := target.(*WorkflowError) + if !ok { + return false + } + return w.Code == t.Code +} + type WorkflowErrorWithCause struct { *WorkflowError cause error } +func (w *WorkflowErrorWithCause) Cause() error { + return w.cause +} + func (w *WorkflowErrorWithCause) Error() string { return fmt.Sprintf("%v, caused by: %v", w.WorkflowError.Error(), w.cause) } -func (w *WorkflowErrorWithCause) Cause() error { +func (w *WorkflowErrorWithCause) Is(target error) bool { + t, ok := target.(*WorkflowErrorWithCause) + if !ok { + return false + } + return w.Code == t.Code && (w.cause == t.cause || t.cause == nil) && (w.Message == t.Message || t.Message == "") && (w.Workflow == t.Workflow || t.Workflow == "") +} + +func (w *WorkflowErrorWithCause) Unwrap() error { return w.cause } diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index ccb7e4bd3..ce9a015e0 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -404,8 +404,10 @@ func (c *workflowExecutor) HandleFlyteWorkflow(ctx context.Context, w *v1alpha1. if err != nil { return err } - if err := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus); err != nil { - return err + failingErr := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus) + // Ignore ExecutionNotFound error to allow graceful failure + if failingErr != nil && !eventsErr.IsNotFound(failingErr) { + return failingErr } c.k8sRecorder.Event(w, corev1.EventTypeWarning, v1alpha1.WorkflowPhaseFailed.String(), "Workflow failed.") return nil @@ -414,8 +416,10 @@ func (c *workflowExecutor) HandleFlyteWorkflow(ctx context.Context, w *v1alpha1. if err != nil { return err } - if err := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus); err != nil { - return err + failureErr := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus) + // Ignore ExecutionNotFound error to allow graceful failure + if failureErr != nil && !eventsErr.IsNotFound(failureErr) { + return failureErr } c.k8sRecorder.Event(w, corev1.EventTypeWarning, v1alpha1.WorkflowPhaseFailed.String(), "Workflow failed.") return nil