Skip to content

Commit

Permalink
Create a failure node in subworkflow
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Nov 29, 2023
1 parent 8e49b3f commit a805168
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 13 deletions.
7 changes: 6 additions & 1 deletion flyte-single-binary-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ cluster_resources:

logger:
show-source: true
level: 6
level: 4

propeller:
create-flyteworkflow-crd: true
Expand Down Expand Up @@ -83,3 +83,8 @@ storage:
access_key_id: minio
secret_key: miniostorage
container: my-s3-bucket


flyte:
propeller:
disableWebhook: true
9 changes: 5 additions & 4 deletions flytepropeller/pkg/compiler/workflow_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,6 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile
}
}

failureNode := fg.Template.FailureNode
v.ValidateNode(&wf, wf.GetOrCreateNodeBuilder(failureNode), false /* validateConditionTypes */, errs.NewScope())

// Add explicitly and implicitly declared edges
for nodeID, n := range wf.Nodes {
if nodeID == c.StartNodeID {
Expand All @@ -228,7 +225,11 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile
wf.AddEdges(n, c.EdgeDirectionBidirectional, errs.NewScope())
}

wf.AddEdges(wf.GetOrCreateNodeBuilder(failureNode), c.EdgeDirectionUpstream, errs.NewScope())
if fg.Template.FailureNode != nil {
failureNode := fg.Template.FailureNode
v.ValidateNode(&wf, wf.GetOrCreateNodeBuilder(failureNode), false /* validateConditionTypes */, errs.NewScope())
wf.AddEdges(wf.GetOrCreateNodeBuilder(failureNode), c.EdgeDirectionUpstream, errs.NewScope())
}

Check warning on line 232 in flytepropeller/pkg/compiler/workflow_compiler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/compiler/workflow_compiler.go#L229-L232

Added lines #L229 - L232 were not covered by tests

// Add execution edges for orphan nodes that don't have any inward/outward edges.
for nodeID := range wf.Nodes {
Expand Down
10 changes: 5 additions & 5 deletions flytepropeller/pkg/controller/nodes/subworkflow/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeEx
errors.BadSpecificationError, errMsg, nil)), nil
}

updateNodeStateFn := func(transition handler.Transition, newPhase v1alpha1.WorkflowNodePhase, err error) (handler.Transition, error) {
updateNodeStateFn := func(transition handler.Transition, workflowNodeState handler.WorkflowNodeState, err error) (handler.Transition, error) {
if err != nil {
return transition, err
}

workflowNodeState := handler.WorkflowNodeState{Phase: newPhase}
err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState)
if err != nil {
logger.Errorf(ctx, "Failed to store WorkflowNodeState, err :%s", err.Error())
Expand All @@ -75,10 +74,10 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeEx

if wfNode.GetSubWorkflowRef() != nil {
trns, err := w.subWfHandler.StartSubWorkflow(ctx, nCtx)
return updateNodeStateFn(trns, v1alpha1.WorkflowNodePhaseExecuting, err)
return updateNodeStateFn(trns, handler.WorkflowNodeState{Phase: v1alpha1.WorkflowNodePhaseExecuting}, err)

Check warning on line 77 in flytepropeller/pkg/controller/nodes/subworkflow/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/handler.go#L77

Added line #L77 was not covered by tests
} else if wfNode.GetLaunchPlanRefID() != nil {
trns, err := w.lpHandler.StartLaunchPlan(ctx, nCtx)
return updateNodeStateFn(trns, v1alpha1.WorkflowNodePhaseExecuting, err)
return updateNodeStateFn(trns, handler.WorkflowNodeState{Phase: v1alpha1.WorkflowNodePhaseExecuting}, err)
}

return invalidWFNodeError()
Expand All @@ -95,8 +94,9 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeEx
}

if wfNode.GetSubWorkflowRef() != nil {
originalError := nCtx.NodeStateReader().GetWorkflowNodeState().Error

Check warning on line 97 in flytepropeller/pkg/controller/nodes/subworkflow/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/handler.go#L97

Added line #L97 was not covered by tests
trns, err := w.subWfHandler.HandleFailingSubWorkflow(ctx, nCtx)
return updateNodeStateFn(trns, workflowPhase, err)
return updateNodeStateFn(trns, handler.WorkflowNodeState{Phase: workflowPhase, Error: originalError}, err)

Check warning on line 99 in flytepropeller/pkg/controller/nodes/subworkflow/handler.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/handler.go#L99

Added line #L99 was not covered by tests
} else if wfNode.GetLaunchPlanRefID() != nil {
// There is no failure node for launch plans, terminate immediately.
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailureErr(wfNodeState.Error, nil)), nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context,
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err
}

if state.NodePhase == interfaces.NodePhaseRunning {
if state.NodePhase == interfaces.NodePhaseQueued || state.NodePhase == interfaces.NodePhaseRunning {

Check warning on line 163 in flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go#L163

Added line #L163 was not covered by tests
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil
}

Expand All @@ -175,7 +175,7 @@ func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context,
return handler.UnknownTransition, err
}

return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailingErr(originalError, nil)), nil
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil

Check warning on line 178 in flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/subworkflow/subworkflow.go#L178

Added line #L178 was not covered by tests
}

// When handling the failure node succeeds, the final status will still be failure
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ func ToNodePhase(p handler.EPhase) (v1alpha1.NodePhase, error) {
return v1alpha1.NodePhaseSucceeding, nil
case handler.EPhaseFailed:
return v1alpha1.NodePhaseFailing, nil
case handler.EPhaseFailing:
return v1alpha1.NodePhaseFailing, nil

Check warning on line 220 in flytepropeller/pkg/controller/nodes/transformers.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/transformers.go#L219-L220

Added lines #L219 - L220 were not covered by tests
case handler.EPhaseTimedout:
return v1alpha1.NodePhaseTimingOut, nil
case handler.EPhaseRecovered:
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha
}
execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow())
state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, execcontext, w, w, startNode)

if err != nil {
return StatusRunning, err
}
Expand Down Expand Up @@ -293,7 +294,6 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W
wStatus.UpdatePhase(v1alpha1.WorkflowPhaseFailing, "", wfEvent.GetError())
wfEvent.OccurredAt = utils.GetProtoTime(nil)
case v1alpha1.WorkflowPhaseHandlingFailureNode:
// TODO: Add core.WorkflowPhaseHandlingFailureNode to idl?
wfEvent.Phase = core.WorkflowExecution_FAILING
wfEvent.OutputResult = convertToExecutionError(toStatus.Err, previousError)
wStatus.UpdatePhase(v1alpha1.WorkflowPhaseHandlingFailureNode, "", wfEvent.GetError())
Expand Down

0 comments on commit a805168

Please sign in to comment.