Skip to content

Commit

Permalink
Revert "refactor(api): Remove concurrency from LegacyContextPlugin (#…
Browse files Browse the repository at this point in the history
…16098)"

This reverts commit 6451bdb.
  • Loading branch information
SyntaxColoring committed Sep 20, 2024
1 parent de0035b commit a129325
Show file tree
Hide file tree
Showing 5 changed files with 447 additions and 34 deletions.
98 changes: 71 additions & 27 deletions api/src/opentrons/protocol_runner/legacy_context_plugin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Customize the ProtocolEngine to monitor and control legacy (APIv2) protocols."""
from __future__ import annotations

import asyncio
from asyncio import create_task, Task
from contextlib import ExitStack
from typing import Optional
from typing import List, Optional

from opentrons.legacy_commands.types import CommandMessage as LegacyCommand
from opentrons.legacy_broker import LegacyBroker
Expand All @@ -12,6 +12,7 @@
from opentrons.util.broker import ReadOnlyBroker

from .legacy_command_mapper import LegacyCommandMapper
from .thread_async_queue import ThreadAsyncQueue


class LegacyContextPlugin(AbstractPlugin):
Expand All @@ -20,36 +21,59 @@ class LegacyContextPlugin(AbstractPlugin):
In the legacy ProtocolContext, protocol execution is accomplished
by direct communication with the HardwareControlAPI, as opposed to an
intermediate layer like the ProtocolEngine. This plugin wraps up
and hides this behavior, so the ProtocolEngine can monitor
and hides this behavior, so the ProtocolEngine can monitor and control
the run of a legacy protocol without affecting the execution of
the protocol commands themselves.
This plugin allows a ProtocolEngine to subscribe to what is being done with the
legacy ProtocolContext, and insert matching commands into ProtocolEngine state for
purely progress-tracking purposes.
This plugin allows a ProtocolEngine to:
1. Play/pause the protocol run using the HardwareControlAPI, as was done before
the ProtocolEngine existed.
2. Subscribe to what is being done with the legacy ProtocolContext,
and insert matching commands into ProtocolEngine state for
purely progress-tracking purposes.
"""

def __init__(
self,
engine_loop: asyncio.AbstractEventLoop,
broker: LegacyBroker,
equipment_broker: ReadOnlyBroker[LoadInfo],
legacy_command_mapper: Optional[LegacyCommandMapper] = None,
) -> None:
"""Initialize the plugin with its dependencies."""
self._engine_loop = engine_loop

self._broker = broker
self._equipment_broker = equipment_broker
self._legacy_command_mapper = legacy_command_mapper or LegacyCommandMapper()

# We use a non-blocking queue to communicate activity
# from the APIv2 protocol, which is running in its own thread,
# to the ProtocolEngine, which is running in the main thread's async event loop.
#
# The queue being non-blocking lets the protocol communicate its activity
# instantly *even if the event loop is currently occupied by something else.*
# Various things can accidentally occupy the event loop for too long.
# So if the protocol had to wait for the event loop to be free
# every time it reported some activity,
# it could visibly stall for a moment, making its motion jittery.
#
# TODO(mm, 2024-03-22): See if we can remove this non-blockingness now.
# It was one of several band-aids introduced in ~v5.0.0 to mitigate performance
# problems. v6.3.0 started running some Python protocols directly through
# Protocol Engine, without this plugin, and without any non-blocking queue.
# If performance is sufficient for those, that probably means the
# performance problems have been resolved in better ways elsewhere
# and we don't need this anymore.
self._actions_to_dispatch = ThreadAsyncQueue[List[pe_actions.Action]]()
self._action_dispatching_task: Optional[Task[None]] = None

self._subscription_exit_stack: Optional[ExitStack] = None

def setup(self) -> None:
"""Set up the plugin.
Subscribe to the APIv2 context's message brokers to be informed
of the APIv2 protocol's activity.
* Subscribe to the APIv2 context's message brokers to be informed
of the APIv2 protocol's activity.
* Kick off a background task to inform Protocol Engine of that activity.
"""
# Subscribe to activity on the APIv2 context,
# and arrange to unsubscribe when this plugin is torn down.
Expand All @@ -73,16 +97,24 @@ def setup(self) -> None:
# to clean up these subscriptions.
self._subscription_exit_stack = exit_stack.pop_all()

# todo(mm, 2024-08-21): This no longer needs to be async.
# Kick off a background task to report activity to the ProtocolEngine.
self._action_dispatching_task = create_task(self._dispatch_all_actions())

async def teardown(self) -> None:
"""Tear down the plugin, undoing the work done in `setup()`.
Called by Protocol Engine.
At this point, the APIv2 protocol script must have exited.
"""
if self._subscription_exit_stack is not None:
self._subscription_exit_stack.close()
self._subscription_exit_stack = None
self._actions_to_dispatch.done_putting()
try:
if self._action_dispatching_task is not None:
await self._action_dispatching_task
self._action_dispatching_task = None
finally:
if self._subscription_exit_stack is not None:
self._subscription_exit_stack.close()
self._subscription_exit_stack = None

def handle_action(self, action: pe_actions.Action) -> None:
"""React to a ProtocolEngine action."""
Expand All @@ -95,22 +127,34 @@ def _handle_legacy_command(self, command: LegacyCommand) -> None:
Used as a broker callback, so this will run in the APIv2 protocol's thread.
"""
pe_actions = self._legacy_command_mapper.map_command(command=command)
future = asyncio.run_coroutine_threadsafe(
self._dispatch_action_list(pe_actions), self._engine_loop
)
future.result()
self._actions_to_dispatch.put(pe_actions)

def _handle_equipment_loaded(self, load_info: LoadInfo) -> None:
"""Handle an equipment load reported by the legacy APIv2 protocol.
Used as a broker callback, so this will run in the APIv2 protocol's thread.
"""
pe_actions = self._legacy_command_mapper.map_equipment_load(load_info=load_info)
future = asyncio.run_coroutine_threadsafe(
self._dispatch_action_list(pe_actions), self._engine_loop
)
future.result()

async def _dispatch_action_list(self, actions: list[pe_actions.Action]) -> None:
for action in actions:
self.dispatch(action)
self._actions_to_dispatch.put(pe_actions)

async def _dispatch_all_actions(self) -> None:
"""Dispatch all actions to the `ProtocolEngine`.
Exits only when `self._actions_to_dispatch` is closed
(or an unexpected exception is raised).
"""
async for action_batch in self._actions_to_dispatch.get_async_until_closed():
# It's critical that we dispatch this batch of actions as one atomic
# sequence, without yielding to the event loop.
# Although this plugin only means to use the ProtocolEngine as a way of
# passively exposing the protocol's progress, the ProtocolEngine is still
# theoretically active, which means it's constantly watching in the
# background to execute any commands that it finds `queued`.
#
# For example, one of these action batches will often want to
# instantaneously create a running command by having a queue action
# immediately followed by a run action. We cannot let the
# ProtocolEngine's background task see the command in the `queued` state,
# or it will try to execute it, which the legacy protocol is already doing.
for action in action_batch:
self.dispatch(action)
5 changes: 1 addition & 4 deletions api/src/opentrons/protocol_runner/protocol_runner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Protocol run control and management."""
import asyncio
from typing import List, NamedTuple, Optional, Union

from abc import ABC, abstractmethod
Expand Down Expand Up @@ -221,9 +220,7 @@ async def load(
equipment_broker = Broker[LoadInfo]()
self._protocol_engine.add_plugin(
LegacyContextPlugin(
engine_loop=asyncio.get_running_loop(),
broker=self._broker,
equipment_broker=equipment_broker,
broker=self._broker, equipment_broker=equipment_broker
)
)
self._hardware_api.should_taskify_movement_execution(taskify=True)
Expand Down
174 changes: 174 additions & 0 deletions api/src/opentrons/protocol_runner/thread_async_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""Safely pass values between threads and async tasks."""


from __future__ import annotations

from collections import deque
from threading import Condition
from typing import AsyncIterable, Deque, Generic, Iterable, TypeVar

from anyio.to_thread import run_sync


_T = TypeVar("_T")


class ThreadAsyncQueue(Generic[_T]):
"""A queue to safely pass values of type `_T` between threads and async tasks.
All methods are safe to call concurrently from any thread or task.
Compared to queue.Queue:
* This class lets you close the queue to signal that no more values will be added,
which makes common producer/consumer patterns easier.
(This is like Golang channels and AnyIO memory object streams.)
* This class has built-in support for async consumers.
Compared to asyncio.Queue and AnyIO memory object streams:
* You can use this class to communicate between async tasks and threads
without the threads having to wait for the event loop to be free
every time they access the queue.
"""

def __init__(self) -> None:
"""Initialize the queue."""
self._is_closed = False
self._deque: Deque[_T] = deque()
self._condition = Condition()

def put(self, value: _T) -> None:
"""Add a value to the back of the queue.
Returns immediately, without blocking. The queue can grow without bound.
Raises:
QueueClosed: If the queue is already closed.
"""
with self._condition:
if self._is_closed:
raise QueueClosed("Can't add more values when queue is already closed.")
else:
self._deque.append(value)
self._condition.notify()

def get(self) -> _T:
"""Remove and return the value at the front of the queue.
If the queue is empty, this blocks until a new value is available.
If you're calling from an async task, use one of the async methods instead
to avoid blocking the event loop.
Raises:
QueueClosed: If all values have been consumed
and the queue has been closed with `done_putting()`.
"""
with self._condition:
while True:
if len(self._deque) > 0:
return self._deque.popleft()
elif self._is_closed:
raise QueueClosed("Queue closed; no more items to get.")
else:
# We don't have anything to return.
# Wait for something to change, then check again.
self._condition.wait()

def get_until_closed(self) -> Iterable[_T]:
"""Remove and return values from the front of the queue until it's closed.
Example:
for value in queue.get_until_closed():
print(value)
"""
while True:
try:
yield self.get()
except QueueClosed:
break

async def get_async(self) -> _T:
"""Like `get()`, except yield to the event loop while waiting.
Warning:
A waiting `get_async()` won't be interrupted by an async cancellation.
The proper way to interrupt a waiting `get_async()`
is to close the queue, just like you have to do with `get()`.
"""
return await run_sync(
self.get,
# We keep `cancellable` False so we don't leak this helper thread.
# If we made it True, an async cancellation here would detach us
# from the helper thread and allow the thread to "run to completion"--
# but if no more values are ever enqueued, and the queue is never closed,
# completion would never happen and it would hang around forever.
cancellable=False,
)

async def get_async_until_closed(self) -> AsyncIterable[_T]:
"""Like `get_until_closed()`, except yield to the event loop while waiting.
Example:
async for value in queue.get_async_until_closed():
print(value)
Warning:
While the ``async for`` is waiting for a new value,
it won't be interrupted by an async cancellation.
The proper way to interrupt a waiting `get_async_until_closed()`
is to close the queue, just like you have to do with `get()`.
"""
while True:
try:
yield await self.get_async()
except QueueClosed:
break

def done_putting(self) -> None:
"""Close the queue, i.e. signal that no more values will be `put()`.
You normally *must* close the queue eventually
to inform consumers that they can stop waiting for new values.
Forgetting to do this can leave them waiting forever,
leaking tasks or threads or causing deadlocks.
Consider using a ``with`` block instead. See `__enter__()`.
Raises:
QueueClosed: If the queue is already closed.
"""
with self._condition:
if self._is_closed:
raise QueueClosed("Can't close when queue is already closed.")
else:
self._is_closed = True
self._condition.notify_all()

def __enter__(self) -> ThreadAsyncQueue[_T]:
"""Use the queue as a context manager, closing the queue upon exit.
Example:
This:
with queue:
do_stuff()
Is equivalent to:
try:
do_stuff()
finally:
queue.done_putting()
"""
return self

def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None:
"""See `__enter__()`."""
self.done_putting()


class QueueClosed(Exception):
"""See `ThreadAsyncQueue.done_putting()`."""

pass
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Tests for the PythonAndLegacyRunner's LegacyContextPlugin."""
import asyncio
import pytest
from anyio import to_thread
from decoy import Decoy, matchers
Expand Down Expand Up @@ -61,7 +60,7 @@ def mock_action_dispatcher(decoy: Decoy) -> pe_actions.ActionDispatcher:


@pytest.fixture
async def subject(
def subject(
mock_legacy_broker: LegacyBroker,
mock_equipment_broker: ReadOnlyBroker[LoadInfo],
mock_legacy_command_mapper: LegacyCommandMapper,
Expand All @@ -70,7 +69,6 @@ async def subject(
) -> LegacyContextPlugin:
"""Get a configured LegacyContextPlugin with its dependencies mocked out."""
plugin = LegacyContextPlugin(
engine_loop=asyncio.get_running_loop(),
broker=mock_legacy_broker,
equipment_broker=mock_equipment_broker,
legacy_command_mapper=mock_legacy_command_mapper,
Expand Down
Loading

0 comments on commit a129325

Please sign in to comment.