-
Notifications
You must be signed in to change notification settings - Fork 671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deterministic error propagation for distributed (training) tasks #5598
Conversation
Signed-off-by: Fabio Grätz <[email protected]>
Signed-off-by: Fabio Grätz <[email protected]>
Signed-off-by: Fabio M. Graetz, Ph.D. <[email protected]>
Signed-off-by: Fabio M. Graetz, Ph.D. <[email protected]>
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #5598 +/- ##
===========================================
- Coverage 60.97% 36.21% -24.77%
===========================================
Files 794 1303 +509
Lines 51488 109568 +58080
===========================================
+ Hits 31397 39683 +8286
- Misses 17199 65765 +48566
- Partials 2892 4120 +1228
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
* As a Flyte user trying to understand why a distributed training task failed, I currently cannot rely on the error reported in the Flyte Console (UI) being the root cause error. | ||
* Instead, I have to search the logs of each worker pod. For distributed training jobs with dozens or even hundreds of worker pods, this can be tedious. | ||
* (Current remedies include combining all worker pods in stackdriver logs using a wildcard in the pod name and then filtering by severity.) | ||
* As a Flyte user marking specific errors that can occur in distributed training jobs as retriable (using a `FlyteRecoverableException`), I want Flyte to deterministically determine the root cause error so that the retry behaviour does not suffer from a race condition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love this one. It is subtle, but in a typical PyTorch job, the first error is an instance of ChildError, and Flyte plugin for PyTorch handles that properly by checking if the root cause is a recoverable exception. The latter errors are often different, such as Rendezvous errors, and Flyte treats them as non-recoverable. With the current behavior of latter pod errors taking over, we end up treating these as non-recoverable.
Signed-off-by: Fabio M. Graetz, Ph.D. <[email protected]>
Signed-off-by: Fabio M. Graetz, Ph.D. <[email protected]>
Signed-off-by: Fabio Grätz <[email protected]>
Signed-off-by: Fabio Grätz <[email protected]>
* We could add a `MultipleErrorFiles` property to `PluginProperties` (see https://github.com/flyteorg/flyte/blob/4514860cf56ba62717f6c207f269410a8c1a5461/flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go#L34). The PyTorch plugin, for instance, would then pass `true` for `MultipleErrorFiles` [here](https://github.com/flyteorg/flyte/blob/4514860cf56ba62717f6c207f269410a8c1a5461/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go#L31). | ||
|
||
Currently, [here](https://github.com/flyteorg/flyte/blob/4514860cf56ba62717f6c207f269410a8c1a5461/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L290) in the plugin manager, where we call `NewRemoteFileOutputReader`, we do have access to `e.plugin`, and thus to `PluginProperties` and could make use of that information to instantiate another output reader. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to this.
If we feel that MultiErrorFileRemoteFileOutputReader
ends up being used more widely in plugins we can "bump" it up in the configuration of plugins.
To transport this information from `pyflyte-execute` to flytepropeller, we propose to add an additional field `pod_name` (or `container_name`) to `message ContainerError`. | ||
|
||
Open question: | ||
* Where does flytepropeller add this info for it to be displayed as part of the error in the UI? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvditt , let's set some time to clarify how this is done currently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eapolinario @fg91 apologies for just now seeing this.
DemystifyFailure and DemystifyPending get the messages from the pods.
Flytekit related
So broadly speaking there's a map/reduce problem with error docs. With flytekit changes, we can take care of the map part pretty easily. On the reduce side...
|
FYI I made the PR a draft again and moved the RFC back to "new" in the Kanban. I don't have a full picture yet of how we communicate from propeller to admin/the UI in which of the workers the error occurred. Once I understood the details, I'll update the RFC and mark it as ready for review again. |
Thank you for your review @wild-endeavor ! 🙇
Ultimately we want to show in the UI which worker pod failed so that one immediately knows which pod's logs to check. I was first thinking that propeller takes this information from the error file name. But now, thinking about it, maybe it's better if we properly persist the pod name in
In the RFC we discuss that in the future there might be potential other strategies to determine the root cause error other than earliest timestamp. I could imagine that the pod entrypoint might have to do different things based on this strategy. For now,
I agree with the criticism that time is not a reliable indicator but it is the strategy that torch distributed elastic launch uses within a local worker group (processes running in a single pod) to determine which of the processes in the pod died first. In this sense, we are just extending the mechanism to multiple nodes.
I'm open to both, if you have a preference, I'll put that into the rfc.
I can't come up with a case where we would want to handle this in python/tasks. For torch distributed, communication between workers to aggregate errors would require them to rendezvous but this is very likely not possible anymore after the worker group crashed.
For this RFC, I'm only thinking about K8s plugins to be honest. I would expect authors of non-k8s plugins to do this in
Yes, definitely, plugins don't read error files in |
Signed-off-by: Fabio Grätz <[email protected]>
Signed-off-by: Fabio Grätz <[email protected]>
|
||
Currently, [here](https://github.com/flyteorg/flyte/blob/4514860cf56ba62717f6c207f269410a8c1a5461/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L290) in the plugin manager, upon completion of a node execution, a new [`RemoteFileOutputReader`](https://github.com/flyteorg/flyte/blob/d6da838627d57cd27d60beea004e974ce1fb3ca5/flyteplugins/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go#L14) is constructed which is responsible for reading the error file uploaded to blob storage. This `RemoteFileOutputReader` implements the [`OutputReader` interface](https://github.com/flyteorg/flyte/blob/1e54d21c4d4ee74245f799a57b4bb8a5534e8368/flyteplugins/go/tasks/pluginmachinery/io/iface.go#L32). | ||
|
||
We propose to implement a new `MultiErrorFileRemoteFileOutputReader` which (for future flexibility) can be configured with the different strategies we define. Initially, the only available strategy will be `"earliest"` which the RFC authors aim to use for the kubeflow pytorch plugin. This output reader will search for all error files in the `/errors` folder under the raw output prefix and aggregate the error as specified by the strategy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of a MultiErrorFileRemoteFileOutputReader
class, why not have RemoteFileOutputReader
take an ErrorAggregationStrategy
? Default
is the current behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RemoteFileOutputReader
's main goal is to read outputs. We can have multiple classes to extract the errors, which would be an internal detail to RemoteFileOutputReader
.
|
||
We propose to implement a new `MultiErrorFileRemoteFileOutputReader` which (for future flexibility) can be configured with the different strategies we define. Initially, the only available strategy will be `"earliest"` which the RFC authors aim to use for the kubeflow pytorch plugin. This output reader will search for all error files in the `/errors` folder under the raw output prefix and aggregate the error as specified by the strategy. | ||
|
||
If in [the plugin manager](https://github.com/flyteorg/flyte/blob/4514860cf56ba62717f6c207f269410a8c1a5461/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L290) the respective plugin is found to configure an error aggregation strategy other than `Default`, we instantiate such a `MultiErrorFileRemoteFileOutputReader` reader (instead of the existing `RemoteFileOutputReader`) and configure it with the respective strategy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does the plugin manager get access to the plugin properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we can call e.plugin.GetProperties()
09/26/2024 Contributors sync notes: Fabio provided a summary of the proposal, @pvditt to please review. |
@wild-endeavor and @EngHabu had these questions (I moved these questions here into the RFC to not disperse the discussion over multiple PRs, hope that's ok for you):
@bgedik responded:
My replies:
|
@fg91 I thought about point 3 a bit more and I think this is a bit more than just having an extra HEAD call on Say we looked at For non distributed cases: If For distributed cases: If Overall, having a strategy seems cleaner to me. May be we should discuss a bit what the downside is with the strategy approach. |
10/24/2024 Contributor's sync notes: pending sync with Haytham to keep refining implementation approach. |
Why are the changes needed?
See introduction of RFC document.