diff --git a/src/mavlink/mavlink_camera.rs b/src/mavlink/mavlink_camera.rs index a56893bb..91cae8fb 100644 --- a/src/mavlink/mavlink_camera.rs +++ b/src/mavlink/mavlink_camera.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use crate::{ cli, mavlink::mavlink_camera_component::MavlinkCameraComponent, @@ -19,6 +19,7 @@ use super::utils::*; pub struct MavlinkCameraHandle { inner: Arc, _runtime: tokio::runtime::Runtime, + shutdown: Arc>, heartbeat_handle: tokio::task::JoinHandle<()>, messages_handle: tokio::task::JoinHandle<()>, } @@ -53,14 +54,22 @@ impl MavlinkCameraHandle { .build() .expect("Failed building a new tokio runtime"); - let heartbeat_handle = - runtime.spawn(MavlinkCamera::heartbeat_loop(inner.clone(), sender.clone())); - let messages_handle = - runtime.spawn(MavlinkCamera::messages_loop(inner.clone(), sender.clone())); + let shutdown = Arc::new(Mutex::new(false)); + let heartbeat_handle = runtime.spawn(MavlinkCamera::heartbeat_loop( + inner.clone(), + sender.clone(), + shutdown.clone(), + )); + let messages_handle = runtime.spawn(MavlinkCamera::messages_loop( + inner.clone(), + sender.clone(), + shutdown.clone(), + )); Ok(Self { inner, _runtime: runtime, + shutdown, heartbeat_handle, messages_handle, }) @@ -129,11 +138,17 @@ impl MavlinkCamera { #[instrument(level = "trace", skip(sender))] #[instrument(level = "debug", skip_all, fields(component_id = camera.component.component_id))] - pub async fn heartbeat_loop(camera: Arc, sender: broadcast::Sender) { + pub async fn heartbeat_loop( + camera: Arc, + sender: broadcast::Sender, + shutdown: Arc>, + ) { let component_id = camera.component.component_id; let system_id = camera.component.system_id; - loop { + if *shutdown.lock().unwrap() { + break; + } tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; let header = mavlink::MavHeader { @@ -158,22 +173,32 @@ impl MavlinkCamera { debug!("Heartbeat sent"); } + debug!("Heartbeat loop done"); } #[instrument(level = "trace", skip(sender))] #[instrument(level = "debug", skip_all, fields(component_id = camera.component.component_id))] - pub async fn messages_loop(camera: Arc, sender: broadcast::Sender) { + pub async fn messages_loop( + camera: Arc, + sender: broadcast::Sender, + shutdown: Arc>, + ) { let mut receiver = sender.subscribe(); loop { - let (header, message) = match receiver.recv().await { + if *shutdown.lock().unwrap() { + break; + } + + std::thread::sleep(std::time::Duration::from_millis(10)); + let (header, message) = match receiver.try_recv() { Ok(Message::Received(message)) => message, - Err(broadcast::error::RecvError::Closed) => { + Err(broadcast::error::TryRecvError::Closed) => { unreachable!( "Closed channel: This should never happen, this channel is static!" ); } - Ok(Message::ToBeSent(_)) | Err(broadcast::error::RecvError::Lagged(_)) => continue, + Ok(Message::ToBeSent(_)) | Err(_) => continue, }; trace!("Message received: {header:?}, {message:?}"); @@ -185,6 +210,7 @@ impl MavlinkCamera { message, )); } + debug!("Messages loop done"); } #[instrument(level = "trace", skip(sender))] @@ -614,6 +640,12 @@ impl MavlinkCamera { impl Drop for MavlinkCameraHandle { fn drop(&mut self) { + *self.shutdown.lock().unwrap() = true; + while !self.heartbeat_handle.is_finished() || !self.messages_handle.is_finished() { + std::thread::sleep(std::time::Duration::from_millis(10)); + info!("Waiting for handles to finish"); + } + self.heartbeat_handle.abort(); self.messages_handle.abort(); super::manager::Manager::drop_id(self.inner.component.component_id)