Skip to content

Commit

Permalink
[flytepropeller] Better handling for task aborts
Browse files Browse the repository at this point in the history
Signed-off-by: Po Cheung <[email protected]>
  • Loading branch information
pocheung1 committed Jul 17, 2024
1 parent bba8c11 commit e2bcf69
Show file tree
Hide file tree
Showing 19 changed files with 114 additions and 10 deletions.
4 changes: 4 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ func PhaseInfoSystemRetryableFailureWithCleanup(code, reason string, info *TaskI
return phaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info, true)
}

func PhaseInfoAborted(code, reason string, info *TaskInfo) PhaseInfo {
return phaseInfoFailed(PhaseAborted, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, false)

Check warning on line 296 in flyteplugins/go/tasks/pluginmachinery/core/phase.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/core/phase.go#L295-L296

Added lines #L295 - L296 were not covered by tests
}

// Creates a new PhaseInfo with phase set to PhaseWaitingForCache
func PhaseInfoWaitingForCache(version uint32, info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseWaitingForCache, version, nil, info, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ func ToPluginPhase(s core.Phase) (Phase, error) {
return PhaseResourcesCreated, nil
case core.PhaseSuccess:
return PhaseSucceeded, nil
case core.PhaseAborted:
fallthrough

Check warning on line 161 in flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go#L160-L161

Added lines #L160 - L161 were not covered by tests
case core.PhasePermanentFailure:
fallthrough
case core.PhaseRetryableFailure:
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (p Plugin) Status(ctx context.Context, taskCtx webapi.StatusContext) (phase
}
return core.PhaseInfoSuccess(taskInfo), nil
case flyteIdl.TaskExecution_ABORTED:
return core.PhaseInfoFailure(pluginErrors.TaskFailedWithError, "failed to run the job with aborted phase.\n"+resource.Message, taskInfo), nil
return core.PhaseInfoAborted(pluginErrors.TaskFailedWithError, "failed to run the job with aborted phase.\n"+resource.Message, taskInfo), nil

Check warning on line 283 in flyteplugins/go/tasks/plugins/webapi/agent/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/webapi/agent/plugin.go#L283

Added line #L283 was not covered by tests
case flyteIdl.TaskExecution_FAILED:
return core.PhaseInfoFailure(pluginErrors.TaskFailedWithError, "failed to run the job.\n"+resource.Message, taskInfo), nil
}
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/webapi/agent/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func TestPlugin(t *testing.T) {

phase, err := plugin.Status(context.Background(), taskContext)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhasePermanentFailure, phase.Phase())
assert.Equal(t, pluginsCore.PhaseAborted, phase.Phase())
})

t.Run("test TaskExecution_FAILED Status", func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/cmd/kubectl-flyte/cmd/printers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func ColorizeNodePhase(p v1alpha1.NodePhase) string {
return color.HiRedString("%s", p.String())
case v1alpha1.NodePhaseFailed:
return color.HiRedString("%s", p.String())
case v1alpha1.NodePhaseAborted:
return color.HiRedString("%s", p.String())
}
return color.CyanString("%s", p.String())
}
Expand Down
6 changes: 6 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
NodePhaseTimedOut
NodePhaseDynamicRunning
NodePhaseRecovered
NodePhaseAborting
NodePhaseAborted
)

func (p NodePhase) String() string {
Expand Down Expand Up @@ -97,6 +99,10 @@ func (p NodePhase) String() string {
return "DynamicRunning"
case NodePhaseRecovered:
return "NodePhaseRecovered"
case NodePhaseAborting:
return "Aborting"

Check warning on line 103 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go#L102-L103

Added lines #L102 - L103 were not covered by tests
case NodePhaseAborted:
return "Aborted"
}

return "Unknown"
Expand Down
6 changes: 3 additions & 3 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ func (in *NodeStatus) GetMessage() string {
}

func IsPhaseTerminal(phase NodePhase) bool {
return phase == NodePhaseSucceeded || phase == NodePhaseFailed || phase == NodePhaseSkipped || phase == NodePhaseTimedOut || phase == NodePhaseRecovered
return phase == NodePhaseSucceeded || phase == NodePhaseFailed || phase == NodePhaseSkipped || phase == NodePhaseTimedOut || phase == NodePhaseRecovered || phase == NodePhaseAborted
}

func (in *NodeStatus) GetOrCreateTaskStatus() MutableTaskNodeStatus {
Expand Down Expand Up @@ -765,7 +765,7 @@ func (in *NodeStatus) GetNodeExecutionStatus(ctx context.Context, id NodeID) Exe
}

func (in *NodeStatus) IsTerminated() bool {
return in.GetPhase() == NodePhaseFailed || in.GetPhase() == NodePhaseSkipped || in.GetPhase() == NodePhaseSucceeded || in.GetPhase() == NodePhaseRecovered
return in.GetPhase() == NodePhaseFailed || in.GetPhase() == NodePhaseSkipped || in.GetPhase() == NodePhaseSucceeded || in.GetPhase() == NodePhaseRecovered || in.GetPhase() == NodePhaseAborted

Check warning on line 768 in flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L768

Added line #L768 was not covered by tests
}

func (in *NodeStatus) GetDataDir() DataReference {
Expand Down Expand Up @@ -795,7 +795,7 @@ func (in *NodeStatus) Equals(other *NodeStatus) bool {
}

if in.Phase == other.Phase {
if in.Phase == NodePhaseSucceeded || in.Phase == NodePhaseFailed {
if in.Phase == NodePhaseSucceeded || in.Phase == NodePhaseFailed || in.Phase == NodePhaseAborted {
return true
}
}
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,9 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
switch nodePhase {
case v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseRecovered, v1alpha1.NodePhaseSkipped:
successCount++
case v1alpha1.NodePhaseFailing:
case v1alpha1.NodePhaseFailing, v1alpha1.NodePhaseAborting:
failingCount++
case v1alpha1.NodePhaseFailed, v1alpha1.NodePhaseTimedOut:
case v1alpha1.NodePhaseFailed, v1alpha1.NodePhaseAborted, v1alpha1.NodePhaseTimedOut:

Check warning on line 380 in flytepropeller/pkg/controller/nodes/array/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/array/handler.go#L380

Added line #L380 was not covered by tests
failedCount++
default:
runningCount++
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/array/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@ func inferParallelism(ctx context.Context, parallelism *uint32, parallelismBehav

func isTerminalNodePhase(nodePhase v1alpha1.NodePhase) bool {
return nodePhase == v1alpha1.NodePhaseSucceeded || nodePhase == v1alpha1.NodePhaseFailed || nodePhase == v1alpha1.NodePhaseTimedOut ||
nodePhase == v1alpha1.NodePhaseSkipped || nodePhase == v1alpha1.NodePhaseRecovered
nodePhase == v1alpha1.NodePhaseSkipped || nodePhase == v1alpha1.NodePhaseRecovered || nodePhase == v1alpha1.NodePhaseAborted
}
4 changes: 4 additions & 0 deletions flytepropeller/pkg/controller/nodes/branch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx interfaces.N
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(downstreamStatus.Err, nil)), nil
}

if downstreamStatus.HasAborted() {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoAbortErr(downstreamStatus.Err, nil)), nil

Check warning on line 167 in flytepropeller/pkg/controller/nodes/branch/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/branch/handler.go#L167

Added line #L167 was not covered by tests
}

phase := handler.PhaseInfoRunning(nil)
return handler.DoTransition(handler.TransitionTypeEphemeral, phase), nil
}
Expand Down
45 changes: 44 additions & 1 deletion flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func canHandleNode(phase v1alpha1.NodePhase) bool {
phase == v1alpha1.NodePhaseQueued ||
phase == v1alpha1.NodePhaseRunning ||
phase == v1alpha1.NodePhaseFailing ||
phase == v1alpha1.NodePhaseAborting ||
phase == v1alpha1.NodePhaseTimingOut ||
phase == v1alpha1.NodePhaseRetryableFailure ||
phase == v1alpha1.NodePhaseSucceeding ||
Expand Down Expand Up @@ -247,6 +248,14 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo
}

return interfaces.NodeStatusFailed(nodeStatus.GetExecutionError()), nil
} else if nodePhase == v1alpha1.NodePhaseAborted {
logger.Debugf(currentNodeCtx, "Node has aborted, traversing downstream.")
_, err := c.handleDownstream(ctx, execContext, dag, nl, currentNode)
if err != nil {
return interfaces.NodeStatusUndefined, err

Check warning on line 255 in flytepropeller/pkg/controller/nodes/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/executor.go#L251-L255

Added lines #L251 - L255 were not covered by tests
}

return interfaces.NodeStatusAborted(nodeStatus.GetExecutionError()), nil

Check warning on line 258 in flytepropeller/pkg/controller/nodes/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/executor.go#L258

Added line #L258 was not covered by tests
} else if nodePhase == v1alpha1.NodePhaseTimedOut {
logger.Debugf(currentNodeCtx, "Node has timed out, traversing downstream.")
_, err := c.handleDownstream(ctx, execContext, dag, nl, currentNode)
Expand Down Expand Up @@ -312,6 +321,10 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex
} else {
return state, nil
}
} else if state.HasAborted() {
logger.Debugf(ctx, "Some downstream node has aborted. Error: [%s]", state.Err)

Check warning on line 325 in flytepropeller/pkg/controller/nodes/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/executor.go#L325

Added line #L325 was not covered by tests
// Ignore WorkflowOnFailurePolicy if a downstream node has aborted
return state, nil

Check warning on line 327 in flytepropeller/pkg/controller/nodes/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/executor.go#L327

Added line #L327 was not covered by tests
} else if !state.IsComplete() {
// A Failed/Timedout node is implicitly considered "complete" this means none of the downstream nodes from
// that node will ever be allowed to run.
Expand Down Expand Up @@ -1190,7 +1203,7 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter
}
// When a node fails, we fail the workflow. Independent of number of nodes succeeding/failing, whenever a first node fails,
// the entire workflow is failed.
if np == v1alpha1.NodePhaseFailing {
if np == v1alpha1.NodePhaseFailing || np == v1alpha1.NodePhaseAborting {
if execErr.GetKind() == core.ExecutionError_SYSTEM {
nodeStatus.IncrementSystemFailures()
c.metrics.PermanentSystemErrorDuration.Observe(ctx, startTime, endTime)
Expand All @@ -1208,6 +1221,12 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx inter
finalStatus = interfaces.NodeStatusFailed(p.GetErr())
}

if np == v1alpha1.NodePhaseAborting && !h.FinalizeRequired() {
logger.Infof(ctx, "Finalize not required, moving node to Aborted")
np = v1alpha1.NodePhaseAborted
finalStatus = interfaces.NodeStatusAborted(p.GetErr())
}

if np == v1alpha1.NodePhaseTimingOut && !h.FinalizeRequired() {
logger.Infof(ctx, "Finalize not required, moving node to TimedOut")
np = v1alpha1.NodePhaseTimedOut
Expand Down Expand Up @@ -1348,6 +1367,25 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur
return interfaces.NodeStatusFailed(nodeStatus.GetExecutionError()), nil
}

if currentPhase == v1alpha1.NodePhaseAborting {
logger.Debugf(ctx, "node aborting")
if err := c.Abort(ctx, h, nCtx, "node aborting", false); err != nil {
return interfaces.NodeStatusUndefined, err
}
t := metav1.Now()

startedAt := nodeStatus.GetStartedAt()
if startedAt == nil {
startedAt = &t
}
nodeStatus.UpdatePhase(v1alpha1.NodePhaseAborted, t, nodeStatus.GetMessage(), c.enableCRDebugMetadata, nodeStatus.GetExecutionError())
c.metrics.FailureDuration.Observe(ctx, startedAt.Time, nodeStatus.GetStoppedAt().Time)
if nCtx.NodeExecutionMetadata().IsInterruptible() {

Check warning on line 1383 in flytepropeller/pkg/controller/nodes/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/executor.go#L1383

Added line #L1383 was not covered by tests
c.metrics.InterruptibleNodesTerminated.Inc(ctx)
}
return interfaces.NodeStatusAborted(nodeStatus.GetExecutionError()), nil
}

if currentPhase == v1alpha1.NodePhaseTimingOut {
logger.Debugf(ctx, "node timing out")
if err := c.Abort(ctx, h, nCtx, "node timed out", false); err != nil {
Expand Down Expand Up @@ -1394,6 +1432,11 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur
return interfaces.NodeStatusFailed(nodeStatus.GetExecutionError()), nil
}

if currentPhase == v1alpha1.NodePhaseAborted {
// This should never happen
return interfaces.NodeStatusAborted(nodeStatus.GetExecutionError()), nil
}

Check warning on line 1439 in flytepropeller/pkg/controller/nodes/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/executor.go#L1439

Added line #L1439 was not covered by tests
return c.handleQueuedOrRunningNode(ctx, nCtx, h)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ const (
EPhaseFailing
EPhaseDynamicRunning
EPhaseRecovered
EPhaseAborted
)

func (p EPhase) IsTerminal() bool {
if p == EPhaseFailed || p == EPhaseSuccess || p == EPhaseSkip || p == EPhaseTimedout || p == EPhaseRecovered {
if p == EPhaseFailed || p == EPhaseSuccess || p == EPhaseSkip || p == EPhaseTimedout || p == EPhaseRecovered || p == EPhaseAborted {
return true
}
return false
Expand Down Expand Up @@ -184,6 +185,10 @@ func PhaseInfoFailureErr(err *core.ExecutionError, info *ExecutionInfo) PhaseInf
return phaseInfoFailed(EPhaseFailed, err, info)
}

func PhaseInfoAbortErr(err *core.ExecutionError, info *ExecutionInfo) PhaseInfo {
return phaseInfoFailed(EPhaseAborted, err, info)

Check warning on line 189 in flytepropeller/pkg/controller/nodes/handler/transition_info.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/handler/transition_info.go#L188-L189

Added lines #L188 - L189 were not covered by tests
}

func PhaseInfoFailingErr(err *core.ExecutionError, info *ExecutionInfo) PhaseInfo {
return phaseInfoFailed(EPhaseFailing, err, info)
}
Expand Down
9 changes: 9 additions & 0 deletions flytepropeller/pkg/controller/nodes/interfaces/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
NodePhaseTimedOut
// Node recovered from a prior execution.
NodePhaseRecovered
NodePhaseAborted
)

func (p NodePhase) String() string {
Expand Down Expand Up @@ -115,6 +116,10 @@ func (n *NodeStatus) HasFailed() bool {
return n.NodePhase == NodePhaseFailed
}

func (n *NodeStatus) HasAborted() bool {
return n.NodePhase == NodePhaseAborted
}

func (n *NodeStatus) HasTimedOut() bool {
return n.NodePhase == NodePhaseTimedOut
}
Expand All @@ -135,3 +140,7 @@ var NodeStatusRecovered = NodeStatus{NodePhase: NodePhaseRecovered}
func NodeStatusFailed(err *core.ExecutionError) NodeStatus {
return NodeStatus{NodePhase: NodePhaseFailed, Err: err}
}

func NodeStatusAborted(err *core.ExecutionError) NodeStatus {
return NodeStatus{NodePhase: NodePhaseAborted, Err: err}
}
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func CanExecute(ctx context.Context, dag executors.DAGStructure, nl executors.No

if upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseSkipped ||
upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseFailed ||
upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseAborted ||
upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseTimedOut {
skipped = true
} else if !(upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseSucceeded ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (s *subworkflowHandler) startAndHandleSubWorkflow(ctx context.Context, nCtx
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(startStatus.Err, nil)), nil
}

if startStatus.HasAborted() {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoAbortErr(startStatus.Err, nil)), nil

Check warning on line 61 in flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go#L60-L61

Added lines #L60 - L61 were not covered by tests
}

return s.handleSubWorkflow(ctx, nCtx, subWorkflow, nl)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (n NextPhaseStatePlugin) Handle(ctx context.Context, tCtx pluginCore.TaskEx
return pluginCore.DoTransition(pluginCore.PhaseInfoFailure("failed", "message", s.TaskInfo)), nil
case pluginCore.PhaseRetryableFailure:
return pluginCore.DoTransition(pluginCore.PhaseInfoRetryableFailure("failed", "message", s.TaskInfo)), nil
case pluginCore.PhaseAborted:
return pluginCore.DoTransition(pluginCore.PhaseInfoAborted("aborted", "message", s.TaskInfo)), nil
case pluginCore.PhaseNotReady:
return pluginCore.DoTransition(pluginCore.PhaseInfoNotReady(time.Now(), s.PhaseVersion, "not-ready")), nil
case pluginCore.PhaseInitializing:
Expand Down
5 changes: 5 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ func (p *pluginRequestedTransition) FinalTransition(ctx context.Context) (handle
case pluginCore.PhasePermanentFailure:
logger.Debugf(ctx, "Transitioning to Failure")
return handler.DoTransition(p.ttype, handler.PhaseInfoFailureErr(p.pInfo.Err(), &p.execInfo)), nil
case pluginCore.PhaseAborted:
logger.Debugf(ctx, "Transitioning to Abort")
return handler.DoTransition(p.ttype, handler.PhaseInfoAbortErr(p.pInfo.Err(), &p.execInfo)), nil

Check warning on line 171 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L169-L171

Added lines #L169 - L171 were not covered by tests
case pluginCore.PhaseUndefined:
return handler.UnknownTransition, fmt.Errorf("error converting plugin phase, received [Undefined]")
}
Expand Down Expand Up @@ -504,6 +507,8 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(),
})
}
case pluginCore.PhaseAborted:

Check warning on line 510 in flytepropeller/pkg/controller/nodes/task/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/handler.go#L510

Added line #L510 was not covered by tests
fallthrough
case pluginCore.PhaseRetryableFailure:
fallthrough
case pluginCore.PhasePermanentFailure:
Expand Down
4 changes: 4 additions & 0 deletions flytepropeller/pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func ToNodeExecEventPhase(p handler.EPhase) core.NodeExecution_Phase {
return core.NodeExecution_SUCCEEDED
case handler.EPhaseFailed:
return core.NodeExecution_FAILED
case handler.EPhaseAborted:
return core.NodeExecution_ABORTED

Check warning on line 71 in flytepropeller/pkg/controller/nodes/transformers.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/transformers.go#L70-L71

Added lines #L70 - L71 were not covered by tests
case handler.EPhaseRecovered:
return core.NodeExecution_RECOVERED
case handler.EPhaseTimedout:
Expand Down Expand Up @@ -220,6 +222,8 @@ func ToNodePhase(p handler.EPhase) (v1alpha1.NodePhase, error) {
return v1alpha1.NodePhaseSucceeding, nil
case handler.EPhaseFailed:
return v1alpha1.NodePhaseFailing, nil
case handler.EPhaseAborted:
return v1alpha1.NodePhaseAborting, nil
case handler.EPhaseTimedout:
return v1alpha1.NodePhaseTimingOut, nil
case handler.EPhaseRecovered:
Expand Down
13 changes: 13 additions & 0 deletions flytepropeller/pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func StatusFailed(err *core.ExecutionError) Status {
return Status{TransitionToPhase: v1alpha1.WorkflowPhaseFailed, Err: err}
}

func StatusAborted(err *core.ExecutionError) Status {
return Status{TransitionToPhase: v1alpha1.WorkflowPhaseAborted, Err: err}

Check warning on line 63 in flytepropeller/pkg/controller/workflow/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/workflow/executor.go#L62-L63

Added lines #L62 - L63 were not covered by tests
}

type workflowExecutor struct {
enqueueWorkflow v1alpha1.EnqueueWorkflow
store *storage.DataStore
Expand Down Expand Up @@ -134,6 +138,9 @@ func (c *workflowExecutor) handleReadyWorkflow(ctx context.Context, w *v1alpha1.
if s.HasFailed() {
return StatusFailing(s.Err), nil
}
if s.HasAborted() {
return StatusAborted(s.Err), nil

Check warning on line 142 in flytepropeller/pkg/controller/workflow/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/workflow/executor.go#L142

Added line #L142 was not covered by tests
}
return StatusRunning, nil
}

Expand Down Expand Up @@ -168,6 +175,10 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha
logger.Infof(ctx, "Workflow has failed. Error [%s]", state.Err.String())
return StatusFailing(state.Err), nil
}
if state.HasAborted() {
logger.Infof(ctx, "Workflow has aborted. Error [%s]", state.Err.String())
return StatusAborted(state.Err), nil

Check warning on line 180 in flytepropeller/pkg/controller/workflow/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/workflow/executor.go#L179-L180

Added lines #L179 - L180 were not covered by tests
}
if state.HasTimedOut() {
return StatusFailing(&core.ExecutionError{
Kind: core.ExecutionError_USER,
Expand Down Expand Up @@ -200,6 +211,8 @@ func (c *workflowExecutor) handleFailureNode(ctx context.Context, w *v1alpha1.Fl
switch state.NodePhase {
case interfaces.NodePhaseFailed:
return StatusFailed(state.Err), nil
case interfaces.NodePhaseAborted:
return StatusAborted(state.Err), nil

Check warning on line 215 in flytepropeller/pkg/controller/workflow/executor.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/workflow/executor.go#L214-L215

Added lines #L214 - L215 were not covered by tests
case interfaces.NodePhaseTimedOut:
return StatusFailed(&core.ExecutionError{
Kind: core.ExecutionError_USER,
Expand Down

0 comments on commit e2bcf69

Please sign in to comment.