Skip to content

Commit

Permalink
- add unit tests for existing commands
Browse files Browse the repository at this point in the history
verify the gcode responses and match exact pattern
  • Loading branch information
vegano1 committed Dec 20, 2024
1 parent 132ab0d commit 861d2d4
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 50 deletions.
2 changes: 1 addition & 1 deletion api/src/opentrons/drivers/command_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self, terminator: str = "\n") -> None:
self._elements: List[str] = []

def add_float(
self, prefix: str, value: float, precision: Optional[int]
self, prefix: str, value: float, precision: Optional[int] = None
) -> CommandBuilder:
"""
Add a float value.
Expand Down
4 changes: 2 additions & 2 deletions api/src/opentrons/drivers/flex_stacker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from .abstract import StackerDriver
from .abstract import AbstractStackerDriver
from .driver import FlexStackerDriver
from .simulator import SimulatingDriver

__all__ = [
"StackerDriver",
"AbstractStackerDriver",
"FlexStackerDriver",
"SimulatingDriver",
]
31 changes: 24 additions & 7 deletions api/src/opentrons/drivers/flex_stacker/abstract.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from typing import Protocol

from .types import StackerAxis, PlatformStatus, Direction, MoveParams, StackerInfo
from .types import (
StackerAxis,
PlatformStatus,
Direction,
MoveParams,
StackerInfo,
LEDColor,
)


class StackerDriver(Protocol):
class AbstractStackerDriver(Protocol):
"""Protocol for the Stacker driver."""

async def connect(self) -> None:
Expand All @@ -26,12 +33,12 @@ async def get_device_info(self) -> StackerInfo:
"""Get Device Info."""
...

async def set_serial_number(self, sn: str) -> None:
async def set_serial_number(self, sn: str) -> bool:
"""Set Serial Number."""
...

async def stop_motor(self) -> None:
"""Stop motor movement."""
async def stop_motors(self) -> bool:
"""Stop all motor movement."""
...

async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool:
Expand Down Expand Up @@ -61,12 +68,22 @@ async def get_hopper_door_closed(self) -> bool:

async def move_in_mm(
self, axis: StackerAxis, distance: float, params: MoveParams | None = None
) -> None:
) -> bool:
"""Move axis."""
...

async def move_to_limit_switch(
self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None
) -> None:
) -> bool:
"""Move until limit switch is triggered."""
...

async def home_axis(self, axis: StackerAxis, direction: Direction) -> bool:
"""Home axis."""
...

async def set_led(
self, power: float, color: LEDColor | None = None, external: bool | None = None
) -> bool:
"""Set LED color of status bar."""
...
59 changes: 40 additions & 19 deletions api/src/opentrons/drivers/flex_stacker/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from opentrons.drivers.command_builder import CommandBuilder
from opentrons.drivers.asyncio.communication import AsyncResponseSerialConnection

from .abstract import StackerDriver
from .abstract import AbstractStackerDriver
from .types import (
GCODE,
StackerAxis,
Expand All @@ -28,14 +28,15 @@
GCODE_ROUNDING_PRECISION = 2


class FlexStackerDriver(StackerDriver):
class FlexStackerDriver(AbstractStackerDriver):
"""FLEX Stacker driver."""

@classmethod
def parse_device_info(cls, response: str) -> StackerInfo:
"""Parse stacker info."""
# TODO: Validate serial number format once established
_RE = re.compile(
f"^{GCODE.DEVICE_INFO} FW:(?P<fw>.+) HW:Opentrons-flex-stacker-(?P<hw>.+) SerialNo:(?P<sn>.+)"
f"^{GCODE.DEVICE_INFO} FW:(?P<fw>\\S+) HW:Opentrons-flex-stacker-(?P<hw>\\S+) SerialNo:(?P<sn>\\S+)$"
)
m = _RE.match(response)
if not m:
Expand All @@ -49,7 +50,7 @@ def parse_limit_switch_status(cls, response: str) -> LimitSwitchStatus:
"""Parse limit switch statuses."""
field_names = LimitSwitchStatus.get_fields()
pattern = r"\s".join([rf"{name}:(?P<{name}>\d)" for name in field_names])
_RE = re.compile(f"^{GCODE.GET_LIMIT_SWITCH} {pattern}")
_RE = re.compile(f"^{GCODE.GET_LIMIT_SWITCH} {pattern}$")
m = _RE.match(response)
if not m:
raise ValueError(f"Incorrect Response for limit switch status: {response}")
Expand All @@ -60,7 +61,7 @@ def parse_platform_sensor_status(cls, response: str) -> PlatformStatus:
"""Parse platform statuses."""
field_names = PlatformStatus.get_fields()
pattern = r"\s".join([rf"{name}:(?P<{name}>\d)" for name in field_names])
_RE = re.compile(f"^{GCODE.GET_PLATFORM_SENSOR} {pattern}")
_RE = re.compile(f"^{GCODE.GET_PLATFORM_SENSOR} {pattern}$")
m = _RE.match(response)
if not m:
raise ValueError(f"Incorrect Response for platform status: {response}")
Expand All @@ -69,7 +70,7 @@ def parse_platform_sensor_status(cls, response: str) -> PlatformStatus:
@classmethod
def parse_door_closed(cls, response: str) -> bool:
"""Parse door closed."""
_RE = re.compile(r"^M122 (\d)")
_RE = re.compile(r"^M122 D:(\d)$")
match = _RE.match(response)
if not match:
raise ValueError(f"Incorrect Response for door closed: {response}")
Expand Down Expand Up @@ -137,15 +138,22 @@ async def get_device_info(self) -> StackerInfo:
await self._connection.send_command(GCODE.GET_RESET_REASON.build_command())
return self.parse_device_info(response)

async def set_serial_number(self, sn: str) -> None:
async def set_serial_number(self, sn: str) -> bool:
"""Set Serial Number."""
await self._connection.send_command(
# TODO: validate the serial number format
resp = await self._connection.send_command(
GCODE.SET_SERIAL_NUMBER.build_command().add_element(sn)
)
if not re.match(rf"^{GCODE.SET_SERIAL_NUMBER}$", resp):
raise ValueError(f"Incorrect Response for set serial number: {resp}")
return True

async def stop_motor(self) -> None:
"""Stop motor movement."""
await self._connection.send_command(GCODE.STOP_MOTOR.build_command())
async def stop_motors(self) -> bool:
"""Stop all motor movement."""
resp = await self._connection.send_command(GCODE.STOP_MOTORS.build_command())
if not re.match(rf"^{GCODE.STOP_MOTORS}$", resp):
raise ValueError(f"Incorrect Response for stop motors: {resp}")
return True

async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool:
"""Get limit switch status.
Expand Down Expand Up @@ -189,49 +197,62 @@ async def get_hopper_door_closed(self) -> bool:

async def move_in_mm(
self, axis: StackerAxis, distance: float, params: MoveParams | None = None
) -> None:
) -> bool:
"""Move axis."""
command = self.append_move_params(
GCODE.MOVE_TO.build_command().add_float(
axis.name, distance, GCODE_ROUNDING_PRECISION
),
params,
)
await self._connection.send_command(command)
resp = await self._connection.send_command(command)
if not re.match(rf"^{GCODE.MOVE_TO}$", resp):
raise ValueError(f"Incorrect Response for move to: {resp}")
return True

async def move_to_limit_switch(
self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None
) -> None:
) -> bool:
"""Move until limit switch is triggered."""
command = self.append_move_params(
GCODE.MOVE_TO_SWITCH.build_command().add_int(axis.name, direction.value),
params,
)
await self._connection.send_command(command)
resp = await self._connection.send_command(command)
if not re.match(rf"^{GCODE.MOVE_TO_SWITCH}$", resp):
raise ValueError(f"Incorrect Response for move to switch: {resp}")
return True

async def home_axis(self, axis: StackerAxis, direction: Direction) -> None:
async def home_axis(self, axis: StackerAxis, direction: Direction) -> bool:
"""Home axis."""
await self._connection.send_command(
resp = await self._connection.send_command(
GCODE.HOME_AXIS.build_command().add_int(axis.name, direction.value)
)
if not re.match(rf"^{GCODE.HOME_AXIS}$", resp):
raise ValueError(f"Incorrect Response for home axis: {resp}")
return True

async def set_led(
self, power: float, color: LEDColor | None = None, external: bool | None = None
) -> None:
) -> bool:
"""Set LED color.
:param power: Power of the LED (0-1.0), 0 is off, 1 is full power
:param color: Color of the LED
:param external: True if external LED, False if internal LED
"""
power = max(0, min(power, 1.0))
command = GCODE.SET_LED.build_command().add_float(
"P", power, GCODE_ROUNDING_PRECISION
)
if color is not None:
command.add_int("C", color.value)
if external is not None:
command.add_int("E", external)
await self._connection.send_command(command)
resp = await self._connection.send_command(command)
if not re.match(rf"^{GCODE.SET_LED}$", resp):
raise ValueError(f"Incorrect Response for set led: {resp}")
return True

async def update_firmware(self, firmware_file_path: str) -> None:
"""Updates the firmware on the device."""
Expand Down
29 changes: 16 additions & 13 deletions api/src/opentrons/drivers/flex_stacker/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from opentrons.util.async_helpers import ensure_yield

from .abstract import StackerDriver
from .abstract import AbstractStackerDriver
from .types import (
StackerAxis,
PlatformStatus,
Expand All @@ -14,7 +14,7 @@
)


class SimulatingDriver(StackerDriver):
class SimulatingDriver(AbstractStackerDriver):
"""FLEX Stacker driver simulator."""

def __init__(self, serial_number: Optional[str] = None) -> None:
Expand All @@ -23,14 +23,17 @@ def __init__(self, serial_number: Optional[str] = None) -> None:
self._platform_sensor_status = PlatformStatus(False, False)
self._door_closed = True

def set_limit_switch(self, status: LimitSwitchStatus) -> None:
def set_limit_switch(self, status: LimitSwitchStatus) -> bool:
self._limit_switch_status = status
return True

def set_platform_sensor(self, status: PlatformStatus) -> None:
def set_platform_sensor(self, status: PlatformStatus) -> bool:
self._platform_sensor_status = status
return True

def set_door_closed(self, door_closed: bool) -> None:
def set_door_closed(self, door_closed: bool) -> bool:
self._door_closed = door_closed
return True

@ensure_yield
async def connect(self) -> None:
Expand All @@ -53,14 +56,14 @@ async def get_device_info(self) -> StackerInfo:
return StackerInfo(fw="stacker-fw", hw=HardwareRevision.EVT, sn=self._sn)

@ensure_yield
async def set_serial_number(self, sn: str) -> None:
async def set_serial_number(self, sn: str) -> bool:
"""Set Serial Number."""
pass
return True

@ensure_yield
async def stop_motor(self) -> None:
async def stop_motor(self) -> bool:
"""Stop motor movement."""
pass
return True

@ensure_yield
async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool:
Expand Down Expand Up @@ -94,13 +97,13 @@ async def get_hopper_door_closed(self) -> bool:
@ensure_yield
async def move_in_mm(
self, axis: StackerAxis, distance: float, params: MoveParams | None = None
) -> None:
) -> bool:
"""Move axis."""
pass
return True

@ensure_yield
async def move_to_limit_switch(
self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None
) -> None:
) -> bool:
"""Move until limit switch is triggered."""
pass
return True
2 changes: 1 addition & 1 deletion api/src/opentrons/drivers/flex_stacker/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class GCODE(str, Enum):
MOVE_TO = "G0"
MOVE_TO_SWITCH = "G5"
HOME_AXIS = "G28"
STOP_MOTOR = "M0"
STOP_MOTORS = "M0"
GET_RESET_REASON = "M114"
DEVICE_INFO = "M115"
GET_LIMIT_SWITCH = "M119"
Expand Down
Loading

0 comments on commit 861d2d4

Please sign in to comment.