Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ipyparallel using Python 3.8 raises concurrent.futures._base.InvalidStateError: CANCELLED: <AsyncResult: f:finished> #422

Closed
basnijholt opened this issue Aug 18, 2020 · 2 comments · Fixed by #469

Comments

@basnijholt
Copy link
Contributor

basnijholt commented Aug 18, 2020

In Adaptive simulations (related issue python-adaptive/adaptive#289), I (and many colleagues) started to see this error when we switched to Python 3.8:

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/concurrent/futures/_base.py", line 328, in _invoke_callbacks
    callback(self)
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/site-packages/ipyparallel/client/asyncresult.py", line 233, in _resolve_result
    self.set_exception(e)
  File "/gscratch/home/a-banijh/miniconda3/envs/py38/lib/python3.8/concurrent/futures/_base.py", line 539, in set_exception
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <AsyncResult: f:finished>

After many hours of trying to reproduce this, I've been able to minimize it to:

#!/usr/bin/env python3

# ipython profile create test
# ipcluster start --n=10 --profile=test --cluster-id=''
# python fail.py

import asyncio
import time
from random import random

from ipyparallel import Client
from ipyparallel.error import NoEnginesRegistered


def f(x):
    return x


def connect_to_ipyparallel(profile, n):
    client = Client(profile=profile)
    while True:
        try:
            dview = client[:]
            if len(dview) == n:
                return client
        except NoEnginesRegistered:
            time.sleep(0.1)


async def _run(loop, executor, ncores):
    pending = set()
    done = set()

    for _ in range(10):  # do some loops that submit futures
        while len(pending) + len(done) <= ncores:
            fut = loop.run_in_executor(executor, f, random())
            pending.add(fut)
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)

        for fut in done:
            fut.result()  # process results that are done

    for fut in pending:  # cancel the results that are pending
        fut.cancel()

    return done


def do(loop, executor, ncores):
    coro = _run(loop, executor, ncores)
    task = loop.create_task(coro)
    loop.run_until_complete(task)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    ncores = 10
    executor = connect_to_ipyparallel(profile="test", n=ncores).executor()

    for i in range(10):
        do(loop, executor, ncores)

I am using Python 3.8.5 and ipyparallel 6.3.0. The issue doesn't occur with Python 3.7.

Tagging @tlaeven @kvanhoogdalem, who encountered this issue.

@basnijholt basnijholt changed the title ipyparallel raises concurrent.futures._base.InvalidStateError: CANCELLED: <AsyncResult: f:finished> ipyparallel using Python 3.8 raises concurrent.futures._base.InvalidStateError: CANCELLED: <AsyncResult: f:finished> Aug 18, 2020
@minrk
Copy link
Member

minrk commented Jun 4, 2021

Thanks for the test case @basnijholt! I was able to use that to add and verify a fix in #469

@basnijholt
Copy link
Contributor Author

Thanks a lot! 😄

I guess this will close automatically when #469 is merged.

@minrk minrk closed this as completed in #469 Jun 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants