diff --git a/flyteplugins/go/tasks/pluginmachinery/core/phase.go b/flyteplugins/go/tasks/pluginmachinery/core/phase.go index 6c80cc4d24..df4f5dc402 100644 --- a/flyteplugins/go/tasks/pluginmachinery/core/phase.go +++ b/flyteplugins/go/tasks/pluginmachinery/core/phase.go @@ -292,6 +292,10 @@ func PhaseInfoSystemRetryableFailureWithCleanup(code, reason string, info *TaskI return phaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info, true) } +func PhaseInfoAborted(code, reason string, info *TaskInfo) PhaseInfo { + return phaseInfoFailed(PhaseAborted, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, false) +} + // Creates a new PhaseInfo with phase set to PhaseWaitingForCache func PhaseInfoWaitingForCache(version uint32, info *TaskInfo) PhaseInfo { return phaseInfo(PhaseWaitingForCache, version, nil, info, false) diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go index 7569abd90e..bb879ff230 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go @@ -157,6 +157,8 @@ func ToPluginPhase(s core.Phase) (Phase, error) { return PhaseResourcesCreated, nil case core.PhaseSuccess: return PhaseSucceeded, nil + case core.PhaseAborted: + fallthrough case core.PhasePermanentFailure: fallthrough case core.PhaseRetryableFailure: diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go index 99a83aeb3b..add5a65e70 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go @@ -280,7 +280,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase } return core.PhaseInfoSuccess(taskInfo), nil case flyteIdl.TaskExecution_ABORTED: - return core.PhaseInfoFailure(pluginErrors.TaskFailedWithError, "failed to run the job with aborted phase.\n"+resource.Message, taskInfo), nil + return core.PhaseInfoAborted(pluginErrors.TaskFailedWithError, "failed to run the job with aborted phase.\n"+resource.Message, taskInfo), nil case flyteIdl.TaskExecution_FAILED: return core.PhaseInfoFailure(pluginErrors.TaskFailedWithError, "failed to run the job.\n"+resource.Message, taskInfo), nil } diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go index 19af85eed3..4082293791 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go @@ -250,7 +250,7 @@ func TestPlugin(t *testing.T) { phase, err := plugin.Status(context.Background(), taskContext) assert.NoError(t, err) - assert.Equal(t, pluginsCore.PhasePermanentFailure, phase.Phase()) + assert.Equal(t, pluginsCore.PhaseAborted, phase.Phase()) }) t.Run("test TaskExecution_FAILED Status", func(t *testing.T) { diff --git a/flytepropeller/cmd/kubectl-flyte/cmd/printers/node.go b/flytepropeller/cmd/kubectl-flyte/cmd/printers/node.go index aaaad8282b..7367ebdeb1 100644 --- a/flytepropeller/cmd/kubectl-flyte/cmd/printers/node.go +++ b/flytepropeller/cmd/kubectl-flyte/cmd/printers/node.go @@ -30,6 +30,8 @@ func ColorizeNodePhase(p v1alpha1.NodePhase) string { return color.HiRedString("%s", p.String()) case v1alpha1.NodePhaseFailed: return color.HiRedString("%s", p.String()) + case v1alpha1.NodePhaseAborted: + return color.HiRedString("%s", p.String()) } return color.CyanString("%s", p.String()) } diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go index f92cca4a5a..f64879d69d 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -67,6 +67,8 @@ const ( NodePhaseTimedOut NodePhaseDynamicRunning NodePhaseRecovered + NodePhaseAborting + NodePhaseAborted ) func (p NodePhase) String() string { @@ -97,6 +99,10 @@ func (p NodePhase) String() string { return "DynamicRunning" case NodePhaseRecovered: return "NodePhaseRecovered" + case NodePhaseAborting: + return "Aborting" + case NodePhaseAborted: + return "Aborted" } return "Unknown" diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index aab034224d..760fa1317c 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -558,7 +558,7 @@ func (in *NodeStatus) GetMessage() string { } func IsPhaseTerminal(phase NodePhase) bool { - return phase == NodePhaseSucceeded || phase == NodePhaseFailed || phase == NodePhaseSkipped || phase == NodePhaseTimedOut || phase == NodePhaseRecovered + return phase == NodePhaseSucceeded || phase == NodePhaseFailed || phase == NodePhaseSkipped || phase == NodePhaseTimedOut || phase == NodePhaseRecovered || phase == NodePhaseAborted } func (in *NodeStatus) GetOrCreateTaskStatus() MutableTaskNodeStatus { @@ -765,7 +765,7 @@ func (in *NodeStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID) Exe } func (in *NodeStatus) IsTerminated() bool { - return in.GetPhase() == NodePhaseFailed || in.GetPhase() == NodePhaseSkipped || in.GetPhase() == NodePhaseSucceeded || in.GetPhase() == NodePhaseRecovered + return in.GetPhase() == NodePhaseFailed || in.GetPhase() == NodePhaseSkipped || in.GetPhase() == NodePhaseSucceeded || in.GetPhase() == NodePhaseRecovered || in.GetPhase() == NodePhaseAborted } func (in *NodeStatus) GetDataDir() DataReference { @@ -795,7 +795,7 @@ func (in *NodeStatus) Equals(other *NodeStatus) bool { } if in.Phase == other.Phase { - if in.Phase == NodePhaseSucceeded || in.Phase == NodePhaseFailed { + if in.Phase == NodePhaseSucceeded || in.Phase == NodePhaseFailed || in.Phase == NodePhaseAborted { return true } } diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 0f9e95f19b..86db1de4aa 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -375,9 +375,9 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu switch nodePhase { case v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseRecovered, v1alpha1.NodePhaseSkipped: successCount++ - case v1alpha1.NodePhaseFailing: + case v1alpha1.NodePhaseFailing, v1alpha1.NodePhaseAborting: failingCount++ - case v1alpha1.NodePhaseFailed, v1alpha1.NodePhaseTimedOut: + case v1alpha1.NodePhaseFailed, v1alpha1.NodePhaseAborted, v1alpha1.NodePhaseTimedOut: failedCount++ default: runningCount++ diff --git a/flytepropeller/pkg/controller/nodes/array/utils.go b/flytepropeller/pkg/controller/nodes/array/utils.go index 34e304b662..be86cfad91 100644 --- a/flytepropeller/pkg/controller/nodes/array/utils.go +++ b/flytepropeller/pkg/controller/nodes/array/utils.go @@ -84,5 +84,5 @@ func inferParallelism(ctx context.Context, parallelism *uint32, parallelismBehav func isTerminalNodePhase(nodePhase v1alpha1.NodePhase) bool { return nodePhase == v1alpha1.NodePhaseSucceeded || nodePhase == v1alpha1.NodePhaseFailed || nodePhase == v1alpha1.NodePhaseTimedOut || - nodePhase == v1alpha1.NodePhaseSkipped || nodePhase == v1alpha1.NodePhaseRecovered + nodePhase == v1alpha1.NodePhaseSkipped || nodePhase == v1alpha1.NodePhaseRecovered || nodePhase == v1alpha1.NodePhaseAborted } diff --git a/flytepropeller/pkg/controller/nodes/branch/handler.go b/flytepropeller/pkg/controller/nodes/branch/handler.go index d2a4fcfa68..01fb718831 100644 --- a/flytepropeller/pkg/controller/nodes/branch/handler.go +++ b/flytepropeller/pkg/controller/nodes/branch/handler.go @@ -163,6 +163,10 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx interfaces.N return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(downstreamStatus.Err, nil)), nil } + if downstreamStatus.HasAborted() { + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoAbortErr(downstreamStatus.Err, nil)), nil + } + phase := handler.PhaseInfoRunning(nil) return handler.DoTransition(handler.TransitionTypeEphemeral, phase), nil } diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 1c42357623..248a0f835d 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -135,6 +135,7 @@ func canHandleNode(phase v1alpha1.NodePhase) bool { phase == v1alpha1.NodePhaseQueued || phase == v1alpha1.NodePhaseRunning || phase == v1alpha1.NodePhaseFailing || + phase == v1alpha1.NodePhaseAborting || phase == v1alpha1.NodePhaseTimingOut || phase == v1alpha1.NodePhaseRetryableFailure || phase == v1alpha1.NodePhaseSucceeding || @@ -247,6 +248,14 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo } return interfaces.NodeStatusFailed(nodeStatus.GetExecutionError()), nil + } else if nodePhase == v1alpha1.NodePhaseAborted { + logger.Debugf(currentNodeCtx, "Node has aborted, traversing downstream.") + _, err := c.handleDownstream(ctx, execContext, dag, nl, currentNode) + if err != nil { + return interfaces.NodeStatusUndefined, err + } + + return interfaces.NodeStatusAborted(nodeStatus.GetExecutionError()), nil } else if nodePhase == v1alpha1.NodePhaseTimedOut { logger.Debugf(currentNodeCtx, "Node has timed out, traversing downstream.") _, err := c.handleDownstream(ctx, execContext, dag, nl, currentNode) @@ -312,6 +321,10 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex } else { return state, nil } + } else if state.HasAborted() { + logger.Debugf(ctx, "Some downstream node has aborted. Error: [%s]", state.Err) + // Ignore WorkflowOnFailurePolicy if a downstream node has aborted + return state, nil } else if !state.IsComplete() { // A Failed/Timedout node is implicitly considered "complete" this means none of the downstream nodes from // that node will ever be allowed to run. @@ -1190,7 +1203,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter } // When a node fails, we fail the workflow. Independent of number of nodes succeeding/failing, whenever a first node fails, // the entire workflow is failed. - if np == v1alpha1.NodePhaseFailing { + if np == v1alpha1.NodePhaseFailing || np == v1alpha1.NodePhaseAborting { if execErr.GetKind() == core.ExecutionError_SYSTEM { nodeStatus.IncrementSystemFailures() c.metrics.PermanentSystemErrorDuration.Observe(ctx, startTime, endTime) @@ -1208,6 +1221,12 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter finalStatus = interfaces.NodeStatusFailed(p.GetErr()) } + if np == v1alpha1.NodePhaseAborting && !h.FinalizeRequired() { + logger.Infof(ctx, "Finalize not required, moving node to Aborted") + np = v1alpha1.NodePhaseAborted + finalStatus = interfaces.NodeStatusAborted(p.GetErr()) + } + if np == v1alpha1.NodePhaseTimingOut && !h.FinalizeRequired() { logger.Infof(ctx, "Finalize not required, moving node to TimedOut") np = v1alpha1.NodePhaseTimedOut @@ -1348,6 +1367,25 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur return interfaces.NodeStatusFailed(nodeStatus.GetExecutionError()), nil } + if currentPhase == v1alpha1.NodePhaseAborting { + logger.Debugf(ctx, "node aborting") + if err := c.Abort(ctx, h, nCtx, "node aborting", false); err != nil { + return interfaces.NodeStatusUndefined, err + } + t := metav1.Now() + + startedAt := nodeStatus.GetStartedAt() + if startedAt == nil { + startedAt = &t + } + nodeStatus.UpdatePhase(v1alpha1.NodePhaseAborted, t, nodeStatus.GetMessage(), c.enableCRDebugMetadata, nodeStatus.GetExecutionError()) + c.metrics.FailureDuration.Observe(ctx, startedAt.Time, nodeStatus.GetStoppedAt().Time) + if nCtx.NodeExecutionMetadata().IsInterruptible() { + c.metrics.InterruptibleNodesTerminated.Inc(ctx) + } + return interfaces.NodeStatusAborted(nodeStatus.GetExecutionError()), nil + } + if currentPhase == v1alpha1.NodePhaseTimingOut { logger.Debugf(ctx, "node timing out") if err := c.Abort(ctx, h, nCtx, "node timed out", false); err != nil { @@ -1394,6 +1432,11 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur return interfaces.NodeStatusFailed(nodeStatus.GetExecutionError()), nil } + if currentPhase == v1alpha1.NodePhaseAborted { + // This should never happen + return interfaces.NodeStatusAborted(nodeStatus.GetExecutionError()), nil + } + return c.handleQueuedOrRunningNode(ctx, nCtx, h) } diff --git a/flytepropeller/pkg/controller/nodes/handler/transition_info.go b/flytepropeller/pkg/controller/nodes/handler/transition_info.go index c9af525cca..fb07769545 100644 --- a/flytepropeller/pkg/controller/nodes/handler/transition_info.go +++ b/flytepropeller/pkg/controller/nodes/handler/transition_info.go @@ -25,10 +25,11 @@ const ( EPhaseFailing EPhaseDynamicRunning EPhaseRecovered + EPhaseAborted ) func (p EPhase) IsTerminal() bool { - if p == EPhaseFailed || p == EPhaseSuccess || p == EPhaseSkip || p == EPhaseTimedout || p == EPhaseRecovered { + if p == EPhaseFailed || p == EPhaseSuccess || p == EPhaseSkip || p == EPhaseTimedout || p == EPhaseRecovered || p == EPhaseAborted { return true } return false @@ -184,6 +185,10 @@ func PhaseInfoFailureErr(err *core.ExecutionError, info *ExecutionInfo) PhaseInf return phaseInfoFailed(EPhaseFailed, err, info) } +func PhaseInfoAbortErr(err *core.ExecutionError, info *ExecutionInfo) PhaseInfo { + return phaseInfoFailed(EPhaseAborted, err, info) +} + func PhaseInfoFailingErr(err *core.ExecutionError, info *ExecutionInfo) PhaseInfo { return phaseInfoFailed(EPhaseFailing, err, info) } diff --git a/flytepropeller/pkg/controller/nodes/interfaces/node.go b/flytepropeller/pkg/controller/nodes/interfaces/node.go index fd9c12ebb7..27f59a7b69 100644 --- a/flytepropeller/pkg/controller/nodes/interfaces/node.go +++ b/flytepropeller/pkg/controller/nodes/interfaces/node.go @@ -37,6 +37,7 @@ const ( NodePhaseTimedOut // Node recovered from a prior execution. NodePhaseRecovered + NodePhaseAborted ) func (p NodePhase) String() string { @@ -115,6 +116,10 @@ func (n *NodeStatus) HasFailed() bool { return n.NodePhase == NodePhaseFailed } +func (n *NodeStatus) HasAborted() bool { + return n.NodePhase == NodePhaseAborted +} + func (n *NodeStatus) HasTimedOut() bool { return n.NodePhase == NodePhaseTimedOut } @@ -135,3 +140,7 @@ var NodeStatusRecovered = NodeStatus{NodePhase: NodePhaseRecovered} func NodeStatusFailed(err *core.ExecutionError) NodeStatus { return NodeStatus{NodePhase: NodePhaseFailed, Err: err} } + +func NodeStatusAborted(err *core.ExecutionError) NodeStatus { + return NodeStatus{NodePhase: NodePhaseAborted, Err: err} +} diff --git a/flytepropeller/pkg/controller/nodes/predicate.go b/flytepropeller/pkg/controller/nodes/predicate.go index 5ec6c26d41..938233b90b 100644 --- a/flytepropeller/pkg/controller/nodes/predicate.go +++ b/flytepropeller/pkg/controller/nodes/predicate.go @@ -94,6 +94,7 @@ func CanExecute(ctx context.Context, dag executors.DAGStructure, nl executors.No if upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseSkipped || upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseFailed || + upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseAborted || upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseTimedOut { skipped = true } else if !(upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseSucceeded || diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go index d71f0b7e20..9e24cc97ed 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go @@ -57,6 +57,10 @@ func (s *subworkflowHandler) startAndHandleSubWorkflow(ctx context.Context, nCtx return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(startStatus.Err, nil)), nil } + if startStatus.HasAborted() { + return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoAbortErr(startStatus.Err, nil)), nil + } + return s.handleSubWorkflow(ctx, nCtx, subWorkflow, nl) } diff --git a/flytepropeller/pkg/controller/nodes/task/fakeplugins/next_phase_state_plugin.go b/flytepropeller/pkg/controller/nodes/task/fakeplugins/next_phase_state_plugin.go index 1bf2c5bb41..6e7fbd67ed 100644 --- a/flytepropeller/pkg/controller/nodes/task/fakeplugins/next_phase_state_plugin.go +++ b/flytepropeller/pkg/controller/nodes/task/fakeplugins/next_phase_state_plugin.go @@ -64,6 +64,8 @@ func (n NextPhaseStatePlugin) Handle(ctx context.Context, tCtx pluginCore.TaskEx return pluginCore.DoTransition(pluginCore.PhaseInfoFailure("failed", "message", s.TaskInfo)), nil case pluginCore.PhaseRetryableFailure: return pluginCore.DoTransition(pluginCore.PhaseInfoRetryableFailure("failed", "message", s.TaskInfo)), nil + case pluginCore.PhaseAborted: + return pluginCore.DoTransition(pluginCore.PhaseInfoAborted("aborted", "message", s.TaskInfo)), nil case pluginCore.PhaseNotReady: return pluginCore.DoTransition(pluginCore.PhaseInfoNotReady(time.Now(), s.PhaseVersion, "not-ready")), nil case pluginCore.PhaseInitializing: diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 5e4139296f..0c1c944563 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -166,6 +166,9 @@ func (p *pluginRequestedTransition) FinalTransition(ctx context.Context) (handle case pluginCore.PhasePermanentFailure: logger.Debugf(ctx, "Transitioning to Failure") return handler.DoTransition(p.ttype, handler.PhaseInfoFailureErr(p.pInfo.Err(), &p.execInfo)), nil + case pluginCore.PhaseAborted: + logger.Debugf(ctx, "Transitioning to Abort") + return handler.DoTransition(p.ttype, handler.PhaseInfoAbortErr(p.pInfo.Err(), &p.execInfo)), nil case pluginCore.PhaseUndefined: return handler.UnknownTransition, fmt.Errorf("error converting plugin phase, received [Undefined]") } @@ -504,6 +507,8 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), }) } + case pluginCore.PhaseAborted: + fallthrough case pluginCore.PhaseRetryableFailure: fallthrough case pluginCore.PhasePermanentFailure: diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index 1c911b44f3..97e4f51e1f 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -67,6 +67,8 @@ func ToNodeExecEventPhase(p handler.EPhase) core.NodeExecution_Phase { return core.NodeExecution_SUCCEEDED case handler.EPhaseFailed: return core.NodeExecution_FAILED + case handler.EPhaseAborted: + return core.NodeExecution_ABORTED case handler.EPhaseRecovered: return core.NodeExecution_RECOVERED case handler.EPhaseTimedout: @@ -220,6 +222,8 @@ func ToNodePhase(p handler.EPhase) (v1alpha1.NodePhase, error) { return v1alpha1.NodePhaseSucceeding, nil case handler.EPhaseFailed: return v1alpha1.NodePhaseFailing, nil + case handler.EPhaseAborted: + return v1alpha1.NodePhaseAborting, nil case handler.EPhaseTimedout: return v1alpha1.NodePhaseTimingOut, nil case handler.EPhaseRecovered: diff --git a/flytepropeller/pkg/controller/workflow/executor.go b/flytepropeller/pkg/controller/workflow/executor.go index c2f7a35ebe..05f414f837 100644 --- a/flytepropeller/pkg/controller/workflow/executor.go +++ b/flytepropeller/pkg/controller/workflow/executor.go @@ -59,6 +59,10 @@ func StatusFailed(err *core.ExecutionError) Status { return Status{TransitionToPhase: v1alpha1.WorkflowPhaseFailed, Err: err} } +func StatusAborted(err *core.ExecutionError) Status { + return Status{TransitionToPhase: v1alpha1.WorkflowPhaseAborted, Err: err} +} + type workflowExecutor struct { enqueueWorkflow v1alpha1.EnqueueWorkflow store *storage.DataStore @@ -134,6 +138,9 @@ func (c *workflowExecutor) handleReadyWorkflow(ctx context.Context, w *v1alpha1. if s.HasFailed() { return StatusFailing(s.Err), nil } + if s.HasAborted() { + return StatusAborted(s.Err), nil + } return StatusRunning, nil } @@ -168,6 +175,10 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha logger.Infof(ctx, "Workflow has failed. Error [%s]", state.Err.String()) return StatusFailing(state.Err), nil } + if state.HasAborted() { + logger.Infof(ctx, "Workflow has aborted. Error [%s]", state.Err.String()) + return StatusAborted(state.Err), nil + } if state.HasTimedOut() { return StatusFailing(&core.ExecutionError{ Kind: core.ExecutionError_USER, @@ -200,6 +211,8 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl switch state.NodePhase { case interfaces.NodePhaseFailed: return StatusFailed(state.Err), nil + case interfaces.NodePhaseAborted: + return StatusAborted(state.Err), nil case interfaces.NodePhaseTimedOut: return StatusFailed(&core.ExecutionError{ Kind: core.ExecutionError_USER,