Skip to content

Commit

Permalink
Working error propagates to imdiate execution
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Newton committed Nov 15, 2023
1 parent ce097d7 commit ea7f248
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,13 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo
if err != nil {
return interfaces.NodeStatusUndefined, err
}

return interfaces.NodeStatusFailed(nodeStatus.GetExecutionError()), nil
nodeError := *nodeStatus.GetExecutionError()
fmt.Println("before modifying", nodeError)
status := interfaces.NodeStatusFailed(&nodeError)
nodeStatus.ClearErrorMessage()
fmt.Println("modified error", nodeStatus.GetExecutionError())
fmt.Println("after modifying", nodeError)
return status, nil
} else if nodePhase == v1alpha1.NodePhaseTimedOut {
logger.Debugf(currentNodeCtx, "Node has timed out, traversing downstream.")
_, err := c.handleDownstream(ctx, execContext, dag, nl, currentNode)
Expand Down Expand Up @@ -1327,14 +1332,12 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur
started = &t
}

nodeError := *nodeStatus.GetExecutionError()

nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, t, nodeStatus.GetMessage(), nodeStatus.GetExecutionError())
c.metrics.FailureDuration.Observe(ctx, started.Time, nodeStatus.GetStoppedAt().Time)
if nCtx.NodeExecutionMetadata().IsInterruptible() {
c.metrics.InterruptibleNodesTerminated.Inc(ctx)
}
return interfaces.NodeStatusFailed(&nodeError), nil
return interfaces.NodeStatusFailed(nodeStatus.GetExecutionError()), nil
}

if currentPhase == v1alpha1.NodePhaseTimingOut {
Expand Down

0 comments on commit ea7f248

Please sign in to comment.