From 80111906019645a82f2dad5af1f201f32a0890b4 Mon Sep 17 00:00:00 2001 From: Laura Cox Date: Sun, 3 Nov 2024 21:11:36 +0200 Subject: [PATCH] fix up formatting, lint and tests --- .../protocol_api/core/engine/robot.py | 31 +++++--- .../core/legacy/legacy_protocol_core.py | 4 - .../opentrons/protocol_api/core/protocol.py | 2 +- api/src/opentrons/protocol_api/core/robot.py | 5 +- .../opentrons/protocol_api/robot_context.py | 9 ++- .../protocol_engine/execution/gantry_mover.py | 50 ------------- .../protocol_api/test_robot_context.py | 74 ++++++++++++++----- .../resources/test_pipette_data_provider.py | 1 + 8 files changed, 90 insertions(+), 86 deletions(-) diff --git a/api/src/opentrons/protocol_api/core/engine/robot.py b/api/src/opentrons/protocol_api/core/engine/robot.py index 0eae5a8a0e35..42f8dd2151a3 100644 --- a/api/src/opentrons/protocol_api/core/engine/robot.py +++ b/api/src/opentrons/protocol_api/core/engine/robot.py @@ -15,7 +15,6 @@ from opentrons.protocol_engine.types import DeckPoint, MotorAxis from opentrons.protocol_api.core.robot import AbstractRobot -from opentrons.protocol_api._types import PlungerPositionTypes, PipetteActionTypes _AXIS_TYPE_TO_MOTOR_AXIS = { @@ -50,20 +49,33 @@ def _convert_to_engine_mount(self, axis_map: AxisMapType) -> Dict[MotorAxis, flo return {_AXIS_TYPE_TO_MOTOR_AXIS[ax]: dist for ax, dist in axis_map.items()} def _ul_per_mm_conversion( - self, pipette_settings: SupportedTipsDefinition, ul: float, action: PipetteActionTypes + self, + pipette_settings: SupportedTipsDefinition, + ul: float, + action: PipetteActionTypes, ) -> float: if action == PipetteActionTypes.ASPIRATE_ACTION: - fallback = pipette_settings.aspirate.default[PIPETTING_FUNCTION_FALLBACK_VERSION] - sequence = pipette_settings.aspirate.default.get(PIPETTING_FUNCTION_LATEST_VERSION, fallback) + fallback = pipette_settings.aspirate.default[ + PIPETTING_FUNCTION_FALLBACK_VERSION + ] + sequence = pipette_settings.aspirate.default.get( + PIPETTING_FUNCTION_LATEST_VERSION, fallback + ) elif action == PipetteActionTypes.BLOWOUT_ACTION: # TODO in followup work we should support handling blow out actions for volume. return 1.0 else: - fallback = pipette_settings.aspirate.default[PIPETTING_FUNCTION_FALLBACK_VERSION] - sequence = pipette_settings.dispense.default.get(PIPETTING_FUNCTION_LATEST_VERSION, fallback) + fallback = pipette_settings.aspirate.default[ + PIPETTING_FUNCTION_FALLBACK_VERSION + ] + sequence = pipette_settings.dispense.default.get( + PIPETTING_FUNCTION_LATEST_VERSION, fallback + ) return piecewise_volume_conversion(ul, sequence) - def get_pipette_type_from_engine(self, mount: Union[Mount, str]) -> Optional[str]: + def get_pipette_type_from_engine( + self, mount: Union[Mount, str] + ) -> Optional[pip_types.PipetteNameType]: """Get the pipette attached to the given mount.""" if isinstance(mount, Mount): engine_mount = MountType[mount.name] @@ -81,7 +93,7 @@ def get_plunger_position_from_name( maybe_pipette_state = self._sync_hardware_api.get_attached_instrument(mount) if not maybe_pipette_state: return 0.0 - return maybe_pipette_state["plunger_positions"][position_name.value] + return maybe_pipette_state["plunger_positions"][position_name.value] # type: ignore[no-any-return] def get_plunger_position_from_volume( self, mount: Mount, volume: float, action: PipetteActionTypes, robot_type: str @@ -96,12 +108,13 @@ def get_plunger_position_from_volume( plunger_bottom = maybe_pipette_state["plunger_positions"]["bottom"] convert_volume = self._ul_per_mm_conversion(tip_settings, volume, action) + mm = volume / convert_volume if robot_type == "OT-2 Standard": position = plunger_bottom + mm else: position = plunger_bottom - mm - return round(position, 6) + return round(position, 6) # type: ignore[no-any-return] def move_to(self, mount: Mount, destination: Point, speed: Optional[float]) -> None: engine_mount = MountType[mount.name] diff --git a/api/src/opentrons/protocol_api/core/legacy/legacy_protocol_core.py b/api/src/opentrons/protocol_api/core/legacy/legacy_protocol_core.py index d87c4021cbb2..06e8165214d1 100644 --- a/api/src/opentrons/protocol_api/core/legacy/legacy_protocol_core.py +++ b/api/src/opentrons/protocol_api/core/legacy/legacy_protocol_core.py @@ -275,10 +275,6 @@ def load_robot(self) -> None: # type: ignore """Load an adapter using its identifying parameters""" raise APIVersionError(api_element="Loading robot") - def load_robot(self) -> None: # type: ignore - """Load an adapter using its identifying parameters""" - raise APIVersionError(api_element="Loading robot") - def move_labware( self, labware_core: LegacyLabwareCore, diff --git a/api/src/opentrons/protocol_api/core/protocol.py b/api/src/opentrons/protocol_api/core/protocol.py index c367c2b4bbab..62e2d7cd1d71 100644 --- a/api/src/opentrons/protocol_api/core/protocol.py +++ b/api/src/opentrons/protocol_api/core/protocol.py @@ -3,7 +3,7 @@ from __future__ import annotations from abc import abstractmethod, ABC -from typing import Generic, Type, List, Optional, Union, Tuple, Dict, TYPE_CHECKING +from typing import Generic, List, Optional, Union, Tuple, Dict, TYPE_CHECKING from opentrons_shared_data.deck.types import DeckDefinitionV5, SlotDefV3 from opentrons_shared_data.pipette.types import PipetteNameType diff --git a/api/src/opentrons/protocol_api/core/robot.py b/api/src/opentrons/protocol_api/core/robot.py index f9c30116693f..95def3e17f35 100644 --- a/api/src/opentrons/protocol_api/core/robot.py +++ b/api/src/opentrons/protocol_api/core/robot.py @@ -2,12 +2,15 @@ from typing import Optional, Union from opentrons.types import AxisMapType, Mount, Point +from opentrons_shared_data.pipette.types import PipetteNameType from opentrons.protocol_api._types import PlungerPositionTypes, PipetteActionTypes class AbstractRobot(ABC): @abstractmethod - def get_pipette_type_from_engine(self, mount: Union[Mount, str]) -> Optional[str]: + def get_pipette_type_from_engine( + self, mount: Union[Mount, str] + ) -> Optional[PipetteNameType]: ... @abstractmethod diff --git a/api/src/opentrons/protocol_api/robot_context.py b/api/src/opentrons/protocol_api/robot_context.py index 9e649af3c39b..5b0e578f9bbf 100644 --- a/api/src/opentrons/protocol_api/robot_context.py +++ b/api/src/opentrons/protocol_api/robot_context.py @@ -208,6 +208,10 @@ def plunger_coordinates_for_volume( """ pipette_name = self._core.get_pipette_type_from_engine(mount) + if not pipette_name: + raise ValueError( + f"Expected a pipette to be attached to provided mount {mount}" + ) mount = validation.ensure_mount_for_pipette(mount, pipette_name) pipette_axis = AxisType.plunger_axis_for_mount(mount) @@ -224,7 +228,10 @@ def plunger_coordinates_for_named_position( """ pipette_name = self._core.get_pipette_type_from_engine(mount) - + if not pipette_name: + raise ValueError( + f"Expected a pipette to be attached to provided mount {mount}" + ) mount = validation.ensure_mount_for_pipette(mount, pipette_name) pipette_axis = AxisType.plunger_axis_for_mount(mount) pipette_position = self._core.get_plunger_position_from_name( diff --git a/api/src/opentrons/protocol_engine/execution/gantry_mover.py b/api/src/opentrons/protocol_engine/execution/gantry_mover.py index 03f175247b73..b1df690a2d83 100644 --- a/api/src/opentrons/protocol_engine/execution/gantry_mover.py +++ b/api/src/opentrons/protocol_engine/execution/gantry_mover.py @@ -573,56 +573,6 @@ def get_max_travel_z(self, pipette_id: str) -> float: tip_length = tip.length if tip is not None else 0 return instrument_height - tip_length - def get_max_travel_z_from_mount(self, mount: MountType) -> float: - """Get the maximum allowed z-height for mount.""" - pipette = self._state_view.pipettes.get_by_mount(mount) - if self._state_view.config.robot_type == "OT-2 Standard": - instrument_height = ( - self._state_view.pipettes.get_instrument_max_height_ot2(pipette.id) - if pipette - else VIRTUAL_MAX_OT3_HEIGHT - ) - else: - instrument_height = VIRTUAL_MAX_OT3_HEIGHT - tip_length = ( - self._state_view.tips.get_tip_length(pipette.id) if pipette else 0.0 - ) - - return instrument_height - tip_length - - async def move_axes( - self, - axis_map: Dict[MotorAxis, float], - critical_point: Optional[Dict[MotorAxis, float]] = None, - speed: Optional[float] = None, - ) -> Dict[MotorAxis, float]: - """Move the give axes map. No-op in virtual implementation.""" - mount = self.pick_mount_from_axis_map(axis_map) - current_position = await self.get_position_from_mount(mount) - axis_map[MotorAxis.X] = axis_map.get(MotorAxis.X, 0.0) + current_position[0] - axis_map[MotorAxis.Y] = axis_map.get(MotorAxis.Y, 0.0) + current_position[1] - if mount == Mount.RIGHT: - axis_map[MotorAxis.RIGHT_Z] = ( - axis_map.get(MotorAxis.RIGHT_Z, 0.0) + current_position[2] - ) - elif mount == Mount.EXTENSION: - axis_map[MotorAxis.EXTENSION_Z] = ( - axis_map.get(MotorAxis.EXTENSION_Z, 0.0) + current_position[2] - ) - else: - axis_map[MotorAxis.LEFT_Z] = ( - axis_map.get(MotorAxis.LEFT_Z, 0.0) + current_position[2] - ) - critical_point = critical_point or {} - return {ax: pos - critical_point.get(ax, 0.0) for ax, pos in axis_map.items()} - - async def move_mount_to( - self, mount: Mount, waypoints: List[Waypoint], speed: Optional[float] - ) -> Point: - """Move the hardware mount to a waypoint. No-op in virtual implementation.""" - assert len(waypoints) > 0, "Must have at least one waypoint" - return waypoints[-1].position - def get_max_travel_z_from_mount(self, mount: MountType) -> float: """Get the maximum allowed z-height for mount.""" pipette = self._state_view.pipettes.get_by_mount(mount) diff --git a/api/tests/opentrons/protocol_api/test_robot_context.py b/api/tests/opentrons/protocol_api/test_robot_context.py index d23762f0cea2..36b94c52b15e 100644 --- a/api/tests/opentrons/protocol_api/test_robot_context.py +++ b/api/tests/opentrons/protocol_api/test_robot_context.py @@ -17,6 +17,7 @@ from opentrons.protocol_api.core.common import ProtocolCore, RobotCore from opentrons.protocol_api import RobotContext, ModuleContext from opentrons.protocol_api.deck import Deck +from opentrons_shared_data.pipette.types import PipetteNameType from opentrons.protocol_api._types import PipetteActionTypes, PlungerPositionTypes @@ -60,7 +61,12 @@ def subject( api_version: APIVersion, ) -> RobotContext: """Get a RobotContext test subject with its dependencies mocked out.""" - decoy.when(mock_core.get_pipette_type_from_engine(Mount.LEFT)).then_return(None) + decoy.when(mock_core.get_pipette_type_from_engine(Mount.LEFT)).then_return( + PipetteNameType.P1000_SINGLE_FLEX + ) + decoy.when(mock_core.get_pipette_type_from_engine(Mount.RIGHT)).then_return( + PipetteNameType.P1000_SINGLE_FLEX + ) return RobotContext( core=mock_core, api_version=api_version, protocol_core=mock_protocol ) @@ -179,36 +185,36 @@ def test_get_axes_coordinates_for( res = subject.axis_coordinates_for(mount, location_to_move) assert res == expected_axis_map + @pytest.mark.parametrize( argnames=["mount", "volume", "action", "expected_axis_map"], argvalues=[ - ( - "left", - 100, - PipetteActionTypes.ASPIRATE_ACTION, - {AxisType.P_L: 100} - ), (Mount.RIGHT, 200, PipetteActionTypes.ASPIRATE_ACTION, {AxisType.P_R: 100}), - ( - Mount.LEFT, - 100, - PipetteActionTypes.DISPENSE_ACTION, - {AxisType.P_L: 100} - ), + (Mount.LEFT, 100, PipetteActionTypes.DISPENSE_ACTION, {AxisType.P_L: 100}), ], ) -def test_plunger_coordinates_for_volume(subject: RobotContext, mount: Union[Mount, str], volume: float, action: PipetteActionTypes, expected_axis_map: AxisMapType) -> None: +def test_plunger_coordinates_for_volume( + decoy: Decoy, + subject: RobotContext, + mount: Mount, + volume: float, + action: PipetteActionTypes, + expected_axis_map: AxisMapType, +) -> None: + """Test `RobotContext.plunger_coordinates_for_volume`.""" + decoy.when( + subject._core.get_plunger_position_from_volume( + mount, volume, action, "OT-3 Standard" + ) + ).then_return(100) + result = subject.plunger_coordinates_for_volume(mount, volume, action) assert result == expected_axis_map + @pytest.mark.parametrize( argnames=["mount", "position_name", "expected_axis_map"], argvalues=[ - ( - "left", - PlungerPositionTypes.PLUNGER_TOP, - {AxisType.P_L: 3}, - ), (Mount.RIGHT, PlungerPositionTypes.PLUNGER_TOP, {AxisType.P_R: 3}), ( Mount.RIGHT, @@ -217,6 +223,34 @@ def test_plunger_coordinates_for_volume(subject: RobotContext, mount: Union[Moun ), ], ) -def test_plunger_coordinates_for_named_position(subject: RobotContext, mount: Union[Mount, str], position_name: PlungerPositionTypes, expected_axis_map: AxisMapType) -> None: +def test_plunger_coordinates_for_named_position( + decoy: Decoy, + subject: RobotContext, + mount: Mount, + position_name: PlungerPositionTypes, + expected_axis_map: AxisMapType, +) -> None: + """Test `RobotContext.plunger_coordinates_for_named_position`.""" + decoy.when( + subject._core.get_plunger_position_from_name(mount, position_name) + ).then_return(3) result = subject.plunger_coordinates_for_named_position(mount, position_name) assert result == expected_axis_map + + +def test_plunger_methods_raise_without_pipette( + mock_core: RobotCore, mock_protocol: ProtocolCore, api_version: APIVersion +) -> None: + """Test that `RobotContext` plunger functions raise without pipette attached.""" + subject = RobotContext( + core=mock_core, api_version=api_version, protocol_core=mock_protocol + ) + with pytest.raises(ValueError): + subject.plunger_coordinates_for_named_position( + Mount.LEFT, PlungerPositionTypes.PLUNGER_TOP + ) + + with pytest.raises(ValueError): + subject.plunger_coordinates_for_volume( + Mount.LEFT, 200, PipetteActionTypes.ASPIRATE_ACTION + ) diff --git a/api/tests/opentrons/protocol_engine/resources/test_pipette_data_provider.py b/api/tests/opentrons/protocol_engine/resources/test_pipette_data_provider.py index 086b3ec297b5..144266e67fe4 100644 --- a/api/tests/opentrons/protocol_engine/resources/test_pipette_data_provider.py +++ b/api/tests/opentrons/protocol_engine/resources/test_pipette_data_provider.py @@ -246,6 +246,7 @@ def pipette_dict( "t200": {"minHeight": 0.5, "minVolume": 0}, "t1000": {"minHeight": 0.5, "minVolume": 0}, }, + "plunger_positions": {"top": 100, "bottom": 20, "blow_out": 10, "drop_tip": 0}, }