From c68c46e8b0a7845fb147f5d381a17d44844751d4 Mon Sep 17 00:00:00 2001 From: Bastian Schmidt Date: Mon, 24 Jun 2024 10:55:28 +0200 Subject: [PATCH] Send metric to matching address Use [rntimatcher.gen] thread to send [model]-computed metrics to the same address as the traffic pattern is sent to. * Add Matching parameter: matching_log_traffic * Add MetricTypes::A and MetricA struct * Add metric header definition * Pass reader for metric updates to [rntimatcher.gen] --- src/logic/mod.rs | 14 +++- src/logic/model_handler.rs | 18 +++- src/logic/rnti_matcher.rs | 163 +++++++++++++++++++++++++++++++------ src/main.rs | 6 +- src/parse.rs | 9 +- 5 files changed, 179 insertions(+), 31 deletions(-) diff --git a/src/logic/mod.rs b/src/logic/mod.rs index 9ce9319..e36b253 100644 --- a/src/logic/mod.rs +++ b/src/logic/mod.rs @@ -256,9 +256,21 @@ pub struct MessageRnti { cell_rnti: HashMap, } +/* Wrapping messages */ #[allow(dead_code)] -#[derive(Clone, Debug, PartialEq, Default)] +#[derive(Clone, Debug, PartialEq)] pub struct MessageMetric { + metric: MetricTypes, +} + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum MetricTypes { + A(MetricA), +} + +#[repr(C)] +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct MetricA { /// Timestamp when the metric was calculated (not sent!) timestamp_us: u64, /// Fair share send rate [bits/subframe] = [bits/ms] diff --git a/src/logic/model_handler.rs b/src/logic/model_handler.rs index 7c8a717..4d8681a 100644 --- a/src/logic/model_handler.rs +++ b/src/logic/model_handler.rs @@ -12,6 +12,8 @@ use crate::logic::{ }; use crate::util::determine_process_id; +use super::{MetricA, MetricTypes}; + pub struct ModelHandlerArgs { pub rx_app_state: BusReader, pub tx_model_state: SyncSender, @@ -58,7 +60,7 @@ fn run( rx_cell_info: &mut BusReader, rx_dci: &mut BusReader, rx_rnti: &mut BusReader, - _tx_metric: &mut Bus, + tx_metric: &mut Bus, ) -> Result<()> { tx_model_state.send(ModelState::Running)?; wait_for_running(&mut rx_app_state, &tx_model_state)?; @@ -99,8 +101,18 @@ fn run( } } - // TODO: -> Send combined message to some remote - // tx_metrc.send(metrics) + /* Test sending data */ + tx_metric.broadcast(MessageMetric { + metric: MetricTypes::A(MetricA { + timestamp_us: 121, + fair_share_send_rate: 121231, + }) + }); + // TODO: Implement sending metric + // * Send combined message to remote properly + // * Add input parameter to choose metric type and sending interval (or more like a "how + // many subframes shall I use) + } send_final_state(&tx_model_state)?; diff --git a/src/logic/rnti_matcher.rs b/src/logic/rnti_matcher.rs index c3a665a..df5cccf 100644 --- a/src/logic/rnti_matcher.rs +++ b/src/logic/rnti_matcher.rs @@ -1,6 +1,7 @@ #![allow(dead_code)] use std::collections::{HashMap, HashSet}; +use std::mem; use std::net::UdpSocket; use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError}; use std::thread::{self, JoinHandle}; @@ -29,6 +30,8 @@ use crate::math_util::{ calculate_weighted_euclidean_distance_matrix, standardize_feature_vec, }; +use super::{MessageMetric, MetricTypes}; + pub const MATCHING_INTERVAL_MS: u64 = 1000; pub const MATCHING_TRAFFIC_PATTERN_TIME_OVERLAP_FACTOR: f64 = 1.1; @@ -45,6 +48,19 @@ pub const BASIC_FILTER_MIN_OCCURENCES_FACTOR: f64 = 0.05; pub const RNTI_RING_BUFFER_SIZE: usize = 5; +pub const METRIC_HEADER_LENGTH: usize = 5; +pub const METRIC_INITIAL_INDEX_START: usize = 0; +pub const METRIC_INITIAL_INDEX_END: usize = 4; +pub const METRIC_INITIAL: [u8; 4] = [ + 0x11, + 0x21, + 0x12, + 0x22, +]; +pub const METRIC_VERSION_INDEX: usize = 4; +pub const METRIC_VERSION: u8 = 1; +pub const METRIC_PAYLOAD_INDEX: usize = 5; + /* * Feature vector, order matters: @@ -95,7 +111,7 @@ pub const MATCHING_WEIGHTINGS: [f64; 8] = [ #[derive(Clone, Debug, PartialEq)] enum LocalGeneratorState { Stop, - SendPattern(String, Box), + SendPattern(Box), PatternSent, Idle, } @@ -106,6 +122,7 @@ pub struct RntiMatcherArgs { pub app_args: Arguments, pub rx_dci: BusReader, pub tx_rnti: Bus, + pub rx_metric: BusReader, } struct RunArgs { @@ -118,6 +135,10 @@ struct RunArgs { gen_thread_handle: Option>, } +struct RunArgsMovables { + rx_metric: BusReader, +} + #[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] pub struct TrafficCollection { /* cell_id -> { traffic } */ @@ -161,20 +182,24 @@ pub fn deploy_rnti_matcher(args: RntiMatcherArgs) -> Result> { tx_gen_thread_handle: None, gen_thread_handle: None, }; + let run_args_mov: RunArgsMovables = RunArgsMovables { + rx_metric: args.rx_metric, + }; let thread = thread::spawn(move || { - let _ = run(&mut run_args); + let _ = run(&mut run_args, run_args_mov); finish(run_args); }); Ok(thread) } -fn run(run_args: &mut RunArgs) -> Result<()> { +fn run(run_args: &mut RunArgs, run_args_mov: RunArgsMovables) -> Result<()> { let rx_app_state = &mut run_args.rx_app_state; let tx_rntimatcher_state = &mut run_args.tx_rntimatcher_state; let app_args = &run_args.app_args; let rx_dci = &mut run_args.rx_dci; let tx_rnti = &mut run_args.tx_rnti; + let rx_metric = run_args_mov.rx_metric; tx_rntimatcher_state.send(RntiMatcherState::Running)?; wait_for_running(rx_app_state, tx_rntimatcher_state)?; @@ -185,23 +210,27 @@ fn run(run_args: &mut RunArgs) -> Result<()> { let matching_args = FlattenedRntiMatchingArgs::from_unflattened(app_args.clone().rntimatching.unwrap())?; - - let (tx_gen_thread, rx_gen_thread) = sync_channel::(CHANNEL_SYNC_SIZE); - run_args.gen_thread_handle = Some(deploy_traffic_generator_thread( - rx_gen_thread, - matching_args.matching_local_addr, - )?); - run_args.tx_gen_thread_handle = Some(tx_gen_thread.clone()); - let mut cell_rnti_ring_buffer: CellRntiRingBuffer = CellRntiRingBuffer::new(RNTI_RING_BUFFER_SIZE); let traffic_destination = matching_args.matching_traffic_destination; let traffic_pattern_list: Vec = matching_args.matching_traffic_pattern .iter() .map(|pattern_type| pattern_type.generate_pattern()) .collect(); + let log_matching: bool = matching_args.matching_log_traffic; let mut traffic_pattern_index = 0; let mut matcher_state: RntiMatcherState = RntiMatcherState::Idle; + + let (tx_gen_thread, rx_gen_thread) = sync_channel::(CHANNEL_SYNC_SIZE); + run_args.gen_thread_handle = Some(deploy_traffic_generator_thread( + rx_gen_thread, + matching_args.matching_local_addr, + traffic_destination.clone(), + rx_metric, + )?); + run_args.tx_gen_thread_handle = Some(tx_gen_thread.clone()); + + loop { /* */ thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS)); @@ -223,7 +252,6 @@ fn run(run_args: &mut RunArgs) -> Result<()> { } RntiMatcherState::StartMatching => handle_start_matching( &tx_gen_thread, - &traffic_destination, &traffic_pattern_list, &mut traffic_pattern_index, ), @@ -232,8 +260,11 @@ fn run(run_args: &mut RunArgs) -> Result<()> { handle_collect_dci(latest_dcis, *traffic_collection) } RntiMatcherState::MatchingProcessDci(traffic_collection) => { - handle_process_dci(*traffic_collection, - &mut cell_rnti_ring_buffer) + handle_process_dci( + *traffic_collection, + &mut cell_rnti_ring_buffer, + log_matching, + ) } RntiMatcherState::MatchingPublishRnti(rnti) => { tx_rnti.broadcast(rnti); @@ -271,7 +302,6 @@ fn collect_dcis(rx_dci: &mut BusReader) -> Vec { fn handle_start_matching( tx_gen_thread: &SyncSender, - traffic_destination: &str, traffic_pattern_list: &[TrafficPattern], traffic_pattern_index: &mut usize, ) -> RntiMatcherState { @@ -297,7 +327,6 @@ fn handle_start_matching( }; let _ = tx_gen_thread.send(LocalGeneratorState::SendPattern( - traffic_destination.to_string(), Box::new(traffic_pattern), )); RntiMatcherState::MatchingCollectDci(Box::new(traffic_collection)) @@ -326,12 +355,15 @@ fn handle_collect_dci( fn handle_process_dci( mut traffic_collection: TrafficCollection, cell_rnti_ring_buffer: &mut CellRntiRingBuffer, + log_traffic: bool, ) -> RntiMatcherState { // Check number of packets plausability: expected ms -> expected dcis let mut message_rnti: MessageRnti = MessageRnti::default(); /* log files */ - let _ = log_rnti_matching_traffic(&traffic_collection); + if log_traffic { + let _ = log_rnti_matching_traffic(&traffic_collection); + } traffic_collection.apply_basic_filter(); @@ -387,9 +419,17 @@ fn finish(run_args: RunArgs) { fn deploy_traffic_generator_thread( rx_local_gen_state: Receiver, local_socket_addr: String, + destination_addr: String, + rx_metric: BusReader, ) -> Result> { let thread = thread::spawn(move || { - let _ = run_traffic_generator(rx_local_gen_state, local_socket_addr); + if let Err(err) = run_traffic_generator( + rx_local_gen_state, + local_socket_addr, + destination_addr, + rx_metric) { + print_info(&format!("[rntimatcher.gen] stopped with error: {:?}", err)) + } }); Ok(thread) } @@ -397,6 +437,8 @@ fn deploy_traffic_generator_thread( fn run_traffic_generator( rx_local_gen_state: Receiver, local_socket_addr: String, + destination_addr: String, + mut rx_metric: BusReader, ) -> Result<()> { let socket = init_udp_socket(&local_socket_addr)?; let mut gen_state: LocalGeneratorState = LocalGeneratorState::Idle; @@ -406,6 +448,7 @@ fn run_traffic_generator( )); let mut last_timemstamp_us: Option = None; + let mut metric_option: Option; loop { match check_rx_state(&rx_local_gen_state) { @@ -416,17 +459,28 @@ fn run_traffic_generator( break; } } + /* If present, metric is sent in both states: Idle and SendPattern */ + metric_option = check_rx_metric(&mut rx_metric)?; match gen_state { LocalGeneratorState::Idle => { - /* Sleep here, because it shall not interfere the sendpattern */ - thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS)); + gen_handle_idle( + &socket, + &destination_addr, + metric_option, + )?; } LocalGeneratorState::Stop => { break; } - LocalGeneratorState::SendPattern(ref destination, ref mut pattern) => { - match gen_handle_send_pattern(&socket, destination, pattern, &mut last_timemstamp_us) { + LocalGeneratorState::SendPattern(ref mut pattern) => { + match gen_handle_send_pattern( + &socket, + &destination_addr, + pattern, + &mut last_timemstamp_us, + metric_option, + ) { Ok(Some(_)) => { /* stay in the state and keep sending */ }, Ok(None) => gen_state = LocalGeneratorState::PatternSent, Err(e) => { @@ -467,11 +521,40 @@ fn check_rx_state( } } +fn check_rx_metric( + rx_metric: &mut BusReader, +) -> Result> { + match rx_metric.try_recv() { + Ok(msg) => Ok(Some(msg.metric)), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Disconnected) => Err(anyhow!("[rntimatcher.gen] rx_metric disconnected")), + } +} + +fn gen_handle_idle( + socket: &UdpSocket, + destination: &str, + metric_option: Option +) -> Result<()> { + if let Some(metric) = metric_option { + // add some padding to the total payload + let payload_size = mem::size_of_val(&metric) + METRIC_HEADER_LENGTH * 2; + let mut payload = vec![0xAA; payload_size]; + let _ = prepend_metric_to_payload(&mut payload, metric); + socket.send_to(&payload, destination)?; + } else { + /* nothing to do, sleep */ + thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS)); + } + Ok(()) +} + fn gen_handle_send_pattern( socket: &UdpSocket, destination: &str, pattern: &mut TrafficPattern, last_sent_timemstamp_us: &mut Option, + metric_option: Option ) -> Result> { match pattern.messages.pop_front() { Some(msg) => { @@ -493,10 +576,13 @@ fn gen_handle_send_pattern( } thread::sleep(Duration::from_micros(sleep_us)); - *last_sent_timemstamp_us = Some(chrono::Utc::now().timestamp_micros() as u64); - socket.send_to(&msg.payload, destination)?; + let mut payload = msg.payload.clone(); + if let Some(metric) = metric_option { + let _ = prepend_metric_to_payload(&mut payload, metric); + } + socket.send_to(&payload, destination)?; Ok(Some(())) } @@ -504,6 +590,35 @@ fn gen_handle_send_pattern( } } +fn prepend_metric_to_payload( + payload: &mut [u8], + metric: MetricTypes, +) -> Result<()> { + let MetricTypes::A(metric_data) = metric; + let metric_struct_size = mem::size_of_val(&metric_data); + if payload.len() < (METRIC_HEADER_LENGTH + metric_struct_size) { + return Err(anyhow!("Metric does not fit into payload")); + } + payload[METRIC_INITIAL_INDEX_START..METRIC_INITIAL_INDEX_END] + .copy_from_slice(&METRIC_INITIAL); + payload[METRIC_VERSION_INDEX] = METRIC_VERSION; + + let metric_data_bytes: &[u8] = unsafe { any_as_u8_slice(&metric_data) }; + let metric_in_payload_start = METRIC_INITIAL_INDEX_START; + let metric_in_payload_end = METRIC_INITIAL_INDEX_START + metric_struct_size; + payload[metric_in_payload_start..metric_in_payload_end] + .copy_from_slice(metric_data_bytes); + + Ok(()) +} + +unsafe fn any_as_u8_slice(p: &T) -> &[u8] { + ::core::slice::from_raw_parts( + (p as *const T) as *const u8, + ::core::mem::size_of::(), + ) +} + fn send_final_state(tx_rntimatcher_state: &SyncSender) -> Result<()> { Ok(tx_rntimatcher_state.send(RntiMatcherState::Stopped)?) } diff --git a/src/main.rs b/src/main.rs index 072f83c..95d1398 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use bus::Bus; +use bus::{Bus, BusReader}; use casual_logger::{Level, Log}; use std::collections::HashSet; use std::error::Error; @@ -61,7 +61,8 @@ fn deploy_app( let mut tx_dci: Bus = Bus::::new(BUS_SIZE_DCI); let mut tx_cell_info: Bus = Bus::::new(BUS_SIZE_CELL_INFO); let mut tx_rnti: Bus = Bus::::new(BUS_SIZE_RNTI); - let tx_metric: Bus = Bus::::new(BUS_SIZE_METRIC); + let mut tx_metric: Bus = Bus::::new(BUS_SIZE_METRIC); + let rx_metric: BusReader = tx_metric.add_rx(); let model_args = ModelHandlerArgs { rx_app_state: tx_app_state.add_rx(), @@ -77,6 +78,7 @@ fn deploy_app( app_args: app_args.clone(), rx_dci: tx_dci.add_rx(), tx_rnti, + rx_metric, }; let ngcontrol_args = NgControlArgs { rx_app_state: tx_app_state.add_rx(), diff --git a/src/parse.rs b/src/parse.rs index 150082d..36ed84d 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -122,13 +122,17 @@ pub struct RntiMatchingArgs { #[arg(long, required = false)] pub matching_local_addr: Option, - /// Define which traffic pattern to use to fetch cell data + /// List of traffic patterns (iterates all given patterns) #[arg(long, value_enum, required = false)] pub matching_traffic_pattern: Option>, /// The destination address which the traffic pattern is sent to #[arg(long, required = false)] pub matching_traffic_destination: Option, + + /// Log traffic used for matching (in ./logs/rnti_matching*.jsonl) + #[arg(long, required = false)] + pub matching_log_traffic: Option, } #[derive(Clone, Debug)] @@ -136,6 +140,7 @@ pub struct FlattenedRntiMatchingArgs { pub matching_local_addr: String, pub matching_traffic_pattern: Vec, pub matching_traffic_destination: String, + pub matching_log_traffic: bool, } impl default::Default for Arguments { @@ -163,6 +168,7 @@ impl default::Default for Arguments { matching_local_addr: Some("0.0.0.0:9292".to_string()), matching_traffic_pattern: Some(vec![RntiMatchingTrafficPatternType::A]), matching_traffic_destination: Some("1.1.1.1:53".to_string()), + matching_log_traffic: Some(true), }), } } @@ -270,6 +276,7 @@ impl FlattenedRntiMatchingArgs { matching_local_addr: rnti_args.matching_local_addr.unwrap(), matching_traffic_pattern: rnti_args.matching_traffic_pattern.unwrap(), matching_traffic_destination: rnti_args.matching_traffic_destination.unwrap(), + matching_log_traffic: rnti_args.matching_log_traffic.unwrap(), }) } }