Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(api): Remove concurrency from LegacyContextPlugin #16098

Merged
merged 3 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 27 additions & 71 deletions api/src/opentrons/protocol_runner/legacy_context_plugin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""Customize the ProtocolEngine to monitor and control legacy (APIv2) protocols."""
from __future__ import annotations

from asyncio import create_task, Task
import asyncio
from contextlib import ExitStack
from typing import List, Optional
from typing import Optional

from opentrons.legacy_commands.types import CommandMessage as LegacyCommand
from opentrons.legacy_broker import LegacyBroker
Expand All @@ -12,7 +12,6 @@
from opentrons.util.broker import ReadOnlyBroker

from .legacy_command_mapper import LegacyCommandMapper
from .thread_async_queue import ThreadAsyncQueue


class LegacyContextPlugin(AbstractPlugin):
Expand All @@ -21,59 +20,36 @@ class LegacyContextPlugin(AbstractPlugin):
In the legacy ProtocolContext, protocol execution is accomplished
by direct communication with the HardwareControlAPI, as opposed to an
intermediate layer like the ProtocolEngine. This plugin wraps up
and hides this behavior, so the ProtocolEngine can monitor and control
and hides this behavior, so the ProtocolEngine can monitor
the run of a legacy protocol without affecting the execution of
the protocol commands themselves.

This plugin allows a ProtocolEngine to:

1. Play/pause the protocol run using the HardwareControlAPI, as was done before
the ProtocolEngine existed.
2. Subscribe to what is being done with the legacy ProtocolContext,
and insert matching commands into ProtocolEngine state for
purely progress-tracking purposes.
This plugin allows a ProtocolEngine to subscribe to what is being done with the
legacy ProtocolContext, and insert matching commands into ProtocolEngine state for
purely progress-tracking purposes.
"""

def __init__(
self,
engine_loop: asyncio.AbstractEventLoop,
broker: LegacyBroker,
equipment_broker: ReadOnlyBroker[LoadInfo],
legacy_command_mapper: Optional[LegacyCommandMapper] = None,
) -> None:
"""Initialize the plugin with its dependencies."""
self._engine_loop = engine_loop

self._broker = broker
self._equipment_broker = equipment_broker
self._legacy_command_mapper = legacy_command_mapper or LegacyCommandMapper()

# We use a non-blocking queue to communicate activity
# from the APIv2 protocol, which is running in its own thread,
# to the ProtocolEngine, which is running in the main thread's async event loop.
#
# The queue being non-blocking lets the protocol communicate its activity
# instantly *even if the event loop is currently occupied by something else.*
# Various things can accidentally occupy the event loop for too long.
# So if the protocol had to wait for the event loop to be free
# every time it reported some activity,
# it could visibly stall for a moment, making its motion jittery.
#
# TODO(mm, 2024-03-22): See if we can remove this non-blockingness now.
# It was one of several band-aids introduced in ~v5.0.0 to mitigate performance
# problems. v6.3.0 started running some Python protocols directly through
# Protocol Engine, without this plugin, and without any non-blocking queue.
# If performance is sufficient for those, that probably means the
# performance problems have been resolved in better ways elsewhere
# and we don't need this anymore.
self._actions_to_dispatch = ThreadAsyncQueue[List[pe_actions.Action]]()
self._action_dispatching_task: Optional[Task[None]] = None

self._subscription_exit_stack: Optional[ExitStack] = None

def setup(self) -> None:
"""Set up the plugin.

* Subscribe to the APIv2 context's message brokers to be informed
of the APIv2 protocol's activity.
* Kick off a background task to inform Protocol Engine of that activity.
Subscribe to the APIv2 context's message brokers to be informed
of the APIv2 protocol's activity.
"""
# Subscribe to activity on the APIv2 context,
# and arrange to unsubscribe when this plugin is torn down.
Expand All @@ -97,24 +73,16 @@ def setup(self) -> None:
# to clean up these subscriptions.
self._subscription_exit_stack = exit_stack.pop_all()

# Kick off a background task to report activity to the ProtocolEngine.
self._action_dispatching_task = create_task(self._dispatch_all_actions())

# todo(mm, 2024-08-21): This no longer needs to be async.
async def teardown(self) -> None:
"""Tear down the plugin, undoing the work done in `setup()`.

Called by Protocol Engine.
At this point, the APIv2 protocol script must have exited.
"""
self._actions_to_dispatch.done_putting()
try:
if self._action_dispatching_task is not None:
await self._action_dispatching_task
self._action_dispatching_task = None
finally:
if self._subscription_exit_stack is not None:
self._subscription_exit_stack.close()
self._subscription_exit_stack = None
if self._subscription_exit_stack is not None:
self._subscription_exit_stack.close()
self._subscription_exit_stack = None

def handle_action(self, action: pe_actions.Action) -> None:
"""React to a ProtocolEngine action."""
Expand All @@ -127,34 +95,22 @@ def _handle_legacy_command(self, command: LegacyCommand) -> None:
Used as a broker callback, so this will run in the APIv2 protocol's thread.
"""
pe_actions = self._legacy_command_mapper.map_command(command=command)
self._actions_to_dispatch.put(pe_actions)
future = asyncio.run_coroutine_threadsafe(
self._dispatch_action_list(pe_actions), self._engine_loop
)
future.result()

def _handle_equipment_loaded(self, load_info: LoadInfo) -> None:
"""Handle an equipment load reported by the legacy APIv2 protocol.

Used as a broker callback, so this will run in the APIv2 protocol's thread.
"""
pe_actions = self._legacy_command_mapper.map_equipment_load(load_info=load_info)
self._actions_to_dispatch.put(pe_actions)

async def _dispatch_all_actions(self) -> None:
"""Dispatch all actions to the `ProtocolEngine`.

Exits only when `self._actions_to_dispatch` is closed
(or an unexpected exception is raised).
"""
async for action_batch in self._actions_to_dispatch.get_async_until_closed():
# It's critical that we dispatch this batch of actions as one atomic
# sequence, without yielding to the event loop.
# Although this plugin only means to use the ProtocolEngine as a way of
# passively exposing the protocol's progress, the ProtocolEngine is still
# theoretically active, which means it's constantly watching in the
# background to execute any commands that it finds `queued`.
#
# For example, one of these action batches will often want to
# instantaneously create a running command by having a queue action
# immediately followed by a run action. We cannot let the
# ProtocolEngine's background task see the command in the `queued` state,
# or it will try to execute it, which the legacy protocol is already doing.
for action in action_batch:
self.dispatch(action)
future = asyncio.run_coroutine_threadsafe(
self._dispatch_action_list(pe_actions), self._engine_loop
)
future.result()

async def _dispatch_action_list(self, actions: list[pe_actions.Action]) -> None:
for action in actions:
self.dispatch(action)
5 changes: 4 additions & 1 deletion api/src/opentrons/protocol_runner/protocol_runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Protocol run control and management."""
import asyncio
from typing import List, NamedTuple, Optional, Union

from abc import ABC, abstractmethod
Expand Down Expand Up @@ -220,7 +221,9 @@ async def load(
equipment_broker = Broker[LoadInfo]()
self._protocol_engine.add_plugin(
LegacyContextPlugin(
broker=self._broker, equipment_broker=equipment_broker
engine_loop=asyncio.get_running_loop(),
broker=self._broker,
equipment_broker=equipment_broker,
)
)
self._hardware_api.should_taskify_movement_execution(taskify=True)
Expand Down
174 changes: 0 additions & 174 deletions api/src/opentrons/protocol_runner/thread_async_queue.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Tests for the PythonAndLegacyRunner's LegacyContextPlugin."""
import asyncio
import pytest
from anyio import to_thread
from decoy import Decoy, matchers
Expand Down Expand Up @@ -60,7 +61,7 @@ def mock_action_dispatcher(decoy: Decoy) -> pe_actions.ActionDispatcher:


@pytest.fixture
def subject(
async def subject(
mock_legacy_broker: LegacyBroker,
mock_equipment_broker: ReadOnlyBroker[LoadInfo],
mock_legacy_command_mapper: LegacyCommandMapper,
Expand All @@ -69,6 +70,7 @@ def subject(
) -> LegacyContextPlugin:
"""Get a configured LegacyContextPlugin with its dependencies mocked out."""
plugin = LegacyContextPlugin(
engine_loop=asyncio.get_running_loop(),
broker=mock_legacy_broker,
equipment_broker=mock_equipment_broker,
legacy_command_mapper=mock_legacy_command_mapper,
Expand Down
Loading
Loading