Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into feature/cache-reservation-api
Browse files Browse the repository at this point in the history
  • Loading branch information
hamersaw committed Sep 23, 2021
2 parents b9f2ce8 + 525271e commit b62adfe
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 8 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.21.0
github.com/flyteorg/flyteidl v0.21.1
github.com/flyteorg/flyteplugins v0.6.1
github.com/flyteorg/flytestdlib v0.3.34
github.com/ghodss/yaml v1.0.0
Expand Down
6 changes: 5 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,12 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.21.0 h1:AwHNusfxJMfRRSDk2QWfb3aIlyLJrFWVGtpXCbCtJ5A=
github.com/flyteorg/flyteidl v0.19.2/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.19.22 h1:e3M0Dob/r5n+AJfAByzad/svMAVes7XfZVxUNCi6AeQ=
github.com/flyteorg/flyteidl v0.19.22/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.1 h1:wINIuKv+0xtTD0kR2RF99C5uGYuwflhY78iw+DkiY8o=
github.com/flyteorg/flyteidl v0.21.1/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteplugins v0.6.1 h1:Mq9uM/IN6fXHo03NlXSa+to2GHEom2NAcRWlr+bVH6g=
github.com/flyteorg/flyteplugins v0.6.1/go.mod h1:rPzV/KS6h0BkgK0Z+CnO6JjY58tzUdYvDLMYS10IKG0=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
Expand Down
23 changes: 23 additions & 0 deletions pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"runtime/debug"
"time"

eventsErr "github.com/flyteorg/flyteidl/clients/go/events/errors"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flytestdlib/contextutils"
Expand Down Expand Up @@ -227,6 +228,28 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
ResetFinalizers(mutatedWf)
}
}

// ExecutionNotFound error is returned when flyteadmin is missing the workflow. This is not
// a valid state unless we are experiencing a race condition where the workflow has not yet
// been inserted into the db (ie. workflow phase is WorkflowPhaseReady).
if err != nil && eventsErr.IsNotFound(err) && w.GetExecutionStatus().GetPhase() != v1alpha1.WorkflowPhaseReady {
t.Stop()
logger.Errorf(ctx, "Failed to process workflow, failing: %s", err)

// We set the workflow status to failing to abort any active tasks in the next round.
mutableW := w.DeepCopy()
mutableW.Status.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "Workflow execution is missing in flyteadmin, aborting", &core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: "ExecutionNotFound",
Message: "Workflow execution not found in flyteadmin.",
})
if _, e := p.wfStore.Update(ctx, mutableW, workflowstore.PriorityClassCritical); e != nil {
logger.Errorf(ctx, "Failed to record an ExecutionNotFound workflow as failed, reason: %s. Retrying...", e)
return e
}
return nil
}

// TODO we will need to call updatestatus when it is supported. But to preserve metadata like (label/finalizer) we will need to use update

// update the GetExecutionStatus block of the FlyteWorkflow resource. UpdateStatus will not
Expand Down
33 changes: 33 additions & 0 deletions pkg/controller/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"

eventErrors "github.com/flyteorg/flyteidl/clients/go/events/errors"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flyteorg/flytepropeller/pkg/controller/config"
workflowErrors "github.com/flyteorg/flytepropeller/pkg/controller/workflow/errors"
"github.com/flyteorg/flytepropeller/pkg/controller/workflowstore"

"github.com/flyteorg/flytestdlib/promutils"
Expand Down Expand Up @@ -460,6 +462,37 @@ func TestPropeller_Handle(t *testing.T) {
assert.Equal(t, 0, len(r.Finalizers))
assert.True(t, HasCompletedLabel(r))
})

t.Run("failOnExecutionNotFoundError", func(t *testing.T) {
assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: namespace,
},
WorkflowSpec: &v1alpha1.WorkflowSpec{
ID: "w1",
},
Status: v1alpha1.WorkflowStatus{
Phase: v1alpha1.WorkflowPhaseRunning,
FailedAttempts: 0,
},
}))
exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error {
return workflowErrors.Wrapf(workflowErrors.EventRecordingError, "",
&eventErrors.EventError{
Code: eventErrors.ExecutionNotFound,
Cause: nil,
Message: "The execution that the event belongs to does not exist",
}, "failed to transition phase")
}
assert.NoError(t, p.Handle(ctx, namespace, name))

r, err := s.Get(ctx, namespace, name)
assert.NoError(t, err)
assert.Equal(t, v1alpha1.WorkflowPhaseFailing, r.GetExecutionStatus().GetPhase())
assert.Equal(t, 0, len(r.Finalizers))
assert.False(t, HasCompletedLabel(r))
})
}

func TestPropeller_Handle_TurboMode(t *testing.T) {
Expand Down
34 changes: 33 additions & 1 deletion pkg/controller/nodes/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,29 @@ func (n *NodeError) Error() string {
return fmt.Sprintf("failed at Node[%s]. %v: %v", n.Node, n.ErrCode, n.Message)
}

func (n *NodeError) Is(target error) bool {
t, ok := target.(*NodeError)
if !ok {
return false
}
if n == nil && t == nil {
return true
}
if n == nil || t == nil {
return false
}
return n.ErrCode == t.ErrCode && (n.Message == t.Message || t.Message == "") && (n.Node == t.Node || t.Node == "")
}

type NodeErrorWithCause struct {
NodeError error
cause error
}

func (n *NodeErrorWithCause) Cause() error {
return n.cause
}

func (n *NodeErrorWithCause) Code() ErrorCode {
if asNodeErr, casted := n.NodeError.(*NodeError); casted {
return asNodeErr.Code()
Expand All @@ -50,7 +68,21 @@ func (n *NodeErrorWithCause) Error() string {
return fmt.Sprintf("%v, caused by: %v", nodeError, cause)
}

func (n *NodeErrorWithCause) Cause() error {
func (n *NodeErrorWithCause) Is(target error) bool {
t, ok := target.(*NodeErrorWithCause)
if !ok {
return false
}
if n == nil && t == nil {
return true
}
if n == nil || t == nil {
return false
}
return n.Is(target) && (n.cause == t.cause || t.cause == nil)
}

func (n *NodeErrorWithCause) Unwrap() error {
return n.cause
}

Expand Down
22 changes: 21 additions & 1 deletion pkg/controller/workflow/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,36 @@ func (w *WorkflowError) Error() string {
return fmt.Sprintf("Workflow[%s] failed. %v: %v", w.Workflow, w.Code, w.Message)
}

func (w *WorkflowError) Is(target error) bool {
t, ok := target.(*WorkflowError)
if !ok {
return false
}
return w.Code == t.Code
}

type WorkflowErrorWithCause struct {
*WorkflowError
cause error
}

func (w *WorkflowErrorWithCause) Cause() error {
return w.cause
}

func (w *WorkflowErrorWithCause) Error() string {
return fmt.Sprintf("%v, caused by: %v", w.WorkflowError.Error(), w.cause)
}

func (w *WorkflowErrorWithCause) Cause() error {
func (w *WorkflowErrorWithCause) Is(target error) bool {
t, ok := target.(*WorkflowErrorWithCause)
if !ok {
return false
}
return w.Code == t.Code && (w.cause == t.cause || t.cause == nil) && (w.Message == t.Message || t.Message == "") && (w.Workflow == t.Workflow || t.Workflow == "")
}

func (w *WorkflowErrorWithCause) Unwrap() error {
return w.cause
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,10 @@ func (c *workflowExecutor) HandleFlyteWorkflow(ctx context.Context, w *v1alpha1.
if err != nil {
return err
}
if err := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus); err != nil {
return err
failingErr := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus)
// Ignore ExecutionNotFound error to allow graceful failure
if failingErr != nil && !eventsErr.IsNotFound(failingErr) {
return failingErr
}
c.k8sRecorder.Event(w, corev1.EventTypeWarning, v1alpha1.WorkflowPhaseFailed.String(), "Workflow failed.")
return nil
Expand All @@ -414,8 +416,10 @@ func (c *workflowExecutor) HandleFlyteWorkflow(ctx context.Context, w *v1alpha1.
if err != nil {
return err
}
if err := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus); err != nil {
return err
failureErr := c.TransitionToPhase(ctx, w.ExecutionID.WorkflowExecutionIdentifier, wStatus, newStatus)
// Ignore ExecutionNotFound error to allow graceful failure
if failureErr != nil && !eventsErr.IsNotFound(failureErr) {
return failureErr
}
c.k8sRecorder.Event(w, corev1.EventTypeWarning, v1alpha1.WorkflowPhaseFailed.String(), "Workflow failed.")
return nil
Expand Down

0 comments on commit b62adfe

Please sign in to comment.