Skip to content

Commit

Permalink
fix up formatting, lint and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Laura-Danielle committed Nov 9, 2024
1 parent c01e3c1 commit 8011190
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 86 deletions.
31 changes: 22 additions & 9 deletions api/src/opentrons/protocol_api/core/engine/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from opentrons.protocol_engine.types import DeckPoint, MotorAxis

from opentrons.protocol_api.core.robot import AbstractRobot
from opentrons.protocol_api._types import PlungerPositionTypes, PipetteActionTypes


_AXIS_TYPE_TO_MOTOR_AXIS = {
Expand Down Expand Up @@ -50,20 +49,33 @@ def _convert_to_engine_mount(self, axis_map: AxisMapType) -> Dict[MotorAxis, flo
return {_AXIS_TYPE_TO_MOTOR_AXIS[ax]: dist for ax, dist in axis_map.items()}

def _ul_per_mm_conversion(
self, pipette_settings: SupportedTipsDefinition, ul: float, action: PipetteActionTypes
self,
pipette_settings: SupportedTipsDefinition,
ul: float,
action: PipetteActionTypes,
) -> float:
if action == PipetteActionTypes.ASPIRATE_ACTION:
fallback = pipette_settings.aspirate.default[PIPETTING_FUNCTION_FALLBACK_VERSION]
sequence = pipette_settings.aspirate.default.get(PIPETTING_FUNCTION_LATEST_VERSION, fallback)
fallback = pipette_settings.aspirate.default[
PIPETTING_FUNCTION_FALLBACK_VERSION
]
sequence = pipette_settings.aspirate.default.get(
PIPETTING_FUNCTION_LATEST_VERSION, fallback
)
elif action == PipetteActionTypes.BLOWOUT_ACTION:
# TODO in followup work we should support handling blow out actions for volume.
return 1.0
else:
fallback = pipette_settings.aspirate.default[PIPETTING_FUNCTION_FALLBACK_VERSION]
sequence = pipette_settings.dispense.default.get(PIPETTING_FUNCTION_LATEST_VERSION, fallback)
fallback = pipette_settings.aspirate.default[
PIPETTING_FUNCTION_FALLBACK_VERSION
]
sequence = pipette_settings.dispense.default.get(
PIPETTING_FUNCTION_LATEST_VERSION, fallback
)
return piecewise_volume_conversion(ul, sequence)

def get_pipette_type_from_engine(self, mount: Union[Mount, str]) -> Optional[str]:
def get_pipette_type_from_engine(
self, mount: Union[Mount, str]
) -> Optional[pip_types.PipetteNameType]:
"""Get the pipette attached to the given mount."""
if isinstance(mount, Mount):
engine_mount = MountType[mount.name]
Expand All @@ -81,7 +93,7 @@ def get_plunger_position_from_name(
maybe_pipette_state = self._sync_hardware_api.get_attached_instrument(mount)
if not maybe_pipette_state:
return 0.0
return maybe_pipette_state["plunger_positions"][position_name.value]
return maybe_pipette_state["plunger_positions"][position_name.value] # type: ignore[no-any-return]

def get_plunger_position_from_volume(
self, mount: Mount, volume: float, action: PipetteActionTypes, robot_type: str
Expand All @@ -96,12 +108,13 @@ def get_plunger_position_from_volume(
plunger_bottom = maybe_pipette_state["plunger_positions"]["bottom"]

convert_volume = self._ul_per_mm_conversion(tip_settings, volume, action)

mm = volume / convert_volume
if robot_type == "OT-2 Standard":
position = plunger_bottom + mm
else:
position = plunger_bottom - mm
return round(position, 6)
return round(position, 6) # type: ignore[no-any-return]

def move_to(self, mount: Mount, destination: Point, speed: Optional[float]) -> None:
engine_mount = MountType[mount.name]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,6 @@ def load_robot(self) -> None: # type: ignore
"""Load an adapter using its identifying parameters"""
raise APIVersionError(api_element="Loading robot")

def load_robot(self) -> None: # type: ignore
"""Load an adapter using its identifying parameters"""
raise APIVersionError(api_element="Loading robot")

def move_labware(
self,
labware_core: LegacyLabwareCore,
Expand Down
2 changes: 1 addition & 1 deletion api/src/opentrons/protocol_api/core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

from abc import abstractmethod, ABC
from typing import Generic, Type, List, Optional, Union, Tuple, Dict, TYPE_CHECKING
from typing import Generic, List, Optional, Union, Tuple, Dict, TYPE_CHECKING

from opentrons_shared_data.deck.types import DeckDefinitionV5, SlotDefV3
from opentrons_shared_data.pipette.types import PipetteNameType
Expand Down
5 changes: 4 additions & 1 deletion api/src/opentrons/protocol_api/core/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
from typing import Optional, Union

from opentrons.types import AxisMapType, Mount, Point
from opentrons_shared_data.pipette.types import PipetteNameType
from opentrons.protocol_api._types import PlungerPositionTypes, PipetteActionTypes


class AbstractRobot(ABC):
@abstractmethod
def get_pipette_type_from_engine(self, mount: Union[Mount, str]) -> Optional[str]:
def get_pipette_type_from_engine(
self, mount: Union[Mount, str]
) -> Optional[PipetteNameType]:
...

@abstractmethod
Expand Down
9 changes: 8 additions & 1 deletion api/src/opentrons/protocol_api/robot_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ def plunger_coordinates_for_volume(
"""
pipette_name = self._core.get_pipette_type_from_engine(mount)
if not pipette_name:
raise ValueError(
f"Expected a pipette to be attached to provided mount {mount}"
)
mount = validation.ensure_mount_for_pipette(mount, pipette_name)
pipette_axis = AxisType.plunger_axis_for_mount(mount)

Expand All @@ -224,7 +228,10 @@ def plunger_coordinates_for_named_position(
"""
pipette_name = self._core.get_pipette_type_from_engine(mount)

if not pipette_name:
raise ValueError(
f"Expected a pipette to be attached to provided mount {mount}"
)
mount = validation.ensure_mount_for_pipette(mount, pipette_name)
pipette_axis = AxisType.plunger_axis_for_mount(mount)
pipette_position = self._core.get_plunger_position_from_name(
Expand Down
50 changes: 0 additions & 50 deletions api/src/opentrons/protocol_engine/execution/gantry_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,56 +573,6 @@ def get_max_travel_z(self, pipette_id: str) -> float:
tip_length = tip.length if tip is not None else 0
return instrument_height - tip_length

def get_max_travel_z_from_mount(self, mount: MountType) -> float:
"""Get the maximum allowed z-height for mount."""
pipette = self._state_view.pipettes.get_by_mount(mount)
if self._state_view.config.robot_type == "OT-2 Standard":
instrument_height = (
self._state_view.pipettes.get_instrument_max_height_ot2(pipette.id)
if pipette
else VIRTUAL_MAX_OT3_HEIGHT
)
else:
instrument_height = VIRTUAL_MAX_OT3_HEIGHT
tip_length = (
self._state_view.tips.get_tip_length(pipette.id) if pipette else 0.0
)

return instrument_height - tip_length

async def move_axes(
self,
axis_map: Dict[MotorAxis, float],
critical_point: Optional[Dict[MotorAxis, float]] = None,
speed: Optional[float] = None,
) -> Dict[MotorAxis, float]:
"""Move the give axes map. No-op in virtual implementation."""
mount = self.pick_mount_from_axis_map(axis_map)
current_position = await self.get_position_from_mount(mount)
axis_map[MotorAxis.X] = axis_map.get(MotorAxis.X, 0.0) + current_position[0]
axis_map[MotorAxis.Y] = axis_map.get(MotorAxis.Y, 0.0) + current_position[1]
if mount == Mount.RIGHT:
axis_map[MotorAxis.RIGHT_Z] = (
axis_map.get(MotorAxis.RIGHT_Z, 0.0) + current_position[2]
)
elif mount == Mount.EXTENSION:
axis_map[MotorAxis.EXTENSION_Z] = (
axis_map.get(MotorAxis.EXTENSION_Z, 0.0) + current_position[2]
)
else:
axis_map[MotorAxis.LEFT_Z] = (
axis_map.get(MotorAxis.LEFT_Z, 0.0) + current_position[2]
)
critical_point = critical_point or {}
return {ax: pos - critical_point.get(ax, 0.0) for ax, pos in axis_map.items()}

async def move_mount_to(
self, mount: Mount, waypoints: List[Waypoint], speed: Optional[float]
) -> Point:
"""Move the hardware mount to a waypoint. No-op in virtual implementation."""
assert len(waypoints) > 0, "Must have at least one waypoint"
return waypoints[-1].position

def get_max_travel_z_from_mount(self, mount: MountType) -> float:
"""Get the maximum allowed z-height for mount."""
pipette = self._state_view.pipettes.get_by_mount(mount)
Expand Down
74 changes: 54 additions & 20 deletions api/tests/opentrons/protocol_api/test_robot_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from opentrons.protocol_api.core.common import ProtocolCore, RobotCore
from opentrons.protocol_api import RobotContext, ModuleContext
from opentrons.protocol_api.deck import Deck
from opentrons_shared_data.pipette.types import PipetteNameType

from opentrons.protocol_api._types import PipetteActionTypes, PlungerPositionTypes

Expand Down Expand Up @@ -60,7 +61,12 @@ def subject(
api_version: APIVersion,
) -> RobotContext:
"""Get a RobotContext test subject with its dependencies mocked out."""
decoy.when(mock_core.get_pipette_type_from_engine(Mount.LEFT)).then_return(None)
decoy.when(mock_core.get_pipette_type_from_engine(Mount.LEFT)).then_return(
PipetteNameType.P1000_SINGLE_FLEX
)
decoy.when(mock_core.get_pipette_type_from_engine(Mount.RIGHT)).then_return(
PipetteNameType.P1000_SINGLE_FLEX
)
return RobotContext(
core=mock_core, api_version=api_version, protocol_core=mock_protocol
)
Expand Down Expand Up @@ -179,36 +185,36 @@ def test_get_axes_coordinates_for(
res = subject.axis_coordinates_for(mount, location_to_move)
assert res == expected_axis_map


@pytest.mark.parametrize(
argnames=["mount", "volume", "action", "expected_axis_map"],
argvalues=[
(
"left",
100,
PipetteActionTypes.ASPIRATE_ACTION,
{AxisType.P_L: 100}
),
(Mount.RIGHT, 200, PipetteActionTypes.ASPIRATE_ACTION, {AxisType.P_R: 100}),
(
Mount.LEFT,
100,
PipetteActionTypes.DISPENSE_ACTION,
{AxisType.P_L: 100}
),
(Mount.LEFT, 100, PipetteActionTypes.DISPENSE_ACTION, {AxisType.P_L: 100}),
],
)
def test_plunger_coordinates_for_volume(subject: RobotContext, mount: Union[Mount, str], volume: float, action: PipetteActionTypes, expected_axis_map: AxisMapType) -> None:
def test_plunger_coordinates_for_volume(
decoy: Decoy,
subject: RobotContext,
mount: Mount,
volume: float,
action: PipetteActionTypes,
expected_axis_map: AxisMapType,
) -> None:
"""Test `RobotContext.plunger_coordinates_for_volume`."""
decoy.when(
subject._core.get_plunger_position_from_volume(
mount, volume, action, "OT-3 Standard"
)
).then_return(100)

result = subject.plunger_coordinates_for_volume(mount, volume, action)
assert result == expected_axis_map


@pytest.mark.parametrize(
argnames=["mount", "position_name", "expected_axis_map"],
argvalues=[
(
"left",
PlungerPositionTypes.PLUNGER_TOP,
{AxisType.P_L: 3},
),
(Mount.RIGHT, PlungerPositionTypes.PLUNGER_TOP, {AxisType.P_R: 3}),
(
Mount.RIGHT,
Expand All @@ -217,6 +223,34 @@ def test_plunger_coordinates_for_volume(subject: RobotContext, mount: Union[Moun
),
],
)
def test_plunger_coordinates_for_named_position(subject: RobotContext, mount: Union[Mount, str], position_name: PlungerPositionTypes, expected_axis_map: AxisMapType) -> None:
def test_plunger_coordinates_for_named_position(
decoy: Decoy,
subject: RobotContext,
mount: Mount,
position_name: PlungerPositionTypes,
expected_axis_map: AxisMapType,
) -> None:
"""Test `RobotContext.plunger_coordinates_for_named_position`."""
decoy.when(
subject._core.get_plunger_position_from_name(mount, position_name)
).then_return(3)
result = subject.plunger_coordinates_for_named_position(mount, position_name)
assert result == expected_axis_map


def test_plunger_methods_raise_without_pipette(
mock_core: RobotCore, mock_protocol: ProtocolCore, api_version: APIVersion
) -> None:
"""Test that `RobotContext` plunger functions raise without pipette attached."""
subject = RobotContext(
core=mock_core, api_version=api_version, protocol_core=mock_protocol
)
with pytest.raises(ValueError):
subject.plunger_coordinates_for_named_position(
Mount.LEFT, PlungerPositionTypes.PLUNGER_TOP
)

with pytest.raises(ValueError):
subject.plunger_coordinates_for_volume(
Mount.LEFT, 200, PipetteActionTypes.ASPIRATE_ACTION
)
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def pipette_dict(
"t200": {"minHeight": 0.5, "minVolume": 0},
"t1000": {"minHeight": 0.5, "minVolume": 0},
},
"plunger_positions": {"top": 100, "bottom": 20, "blow_out": 10, "drop_tip": 0},
}


Expand Down

0 comments on commit 8011190

Please sign in to comment.