You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Note, as mentioned in #8790, I think this issue and that one are very similar, with the main difference that I can see being that #8790 is easier to reproduce/simpler to reason through given it only involves a local Client. I'm fairly certain that the discussion in #8790 will subsume this one, but not 100% certain so I decided to create that additional issue rather than edit this one. If I should have just pared this issue down into the simpler example, please share that feedback and I'll change moving forward.
Hello Dask Maintainers,
Thank you in advance for taking the time to read this bug report!
At my company, we are heavy users of Dask's ability to handle dynamic task submission from workers (i.e. the stuff discussed here). Now, some of the tasks we dynamically submit will transiently fail (for a concrete example, a model training sub-task that was spawned dynamically might fail because it can't connect to an MLFlow server to store training metrics). In a very minimalistic sense, you could think of it like this
importrandomfromdask.distributedimportClient, worker_clientdefsometimes_fails():
random_num=random.random()
ifrandom_num<0.5:
raiseValueError("Transiently failed!")
defsubmits_sometimes_failing_subtask(key_for_subtask: str):
withworker_client() asclient:
fut=client.submit(sometimes_fails, key=key_for_subtask)
returnfut.result()
client=Client()
# Try this line a few times until you see a failureouter_future=client.submit(submits_sometimes_failing_subtask, "some_fixed_key")
If the sometimes_failsFuture submitted inside submits_sometimes_failing_subtask fails, then you'll end up with a task with key "some_fixed_key" "stuck" on the cluster, where "stuck" in the sense means that you won't have a direct Future handle to it and it won't fall out-of-scope trivially.
Reproducing the bug
For a deterministic MCVE (since the above example uses random.random), we need a slight tweak/extension of the above code:
fromdask.distributedimportClient, worker_clientdefalways_fails():
raiseValueError("Always fails")
defsubmits_always_failing_subtask(key_for_subtask: str):
withworker_client() asclient:
fut=client.submit(always_fails, key=key_for_subtask)
returnfut.result()
client=Client()
outer_future=client.submit(submits_always_failing_subtask, "some_fixed_key")
# We have the outer_future object in memory still, so it's on the cluster, along with "some_fixed_key"assertset(client.cluster.scheduler.tasks) == {"some_fixed_key", outer_future.key}
outer_future.cancel()
# After we cancel it, naturally it goes aways, but the key for the dynamically submitted task is "stuck", so it doesn'tassertset(client.cluster.scheduler.tasks) == {"some_fixed_key"}
If you dig in, you'll see that the "some_fixed_key" task is "stuck" because it's wanted by the worker Client associated with the worker that submitted it,
At bare minimum, IIUC, this is a slight memory leak on both the scheduler and the worker that submitted the task, since both of those will have to track the metadata for the poisoned task. Related, I haven't done a deep dive to see if any additional state created during the execution of the sub-task sticks around, but if so, that'd be additional memory leakage.
Beyond that, for our usage of Dask, we often leverage fixed keys for certain tasks to save expensive compute (e.g. for model inference tasks with large models, we use fixed keys to save compute in case multiple queries come at similar times with the same inputs). The issue then is that if one of these fixed-key tasks is submitted dynamically from another task and fails, then the erred-state Future stuck on the cluster with that key blocks additional attempts to rerun the task.Put in the context of the MCVE above, assume we use the submits_sometimes_failing_subtask task so it doesn't always fail; the moment the internal sub-task sometimes_fails fails, then any additional submissions of submits_sometimes_failing_subtask beyond that point will always fail (naturally, since there's a Future with "some_fixed_key" already on the cluster).
Resolving the issue
I'm not sure if the bug I'm describing is expected behavior. Assuming it's not, I've been looking into fixes for it. One somewhat straightforward option I have is to tweak the logic of the task that dynamically submits the sub-task by wrapping it in a try/except:
This seems to work (applying it to my MCVE above, I don't see "some_fixed_key" stuck on the cluster), but I was wondering if there was some better way to do this.
Environment
Python Version: 3.10
OS: Mac/Linux
Dask Version: 2024.1.1
Install method: conda, but tried pip as well
The text was updated successfully, but these errors were encountered:
I mentioned this above in the issue description, but while expanding some tests on our side, I think I found a simpler variation with what might be the same root cause. I opened a new issue focused on the simpler variation in #8790.
The issue
Hello Dask Maintainers,
Thank you in advance for taking the time to read this bug report!
At my company, we are heavy users of Dask's ability to handle dynamic task submission from workers (i.e. the stuff discussed here). Now, some of the tasks we dynamically submit will transiently fail (for a concrete example, a model training sub-task that was spawned dynamically might fail because it can't connect to an MLFlow server to store training metrics). In a very minimalistic sense, you could think of it like this
If the
sometimes_fails
Future
submitted insidesubmits_sometimes_failing_subtask
fails, then you'll end up with a task with key"some_fixed_key"
"stuck" on the cluster, where "stuck" in the sense means that you won't have a directFuture
handle to it and it won't fall out-of-scope trivially.Reproducing the bug
For a deterministic MCVE (since the above example uses
random.random
), we need a slight tweak/extension of the above code:If you dig in, you'll see that the
"some_fixed_key"
task is "stuck" because it's wanted by the workerClient
associated with the worker that submitted it,Impact
At bare minimum, IIUC, this is a slight memory leak on both the scheduler and the worker that submitted the task, since both of those will have to track the metadata for the poisoned task. Related, I haven't done a deep dive to see if any additional state created during the execution of the sub-task sticks around, but if so, that'd be additional memory leakage.
Beyond that, for our usage of Dask, we often leverage fixed keys for certain tasks to save expensive compute (e.g. for model inference tasks with large models, we use fixed keys to save compute in case multiple queries come at similar times with the same inputs). The issue then is that if one of these fixed-key tasks is submitted dynamically from another task and fails, then the
erred
-stateFuture
stuck on the cluster with that key blocks additional attempts to rerun the task.Put in the context of the MCVE above, assume we use thesubmits_sometimes_failing_subtask
task so it doesn't always fail; the moment the internal sub-tasksometimes_fails
fails, then any additional submissions ofsubmits_sometimes_failing_subtask
beyond that point will always fail (naturally, since there's aFuture
with"some_fixed_key"
already on the cluster).Resolving the issue
I'm not sure if the bug I'm describing is expected behavior. Assuming it's not, I've been looking into fixes for it. One somewhat straightforward option I have is to tweak the logic of the task that dynamically submits the sub-task by wrapping it in a
try/except
:This seems to work (applying it to my MCVE above, I don't see
"some_fixed_key"
stuck on the cluster), but I was wondering if there was some better way to do this.Environment
2024.1.1
conda
, but triedpip
as wellThe text was updated successfully, but these errors were encountered: