-
Notifications
You must be signed in to change notification settings - Fork 308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: Always propagate pytorch task worker process exception timestamp to task exception #3057
Conversation
… to task exception Signed-off-by: Fabio Grätz <[email protected]>
Code Review Agent Run #3d1353Actionable Suggestions - 3
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
def test_exception_timestamp() -> None: | ||
"""Test that the timestamp of the worker process exception is propagated to the task exception.""" | ||
@task( | ||
task_config=Elastic( | ||
nnodes=1, | ||
nproc_per_node=2, | ||
) | ||
) | ||
def test_task(): | ||
raise Exception("Test exception") | ||
|
||
with pytest.raises(Exception) as e: | ||
test_task() | ||
|
||
assert e.value.timestamp is not None | ||
|
||
|
||
def test_recoverable_exception_timestamp() -> None: | ||
"""Test that the timestamp of the worker process exception is propagated to the task exception.""" | ||
@task( | ||
task_config=Elastic( | ||
nnodes=1, | ||
nproc_per_node=2, | ||
) | ||
) | ||
def test_task(): | ||
raise FlyteRecoverableException("Recoverable test exception") | ||
|
||
with pytest.raises(Exception) as e: | ||
test_task() | ||
|
||
assert e.value.timestamp is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider consolidating the two test functions test_exception_timestamp()
and test_recoverable_exception_timestamp()
into a single parameterized test since they follow the same pattern and only differ in the exception type.
Code suggestion
Check the AI-generated fix before applying
def test_exception_timestamp() -> None: | |
"""Test that the timestamp of the worker process exception is propagated to the task exception.""" | |
@task( | |
task_config=Elastic( | |
nnodes=1, | |
nproc_per_node=2, | |
) | |
) | |
def test_task(): | |
raise Exception("Test exception") | |
with pytest.raises(Exception) as e: | |
test_task() | |
assert e.value.timestamp is not None | |
def test_recoverable_exception_timestamp() -> None: | |
"""Test that the timestamp of the worker process exception is propagated to the task exception.""" | |
@task( | |
task_config=Elastic( | |
nnodes=1, | |
nproc_per_node=2, | |
) | |
) | |
def test_task(): | |
raise FlyteRecoverableException("Recoverable test exception") | |
with pytest.raises(Exception) as e: | |
test_task() | |
assert e.value.timestamp is not None | |
def test_recoverable_exception_timestamp() -> None: | |
@pytest.mark.parametrize("exception_cls,message", [ | |
(Exception, "Test exception"), | |
(FlyteRecoverableException, "Recoverable test exception") | |
]) | |
def test_exception_timestamp(exception_cls, message) -> None: | |
"""Test that the timestamp of the worker process exception is propagated to the task exception.""" | |
task_config=Elastic( | |
nnodes=1, | |
nproc_per_node=2, | |
) | |
) | |
def test_task(): | |
raise exception_cls(message) | |
with pytest.raises(Exception) as e: | |
test_task() | |
assert e.value.timestamp is not None |
Code Review Run #3d1353
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -475,7 +475,7 @@ def fn_partial(): | |||
# the automatically assigned timestamp based on exception creation time | |||
raise FlyteRecoverableException(e.format_msg(), timestamp=first_failure.timestamp) | |||
else: | |||
raise RuntimeError(e.format_msg()) | |||
raise FlyteUserRuntimeException(e, timestamp=first_failure.timestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider preserving the original error message from e.format_msg()
when raising FlyteUserRuntimeException
. The error message could provide valuable debugging context that is currently being lost.
Code suggestion
Check the AI-generated fix before applying
raise FlyteUserRuntimeException(e, timestamp=first_failure.timestamp) | |
raise FlyteUserRuntimeException(e.format_msg(), timestamp=first_failure.timestamp) |
Code Review Run #3d1353
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -15,14 +15,15 @@ class FlyteUserException(_FlyteException): | |||
class FlyteUserRuntimeException(_FlyteException): | |||
_ERROR_CODE = "USER:RuntimeError" | |||
|
|||
def __init__(self, exc_value: Exception): | |||
def __init__(self, exc_value: Exception, timestamp: typing.Optional[float] = None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding validation for the timestamp
parameter to ensure it's a valid timestamp value when provided.
Code suggestion
Check the AI-generated fix before applying
@@ -18,2 +18,5 @@
def __init__(self, exc_value: Exception, timestamp: typing.Optional[float] = None):
+ if timestamp is not None and (not isinstance(timestamp, (int, float)) or timestamp < 0):
+ raise ValueError("timestamp must be a non-negative number representing seconds since epoch")
super().__init__(str(exc_value), timestamp=timestamp)
Code Review Run #3d1353
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3057 +/- ##
===========================================
- Coverage 82.79% 51.19% -31.61%
===========================================
Files 3 202 +199
Lines 186 21297 +21111
Branches 0 2745 +2745
===========================================
+ Hits 154 10902 +10748
- Misses 32 9798 +9766
- Partials 0 597 +597 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Fabio Grätz <[email protected]>
Code Review Agent Run #3ffb7dActionable Suggestions - 0Review Details
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
… to task exception (flyteorg#3057) * Fix: Always propagate pytorch task worker process exception timestamp to task exception Signed-off-by: Fabio Grätz <[email protected]> * Fix exist recoverable error test Signed-off-by: Fabio Grätz <[email protected]> --------- Signed-off-by: Fabio Grätz <[email protected]> Co-authored-by: Fabio Grätz <[email protected]>
… to task exception (#3057) * Fix: Always propagate pytorch task worker process exception timestamp to task exception Signed-off-by: Fabio Grätz <[email protected]> * Fix exist recoverable error test Signed-off-by: Fabio Grätz <[email protected]> --------- Signed-off-by: Fabio Grätz <[email protected]> Co-authored-by: Fabio Grätz <[email protected]>
* Make FlyteUserRuntimeException to return error_code in Container Error (#3059) * Make FlyteUserRuntimeException to return error_code in the ContainerError Signed-off-by: Rafael Ribeiro Raposo <[email protected]> * [Flytekit] Separate remote signal functions (#2933) * feat: separate remote signal functions Signed-off-by: mao3267 <[email protected]> * refactor: make lint Signed-off-by: mao3267 <[email protected]> * test: add integration test for separated signal functions Signed-off-by: mao3267 <[email protected]> * fix: register workflow to admin Signed-off-by: mao3267 <[email protected]> * fix: integration test and approve function Signed-off-by: mao3267 <[email protected]> * fix: remove approve node output Signed-off-by: mao3267 <[email protected]> * fix: replace single sleep command to retry statement Signed-off-by: mao3267 <[email protected]> * fix: update comments Signed-off-by: mao3267 <[email protected]> * fix: simplify duplicate retry operations Signed-off-by: mao3267 <[email protected]> --------- Signed-off-by: mao3267 <[email protected]> * Only copy over cat-certificates.crt if it does not exist in base image (#3067) * Do not copy over ca-certifcates.crt if the base image has one Signed-off-by: Thomas J. Fan <[email protected]> * Only copy over cat-certificates.crt if it does not exist in base image Signed-off-by: Thomas J. Fan <[email protected]> --------- Signed-off-by: Thomas J. Fan <[email protected]> * Support with_overrides setting metadata for map_task subnode instead of parent node (#2982) * test Signed-off-by: Paul Dittamo <[email protected]> * add support for with_overrides for map tasks Signed-off-by: Paul Dittamo <[email protected]> * expand unit test Signed-off-by: Paul Dittamo <[email protected]> * cleanup Signed-off-by: Paul Dittamo <[email protected]> --------- Signed-off-by: Paul Dittamo <[email protected]> * fix: remove duplication log when execute (#3052) Signed-off-by: Vincent <[email protected]> * Fix: Always propagate pytorch task worker process exception timestamp to task exception (#3057) * Fix: Always propagate pytorch task worker process exception timestamp to task exception Signed-off-by: Fabio Grätz <[email protected]> * Fix exist recoverable error test Signed-off-by: Fabio Grätz <[email protected]> --------- Signed-off-by: Fabio Grätz <[email protected]> Co-authored-by: Fabio Grätz <[email protected]> * Allow user-defined dataclass type transformer (again) (#3075) * Allow for user-defined dataclass type tranformers Signed-off-by: Eduardo Apolinario <[email protected]> * Finish comment and remote user-defined dataclass transformer from registry Signed-off-by: Eduardo Apolinario <[email protected]> --------- Signed-off-by: Eduardo Apolinario <[email protected]> Co-authored-by: Eduardo Apolinario <[email protected]> --------- Signed-off-by: Rafael Ribeiro Raposo <[email protected]> Signed-off-by: mao3267 <[email protected]> Signed-off-by: Thomas J. Fan <[email protected]> Signed-off-by: Paul Dittamo <[email protected]> Signed-off-by: Vincent <[email protected]> Signed-off-by: Fabio Grätz <[email protected]> Signed-off-by: Eduardo Apolinario <[email protected]> Co-authored-by: Rafael Raposo <[email protected]> Co-authored-by: Vincent Chen <[email protected]> Co-authored-by: Thomas J. Fan <[email protected]> Co-authored-by: Paul Dittamo <[email protected]> Co-authored-by: V <[email protected]> Co-authored-by: Fabio M. Graetz, Ph.D. <[email protected]> Co-authored-by: Fabio Grätz <[email protected]> Co-authored-by: Eduardo Apolinario <[email protected]>
… to task exception (flyteorg#3057) * Fix: Always propagate pytorch task worker process exception timestamp to task exception Signed-off-by: Fabio Grätz <[email protected]> * Fix exist recoverable error test Signed-off-by: Fabio Grätz <[email protected]> --------- Signed-off-by: Fabio Grätz <[email protected]> Co-authored-by: Fabio Grätz <[email protected]> Signed-off-by: lu00122 <[email protected]>
Why are the changes needed?
Since RFC flyteorg/flyte#5598, flytepropeller can collect error files from multiple worker pods of a distributed task in order to determine the root cause error by identifying the exception with the earliest timestamp. Before, only a single error file was used even if a task launched multiple pods that each could produce an error file.
Generally, the timestamp of the container error is measured here in the pod entrypoint.
In the case of the pytorch elastic plugin where each worker pod can launch multiple worker processes, torch elastic launch determines the worker process exception with the earliest timestamp. The earliest worker process timestamp determined by torch elastic launch is considered the timestamp with which the entire pytorch elastic task pod failed here.
However, we only propagate the timestamp of the worker process exception to the task exception in the case of recoverable errors. In the case of non-recoverable errors we don't propagate the timestamp and instead rely on a new timestamp being measured in the pod entrypoint (see above).
I observed that sometimes the timestamps measured in the pod entrypoint are roughly the same even though the timestamps in the worker processes in the different pods definitely differed. This suggests that in these cases torch distributed still manages to synchronize after one worker experienced an exception so that the pod entrypoints execute
get_container_error_timestamp(...)
roughly at the same time. The fix is obvious: propagate the worker process exception timestamp to the task exception also for non-recoverable errors.Added unit tests.
Related PRs
#2797
Summary by Bito
Updates exception handling in PyTorch elastic task tests to use FlyteUserRuntimeException instead of RuntimeError. Improves worker process exception timestamp propagation for both recoverable and non-recoverable scenarios. Enhances test expectations and imports required exception classes.Unit tests added: False
Estimated effort to review (1-5, lower is better): 1