From 1ceee8b862a606602c2d1c21d082976cc9845b78 Mon Sep 17 00:00:00 2001 From: Iaroslav Ciupin Date: Wed, 8 Nov 2023 21:36:54 +0200 Subject: [PATCH] handle task validation error wip --- flyteadmin/pkg/manager/impl/execution_manager.go | 14 ++++++++++---- .../pkg/repositories/transformers/execution.go | 11 +++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index e31f501686e..fa991a41436 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "strings" "time" "github.com/benbjohnson/clock" @@ -862,10 +863,14 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( }) if err != nil { - m.systemMetrics.PropellerFailures.Inc() - logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v", - request, workflowExecutionID, executionInputs, err) - return nil, nil, err + if strings.Contains(err.Error(), "no node can run task") { + logger.Infof(ctx, "received task validation error, saving execution [%+v] as failed", workflowExecutionID) + } else { + m.systemMetrics.PropellerFailures.Inc() + logger.Infof(ctx, "Failed to execute workflow %+v with execution id %+v and inputs %+v with err %v", + request, workflowExecutionID, executionInputs, err) + return nil, nil, err + } } executionCreatedAt := time.Now() acceptanceDelay := executionCreatedAt.Sub(requestedAt) @@ -902,6 +907,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( SecurityContext: executionConfig.SecurityContext, LaunchEntity: launchPlan.Id.ResourceType, Namespace: namespace, + Error: err, }) if err != nil { logger.Infof(ctx, "Failed to create execution model in transformer for id: [%+v] with err: %v", diff --git a/flyteadmin/pkg/repositories/transformers/execution.go b/flyteadmin/pkg/repositories/transformers/execution.go index 90bae6598e6..97429330bd3 100644 --- a/flyteadmin/pkg/repositories/transformers/execution.go +++ b/flyteadmin/pkg/repositories/transformers/execution.go @@ -45,6 +45,7 @@ type CreateExecutionModelInput struct { SecurityContext *core.SecurityContext LaunchEntity core.ResourceType Namespace string + Error error } type ExecutionTransformerOptions struct { @@ -91,6 +92,16 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e if input.Phase == core.WorkflowExecution_RUNNING { closure.StartedAt = createdAt } + if input.Error != nil { + closure.Phase = core.WorkflowExecution_FAILED + closure.OutputResult = &admin.ExecutionClosure_Error{ + Error: &core.ExecutionError{ + Code: "TaskValidationFailed", + Message: input.Error.Error(), + Kind: core.ExecutionError_USER, + }, + } + } closureBytes, err := proto.Marshal(&closure)