Skip to content

Commit

Permalink
[RSDK-9360] Can't swap device_info between OAK cameras (#59)
Browse files Browse the repository at this point in the history
* Fix oak-ffc-3p device info

* Make device_info type hint Optional; Add class-scoped reconfig manager; Add ydn lock (should've added this before); Add orchestrated reconfiguration logic; Make device info passed only when specified; Log device info found

* Make lint

* Fix comment and a little bit of code golf

* Add timeout const

* Add guard for if worker is None
  • Loading branch information
hexbabe authored Dec 18, 2024
1 parent 61e3a47 commit d451086
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 39 deletions.
200 changes: 171 additions & 29 deletions src/components/oak.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Standard library
import asyncio
from logging import Logger
from threading import Lock, Thread
from typing import (
Any,
ClassVar,
Expand All @@ -13,7 +15,7 @@
)
from typing_extensions import Self

# Viam module
# Viam SDK
from viam.errors import NotSupportedError, ViamError
from viam.logging import getLogger
from viam.module.types import Reconfigurable
Expand All @@ -22,16 +24,14 @@
from viam.resource.base import ResourceBase
from viam.resource.types import Model, ModelFamily
from viam.utils import ValueTypes

# Viam camera
from viam.components.camera import (
Camera,
DistortionParameters,
IntrinsicParameters,
)
from viam.media.video import CameraMimeType, NamedImage, ViamImage

# src
# Local
from src.components.worker.worker import Worker
from src.components.helpers.shared import CapturedData
from src.components.helpers.encoders import (
Expand All @@ -54,6 +54,7 @@
)

DEFAULT_IMAGE_MIMETYPE = CameraMimeType.JPEG
RECONFIGURE_TIMEOUT_SECONDS = 0.1


class Oak(Camera, Reconfigurable):
Expand Down Expand Up @@ -95,6 +96,14 @@ class Properties(NamedTuple):
ALL_MODELS: ClassVar[Tuple[Model]] = DEPRECATED_MODELS + SUPPORTED_MODELS
logger: ClassVar[Logger]
"""Class scoped logger"""
init_reconfig_manager_lock: ClassVar[Lock] = Lock()
"""Lock for thread-safe access to initializing reconfig_manager"""
# The 'OakInstanceManager' is put in quotes because it is a forward reference.
# This is necessary bc OakInstanceManager is not yet defined at the point of its usage.
# Using quotes allows Python to understand that this is a type hint for a class
# that will be defined later in the code.
reconfig_manager: ClassVar[Optional["OakInstanceManager"]] = None # type: ignore
"""Handler for reconfiguring the camera"""

logger: Logger
"""Instance scoped logger"""
Expand All @@ -104,23 +113,41 @@ class Properties(NamedTuple):
"""Native config"""
ydn_configs: Mapping[str, YDNConfig]
"""Configs populated by ydn service"""
ydn_configs_lock: Lock
"""Lock for thread-safe access to ydn_configs"""
worker: Optional[Worker] = None
"""`Worker` handles camera logic in a separate thread"""
worker_manager: Optional[WorkerManager] = None
"""`WorkerManager` managing the lifecycle of `worker`"""
get_point_cloud_was_invoked: bool = False
"""Flag to indicate if get_point_cloud was invoked"""
camera_properties: Camera.Properties
"""Camera properties as per Viam SDK"""
is_closed: bool = False
"""Flag to indicate if the camera has been closed by Close()"""

@classmethod
def new(
cls, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]
) -> Self:
"""
SDK calls this method to create a new camera instance.
"""
self = cls(config.name)
cls.validate(config)
self.ydn_configs = dict() # YDN configs have to survive reconfigures
self.ydn_configs = dict() # YDN configs have to persist across reconfigures
self.ydn_configs_lock = Lock()
cls.logger = getLogger("viam-oak")
self.logger = getLogger(config.name)
self.reconfigure(config, dependencies)

with cls.init_reconfig_manager_lock:
if cls.reconfig_manager is None or cls.reconfig_manager.stop_event.is_set():
cls.reconfig_manager = OakInstanceManager(
[(self, config, dependencies)]
)
cls.reconfig_manager.start()
else:
cls.reconfig_manager.add_reconfigure_request(self, config, dependencies)
return self

@classmethod
Expand Down Expand Up @@ -163,13 +190,22 @@ def reconfigure(
self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]
) -> None:
"""
A procedure both the RDK and module invokes to (re)configure and (re)boot the module—
serving as an initializer and restart method.
SDK calls this method to reconfigure the camera without losing connection to the driver.
This method is used to add the request to the OakInstanceManager's queue.
Args:
config (ComponentConfig)
dependencies (Mapping[ResourceName, ResourceBase])
"""
cls = type(self)
cls.reconfig_manager.add_reconfigure_request(self, config, dependencies)

def do_reconfigure(
self, config: ComponentConfig, dependencies: Mapping[ResourceName, ResourceBase]
) -> None:
"""
This method is called by the OakInstanceManager to actually reconfigure the camera.
"""
self._close()

if self.model == self._oak_d_model:
Expand All @@ -189,25 +225,27 @@ def reconfigure(
intrinsic_parameters=None,
)

self.worker = Worker(
oak_config=self.oak_cfg,
ydn_configs=self.ydn_configs,
user_wants_pc=self.get_point_cloud_was_invoked,
)
with self.ydn_configs_lock:
self.worker = Worker(
oak_config=self.oak_cfg,
ydn_configs=self.ydn_configs,
user_wants_pc=self.get_point_cloud_was_invoked,
)

self.worker_manager = WorkerManager(self.worker)
self.worker_manager.start()

async def close(self) -> None:
"""
Implements `close` to free resources on shutdown.
Implements `close` camera method to free resources on shutdown.
"""
self.logger.info("Closing OAK component.")
self._close()
self.logger.debug("Closed OAK component.")
self.is_closed = True
self.logger.info("Closed OAK component.")

def _close(self) -> None:
if self.worker_manager:
if self.worker_manager and not self.worker_manager.stop_event.is_set():
self.worker_manager.stop()
self.worker_manager.join()
if self.worker:
Expand Down Expand Up @@ -413,7 +451,8 @@ async def do_command(
self.logger.debug(f"Received YDN_CONFIGURE with mapping: {command}")
await self._wait_for_worker()
ydn_config = decode_ydn_configure_command(command)
self.ydn_configs[ydn_config.service_id] = ydn_config
with self.ydn_configs_lock:
self.ydn_configs[ydn_config.service_id] = ydn_config
self.logger.info(
"Closing camera to reconfigure pipeline with yolo detection network."
)
Expand All @@ -422,21 +461,23 @@ async def do_command(
elif cmd == YDN_DECONFIGURE:
self.logger.debug(f"Received YDN_DECONFIGURE with mapping: {command}")
id = command["sender_id"]
if id in self.ydn_configs:
del self.ydn_configs[id]
with self.ydn_configs_lock:
if id in self.ydn_configs:
del self.ydn_configs[id]
self.worker.reset()
return {}
elif cmd == YDN_CAPTURE_ALL:
await self._wait_for_worker()
resp = dict()
service_id = command["sender_id"]
service_name = command["sender_name"]
try:
ydn_config = self.ydn_configs[service_id]
except KeyError:
raise ViamError(
f'Could not find matching YDN config for YDN service id: "{service_id}" and name: "{service_name}"'
)
with self.ydn_configs_lock:
try:
ydn_config = self.ydn_configs[service_id]
except KeyError:
raise ViamError(
f'Could not find matching YDN config for YDN service id: "{service_id}" and name: "{service_name}"'
)

# Find respective Sensor to the YDN config
sensor = None
Expand Down Expand Up @@ -501,10 +542,11 @@ async def _wait_for_worker(

attempts = 0
while attempts < max_attempts:
if self.worker.running and desired_status == "running":
return
if self.worker.configured and desired_status == "configured":
return
if self.worker:
if self.worker.running and desired_status == "running":
return
if self.worker.configured and desired_status == "configured":
return
attempts += 1
await asyncio.sleep(timeout_seconds)
raise ViamError(
Expand Down Expand Up @@ -562,3 +604,103 @@ def __init__(self, method_name: str, details: str) -> None:
f'Cannot invoke method "{method_name}" with current config. {details}'
)
super().__init__(self.message)


class OakInstanceManager(Thread):
"""
OakInstanceManager is a thread that manages every Oak instance.
It is responsible for orchestrating each Oak instance when one or more reconfigure requests are made.
"""

def __init__(
self,
reqs: List[Tuple[Oak, ComponentConfig, Mapping[ResourceName, ResourceBase]]],
) -> None:
super().__init__()
self.reconfig_reqs: List[
Tuple[Oak, ComponentConfig, Mapping[ResourceName, ResourceBase]]
] = reqs
self.lock: Lock = Lock()
self.requests_waited: bool = False
self.oak_instances: List[Oak] = []
self.stop_event: asyncio.Event = asyncio.Event()
self.logger: Logger = getLogger("oak_manager")

self.logger.info("OakInstanceManager initialized with requests: %s", reqs)

self.loop = asyncio.new_event_loop()
self.loop.create_task(self.handle_reconfigures())

async def handle_reconfigures(self) -> None:
self.logger.info("Starting reconfiguration handler loop.")
while not self.stop_event.is_set():
await asyncio.sleep(RECONFIGURE_TIMEOUT_SECONDS)

# Check for reconfiguration requests
reqs_to_process = None
with self.lock:
if not self.reconfig_reqs:
continue

if not self.requests_waited:
self.requests_waited = True
self.logger.debug(
"Waiting one time for more reconfiguration requests."
)
continue

# Process requests after waiting one iteration
reqs_to_process = self.reconfig_reqs
self.reconfig_reqs = []
self.requests_waited = False
self.logger.debug(
"Processing reconfiguration requests: %s", reqs_to_process
)

# Process reconfiguration requests
if reqs_to_process:
has_device_info = False
# Check if device_info is specified in any of the reconfiguration requests
for _, config, _ in reqs_to_process:
if config.attributes.fields["device_info"].string_value is not None:
has_device_info = True
break
if has_device_info:
# If device_info is specified, close all existing Oak instances before reconfiguring
# This is to ensure that when swapping device_infos, the old device is closed before
# the new device is opened. If we don't do this, we may encounter a race condition where
# the new device is opened before the old device is closed, causing the new device to fail to open.
[oak._close() for oak, _, _ in reqs_to_process]

for oak, config, dependencies in reqs_to_process:
self.logger.debug(
"Reconfiguring Oak instance: %s with config: %s", oak, config
)
oak.do_reconfigure(config, dependencies)
if oak not in self.oak_instances:
self.oak_instances.append(oak)
self.logger.debug("Added Oak instance to active list: %s", oak)

self.oak_instances = [
oak for oak in self.oak_instances if not oak.is_closed
]
if len(self.oak_instances) == 0:
self.logger.info(
"No active Oak instances. Stopping reconfiguration loop."
)
self.stop_event.set()

def add_reconfigure_request(
self,
oak: Oak,
config: ComponentConfig,
dependencies: Mapping[ResourceName, ResourceBase],
) -> None:
with self.lock:
self.reconfig_reqs.append((oak, config, dependencies))
self.logger.debug("Added reconfigure request for Oak instance: %s", oak)
self.logger.debug("New reconfigure requests list: %s", self.reconfig_reqs)

def run(self):
self.logger.debug("Starting OakInstanceManager event loop.")
self.loop.run_forever()
23 changes: 18 additions & 5 deletions src/components/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,24 @@ async def start(self):
self.device = None
while not self.device and self.should_exec:
try:
self.device = dai.Device(
pipeline=self.pipeline,
devInfo=dai.DeviceInfo(mxidOrName=self.cfg.device_info),
)
device_info_str = self.cfg.device_info
if device_info_str is None:
self.device = dai.Device(pipeline=self.pipeline)
else:
self.device = dai.Device(
pipeline=self.pipeline,
devInfo=dai.DeviceInfo(mxidOrName=device_info_str),
)
self.device.startPipeline()
self.logger.debug("Successfully initialized device.")
self.logger.info("Successfully initialized device.")
try:
device_info = self.device.getDeviceInfo()
self.logger.info(
f"Connected device info - Name: {device_info.name}, MxId: {device_info.getMxId()}"
)
except Exception as e:
self.logger.error(f"Error getting device info: {e}")

except Exception as e:
self.logger.error(f"Error initializing device: {e}")
await asyncio.sleep(1)
Expand Down Expand Up @@ -540,6 +552,7 @@ def stop(self) -> None:
self.running = False
if self.device:
self.device.close()
self.device = None
if self.pipeline:
self.pipeline = None

Expand Down
6 changes: 3 additions & 3 deletions src/components/worker/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, worker: Worker) -> None:
self.worker = worker
self.loop = None
self.restart_atomic_bool = AtomicBoolean()
self._stop_event = asyncio.Event()
self.stop_event = asyncio.Event()

def run(self):
"""
Expand All @@ -65,7 +65,7 @@ async def check_health(self) -> None:
self.worker.configure()
await self.worker.start()

while not self._stop_event.is_set():
while not self.stop_event.is_set():
self.logger.debug("Checking if worker must be restarted.")
if self.worker.device and self.worker.device.isClosed():
with self.restart_atomic_bool.lock:
Expand All @@ -87,7 +87,7 @@ def stop(self):
"""
Thread-safe stop method to set the event to stop the singular health checking task.
"""
self.loop.call_soon_threadsafe(self._stop_event.set)
self.loop.call_soon_threadsafe(self.stop_event.set)
self.loop.call_soon_threadsafe(self.loop.stop)

async def shutdown(self):
Expand Down
Loading

0 comments on commit d451086

Please sign in to comment.