diff --git a/README.md b/README.md index 6558f20..63a6871 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ The following attributes are available for the `oak-d` camera component: | `height_px` | int | Optional | Height in pixels of the images output by this camera. Default: `720` | | `frame_rate` | int | Optional | The frame rate the camera will capture images at. Default: `30` | | `device_info` | string | Optional | Physical device identifier to connect to a specific OAK camera connected to your machine. If not specified, the module will pick the first device it detects. `device_info` can be a MXID, usb port path, or IP address. [See DepthAI documentation for more details](https://docs.luxonis.com/software/depthai/examples/device_information#Device%20information). | +| `manual_focus` | int | Optional | The manual focus value to apply to the color sensor. Sets the camera to fixed focus mode at the specified lens position. Must be between 0..255 inclusive. Default: auto focus | > [!NOTE] > Higher resolutions may cause out of memory errors. See Luxonis documentation [here](https://docs.luxonis.com/projects/api/en/latest/tutorials/ram_usage/.). diff --git a/src/components/helpers/shared.py b/src/components/helpers/shared.py index 04c05e3..285ac2b 100644 --- a/src/components/helpers/shared.py +++ b/src/components/helpers/shared.py @@ -1,4 +1,3 @@ -from typing import Dict, List, Literal, Optional, Tuple from numpy.typing import NDArray from depthai import CameraBoardSocket @@ -27,83 +26,6 @@ def get_socket_from_str(s: str) -> CameraBoardSocket: raise Exception(f"Camera socket '{s}' is not recognized or supported.") -class Sensor: - """ - Sensor config. Corresponds to a socket and what camera should be configured - off of the specified socket. - """ - - def get_unique_name(self) -> str: - if self.sensor_type == "color": - return f"{self.socket_str}_rgb" - else: - return f"{self.socket_str}_mono" - - def __init__( - self, - socket_str: Literal["cam_a", "cam_b", "cam_c"], - sensor_type: Literal["color", "depth"], - width: int, - height: int, - frame_rate: int, - color_order: Literal["rgb", "bgr"] = "rgb", - interleaved: bool = False, - ): - self.socket_str = socket_str - self.socket = get_socket_from_str(socket_str) - self.sensor_type = sensor_type - self.width = width - self.height = height - self.frame_rate = frame_rate - self.color_order = color_order - self.interleaved = interleaved - - -class Sensors: - """ - Sensors wraps a Sensor list and offers handy utility methods and fields. - """ - - _mapping: Dict[str, Sensor] - stereo_pair: Optional[Tuple[Sensor, Sensor]] - color_sensors: Optional[List[Sensor]] - primary_sensor: Sensor - - def __init__(self, sensors: List[Sensor]): - self._mapping = dict() - for sensor in sensors: - self._mapping[sensor.socket_str] = sensor - - self.color_sensors = self._find_color_sensors() - self.stereo_pair = self._find_stereo_pair() - self.primary_sensor = sensors[0] - - def get_cam_a(self) -> Sensor: - return self._mapping["cam_a"] - - def get_cam_b(self) -> Sensor: - return self._mapping["cam_b"] - - def get_cam_c(self) -> Sensor: - return self._mapping["cam_c"] - - def _find_color_sensors(self) -> List[Sensor]: - l = [] - for sensor in self._mapping.values(): - if sensor.sensor_type == "color": - l.append(sensor) - return l - - def _find_stereo_pair(self) -> Optional[Tuple[Sensor]]: - pair = [] - for sensor in self._mapping.values(): - if sensor.sensor_type == "depth": - pair.append(sensor) - if len(pair) == 0: - return None - return tuple(pair) - - class CapturedData: """ CapturedData is image data with the data as an np array, diff --git a/src/components/worker/worker.py b/src/components/worker/worker.py index 8c0c6f0..cf04afe 100644 --- a/src/components/worker/worker.py +++ b/src/components/worker/worker.py @@ -23,8 +23,8 @@ from numpy.typing import NDArray import numpy as np -from src.components.helpers.shared import CapturedData, Sensor -from src.config import OakConfig, YDNConfig +from src.components.helpers.shared import CapturedData +from src.config import OakConfig, YDNConfig, Sensor DIMENSIONS_TO_MONO_RES = { (1280, 800): dai.MonoCameraProperties.SensorResolution.THE_800_P, @@ -189,6 +189,8 @@ def configure_color() -> Optional[List[dai.node.ColorCamera]]: color_cam.setColorOrder(dai.ColorCameraProperties.ColorOrder.BGR) else: color_cam.setColorOrder(dai.ColorCameraProperties.ColorOrder.RGB) + if sensor.manual_focus is not None: + color_cam.initialControl.setManualFocus(sensor.manual_focus) # Linking color_cam.preview.link(xout_color.input) diff --git a/src/config.py b/src/config.py index 8ff2acc..4669485 100644 --- a/src/config.py +++ b/src/config.py @@ -1,11 +1,13 @@ import os -from typing import List, Literal, Mapping, Optional +from typing import Dict, List, Literal, Mapping, Optional, Tuple +from depthai import CameraBoardSocket from google.protobuf.struct_pb2 import Value +from numpy.typing import NDArray from viam.errors import ValidationError from viam.logging import getLogger -from src.components.helpers.shared import Sensor, Sensors +from src.components.helpers.shared import get_socket_from_str # Be sure to update README.md if default attributes are changed @@ -17,6 +19,85 @@ LOGGER = getLogger("viam-luxonis-configuration") +class Sensor: + """ + Sensor config. Corresponds to a socket and what camera should be configured + off of the specified socket. + """ + + def get_unique_name(self) -> str: + if self.sensor_type == "color": + return f"{self.socket_str}_rgb" + else: + return f"{self.socket_str}_mono" + + def __init__( + self, + socket_str: Literal["cam_a", "cam_b", "cam_c"], + sensor_type: Literal["color", "depth"], + width: int, + height: int, + frame_rate: int, + color_order: Literal["rgb", "bgr"] = "rgb", + interleaved: bool = False, + manual_focus: Optional[int] = None, + ): + self.socket_str = socket_str + self.socket = get_socket_from_str(socket_str) + self.sensor_type = sensor_type + self.width = width + self.height = height + self.frame_rate = frame_rate + self.color_order = color_order + self.interleaved = interleaved + self.manual_focus = manual_focus + + +class Sensors: + """ + Sensors wraps a Sensor list and offers handy utility methods and fields. + """ + + _mapping: Dict[str, Sensor] + stereo_pair: Optional[Tuple[Sensor, Sensor]] + color_sensors: Optional[List[Sensor]] + primary_sensor: Sensor + + def __init__(self, sensors: List[Sensor]): + self._mapping = dict() + for sensor in sensors: + self._mapping[sensor.socket_str] = sensor + + self.color_sensors = self._find_color_sensors() + self.stereo_pair = self._find_stereo_pair() + self.primary_sensor = sensors[0] + + def get_cam_a(self) -> Sensor: + return self._mapping["cam_a"] + + def get_cam_b(self) -> Sensor: + return self._mapping["cam_b"] + + def get_cam_c(self) -> Sensor: + return self._mapping["cam_c"] + + def _find_color_sensors(self) -> List[Sensor]: + l = [] + for sensor in self._mapping.values(): + if sensor.sensor_type == "color": + l.append(sensor) + return l + + def _find_stereo_pair(self) -> Optional[Tuple[Sensor]]: + pair = [] + for sensor in self._mapping.values(): + if sensor.sensor_type == "depth": + pair.append(sensor) + if len(pair) == 0: + return None + return tuple(pair) + + def handle_err(err_msg: str) -> None: """ handle_error is invoked when there is an error in validation. @@ -142,18 +223,29 @@ def initialize_config(self): height = int(self.attribute_map["height_px"].number_value) or DEFAULT_HEIGHT width = int(self.attribute_map["width_px"].number_value) or DEFAULT_WIDTH frame_rate = self.attribute_map["frame_rate"].number_value or DEFAULT_FRAME_RATE + manual_focus = int(self.attribute_map["manual_focus"].number_value) or None sensor_list = [] for sensor_str in sensors_str_list: if sensor_str == "depth": for cam_socket in ["cam_b", "cam_c"]: depth_sensor = Sensor( - cam_socket, "depth", width, height, frame_rate + socket_str=cam_socket, + sensor_type="depth", + width=width, + height=height, + frame_rate=frame_rate, ) sensor_list.append(depth_sensor) elif sensor_str == "color": color_sensor = Sensor( - "cam_a", "color", width, height, frame_rate, "rgb" + socket_str="cam_a", + sensor_type="color", + width=width, + height=height, + frame_rate=frame_rate, + color_order="rgb", + manual_focus=manual_focus, ) sensor_list.append(color_sensor) self.sensors = Sensors(sensor_list) @@ -168,6 +260,7 @@ def validate(cls, attribute_map: Mapping[str, Value]) -> List[str]: "sensors", "frame_rate", "device_info", + "manual_focus", ] # Check config keys are valid for attribute in attribute_map.keys(): @@ -228,6 +321,19 @@ def validate(cls, attribute_map: Mapping[str, Value]) -> List[str]: 'received only one dimension attribute. Please supply both "height_px" and "width_px", or neither.' ) + # Validate manual focus + validate_attr_type("manual_focus", "number_value", attribute_map) + manual_focus = attribute_map.get(key="manual_focus", default=None) + if manual_focus: + if "color" not in sensor_list: + handle_err('"manual_focus" can be set only for the color sensor') + + focus_value = manual_focus.number_value + if focus_value < 0 or focus_value > 255: + handle_err('"manual_focus" must be a value in range 0...255 inclusive') + if int(focus_value) != focus_value: + handle_err('"manual_focus" must be an integer') + return [] # no deps diff --git a/tests/test_oak_d.py b/tests/test_oak_d.py index d65a098..eb0a532 100644 --- a/tests/test_oak_d.py +++ b/tests/test_oak_d.py @@ -152,6 +152,30 @@ "attribute must be a string_value" ) +manual_focus_set_for_non_color = ( + make_component_config({ + "sensors": ["depth"], + "manual_focus": 255 + }, "viam:luxonis:oak-d"), + '"manual_focus" can be set only for the color sensor' +) + +manual_focus_out_of_range = ( + make_component_config({ + "sensors": ["color", "depth"], + "manual_focus": 256 + }, "viam:luxonis:oak-d"), + '"manual_focus" must be a value in range 0...255 inclusive' +) + +manual_focus_not_integer = ( + make_component_config({ + "sensors": ["color"], + "manual_focus": 1.5 + }, "viam:luxonis:oak-d"), + '"manual_focus" must be an integer' +) + configs_and_msgs = [ invalid_attribute_name, sensors_not_present, @@ -170,7 +194,11 @@ width_is_zero, width_is_negative, only_received_height, - only_received_width + only_received_width, + wrong_device_info_type, + manual_focus_set_for_non_color, + manual_focus_out_of_range, + manual_focus_not_integer ] full_correct_config = make_component_config({