From f99e8e9f96c3cfd2005e12833d997968c5b60a74 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Fri, 8 Dec 2023 17:37:56 -0600 Subject: [PATCH] Detect subNode phase updates to reduce evaluation frequency of ArrayNode (#4535) * detecting subNode or task phase updates to increment TaskPhaseVersion on ArrayNode state Signed-off-by: Daniel Rammer * not writting empty file on no inputs Signed-off-by: Daniel Rammer --------- Signed-off-by: Daniel Rammer Signed-off-by: Paul Dittamo --- .../pkg/controller/nodes/array/handler.go | 20 +++++++++++-------- .../pkg/controller/nodes/executor.go | 2 +- flytestdlib/storage/cached_rawstore.go | 3 +++ 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index dddcd0e7c5..00a9fc747e 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -169,7 +169,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState := nCtx.NodeStateReader().GetArrayNodeState() currentArrayNodePhase := arrayNodeState.Phase - taskPhaseVersion := arrayNodeState.TaskPhaseVersion + incrementTaskPhaseVersion := false eventRecorder := newArrayEventRecorder(nCtx.EventsRecorder()) switch currentArrayNodePhase { @@ -246,6 +246,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu messageCollector := errorcollector.NewErrorMessageCollector() for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { nodePhase := v1alpha1.NodePhase(nodePhaseUint64) + taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i)) // do not process nodes in terminal state if isTerminalNodePhase(nodePhase) { @@ -283,6 +284,11 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } arrayNodeState.SubNodeRetryAttempts.SetItem(i, uint64(subNodeStatus.GetAttempts())) arrayNodeState.SubNodeSystemFailures.SetItem(i, uint64(subNodeStatus.GetSystemFailures())) + + // increment task phase version if subNode phase or task phase changed + if subNodeStatus.GetPhase() != nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != taskPhase { + incrementTaskPhaseVersion = true + } } // process phases of subNodes to determine overall `ArrayNode` phase @@ -429,17 +435,15 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu taskPhase = idlcore.TaskExecution_FAILED } - // need to increment taskPhaseVersion if arrayNodeState.Phase does not change, otherwise - // reset to 0. by incrementing this always we report an event and ensure processing - // every time the ArrayNode is evaluated. if this overhead becomes too large, we will need - // to revisit and only increment when any subNode state changes. + // if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0, otherwise + // increment it if we detect any changes in subNode state. if currentArrayNodePhase != arrayNodeState.Phase { arrayNodeState.TaskPhaseVersion = 0 - } else { - arrayNodeState.TaskPhaseVersion = taskPhaseVersion + 1 + } else if incrementTaskPhaseVersion { + arrayNodeState.TaskPhaseVersion = arrayNodeState.TaskPhaseVersion + 1 } - if err := eventRecorder.finalize(ctx, nCtx, taskPhase, taskPhaseVersion, a.eventConfig); err != nil { + if err := eventRecorder.finalize(ctx, nCtx, taskPhase, arrayNodeState.TaskPhaseVersion, a.eventConfig); err != nil { logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) return handler.UnknownTransition, err } diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 8e96ee9645..23062a8cb3 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -753,7 +753,7 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur return handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, "BindingResolutionFailure", err.Error(), nil), nil } - if nodeInputs != nil { + if nodeInputs != nil && len(nodeInputs.Literals) > 0 { inputsFile := v1alpha1.GetInputsFile(dataDir) if err := c.store.WriteProtobuf(ctx, inputsFile, storage.Options{}, nodeInputs); err != nil { c.metrics.InputsWriteFailure.Inc(ctx) diff --git a/flytestdlib/storage/cached_rawstore.go b/flytestdlib/storage/cached_rawstore.go index a37a4cdf6b..913a517a0f 100644 --- a/flytestdlib/storage/cached_rawstore.go +++ b/flytestdlib/storage/cached_rawstore.go @@ -35,6 +35,9 @@ type cachedRawStore struct { // Head gets metadata about the reference. This should generally be a lightweight operation. func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Metadata, error) { + ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/Head") + defer span.End() + key := []byte(reference) if oRaw, err := s.cache.Get(key); err == nil { s.metrics.CacheHit.Inc()