Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] fix ArrayNode state's TaskPhase reset #5451

Merged
merged 4 commits into from
Jul 19, 2024

Conversation

pvditt
Copy link
Contributor

@pvditt pvditt commented Jun 5, 2024

Tracking issue

Resolves: #5201

Why are the changes needed?

When a NodePhase change is detected, the current implementation resets the TaskPhaseVersion to 0 prior to emitting the event. This would create a duplicate event since the TaskPhase isn't also updated causing for the event to not get emitted/persisted to admin.

What changes were proposed in this pull request?

  • Bump the task phase version after emitting the event when a NodePhase change is detected in the ArrayNode handler.

We opt to do this instead of bumping the task phase as that can cause issues.
Example issue:

  • updating the task phase to a terminal phase and then emitting that event to admin would set that task execution to a terminal state
  • admin does not persist follow up event when a task execution is already in a terminal state
  • the next propeller loop that handles cleanup wouldn't be able to persist new task events such as aborting subnodes on faliure cleanup.

Also this seems to be consistent with letting the proceeding propeller loop handling the next state.

How was this patch tested?

Ran through different failing and succeeding scenarios and ensured that the subnodes phases transitioned to the correct terminal phase.

Setup process

Repro issues:

@task(
    retries=0,
)
def hello(name: str) -> None:
    if name == "b":
        time.sleep(10)
        raise Exception("This is a test exception")
    return


@workflow
def map_workflow():
    map_task(hello, min_success_ratio=0.5, concurrency=0)(name=["a", "b", "c"])

Run this multiple times:

@task(
    cache_version="1.0",
    cache=True,
)
def square(val: int) -> int:
    return val ** 2


@workflow
def map_square() -> list[int]:
    temp = list(range(1, 51))
    a = map_task(square, concurrency=30)(val=temp)
    return a

Screenshots

image

image

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Copy link

codecov bot commented Jun 5, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 60.92%. Comparing base (fc1c92c) to head (8a5362b).
Report is 132 commits behind head on master.

Additional details and impacted files
@@           Coverage Diff           @@
##           master    #5451   +/-   ##
=======================================
  Coverage   60.91%   60.92%           
=======================================
  Files         796      796           
  Lines       51689    51689           
=======================================
+ Hits        31488    31493    +5     
+ Misses      17294    17289    -5     
  Partials     2907     2907           
Flag Coverage Δ
unittests-datacatalog 69.31% <ø> (ø)
unittests-flyteadmin 58.73% <ø> (+0.04%) ⬆️
unittests-flytecopilot 17.79% <ø> (ø)
unittests-flytectl 67.41% <ø> (ø)
unittests-flyteidl 79.04% <ø> (ø)
unittests-flyteplugins 61.85% <ø> (ø)
unittests-flytepropeller 57.25% <100.00%> (ø)
unittests-flytestdlib 65.57% <ø> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@pvditt pvditt marked this pull request as ready for review June 6, 2024 02:29
@pvditt pvditt requested a review from hamersaw June 6, 2024 02:29
@pvditt pvditt marked this pull request as draft June 6, 2024 02:40
@pvditt pvditt closed this Jun 6, 2024
@pvditt pvditt changed the title [Bug] set updated array node phase for task exec event [Bug] fix ArrayNode state's TaskPhase reset Jun 6, 2024
@pvditt pvditt reopened this Jun 6, 2024
@pvditt pvditt marked this pull request as ready for review June 6, 2024 06:19
Comment on lines +588 to +592

// if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0
if currentArrayNodePhase != arrayNodeState.Phase {
arrayNodeState.TaskPhaseVersion = 0
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't setting this afterwards cause problems with admin requiring incremental values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scenario of interest would be if we would lose some eventing data if we have incrementPhase = False and currentArrayNodePhase != arrayNodeState.Phase + have a previous event emitted with the same TaskPhase and TaskPhaseVersion.

We set incrementTaskPhaseVersion = True if there's a subnode phase change. The arrayNodeState.Phase is updated in 3 different places: arrayNodeState.Phase = v1alpha1.ArrayNodePhaseFailing, arrayNodeState.Phase = v1alpha1.ArrayNodePhaseSucceeding, and arrayNodeState.Phase = v1alpha1.ArrayNodePhaseExecuting.

For arrayNodeState.Phase = v1alpha1.ArrayNodePhaseFailing and arrayNodeState.Phase = v1alpha1.ArrayNodePhaseSucceeding, there would have to be subnode phase change so we couldn't have a scenario where incrementPhase = False and currentArrayNodePhase != arrayNodeState.Phase.

For arrayNodeState.Phase = v1alpha1.ArrayNodePhaseExecuting, we shouldn't have a previous event emitted with the same TaskPhase and TaskPhaseVersion as we should only be in the v1alpha1.ArrayNodePhaseNone phase for the first pass through.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proceeding loops would have a new TaskPhase as well.

Copy link
Contributor

@hamersaw hamersaw Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scenario of interest would be if we would lose some eventing data if we have incrementPhase = False and currentArrayNodePhase != arrayNodeState.Phase + have a previous event emitted with the same TaskPhase and TaskPhaseVersion.
Do we have a repro for this? I'm having difficulty understanding how this is possible. If incrementPhase == False then no subnode phases have been updated so currentArrayNodePhase != arrayNodeState.Phase cannot be true since they are determined from subNode phases - unless this is a system failure in the handler logic?

Then taskPhase is deterministic on currentArrayNodePhase, so if currentArrayNodePhase != arrayNodeState.Phase then we can not have emitted an event with the same taskPhase and taskPhaseVersion. FlyteAdmin does not take taskPhaseVersion into accord for new taskPhase values.

I'm probably missing something here, a repro would help. Or is there an issue this should be linked to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hamersaw I noticed this when working on mapping over launch plans but don't think I actually ran into the bug. However this should repro an issue.

@task
def hello(num: int) -> int:
    if num > 9:
        time.sleep(10)
        raise Exception("This is a test exception")
    return num


@workflow
def map_workflow():
    map_task(hello, min_success_ratio=0.5)(num=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

A subnode task phase will be stuck in "Running" even though it has terminated/failed.

Emitting the event always fails due to the task phase not getting bumped but the task phase version getting reset.

Copy link
Contributor Author

@pvditt pvditt Jul 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, this doesn't occur when the staggered subtask succeeds. Looking back into this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured it out:

this doesn't bubble up for:

@task
def hello(num: int) -> int:
    if num > 9:
        time.sleep(10)
    return num


@workflow
def map_workflow():
    map_task(hello, min_success_ratio=0.5)(num=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

for _, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() {
nodePhase := v1alpha1.NodePhase(nodePhaseUint64)
switch nodePhase {
case v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseRecovered, v1alpha1.NodePhaseSkipped:
successCount++
case v1alpha1.NodePhaseFailing:
failingCount++
case v1alpha1.NodePhaseFailed, v1alpha1.NodePhaseTimedOut:
failedCount++
default:
runningCount++
}
}
doesn't check for NodePhaseSucceeding so it counts as a running task so we don't bump the arrayNode phase here:
if len(arrayNodeState.SubNodePhases.GetItems())-failedCount < minSuccesses {
// no chance to reach the minimum number of successes
arrayNodeState.Phase = v1alpha1.ArrayNodePhaseFailing
} else if successCount >= minSuccesses && runningCount == 0 {
// wait until all tasks have completed before declaring success
arrayNodeState.Phase = v1alpha1.ArrayNodePhaseSucceeding
}

Since we don't do that we bump task phase version instead of resetting to 0 to the event lands in admin. Meanwhile for NodePhaseFailing, we don't increment running so we update arrayNodeState.Phase which then leads for the existing implementation to reset the task phase version.

@pvditt pvditt requested a review from hamersaw June 6, 2024 18:43
@hamersaw hamersaw merged commit fe879f2 into master Jul 19, 2024
50 checks passed
@hamersaw hamersaw deleted the set-updated-phase-array-node-eventing branch July 19, 2024 13:32
vlibov pushed a commit to vlibov/flyte that referenced this pull request Aug 16, 2024
* set updated array node phase for task exec event

Signed-off-by: Paul Dittamo <[email protected]>

* Revert "set updated array node phase for task exec event"

This reverts commit 9f4306a.

Signed-off-by: Paul Dittamo <[email protected]>

* reset taskPhaseVersion due to nodephase change after emitting event

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: Vladyslav Libov <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] ArrayNode maptask subnode/task eventing
2 participants