From 309d0c7e1943750e742edcd0f87a5b292d848340 Mon Sep 17 00:00:00 2001 From: Alex Streed Date: Fri, 21 Feb 2025 11:27:05 -0600 Subject: [PATCH] Add `PrefectFlowRunFuture` --- src/prefect/flow_runs.py | 242 --------------------------------------- src/prefect/futures.py | 179 +++++++++++++++++++++++++++-- tests/test_flow_runs.py | 126 +------------------- tests/test_futures.py | 164 +++++++++++++++++++++++++- 4 files changed, 336 insertions(+), 375 deletions(-) diff --git a/src/prefect/flow_runs.py b/src/prefect/flow_runs.py index c111b84d89f1..5fdff334a4f4 100644 --- a/src/prefect/flow_runs.py +++ b/src/prefect/flow_runs.py @@ -1,14 +1,8 @@ from __future__ import annotations -import asyncio -import atexit -import threading -import uuid from typing import ( TYPE_CHECKING, Any, - Callable, - Self, Type, TypeVar, overload, @@ -16,14 +10,10 @@ from uuid import UUID, uuid4 import anyio -from cachetools import TTLCache -from prefect._internal.concurrency.api import create_call, from_async, from_sync -from prefect._internal.concurrency.threads import get_global_loop from prefect.client.orchestration import PrefectClient, get_client from prefect.client.schemas import FlowRun from prefect.client.schemas.objects import ( - TERMINAL_STATES, StateType, ) from prefect.client.schemas.responses import SetStateStatus @@ -32,8 +22,6 @@ FlowRunContext, TaskRunContext, ) -from prefect.events.clients import get_events_subscriber -from prefect.events.filters import EventFilter, EventNameFilter from prefect.exceptions import ( Abort, FlowPauseTimeout, @@ -59,8 +47,6 @@ ) if TYPE_CHECKING: - import logging - from prefect.client.orchestration import PrefectClient @@ -479,231 +465,3 @@ def _observed_flow_pauses(context: FlowRunContext) -> int: else: context.observed_flow_pauses["counter"] += 1 return context.observed_flow_pauses["counter"] - - -class FlowRunWaiter: - """ - A service used for waiting for a flow run to finish. - - This service listens for flow run events and provides a way to wait for a specific - flow run to finish. This is useful for waiting for a flow run to finish before - continuing execution. - - The service is a singleton and must be started before use. The service will - automatically start when the first instance is created. A single websocket - connection is used to listen for flow run events. - - The service can be used to wait for a flow run to finish by calling - `FlowRunWaiter.wait_for_flow_run` with the flow run ID to wait for. The method - will return when the flow run has finished or the timeout has elapsed. - - The service will automatically stop when the Python process exits or when the - global loop thread is stopped. - - Example: - ```python - import asyncio - from uuid import uuid4 - - from prefect import flow - from prefect.flow_engine import run_flow_async - from prefect.flow_runs import FlowRunWaiter - - - @flow - async def test_flow(): - await asyncio.sleep(5) - print("Done!") - - - async def main(): - flow_run_id = uuid4() - asyncio.create_flow(run_flow_async(flow=test_flow, flow_run_id=flow_run_id)) - - await FlowRunWaiter.wait_for_flow_run(flow_run_id) - print("Flow run finished") - - - if __name__ == "__main__": - asyncio.run(main()) - ``` - """ - - _instance: Self | None = None - _instance_lock = threading.Lock() - - def __init__(self): - self.logger: "logging.Logger" = get_logger("FlowRunWaiter") - self._consumer_task: asyncio.Task[None] | None = None - self._observed_completed_flow_runs: TTLCache[uuid.UUID, bool] = TTLCache( - maxsize=10000, ttl=600 - ) - self._completion_events: dict[uuid.UUID, asyncio.Event] = {} - self._completion_callbacks: dict[uuid.UUID, Callable[[], None]] = {} - self._loop: asyncio.AbstractEventLoop | None = None - self._observed_completed_flow_runs_lock = threading.Lock() - self._completion_events_lock = threading.Lock() - self._started = False - - def start(self) -> None: - """ - Start the FlowRunWaiter service. - """ - if self._started: - return - self.logger.debug("Starting FlowRunWaiter") - loop_thread = get_global_loop() - - if not asyncio.get_running_loop() == loop_thread.loop: - raise RuntimeError("FlowRunWaiter must run on the global loop thread.") - - self._loop = loop_thread.loop - if TYPE_CHECKING: - assert self._loop is not None - - consumer_started = asyncio.Event() - self._consumer_task = self._loop.create_task( - self._consume_events(consumer_started) - ) - asyncio.run_coroutine_threadsafe(consumer_started.wait(), self._loop) - - loop_thread.add_shutdown_call(create_call(self.stop)) - atexit.register(self.stop) - self._started = True - - async def _consume_events(self, consumer_started: asyncio.Event): - async with get_events_subscriber( - filter=EventFilter( - event=EventNameFilter( - name=[ - f"prefect.flow-run.{state.name.title()}" - for state in TERMINAL_STATES - ], - ) - ) - ) as subscriber: - consumer_started.set() - async for event in subscriber: - try: - self.logger.debug( - f"Received event: {event.resource['prefect.resource.id']}" - ) - flow_run_id = uuid.UUID( - event.resource["prefect.resource.id"].replace( - "prefect.flow-run.", "" - ) - ) - - with self._observed_completed_flow_runs_lock: - # Cache the flow run ID for a short period of time to avoid - # unnecessary waits - self._observed_completed_flow_runs[flow_run_id] = True - with self._completion_events_lock: - # Set the event for the flow run ID if it is in the cache - # so the waiter can wake up the waiting coroutine - if flow_run_id in self._completion_events: - self._completion_events[flow_run_id].set() - if flow_run_id in self._completion_callbacks: - self._completion_callbacks[flow_run_id]() - except Exception as exc: - self.logger.error(f"Error processing event: {exc}") - - def stop(self) -> None: - """ - Stop the FlowRunWaiter service. - """ - self.logger.debug("Stopping FlowRunWaiter") - if self._consumer_task: - self._consumer_task.cancel() - self._consumer_task = None - self.__class__._instance = None - self._started = False - - @classmethod - async def wait_for_flow_run( - cls, flow_run_id: uuid.UUID, timeout: float | None = None - ) -> None: - """ - Wait for a flow run to finish. - - Note this relies on a websocket connection to receive events from the server - and will not work with an ephemeral server. - - Args: - flow_run_id: The ID of the flow run to wait for. - timeout: The maximum time to wait for the flow run to - finish. Defaults to None. - """ - instance = cls.instance() - with instance._observed_completed_flow_runs_lock: - if flow_run_id in instance._observed_completed_flow_runs: - return - - # Need to create event in loop thread to ensure it can be set - # from the loop thread - finished_event = await from_async.wait_for_call_in_loop_thread( - create_call(asyncio.Event) - ) - with instance._completion_events_lock: - # Cache the event for the flow run ID so the consumer can set it - # when the event is received - instance._completion_events[flow_run_id] = finished_event - - try: - # Now check one more time whether the flow run arrived before we start to - # wait on it, in case it came in while we were setting up the event above. - with instance._observed_completed_flow_runs_lock: - if flow_run_id in instance._observed_completed_flow_runs: - return - - with anyio.move_on_after(delay=timeout): - await from_async.wait_for_call_in_loop_thread( - create_call(finished_event.wait) - ) - finally: - with instance._completion_events_lock: - # Remove the event from the cache after it has been waited on - instance._completion_events.pop(flow_run_id, None) - - @classmethod - def add_done_callback( - cls, flow_run_id: uuid.UUID, callback: Callable[[], None] - ) -> None: - """ - Add a callback to be called when a flow run finishes. - - Args: - flow_run_id: The ID of the flow run to wait for. - callback: The callback to call when the flow run finishes. - """ - instance = cls.instance() - with instance._observed_completed_flow_runs_lock: - if flow_run_id in instance._observed_completed_flow_runs: - callback() - return - - with instance._completion_events_lock: - # Cache the event for the flow run ID so the consumer can set it - # when the event is received - instance._completion_callbacks[flow_run_id] = callback - - @classmethod - def instance(cls) -> Self: - """ - Get the singleton instance of FlowRunWaiter. - """ - with cls._instance_lock: - if cls._instance is None: - cls._instance = cls._new_instance() - return cls._instance - - @classmethod - def _new_instance(cls): - instance = cls() - - if threading.get_ident() == get_global_loop().thread.ident: - instance.start() - else: - from_sync.call_soon_in_loop_thread(create_call(instance.start)).result() - - return instance diff --git a/src/prefect/futures.py b/src/prefect/futures.py index ee31acc58503..88d01f8a4e3b 100644 --- a/src/prefect/futures.py +++ b/src/prefect/futures.py @@ -3,12 +3,14 @@ import concurrent.futures import threading import uuid +import warnings from collections.abc import Generator, Iterator from functools import partial from typing import TYPE_CHECKING, Any, Callable, Generic, Optional, Union from typing_extensions import NamedTuple, Self, TypeVar +from prefect._waiters import FlowRunWaiter from prefect.client.orchestration import get_client from prefect.exceptions import ObjectNotFound from prefect.logging.loggers import get_logger, get_run_logger @@ -31,22 +33,39 @@ class PrefectFuture(abc.ABC, Generic[R]): """ Abstract base class for Prefect futures. A Prefect future is a handle to the - asynchronous execution of a task run. It provides methods to wait for the task - to complete and to retrieve the result of the task run. + asynchronous execution of a run. It provides methods to wait for the + to complete and to retrieve the result of the run. """ def __init__(self, task_run_id: uuid.UUID): + warnings.warn( + "The __init__ method of PrefectFuture is deprecated and will be removed in a future release. " + "If you are subclassing PrefectFuture, please implement the __init__ method in your subclass.", + DeprecationWarning, + ) self._task_run_id = task_run_id self._final_state: Optional[State[R]] = None @property def task_run_id(self) -> uuid.UUID: """The ID of the task run associated with this future""" + warnings.warn( + "The task_run_id property of PrefectFuture is deprecated and will be removed in a future release. " + "If you are subclassing PrefectFuture, please implement the task_run_id property in your subclass.", + DeprecationWarning, + ) + return self._task_run_id @property def state(self) -> State: """The current state of the task run associated with this future""" + warnings.warn( + "The state property of PrefectFuture is deprecated and will be removed in a future release. " + "If you are subclassing PrefectFuture, please implement the state property in your subclass.", + DeprecationWarning, + ) + if self._final_state: return self._final_state client = get_client(sync_client=True) @@ -103,7 +122,36 @@ def add_done_callback(self, fn: Callable[["PrefectFuture[R]"], None]) -> None: ... -class PrefectWrappedFuture(PrefectFuture[R], abc.ABC, Generic[R, F]): +class PrefectTaskRunFuture(PrefectFuture[R]): + """ + A Prefect future that represents the eventual execution of a task run. + """ + + def __init__(self, task_run_id: uuid.UUID): + self._task_run_id = task_run_id + self._final_state: Optional[State[R]] = None + + @property + def task_run_id(self) -> uuid.UUID: + """The ID of the task run associated with this future""" + return self._task_run_id + + @property + def state(self) -> State: + """The current state of the task run associated with this future""" + if self._final_state: + return self._final_state + client = get_client(sync_client=True) + try: + task_run = client.read_task_run(task_run_id=self.task_run_id) + except ObjectNotFound: + # We'll be optimistic and assume this task will eventually start + # TODO: Consider using task run events to wait for the task to start + return Pending() + return task_run.state or Pending() + + +class PrefectWrappedFuture(PrefectTaskRunFuture[R], abc.ABC, Generic[R, F]): """ A Prefect future that wraps another future object. @@ -190,7 +238,7 @@ def __del__(self) -> None: ) -class PrefectDistributedFuture(PrefectFuture[R]): +class PrefectDistributedFuture(PrefectTaskRunFuture[R]): """ Represents the result of a computation happening anywhere. @@ -289,6 +337,123 @@ def __hash__(self) -> int: return hash(self.task_run_id) +class PrefectFlowRunFuture(PrefectFuture[R]): + """ + A Prefect future that represents the eventual execution of a flow run. + """ + + def __init__(self, flow_run_id: uuid.UUID): + self._flow_run_id = flow_run_id + self._final_state: State[R] | None = None + + @property + def flow_run_id(self) -> uuid.UUID: + """The ID of the flow run associated with this future""" + return self._flow_run_id + + @property + def state(self) -> State: + """The current state of the flow run associated with this future""" + if self._final_state: + return self._final_state + client = get_client(sync_client=True) + state = Pending() + try: + flow_run = client.read_flow_run(flow_run_id=self.flow_run_id) + if flow_run.state: + state = flow_run.state + except ObjectNotFound: + # We'll be optimistic and assume this flow run will eventually start + pass + return state + + def wait(self, timeout: Optional[float] = None) -> None: + return run_coro_as_sync(self.wait_async(timeout=timeout)) + + async def wait_async(self, timeout: Optional[float] = None) -> None: + if self._final_state: + logger.debug( + "Final state already set for %s. Returning...", self.task_run_id + ) + return + + # Ask for the instance of FlowRunWaiter _now_ so that it's already running and + # can catch the completion event if it happens before we start listening for it. + FlowRunWaiter.instance() + + # Read task run to see if it is still running + async with get_client() as client: + flow_run = await client.read_flow_run(flow_run_id=self._flow_run_id) + if flow_run.state is None: + raise RuntimeError( + f"Flow run {self.flow_run_id} has no state which means it hasn't started yet." + ) + if flow_run.state and flow_run.state.is_final(): + logger.debug( + "Flow run %s already finished. Returning...", + self.flow_run_id, + ) + self._final_state = flow_run.state + return + + # If still running, wait for a completed event from the server + logger.debug( + "Waiting for completed event for flow run %s...", + self.flow_run_id, + ) + await FlowRunWaiter.wait_for_flow_run(self._flow_run_id, timeout=timeout) + flow_run = await client.read_flow_run(flow_run_id=self._flow_run_id) + if flow_run.state and flow_run.state.is_final(): + self._final_state = flow_run.state + return + + def result( + self, + timeout: Optional[float] = None, + raise_on_failure: bool = True, + ) -> R: + return run_coro_as_sync( + self.aresult(timeout=timeout, raise_on_failure=raise_on_failure) + ) + + async def aresult( + self, + timeout: Optional[float] = None, + raise_on_failure: bool = True, + ) -> R: + if not self._final_state: + await self.wait_async(timeout=timeout) + if not self._final_state: + raise TimeoutError( + f"Task run {self.task_run_id} did not complete within {timeout} seconds" + ) + + return await self._final_state.result( + raise_on_failure=raise_on_failure, fetch=True + ) + + def add_done_callback(self, fn: Callable[[PrefectFuture[R]], None]) -> None: + if self._final_state: + fn(self) + return + FlowRunWaiter.instance() + with get_client(sync_client=True) as client: + flow_run = client.read_flow_run(flow_run_id=self._flow_run_id) + if flow_run.state and flow_run.state.is_final(): + self._final_state = flow_run.state + fn(self) + return + FlowRunWaiter.add_done_callback(self._flow_run_id, partial(fn, self)) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, PrefectFlowRunFuture): + return False + return self.flow_run_id == other.flow_run_id + + def __hash__(self) -> int: + return hash(self.flow_run_id) + + class PrefectFutureList(list[PrefectFuture[R]], Iterator[PrefectFuture[R]]): """ A list of Prefect futures. @@ -454,7 +619,7 @@ def resolve_futures_to_states( futures: set[PrefectFuture[R]] = set() def _collect_futures( - futures: set[PrefectFuture[R]], expr: Any, context: Any + futures: set[PrefectFuture[R]], expr: Any | PrefectFuture[R], context: Any ) -> Union[PrefectFuture[R], Any]: # Expressions inside quotes should not be traversed if isinstance(context.get("annotation"), quote): @@ -469,7 +634,7 @@ def _collect_futures( expr, visit_fn=partial(_collect_futures, futures), return_data=False, - context={}, + context={"annotation": None}, ) # if no futures were found, return the original expression @@ -498,5 +663,5 @@ def replace_futures_with_states(expr: Any, context: Any) -> Any: expr, visit_fn=replace_futures_with_states, return_data=True, - context={}, + context={"annotation": None}, ) diff --git a/tests/test_flow_runs.py b/tests/test_flow_runs.py index 9392c52b30b6..042a6f4c6c1b 100644 --- a/tests/test_flow_runs.py +++ b/tests/test_flow_runs.py @@ -1,4 +1,3 @@ -import asyncio import time import pytest @@ -7,10 +6,8 @@ from prefect import flow from prefect.client.orchestration import PrefectClient from prefect.exceptions import FlowRunWaitTimeout -from prefect.flow_engine import run_flow_async -from prefect.flow_runs import FlowRunWaiter, wait_for_flow_run -from prefect.server.events.pipeline import EventsPipeline -from prefect.states import Completed, Pending +from prefect.flow_runs import wait_for_flow_run +from prefect.states import Completed async def test_create_then_wait_for_flow_run(prefect_client: PrefectClient): @@ -40,122 +37,3 @@ def foo(): with pytest.raises(FlowRunWaitTimeout): await wait_for_flow_run(flow_run.id, timeout=0) - - -class TestFlowRunWaiter: - @pytest.fixture(autouse=True) - def teardown(self): - yield - - FlowRunWaiter.instance().stop() - - def test_instance_returns_singleton(self): - assert FlowRunWaiter.instance() is FlowRunWaiter.instance() - - def test_instance_returns_instance_after_stop(self): - instance = FlowRunWaiter.instance() - instance.stop() - assert FlowRunWaiter.instance() is not instance - - @pytest.mark.timeout(20) - async def test_wait_for_flow_run( - self, prefect_client: PrefectClient, emitting_events_pipeline: EventsPipeline - ): - """This test will fail with a timeout error if waiting is not working correctly.""" - - @flow - async def test_flow(): - await asyncio.sleep(1) - - flow_run = await prefect_client.create_flow_run(test_flow, state=Pending()) - asyncio.create_task(run_flow_async(flow=test_flow, flow_run=flow_run)) - - await FlowRunWaiter.wait_for_flow_run(flow_run.id) - - await emitting_events_pipeline.process_events() - - flow_run = await prefect_client.read_flow_run(flow_run.id) - assert flow_run.state - assert flow_run.state.is_completed() - - async def test_wait_for_flow_run_with_timeout(self, prefect_client: PrefectClient): - @flow - async def test_flow(): - await asyncio.sleep(5) - - flow_run = await prefect_client.create_flow_run(test_flow, state=Pending()) - run = asyncio.create_task(run_flow_async(flow=test_flow, flow_run=flow_run)) - - await FlowRunWaiter.wait_for_flow_run(flow_run.id, timeout=1) - - # FlowRunWaiter stopped waiting before the task finished - assert not run.done() - await run - - @pytest.mark.timeout(20) - async def test_non_singleton_mode( - self, prefect_client: PrefectClient, emitting_events_pipeline: EventsPipeline - ): - waiter = FlowRunWaiter() - assert waiter is not FlowRunWaiter.instance() - - @flow - async def test_flow(): - await asyncio.sleep(1) - - flow_run = await prefect_client.create_flow_run(test_flow, state=Pending()) - asyncio.create_task(run_flow_async(flow=test_flow, flow_run=flow_run)) - - await waiter.wait_for_flow_run(flow_run.id) - - await emitting_events_pipeline.process_events() - - flow_run = await prefect_client.read_flow_run(flow_run.id) - assert flow_run.state - assert flow_run.state.is_completed() - - waiter.stop() - - @pytest.mark.timeout(20) - async def test_handles_concurrent_task_runs( - self, prefect_client: PrefectClient, emitting_events_pipeline: EventsPipeline - ): - @flow - async def fast_flow(): - await asyncio.sleep(1) - - @flow - async def slow_flow(): - await asyncio.sleep(5) - - flow_run_1 = await prefect_client.create_flow_run(fast_flow, state=Pending()) - flow_run_2 = await prefect_client.create_flow_run(slow_flow, state=Pending()) - - asyncio.create_task(run_flow_async(flow=fast_flow, flow_run=flow_run_1)) - asyncio.create_task(run_flow_async(flow=slow_flow, flow_run=flow_run_2)) - - await FlowRunWaiter.wait_for_flow_run(flow_run_1.id) - - await emitting_events_pipeline.process_events() - - flow_run_1 = await prefect_client.read_flow_run(flow_run_1.id) - flow_run_2 = await prefect_client.read_flow_run(flow_run_2.id) - - assert flow_run_1.state - assert flow_run_1.state.is_completed() - - assert flow_run_2.state - assert not flow_run_2.state.is_completed() - - await FlowRunWaiter.wait_for_flow_run(flow_run_2.id) - - await emitting_events_pipeline.process_events() - - flow_run_1 = await prefect_client.read_flow_run(flow_run_1.id) - flow_run_2 = await prefect_client.read_flow_run(flow_run_2.id) - - assert flow_run_1.state - assert flow_run_1.state.is_completed() - - assert flow_run_2.state - assert flow_run_2.state.is_completed() diff --git a/tests/test_futures.py b/tests/test_futures.py index a4a7cdca93a7..0067ce682400 100644 --- a/tests/test_futures.py +++ b/tests/test_futures.py @@ -7,11 +7,14 @@ import pytest -from prefect import task +from prefect import flow, task +from prefect.client.orchestration import PrefectClient from prefect.exceptions import MissingResult +from prefect.flow_engine import run_flow_async, run_flow_sync from prefect.futures import ( PrefectConcurrentFuture, PrefectDistributedFuture, + PrefectFlowRunFuture, PrefectFuture, PrefectFutureList, PrefectWrappedFuture, @@ -19,12 +22,13 @@ resolve_futures_to_states, wait, ) +from prefect.server.events.pipeline import EventsPipeline from prefect.states import Completed, Failed from prefect.task_engine import run_task_async, run_task_sync from prefect.task_runners import ThreadPoolTaskRunner -class MockFuture(PrefectWrappedFuture): +class MockFuture(PrefectWrappedFuture[Any, Future[Any]]): def __init__(self, data: Any = 42): super().__init__(uuid.uuid4(), Future()) self._final_state = Completed(data=data) @@ -412,6 +416,162 @@ def my_task(): future.result() +class TestPrefectFlowRunFuture: + async def test_wait_with_timeout(self, prefect_client: PrefectClient): + @flow + async def my_flow(): + return 42 + + flow_run = await prefect_client.create_flow_run( + flow=my_flow, + parameters={}, + ) + + asyncio.create_task( + run_flow_async( + flow=my_flow, + flow_run=flow_run, + parameters={}, + return_type="state", + ) + ) + + future = PrefectFlowRunFuture(flow_run_id=flow_run.id) + future.wait(timeout=0.25) + assert future.state.is_pending() + + async def test_wait_without_timeout( + self, events_pipeline: EventsPipeline, prefect_client: PrefectClient + ): + @flow + def my_flow(): + return 42 + + flow_run = await prefect_client.create_flow_run( + flow=my_flow, + parameters={}, + ) + future = PrefectFlowRunFuture(flow_run_id=flow_run.id) + + state = run_flow_sync( + flow=my_flow, + flow_run=flow_run, + parameters={}, + return_type="state", + ) + assert state.is_completed() + + await events_pipeline.process_events() + + future.wait() + assert future.state.is_completed() + + async def test_result_with_final_state( + self, events_pipeline: EventsPipeline, prefect_client: PrefectClient + ): + @flow(persist_result=True) + def my_flow(): + return 42 + + flow_run = await prefect_client.create_flow_run( + flow=my_flow, + parameters={}, + ) + future = PrefectFlowRunFuture(flow_run_id=flow_run.id) + + state = run_flow_sync( + flow=my_flow, + flow_run=flow_run, + parameters={}, + return_type="state", + ) + assert state.is_completed() + + await events_pipeline.process_events() + + assert await state.result() == 42 + + assert future.result() == 42 + + async def test_final_state_without_result( + self, events_pipeline: EventsPipeline, prefect_client: PrefectClient + ): + @flow(persist_result=False) + def my_flow(): + return 42 + + flow_run = await prefect_client.create_flow_run( + flow=my_flow, + parameters={}, + ) + future = PrefectFlowRunFuture(flow_run_id=flow_run.id) + + state = run_flow_sync( + flow=my_flow, + flow_run=flow_run, + parameters={}, + return_type="state", + ) + assert state.is_completed() + + await events_pipeline.process_events() + + with pytest.raises(MissingResult): + future.result() + + async def test_result_with_final_state_and_raise_on_failure( + self, events_pipeline: EventsPipeline, prefect_client: PrefectClient + ): + @flow(persist_result=True) + def my_flow(): + raise ValueError("oops") + + flow_run = await prefect_client.create_flow_run( + flow=my_flow, + parameters={}, + ) + future = PrefectFlowRunFuture(flow_run_id=flow_run.id) + + state = run_flow_sync( + flow=my_flow, + flow_run=flow_run, + parameters={}, + return_type="state", + ) + assert state.is_failed() + + await events_pipeline.process_events() + + with pytest.raises(ValueError, match="oops"): + future.result(raise_on_failure=True) + + async def test_final_state_missing_result( + self, events_pipeline: EventsPipeline, prefect_client: PrefectClient + ): + @flow(persist_result=False) + def my_flow(): + return 42 + + flow_run = await prefect_client.create_flow_run( + flow=my_flow, + parameters={}, + ) + future = PrefectFlowRunFuture(flow_run_id=flow_run.id) + + state = run_flow_sync( + flow=my_flow, + flow_run=flow_run, + parameters={}, + return_type="state", + ) + assert state.is_completed() + + await events_pipeline.process_events() + + with pytest.raises(MissingResult): + future.result() + + class TestPrefectFutureList: def test_wait(self): mock_futures = [MockFuture(data=i) for i in range(5)]