From 861d2d48b2a81777c30eb3638d90e71b9e6c9e19 Mon Sep 17 00:00:00 2001 From: vegano1 <brayan.almonte@gmail.com> Date: Fri, 20 Dec 2024 12:38:50 -0500 Subject: [PATCH] - add unit tests for existing commands verify the gcode responses and match exact pattern --- api/src/opentrons/drivers/command_builder.py | 2 +- .../drivers/flex_stacker/__init__.py | 4 +- .../drivers/flex_stacker/abstract.py | 31 ++- .../opentrons/drivers/flex_stacker/driver.py | 59 +++-- .../drivers/flex_stacker/simulator.py | 29 ++- .../opentrons/drivers/flex_stacker/types.py | 2 +- .../drivers/flex_stacker/test_driver.py | 230 +++++++++++++++++- 7 files changed, 307 insertions(+), 50 deletions(-) diff --git a/api/src/opentrons/drivers/command_builder.py b/api/src/opentrons/drivers/command_builder.py index 92eb545ee45..ea90a12b946 100644 --- a/api/src/opentrons/drivers/command_builder.py +++ b/api/src/opentrons/drivers/command_builder.py @@ -17,7 +17,7 @@ def __init__(self, terminator: str = "\n") -> None: self._elements: List[str] = [] def add_float( - self, prefix: str, value: float, precision: Optional[int] + self, prefix: str, value: float, precision: Optional[int] = None ) -> CommandBuilder: """ Add a float value. diff --git a/api/src/opentrons/drivers/flex_stacker/__init__.py b/api/src/opentrons/drivers/flex_stacker/__init__.py index 136574ecd6b..cd4866c179a 100644 --- a/api/src/opentrons/drivers/flex_stacker/__init__.py +++ b/api/src/opentrons/drivers/flex_stacker/__init__.py @@ -1,9 +1,9 @@ -from .abstract import StackerDriver +from .abstract import AbstractStackerDriver from .driver import FlexStackerDriver from .simulator import SimulatingDriver __all__ = [ - "StackerDriver", + "AbstractStackerDriver", "FlexStackerDriver", "SimulatingDriver", ] diff --git a/api/src/opentrons/drivers/flex_stacker/abstract.py b/api/src/opentrons/drivers/flex_stacker/abstract.py index ef14acc6347..5ba3cdcb026 100644 --- a/api/src/opentrons/drivers/flex_stacker/abstract.py +++ b/api/src/opentrons/drivers/flex_stacker/abstract.py @@ -1,9 +1,16 @@ from typing import Protocol -from .types import StackerAxis, PlatformStatus, Direction, MoveParams, StackerInfo +from .types import ( + StackerAxis, + PlatformStatus, + Direction, + MoveParams, + StackerInfo, + LEDColor, +) -class StackerDriver(Protocol): +class AbstractStackerDriver(Protocol): """Protocol for the Stacker driver.""" async def connect(self) -> None: @@ -26,12 +33,12 @@ async def get_device_info(self) -> StackerInfo: """Get Device Info.""" ... - async def set_serial_number(self, sn: str) -> None: + async def set_serial_number(self, sn: str) -> bool: """Set Serial Number.""" ... - async def stop_motor(self) -> None: - """Stop motor movement.""" + async def stop_motors(self) -> bool: + """Stop all motor movement.""" ... async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool: @@ -61,12 +68,22 @@ async def get_hopper_door_closed(self) -> bool: async def move_in_mm( self, axis: StackerAxis, distance: float, params: MoveParams | None = None - ) -> None: + ) -> bool: """Move axis.""" ... async def move_to_limit_switch( self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None - ) -> None: + ) -> bool: """Move until limit switch is triggered.""" ... + + async def home_axis(self, axis: StackerAxis, direction: Direction) -> bool: + """Home axis.""" + ... + + async def set_led( + self, power: float, color: LEDColor | None = None, external: bool | None = None + ) -> bool: + """Set LED color of status bar.""" + ... diff --git a/api/src/opentrons/drivers/flex_stacker/driver.py b/api/src/opentrons/drivers/flex_stacker/driver.py index daa5dd7d58c..83671023772 100644 --- a/api/src/opentrons/drivers/flex_stacker/driver.py +++ b/api/src/opentrons/drivers/flex_stacker/driver.py @@ -5,7 +5,7 @@ from opentrons.drivers.command_builder import CommandBuilder from opentrons.drivers.asyncio.communication import AsyncResponseSerialConnection -from .abstract import StackerDriver +from .abstract import AbstractStackerDriver from .types import ( GCODE, StackerAxis, @@ -28,14 +28,15 @@ GCODE_ROUNDING_PRECISION = 2 -class FlexStackerDriver(StackerDriver): +class FlexStackerDriver(AbstractStackerDriver): """FLEX Stacker driver.""" @classmethod def parse_device_info(cls, response: str) -> StackerInfo: """Parse stacker info.""" + # TODO: Validate serial number format once established _RE = re.compile( - f"^{GCODE.DEVICE_INFO} FW:(?P<fw>.+) HW:Opentrons-flex-stacker-(?P<hw>.+) SerialNo:(?P<sn>.+)" + f"^{GCODE.DEVICE_INFO} FW:(?P<fw>\\S+) HW:Opentrons-flex-stacker-(?P<hw>\\S+) SerialNo:(?P<sn>\\S+)$" ) m = _RE.match(response) if not m: @@ -49,7 +50,7 @@ def parse_limit_switch_status(cls, response: str) -> LimitSwitchStatus: """Parse limit switch statuses.""" field_names = LimitSwitchStatus.get_fields() pattern = r"\s".join([rf"{name}:(?P<{name}>\d)" for name in field_names]) - _RE = re.compile(f"^{GCODE.GET_LIMIT_SWITCH} {pattern}") + _RE = re.compile(f"^{GCODE.GET_LIMIT_SWITCH} {pattern}$") m = _RE.match(response) if not m: raise ValueError(f"Incorrect Response for limit switch status: {response}") @@ -60,7 +61,7 @@ def parse_platform_sensor_status(cls, response: str) -> PlatformStatus: """Parse platform statuses.""" field_names = PlatformStatus.get_fields() pattern = r"\s".join([rf"{name}:(?P<{name}>\d)" for name in field_names]) - _RE = re.compile(f"^{GCODE.GET_PLATFORM_SENSOR} {pattern}") + _RE = re.compile(f"^{GCODE.GET_PLATFORM_SENSOR} {pattern}$") m = _RE.match(response) if not m: raise ValueError(f"Incorrect Response for platform status: {response}") @@ -69,7 +70,7 @@ def parse_platform_sensor_status(cls, response: str) -> PlatformStatus: @classmethod def parse_door_closed(cls, response: str) -> bool: """Parse door closed.""" - _RE = re.compile(r"^M122 (\d)") + _RE = re.compile(r"^M122 D:(\d)$") match = _RE.match(response) if not match: raise ValueError(f"Incorrect Response for door closed: {response}") @@ -137,15 +138,22 @@ async def get_device_info(self) -> StackerInfo: await self._connection.send_command(GCODE.GET_RESET_REASON.build_command()) return self.parse_device_info(response) - async def set_serial_number(self, sn: str) -> None: + async def set_serial_number(self, sn: str) -> bool: """Set Serial Number.""" - await self._connection.send_command( + # TODO: validate the serial number format + resp = await self._connection.send_command( GCODE.SET_SERIAL_NUMBER.build_command().add_element(sn) ) + if not re.match(rf"^{GCODE.SET_SERIAL_NUMBER}$", resp): + raise ValueError(f"Incorrect Response for set serial number: {resp}") + return True - async def stop_motor(self) -> None: - """Stop motor movement.""" - await self._connection.send_command(GCODE.STOP_MOTOR.build_command()) + async def stop_motors(self) -> bool: + """Stop all motor movement.""" + resp = await self._connection.send_command(GCODE.STOP_MOTORS.build_command()) + if not re.match(rf"^{GCODE.STOP_MOTORS}$", resp): + raise ValueError(f"Incorrect Response for stop motors: {resp}") + return True async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool: """Get limit switch status. @@ -189,7 +197,7 @@ async def get_hopper_door_closed(self) -> bool: async def move_in_mm( self, axis: StackerAxis, distance: float, params: MoveParams | None = None - ) -> None: + ) -> bool: """Move axis.""" command = self.append_move_params( GCODE.MOVE_TO.build_command().add_float( @@ -197,33 +205,43 @@ async def move_in_mm( ), params, ) - await self._connection.send_command(command) + resp = await self._connection.send_command(command) + if not re.match(rf"^{GCODE.MOVE_TO}$", resp): + raise ValueError(f"Incorrect Response for move to: {resp}") + return True async def move_to_limit_switch( self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None - ) -> None: + ) -> bool: """Move until limit switch is triggered.""" command = self.append_move_params( GCODE.MOVE_TO_SWITCH.build_command().add_int(axis.name, direction.value), params, ) - await self._connection.send_command(command) + resp = await self._connection.send_command(command) + if not re.match(rf"^{GCODE.MOVE_TO_SWITCH}$", resp): + raise ValueError(f"Incorrect Response for move to switch: {resp}") + return True - async def home_axis(self, axis: StackerAxis, direction: Direction) -> None: + async def home_axis(self, axis: StackerAxis, direction: Direction) -> bool: """Home axis.""" - await self._connection.send_command( + resp = await self._connection.send_command( GCODE.HOME_AXIS.build_command().add_int(axis.name, direction.value) ) + if not re.match(rf"^{GCODE.HOME_AXIS}$", resp): + raise ValueError(f"Incorrect Response for home axis: {resp}") + return True async def set_led( self, power: float, color: LEDColor | None = None, external: bool | None = None - ) -> None: + ) -> bool: """Set LED color. :param power: Power of the LED (0-1.0), 0 is off, 1 is full power :param color: Color of the LED :param external: True if external LED, False if internal LED """ + power = max(0, min(power, 1.0)) command = GCODE.SET_LED.build_command().add_float( "P", power, GCODE_ROUNDING_PRECISION ) @@ -231,7 +249,10 @@ async def set_led( command.add_int("C", color.value) if external is not None: command.add_int("E", external) - await self._connection.send_command(command) + resp = await self._connection.send_command(command) + if not re.match(rf"^{GCODE.SET_LED}$", resp): + raise ValueError(f"Incorrect Response for set led: {resp}") + return True async def update_firmware(self, firmware_file_path: str) -> None: """Updates the firmware on the device.""" diff --git a/api/src/opentrons/drivers/flex_stacker/simulator.py b/api/src/opentrons/drivers/flex_stacker/simulator.py index 79441fcfede..1e0b59b19de 100644 --- a/api/src/opentrons/drivers/flex_stacker/simulator.py +++ b/api/src/opentrons/drivers/flex_stacker/simulator.py @@ -2,7 +2,7 @@ from opentrons.util.async_helpers import ensure_yield -from .abstract import StackerDriver +from .abstract import AbstractStackerDriver from .types import ( StackerAxis, PlatformStatus, @@ -14,7 +14,7 @@ ) -class SimulatingDriver(StackerDriver): +class SimulatingDriver(AbstractStackerDriver): """FLEX Stacker driver simulator.""" def __init__(self, serial_number: Optional[str] = None) -> None: @@ -23,14 +23,17 @@ def __init__(self, serial_number: Optional[str] = None) -> None: self._platform_sensor_status = PlatformStatus(False, False) self._door_closed = True - def set_limit_switch(self, status: LimitSwitchStatus) -> None: + def set_limit_switch(self, status: LimitSwitchStatus) -> bool: self._limit_switch_status = status + return True - def set_platform_sensor(self, status: PlatformStatus) -> None: + def set_platform_sensor(self, status: PlatformStatus) -> bool: self._platform_sensor_status = status + return True - def set_door_closed(self, door_closed: bool) -> None: + def set_door_closed(self, door_closed: bool) -> bool: self._door_closed = door_closed + return True @ensure_yield async def connect(self) -> None: @@ -53,14 +56,14 @@ async def get_device_info(self) -> StackerInfo: return StackerInfo(fw="stacker-fw", hw=HardwareRevision.EVT, sn=self._sn) @ensure_yield - async def set_serial_number(self, sn: str) -> None: + async def set_serial_number(self, sn: str) -> bool: """Set Serial Number.""" - pass + return True @ensure_yield - async def stop_motor(self) -> None: + async def stop_motor(self) -> bool: """Stop motor movement.""" - pass + return True @ensure_yield async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool: @@ -94,13 +97,13 @@ async def get_hopper_door_closed(self) -> bool: @ensure_yield async def move_in_mm( self, axis: StackerAxis, distance: float, params: MoveParams | None = None - ) -> None: + ) -> bool: """Move axis.""" - pass + return True @ensure_yield async def move_to_limit_switch( self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None - ) -> None: + ) -> bool: """Move until limit switch is triggered.""" - pass + return True diff --git a/api/src/opentrons/drivers/flex_stacker/types.py b/api/src/opentrons/drivers/flex_stacker/types.py index 34e8f16c706..4035aaaa755 100644 --- a/api/src/opentrons/drivers/flex_stacker/types.py +++ b/api/src/opentrons/drivers/flex_stacker/types.py @@ -10,7 +10,7 @@ class GCODE(str, Enum): MOVE_TO = "G0" MOVE_TO_SWITCH = "G5" HOME_AXIS = "G28" - STOP_MOTOR = "M0" + STOP_MOTORS = "M0" GET_RESET_REASON = "M114" DEVICE_INFO = "M115" GET_LIMIT_SWITCH = "M119" diff --git a/api/tests/opentrons/drivers/flex_stacker/test_driver.py b/api/tests/opentrons/drivers/flex_stacker/test_driver.py index 032dc0c170c..15c647bcfa3 100644 --- a/api/tests/opentrons/drivers/flex_stacker/test_driver.py +++ b/api/tests/opentrons/drivers/flex_stacker/test_driver.py @@ -5,7 +5,6 @@ ) from opentrons.drivers.flex_stacker.driver import FlexStackerDriver from opentrons.drivers.flex_stacker import types -from opentrons.drivers.command_builder import CommandBuilder @pytest.fixture @@ -24,18 +23,235 @@ async def test_get_device_info( ) -> None: """It should send a get device info command""" connection.send_command.return_value = ( - "M115 FW:dummy-fw-version HW:Opentrons-flex-stacker-a1 SerialNo:dummy-serial ok\n" + "M115 FW:0.0.1 HW:a1 SerialNo:STCA120230605001" ) response = await subject.get_device_info() assert response == types.StackerInfo( - fw="dummy-fw-version", + fw="0.0.1", hw=types.HardwareRevision.EVT, - sn="dummy-serial", + sn="STCA120230605001", ) - + device_info = types.GCODE.DEVICE_INFO.build_command() reset_reason = types.GCODE.GET_RESET_REASON.build_command() - connection.send_command.assert_any_call(command=device_info) - connection.send_command.assert_called_with(command=reset_reason) + connection.send_command.assert_any_call(device_info) + connection.send_command.assert_called_with(reset_reason) + connection.reset_mock() + + # Test invalid response + connection.send_command.return_value = "M115 FW:0.0.1 SerialNo:STCA120230605001" + + # This should raise ValueError + with pytest.raises(ValueError): + response = await subject.get_device_info() + + device_info = types.GCODE.DEVICE_INFO.build_command() + reset_reason = types.GCODE.GET_RESET_REASON.build_command() + connection.send_command.assert_any_call(device_info) + connection.send_command.assert_called_with(reset_reason) + + +async def test_stop_motors(subject: FlexStackerDriver, connection: AsyncMock) -> None: + """It should send a stop motors command""" + connection.send_command.return_value = "M0" + response = await subject.stop_motors() + assert response + + stop_motors = types.GCODE.STOP_MOTORS.build_command() + connection.send_command.assert_any_call(stop_motors) + connection.reset_mock() + + # This should raise ValueError + with pytest.raises(ValueError): + await subject.get_device_info() + + +async def test_set_serial_number( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a set serial number command""" + connection.send_command.return_value = "M996" + + serial_number = "Something" + response = await subject.set_serial_number(serial_number) + assert response + + set_serial_number = types.GCODE.SET_SERIAL_NUMBER.build_command().add_element( + serial_number + ) + connection.send_command.assert_any_call(set_serial_number) + connection.reset_mock() + + # Test invalid response + connection.send_command.return_value = "M9nn" + with pytest.raises(ValueError): + response = await subject.set_serial_number(serial_number) + + set_serial_number = types.GCODE.SET_SERIAL_NUMBER.build_command().add_element( + serial_number + ) + connection.send_command.assert_any_call(set_serial_number) + connection.reset_mock() + + +async def test_get_limit_switch( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a get limit switch command and return the boolean of one.""" + connection.send_command.return_value = "M119 XE:1 XR:0 ZE:0 ZR:1 LR:1" + response = await subject.get_limit_switch( + types.StackerAxis.X, types.Direction.EXTENT + ) + assert response + + limit_switch_status = types.GCODE.GET_LIMIT_SWITCH.build_command() + connection.send_command.assert_any_call(limit_switch_status) + connection.reset_mock() + + +async def test_get_limit_switches_status( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a get limit switch status and return LimitSwitchStatus.""" + connection.send_command.return_value = "M119 XE:1 XR:0 ZE:0 ZR:1 LR:1" + response = await subject.get_limit_switches_status() + assert response == types.LimitSwitchStatus( + XE=True, + XR=False, + ZE=False, + ZR=True, + LR=True, + ) + + limit_switch_status = types.GCODE.GET_LIMIT_SWITCH.build_command() + connection.send_command.assert_any_call(limit_switch_status) + connection.reset_mock() + + # Test invalid response + connection.send_command.return_value = "M119 XE:b XR:0 ZE:a ZR:1 LR:n" + with pytest.raises(ValueError): + response = await subject.get_limit_switches_status() + + limit_switch_status = types.GCODE.GET_LIMIT_SWITCH.build_command() + connection.send_command.assert_any_call(limit_switch_status) + + +async def test_get_platform_sensor( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a get platform sensor command return status of specified sensor.""" + connection.send_command.return_value = "M121 E:1 R:1" + response = await subject.get_platform_sensor(types.Direction.EXTENT) + assert response + + platform_sensor = types.GCODE.GET_PLATFORM_SENSOR.build_command() + connection.send_command.assert_any_call(platform_sensor) + connection.reset_mock() + + +async def test_get_platform_status( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """it should send a get platform sensors status.""" + connection.send_command.return_value = "M121 E:0 R:1" + response = await subject.get_platform_status() + assert response == types.PlatformStatus( + E=False, + R=True, + ) + + platform_status = types.GCODE.GET_PLATFORM_SENSOR.build_command() + connection.send_command.assert_any_call(platform_status) + connection.reset_mock() + + # Test invalid response + connection.send_command.return_value = "M121 E:0 R:1 something" + with pytest.raises(ValueError): + response = await subject.get_platform_status() + + platform_status = types.GCODE.GET_PLATFORM_SENSOR.build_command() + connection.send_command.assert_any_call(platform_status) + + +async def test_get_hopper_door_closed( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a get door closed command.""" + connection.send_command.return_value = "M122 D:1" + response = await subject.get_hopper_door_closed() + assert response + + door_closed = types.GCODE.GET_DOOR_SWITCH.build_command() + connection.send_command.assert_any_call(door_closed) + connection.reset_mock() + + # Test door open + connection.send_command.return_value = "M122 D:0" + response = await subject.get_hopper_door_closed() + assert not response + + door_closed = types.GCODE.GET_DOOR_SWITCH.build_command() + connection.send_command.assert_any_call(door_closed) + connection.reset_mock() + + # Test invalid response + connection.send_command.return_value = "M122 78gybhjk" + + with pytest.raises(ValueError): + response = await subject.get_hopper_door_closed() + + door_closed = types.GCODE.GET_DOOR_SWITCH.build_command() + connection.send_command.assert_any_call(door_closed) + connection.reset_mock() + + +async def test_move_in_mm(subject: FlexStackerDriver, connection: AsyncMock) -> None: + """It should send a move to command""" + connection.send_command.return_value = "G0" + response = await subject.move_in_mm(types.StackerAxis.X, 10) + assert response + + move_to = types.GCODE.MOVE_TO.build_command().add_float("X", 10) + connection.send_command.assert_any_call(move_to) + connection.reset_mock() + + +async def test_move_to_switch( + subject: FlexStackerDriver, connection: AsyncMock +) -> None: + """It should send a move to switch command""" + connection.send_command.return_value = "G5" + axis = types.StackerAxis.X + direction = types.Direction.EXTENT + response = await subject.move_to_limit_switch(axis, direction) + assert response + + move_to = types.GCODE.MOVE_TO_SWITCH.build_command().add_int( + axis.name, direction.value + ) + connection.send_command.assert_any_call(move_to) + connection.reset_mock() + + +async def test_home_axis(subject: FlexStackerDriver, connection: AsyncMock) -> None: + """It should send a home axis command""" + connection.send_command.return_value = "G28" + axis = types.StackerAxis.X + direction = types.Direction.EXTENT + response = await subject.home_axis(axis, direction) + assert response + + move_to = types.GCODE.HOME_AXIS.build_command().add_int(axis.name, direction.value) + connection.send_command.assert_any_call(move_to) + connection.reset_mock() + +async def test_set_led(subject: FlexStackerDriver, connection: AsyncMock) -> None: + """It should send a set led command""" + connection.send_command.return_value = "M200" + response = await subject.set_led(1, types.LEDColor.RED) + assert response + set_led = types.GCODE.SET_LED.build_command().add_float("P", 1).add_int("C", 1) + connection.send_command.assert_any_call(set_led) + connection.reset_mock()