Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple P2P Merge Fails if Scheduler Is Configured #8919

Open
andoni-garcia-fgp opened this issue Oct 31, 2024 · 0 comments
Open

Simple P2P Merge Fails if Scheduler Is Configured #8919

andoni-garcia-fgp opened this issue Oct 31, 2024 · 0 comments

Comments

@andoni-garcia-fgp
Copy link

Describe the issue:

My simple dataframe merging fails with shuffle_method='p2p' if I set dask.config.set(scheduler=<scheduler_type>). Even if I set it to a distributed scheduler. Switching to any other shuffle_method succeeds.

Minimal Complete Verifiable Example:

if __name__ == '__main__':
    import dask
    import dask.dataframe as dd
    import pandas as pd
    from dask.distributed import Client

    client = Client(processes=True, n_workers=2, threads_per_worker=2)

    base_ddf = dd.from_pandas(
        pd.DataFrame({'A': [1, 2, 3], 'B': ['key1', 'key2', 'key3'], 'C': ['Foo', 'Bar', 'Bax']}), npartitions=2
    )
    other_ddf = dd.from_pandas(
        pd.DataFrame({'A': [1, 2, 4], 'B': ['key1', 'key2', 'key4'], 'D': ['Blue', 'Red', 'Green']}), npartitions=2
    )
    merged_ddf = base_ddf.merge(other_ddf, on=['A', 'B'], shuffle_method='p2p')

    print(merged_ddf.compute(client=client))

    dask.config.set(scheduler='processes')
    print(merged_ddf.compute(client=client))

Yields:

ValueError: No worker found

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
File /mnt/home/andoni.garcia/code/research/.venv/lib/python3.11/site-packages/distributed/shuffle/_core.py:517, in handle_transfer_errors(id)
    [516](https://vscode-remote+ssh-002dremote-002bdev.vscode-resource.vscode-cdn.net/mnt/home/andoni.garcia/code/research/.venv/lib/python3.11/site-packages/distributed/shuffle/_core.py:516) try:
--> [517](https://vscode-remote+ssh-002dremote-002bdev.vscode-resource.vscode-cdn.net/mnt/home/andoni.garcia/code/research/.venv/lib/python3.11/site-packages/distributed/shuffle/_core.py:517)     yield
    [518](https://vscode-remote+ssh-002dremote-002bdev.vscode-resource.vscode-cdn.net/mnt/home/andoni.garcia/code/research/.venv/lib/python3.11/site-packages/distributed/shuffle/_core.py:518) except ShuffleClosedError:

File /mnt/home/andoni.garcia/code/research/.venv/lib/python3.11/site-packages/distributed/shuffle/_shuffle.py:83, in shuffle_transfer(input, id, input_partition, npartitions, column, meta, parts_out, disk, drop_column)
     [82](https://vscode-remote+ssh-002dremote-002bdev.vscode-resource.vscode-cdn.net/mnt/home/andoni.garcia/code/research/.venv/lib/python3.11/site-packages/distributed/shuffle/_shuffle.py:82) with handle_transfer_errors(id):
---> [83](https://vscode-remote+ssh-002dremote-002bdev.vscode-resource.vscode-cdn.net/mnt/home/andoni.garcia/code/research/.venv/lib/python3.11/site-packages/distributed/shuffle/_shuffle.py:83)     return get_worker_plugin().add_partition(
     [84](https://vscode-remote+ssh-002dremote-002bdev.vscode-resource.vscode-cdn.net/mnt/home/andoni.garcia/code/research/.venv/lib/python3.11/site-packages/distributed/shuffle/_shuffle.py:84)         input,
...
    [523](https://vscode-remote+ssh-002dremote-002bdev.vscode-resource.vscode-cdn.net/mnt/home/andoni.garcia/code/research/.venv/lib/python3.11/site-packages/distributed/shuffle/_core.py:523)     raise
    [524](https://vscode-remote+ssh-002dremote-002bdev.vscode-resource.vscode-cdn.net/mnt/home/andoni.garcia/code/research/.venv/lib/python3.11/site-packages/distributed/shuffle/_core.py:524) except Exception as e:
--> [525](https://vscode-remote+ssh-002dremote-002bdev.vscode-resource.vscode-cdn.net/mnt/home/andoni.garcia/code/research/.venv/lib/python3.11/site-packages/distributed/shuffle/_core.py:525)     raise RuntimeError(f"P2P shuffling {id} failed during transfer phase") from e

RuntimeError: P2P shuffling 89e4b2019b4ea76e5a3ba83c87881f3f failed during transfer phase

Anything else we need to know?:

This is reproducible in both python scripts and Jupyter environments and across several tested dask versions.

Environment:

  • Dask version: 2024.10.0
  • Python version: Python 3.11.9
  • Operating System: Ubuntu 22.04.5 LTS
  • Install method (conda, pip, source): poetry
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant