diff --git a/src/liftoff_quad.rs b/src/liftoff_quad.rs index e4994b26..39447c17 100644 --- a/src/liftoff_quad.rs +++ b/src/liftoff_quad.rs @@ -4,14 +4,9 @@ use nalgebra::{Matrix4, Quaternion, UnitQuaternion, Vector3}; use peng_quad::config::LiftoffQuadrotorConfig; use peng_quad::quadrotor::{QuadrotorInterface, QuadrotorState}; use peng_quad::SimulationError; -use prost::Message; -use rand; -use serialport::{available_ports, SerialPort, SerialPortBuilder, SerialPortType}; -use std::io::{Cursor, Write}; use std::net::UdpSocket; -use std::sync::Arc; -use std::sync::Mutex; -use tempfile::NamedTempFile; +use tokio::sync::mpsc; +use tokio::sync::Mutex; use tokio::time::{sleep, Duration}; /// Represents a quadrotor in the game Liftoff @@ -37,23 +32,20 @@ pub struct LiftoffQuad { /// Previous Torque pub previous_torque: Vector3, /// Quadrotor sample mutex - pub shared_data: Arc>>, + pub consumer: mpsc::Receiver, } impl LiftoffQuad { pub fn new(time_step: f32, config: LiftoffQuadrotorConfig) -> Result { - // let shared_data: Arc>> = - // Arc::new(Mutex::new(Some(LiftoffPacket::default()))); - // let shared_data_clone = Arc::clone(&shared_data); - // let config_clone = config.clone(); - // tokio::spawn(async move { - // let _ = feedback_loop(config_clone, shared_data_clone).await; - // }); + let (producer, consumer) = tokio::sync::mpsc::channel(100); + let config_clone = config.clone(); + tokio::spawn(async move { + let _ = feedback_loop(config_clone, producer).await; + }); // Open a serial port to communicate with the quadrotor if one is specified // If not, open a writer to a temp file let writer: Option = match config.clone().serial_port { Some(port) => { - println!("Port: {:?}", port); let mut writer = Writer::new(port.to_string(), config.baud_rate).map_err(|e| { SimulationError::OtherError(format!( "Failed to open SerialPort {:?}", @@ -89,7 +81,7 @@ impl LiftoffQuad { time_step, previous_thrust: 0.0, previous_torque: Vector3::zeros(), - shared_data: Arc::new(Mutex::new(None)), + consumer, }) } @@ -165,34 +157,11 @@ impl QuadrotorInterface for LiftoffQuad { /// Observe the current state of the quadrotor /// Returns a tuple containing the position, velocity, orientation, and angular velocity of the quadrotor. fn observe(&mut self) -> Result { - let mut buf = [0; 128]; - // let mut data_lock = self.shared_data.lock().unwrap(); - let sample = match UdpSocket::bind(self.config.ip_address.to_string()) { - Ok(socket) => { - socket - .set_read_timeout(Some(Duration::from_millis(1))) - .map_err(|e| SimulationError::OtherError(e.to_string()))?; - match socket.recv_from(&mut buf) { - Ok((len, _)) => { - let mut cursor = std::io::Cursor::new(&buf[..len]); - // TODO: more robust handling of packet parsing errors during resets - let sample = if let Ok(sample) = LiftoffPacket::read(&mut cursor) { - sample - } else { - LiftoffPacket::default() - }; - Some(sample) - } - Err(_) => None, - } - } - Err(e) => { - return Err(SimulationError::OtherError(format!( - "Bind loop exceeded max wait time {}", - e.to_string() - ))); - } - }; + // let sample = try_take_with_timeout(&self.shared_data, Duration::from_millis(25)); + let mut sample = None; + while let Ok(value) = self.consumer.try_recv() { + sample = Some(value); + } let state = match sample { Some(sample) => { // update the last state value @@ -218,8 +187,9 @@ impl QuadrotorInterface for LiftoffQuad { angular_velocity: omega_body, } } - None => return Ok(self.state.clone()), + None => self.state.clone(), }; + self.state = state.clone(); Ok(state) } @@ -245,10 +215,11 @@ fn try_take_with_timeout(mutex: &Mutex>, timeout: Duration) -> Opti while start.elapsed() < timeout { if let Ok(mut guard) = mutex.try_lock() { if let Some(value) = guard.take() { + println!("Getting Value"); return Some(value); // Successfully took the value } } - std::thread::sleep(Duration::from_millis(1)); // Small sleep to avoid busy waiting + // std::thread::sleep(Duration::from_millis(1)); // Small sleep to avoid busy waiting } None // Timeout occurred } @@ -310,45 +281,28 @@ impl LiftoffPacket { async fn feedback_loop( liftoff_config: LiftoffQuadrotorConfig, - data_lock: Arc>>, + tx: mpsc::Sender, ) -> Result<(), SimulationError> { - let mut current_wait = Duration::from_secs(0); - let mut delay = Duration::from_secs(2); - let max_wait = liftoff_config.connection_timeout; - let max_delay = liftoff_config.max_retry_delay; - loop { let mut buf = [0; 128]; - match UdpSocket::bind(liftoff_config.ip_address.to_string()) { + // Possible bug: rebinding is necesssary to get new data from Liftoff. + let sample = match UdpSocket::bind(liftoff_config.ip_address.to_string()) { Ok(socket) => { socket - .set_read_timeout(Some(Duration::from_secs(60))) + .set_read_timeout(Some(Duration::from_millis(50))) .map_err(|e| SimulationError::OtherError(e.to_string()))?; match socket.recv_from(&mut buf) { Ok((len, _)) => { let mut cursor = std::io::Cursor::new(&buf[..len]); // TODO: more robust handling of packet parsing errors during resets - if let Ok(sample) = LiftoffPacket::read(&mut cursor) { - let mut data_lock = data_lock.lock().unwrap(); - *data_lock = Some(sample); - } - current_wait = Duration::from_secs(0); - delay = Duration::from_secs(2); - } - Err(e) => { - if current_wait >= max_wait { - return Err(SimulationError::OtherError(format!( - "Bind loop exceeded max wait time {}", - e.to_string(), - ))); - } - current_wait += delay; - sleep( - delay + Duration::from_millis((rand::random::() * 1000.0) as u64), - ) - .await; - delay = (delay * 2).min(max_delay); + let sample = if let Ok(sample) = LiftoffPacket::read(&mut cursor) { + sample + } else { + LiftoffPacket::default() + }; + Some(sample) } + Err(_) => None, } } Err(e) => { @@ -357,7 +311,11 @@ async fn feedback_loop( e.to_string() ))); } + }; + if let Some(sample) = sample { + let _ = tx.send(sample).await.is_ok(); } + // sleep(Duration::from_millis(10)).await; } }