From ea7f248dc40dcaef2e2e9f80e7804f72006a84dd Mon Sep 17 00:00:00 2001 From: Thomas Newton Date: Wed, 15 Nov 2023 18:56:57 +0000 Subject: [PATCH] Working error propagates to imdiate execution --- flytepropeller/pkg/controller/nodes/executor.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index da7bc6bb20..087bf0b741 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -235,8 +235,13 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo if err != nil { return interfaces.NodeStatusUndefined, err } - - return interfaces.NodeStatusFailed(nodeStatus.GetExecutionError()), nil + nodeError := *nodeStatus.GetExecutionError() + fmt.Println("before modifying", nodeError) + status := interfaces.NodeStatusFailed(&nodeError) + nodeStatus.ClearErrorMessage() + fmt.Println("modified error", nodeStatus.GetExecutionError()) + fmt.Println("after modifying", nodeError) + return status, nil } else if nodePhase == v1alpha1.NodePhaseTimedOut { logger.Debugf(currentNodeCtx, "Node has timed out, traversing downstream.") _, err := c.handleDownstream(ctx, execContext, dag, nl, currentNode) @@ -1327,14 +1332,12 @@ func (c *nodeExecutor) HandleNode(ctx context.Context, dag executors.DAGStructur started = &t } - nodeError := *nodeStatus.GetExecutionError() - nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, t, nodeStatus.GetMessage(), nodeStatus.GetExecutionError()) c.metrics.FailureDuration.Observe(ctx, started.Time, nodeStatus.GetStoppedAt().Time) if nCtx.NodeExecutionMetadata().IsInterruptible() { c.metrics.InterruptibleNodesTerminated.Inc(ctx) } - return interfaces.NodeStatusFailed(&nodeError), nil + return interfaces.NodeStatusFailed(nodeStatus.GetExecutionError()), nil } if currentPhase == v1alpha1.NodePhaseTimingOut {