From e3ee38090cffdd0049ccf6088b8ae6d274488dc1 Mon Sep 17 00:00:00 2001 From: EdoZua95LT Date: Wed, 18 Sep 2024 17:41:20 +0200 Subject: [PATCH] WIP: Remove cross-locking on mutex --- Cargo.toml | 1 + src/bin/sailtrack-kalman.rs | 70 ++++++++++++++++++++++--------------- 2 files changed, 43 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c5ff8cb..0e3cacd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,4 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" rand = "0.8.5" kalmanfilt = "0.2.4" +tokio = "1.40.0" diff --git a/src/bin/sailtrack-kalman.rs b/src/bin/sailtrack-kalman.rs index 66df1d6..e7b1376 100644 --- a/src/bin/sailtrack-kalman.rs +++ b/src/bin/sailtrack-kalman.rs @@ -3,6 +3,7 @@ use rand::Rng; use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; +use std::sync::mpsc::{Receiver, Sender}; use nalgebra::{OMatrix, OVector, U3, U6}; use rumqttc::{Client, Event, Incoming, MqttOptions, QoS}; @@ -17,6 +18,12 @@ const EARTH_CIRCUMFERENCE_METERS: f32 = 40075.0 * 1000.0; const KALMAN_SAMPLE_TIME_MS: u64 = 200; const LAT_FACTOR: f32 = 1.0; +#[derive(Serialize, Deserialize, Clone, Copy, Debug)] +enum SyncEvent { + GPS_Received, + IMU_Received, + GPS_Fix_Received, +} #[derive(Serialize, Deserialize, Clone, Copy, Debug)] #[serde(rename_all = "camelCase")] @@ -85,15 +92,12 @@ struct MeasureCollection { struct Measure { meas: OVector, meas_variance: OMatrix, - past_measures: MeasureCollection>, - new_measure: bool, } #[derive(Debug, Clone, Copy)] struct Input { acceleration: OVector, orientation: OVector, - new_input: bool, } impl MeasureCollection> { @@ -181,25 +185,6 @@ fn get_measure_forom_gps(gps_data: &Gps, reference: &Gps, measure_struct: &mut M measure_struct.new_measure = true; } -// Function that keeps on controll if the GPS fix is obtained -fn wait_for_fix_tipe(gps_ref_arc: &Arc>) -> bool { - let gps_ref_lock = acquire_lock(gps_ref_arc, line!()); - if gps_ref_lock.fix_type == 3 { - drop(gps_ref_lock); - return true; - } - drop(gps_ref_lock); - loop { - let gps_ref_lock = acquire_lock(gps_ref_arc, line!()); - if gps_ref_lock.fix_type == 3 { - drop(gps_ref_lock); - return true; - } - drop(gps_ref_lock); - thread::sleep(Duration::from_millis(1000)); - } -} - fn on_message_imu(message: Imu, input: &Arc>) { let accel_vec = vec![ message.linear_accel.x, @@ -270,7 +255,6 @@ fn main() { let input = Input { acceleration: OVector::::zeros(), orientation: OVector::::zeros(), - new_input: false, }; let gps_ref = Gps { @@ -293,8 +277,6 @@ fn main() { let measure = Measure { meas: OVector::::zeros(), meas_variance: OMatrix::::identity(), - past_measures: MeasureCollection::new(), - new_measure: false, }; // Creating ESKF object @@ -372,6 +354,8 @@ fn main() { R: noise_meas_cov, ..Default::default() }; + // Defining Event Channels + let (tx, rx) = mpsc::channel(); // Defining Mutex for thread share let gps_ref_mutex = Arc::new(Mutex::new(gps_ref)); @@ -402,6 +386,10 @@ fn main() { serde_json::from_slice(payload.as_ref()).unwrap(), &input_clone, ); + match tx.try_send(SyncEvent::IMU_Received){ + Ok(_) => (), + Err(_) => continue + } } "sensor/gps0" => { let payload = packet.payload.clone(); // Clone the payload for later use @@ -410,6 +398,10 @@ fn main() { &gps_ref_clone, &measure_clone, ); + match tx.try_send(SyncEvent::IMU_Received){ + Ok(_) => (), + Err(_) => continue + } } _ => (), } @@ -417,6 +409,26 @@ fn main() { } }); + // GPS fix thread + let gps_ref_clone = Arc::clone(&gps_ref_mutex); + thread::spawn(move || { + let gps_ref_lock = acquire_lock(gps_ref_clone, line!()); + if gps_ref_lock.fix_type == 3 { + drop(gps_ref_lock); + return true; + } + drop(gps_ref_lock); + loop { + let gps_ref_lock = acquire_lock(gps_ref_clone, line!()); + if gps_ref_lock.fix_type == 3 { + drop(gps_ref_lock); + return true; + } + drop(gps_ref_lock); + thread::sleep(Duration::from_millis(1000)); + } + }); + // Kalman filter thread let gps_ref_clone = Arc::clone(&gps_ref_mutex); let measure_clone = Arc::clone(&measure_mutex); @@ -424,7 +436,9 @@ fn main() { let filter_clone = Arc::clone(&filter_mutex); thread::spawn(move || loop { // Check if the GPS fix has been obtained - wait_for_fix_tipe(&gps_ref_clone); + loop { + + } let thread_start = Instant::now(); let mut measure_lock = acquire_lock(&measure_clone, line!()); let measure = &mut *measure_lock; @@ -532,7 +546,7 @@ fn main() { serde_json::to_vec(&message).unwrap(), ) .unwrap(); - thread::sleep(Duration::from_millis(1000 / MQTT_PUBLISH_FREQ_HZ)); - } + +} }