Skip to content

[flaky] test_repeated_merge_spill raises concurrent.futures._base.CancelledError #681

@hendrikmakait

Description

@hendrikmakait

Seen in:

This test raises a CancelledError caused by the spilling merge computation not returning within the 120 seconds timeout. This may indicate a deadlock. It looks like this behavior has already been discussed in #212.

Traceback

__________________________ test_repeated_merge_spill ___________________________
[gw2] linux -- Python 3.9.15 /usr/share/miniconda3/envs/test/bin/python

upload_cluster_dump = <function upload_cluster_dump.<locals>._upload_cluster_dump at 0x7f32bc3945e0>
benchmark_all = <function benchmark_all.<locals>._benchmark_all at 0x7f32ae651f70>
cluster_kwargs = {'parquet_cluster': {'backend_options': {'multizone': True, 'send_prometheus_metrics': True, 'spot': True, 'spot_on_de...spot_on_demand_fallback': True}, 'n_workers': 20, 'package_sync': True, 'scheduler_vm_types': ['m6i.large'], ...}, ...}
dask_env_variables = {'DASK_COILED__TOKEN': '***'}
gitlab_cluster_tags = {'GITHUB_JOB': 'tests', 'GITHUB_REF': 'refs/heads/main', 'GITHUB_RUN_ATTEMPT': '1', 'GITHUB_RUN_ID': '4039342066', ...}

    @pytest.mark.skipif(
        Version(distributed.__version__) < Version("2022.4.2"),
        reason="https://github.com/dask/distributed/issues/6110",
    )
    def test_repeated_merge_spill(
        upload_cluster_dump,
        benchmark_all,
        cluster_kwargs,
        dask_env_variables,
        gitlab_cluster_tags,
    ):
        with Cluster(
            name=f"test_repeated_merge_spill-{uuid.uuid4().hex[:8]}",
            environ=dask_env_variables,
            tags=gitlab_cluster_tags,
            **cluster_kwargs["test_repeated_merge_spill"],
        ) as cluster:
            with Client(cluster) as client:
                with upload_cluster_dump(client), benchmark_all(client):
                    ddf = dask.datasets.timeseries(
                        "2020",
                        "2025",
                        partition_freq="2w",
                    )
                    ddf2 = dask.datasets.timeseries(
                        "2020",
                        "2023",
                        partition_freq="2w",
                    )
    
                    for _ in range(10):
                        client.restart()
                        fs = client.compute((ddf.x + ddf.y).mean())
    
>                       wait(fs, timeout=2 * 60)

tests/stability/test_deadlock.py:45: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/client.py:4901: in wait
    result = client.sync(_wait, fs, timeout=timeout, return_when=return_when)
/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/utils.py:339: in sync
    return sync(
/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/utils.py:[406](https://github.com/coiled/coiled-runtime/actions/runs/4039342066/jobs/6943979286#step:5:407): in sync
    raise exc.with_traceback(tb)
/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/distributed/utils.py:379: in f
    result = yield future
/usr/share/miniconda3/envs/test/lib/python3.9/site-packages/tornado/gen.py:762: in run
    value = future.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fs = [<Future: cancelled, key: finalize-3b5ccaaaef66d9d6c0c27c11320a79b9>]
timeout = 120, return_when = 'ALL_COMPLETED'

    async def _wait(fs, timeout=None, return_when=ALL_COMPLETED):
        if timeout is not None and not isinstance(timeout, Number):
            raise TypeError(
                "timeout= keyword received a non-numeric value.\n"
                "Beware that wait expects a list of values\n"
                "  Bad:  wait(x, y, z)\n"
                "  Good: wait([x, y, z])"
            )
        fs = futures_of(fs)
        if return_when == ALL_COMPLETED:
            wait_for = distributed.utils.All
        elif return_when == FIRST_COMPLETED:
            wait_for = distributed.utils.Any
        else:
            raise NotImplementedError(
                "Only return_when='ALL_COMPLETED' and 'FIRST_COMPLETED' are supported"
            )
    
        future = wait_for({f._state.wait() for f in fs})
        if timeout is not None:
            future = asyncio.wait_for(future, timeout)
        await future
    
        done, not_done = (
            {fu for fu in fs if fu.status != "pending"},
            {fu for fu in fs if fu.status == "pending"},
        )
        cancelled = [f.key for f in done if f.status == "cancelled"]
        if cancelled:
>           raise CancelledError(cancelled)
E           concurrent.futures._base.CancelledError: ['finalize-3b5ccaaaef66d9d6c0c27c11320a79b9']

CI Run
https://github.com/coiled/coiled-runtime/actions/runs/4039342066/jobs/6943979286#step:5:395

Metadata

Metadata

Assignees

No one assigned

    Labels

    affected: 0.2.1This issue affected v0.2.1 of coiled-runtimeflaky testIntermittent failures on CI

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions