Skip to content

Commit

Permalink
Revert "Detect subNode phase updates to reduce evaluation frequency o…
Browse files Browse the repository at this point in the history
…f ArrayNode (#4535)"

This reverts commit b50ba87.

Signed-off-by: Eduardo Apolinario <[email protected]>
  • Loading branch information
eapolinario committed Dec 13, 2023
1 parent b14f1ac commit c6c3051
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 16 deletions.
20 changes: 8 additions & 12 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
arrayNodeState := nCtx.NodeStateReader().GetArrayNodeState()
currentArrayNodePhase := arrayNodeState.Phase

incrementTaskPhaseVersion := false
taskPhaseVersion := arrayNodeState.TaskPhaseVersion
eventRecorder := newArrayEventRecorder(nCtx.EventsRecorder())

switch currentArrayNodePhase {
Expand Down Expand Up @@ -246,7 +246,6 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
messageCollector := errorcollector.NewErrorMessageCollector()
for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() {
nodePhase := v1alpha1.NodePhase(nodePhaseUint64)
taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i))

// do not process nodes in terminal state
if isTerminalNodePhase(nodePhase) {
Expand Down Expand Up @@ -284,11 +283,6 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
}
arrayNodeState.SubNodeRetryAttempts.SetItem(i, uint64(subNodeStatus.GetAttempts()))
arrayNodeState.SubNodeSystemFailures.SetItem(i, uint64(subNodeStatus.GetSystemFailures()))

// increment task phase version if subNode phase or task phase changed
if subNodeStatus.GetPhase() != nodePhase || subNodeStatus.GetTaskNodeStatus().GetPhase() != taskPhase {
incrementTaskPhaseVersion = true
}
}

// process phases of subNodes to determine overall `ArrayNode` phase
Expand Down Expand Up @@ -435,15 +429,17 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
taskPhase = idlcore.TaskExecution_FAILED
}

// if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0, otherwise
// increment it if we detect any changes in subNode state.
// need to increment taskPhaseVersion if arrayNodeState.Phase does not change, otherwise
// reset to 0. by incrementing this always we report an event and ensure processing
// every time the ArrayNode is evaluated. if this overhead becomes too large, we will need
// to revisit and only increment when any subNode state changes.
if currentArrayNodePhase != arrayNodeState.Phase {
arrayNodeState.TaskPhaseVersion = 0
} else if incrementTaskPhaseVersion {
arrayNodeState.TaskPhaseVersion = arrayNodeState.TaskPhaseVersion + 1
} else {
arrayNodeState.TaskPhaseVersion = taskPhaseVersion + 1
}

if err := eventRecorder.finalize(ctx, nCtx, taskPhase, arrayNodeState.TaskPhaseVersion, a.eventConfig); err != nil {
if err := eventRecorder.finalize(ctx, nCtx, taskPhase, taskPhaseVersion, a.eventConfig); err != nil {
logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error())
return handler.UnknownTransition, err
}
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur
return handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, "BindingResolutionFailure", err.Error(), nil), nil
}

if nodeInputs != nil && len(nodeInputs.Literals) > 0 {
if nodeInputs != nil {
inputsFile := v1alpha1.GetInputsFile(dataDir)
if err := c.store.WriteProtobuf(ctx, inputsFile, storage.Options{}, nodeInputs); err != nil {
c.metrics.InputsWriteFailure.Inc(ctx)
Expand Down
3 changes: 0 additions & 3 deletions flytestdlib/storage/cached_rawstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ type cachedRawStore struct {

// Head gets metadata about the reference. This should generally be a lightweight operation.
func (s *cachedRawStore) Head(ctx context.Context, reference DataReference) (Metadata, error) {
ctx, span := otelutils.NewSpan(ctx, otelutils.BlobstoreClientTracer, "flytestdlib.storage.cachedRawStore/Head")
defer span.End()

key := []byte(reference)
if oRaw, err := s.cache.Get(key); err == nil {
s.metrics.CacheHit.Inc()
Expand Down

0 comments on commit c6c3051

Please sign in to comment.