diff --git a/Cargo.toml b/Cargo.toml index 81e72a0..b52f04a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] anyhow = "1.0.80" +arrow = "52.1.0" bus = "2.4.1" casual_logger = "0.6.5" chrono = "0.4.38" @@ -21,6 +22,7 @@ lazy_static = "1.4.0" libc = "0.2.155" nalgebra = "0.32.6" nix = "0.28.0" +once_cell = "1.19.0" regex = "1.10.4" reqwest = { version = "0.12.3", features = ["json"] } serde = { version = "1.0.197", features = ["derive"] } diff --git a/src/cell_info.rs b/src/cell_info.rs index d6e7717..63b3f3d 100644 --- a/src/cell_info.rs +++ b/src/cell_info.rs @@ -38,6 +38,7 @@ pub struct CellInfo { pub struct SingleCell { pub cell_id: u64, pub cell_type: CellularType, + /// Number of PRB per slot pub nof_prb: u16, pub frequency: u64, pub rssi: f64, diff --git a/src/logger.rs b/src/logger.rs new file mode 100644 index 0000000..c5b3ab6 --- /dev/null +++ b/src/logger.rs @@ -0,0 +1,474 @@ +use std::fs::{create_dir_all, File, OpenOptions}; +use std::io::Write; +use std::path::Path; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError}; +use std::sync::{Mutex, Arc}; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +use anyhow::{anyhow, Result}; +use arrow::array::{UInt64Builder, StructBuilder, UInt8Builder, ListBuilder, ArrayRef, UInt16Builder, UInt32Builder}; +use arrow::datatypes::{Schema, Field, DataType, Fields}; +use arrow::ipc::writer::FileWriter; +use arrow::record_batch::RecordBatch; + +use crate::logic::model_handler::LogMetric; +use crate::logic::rnti_matcher::TrafficCollection; +use crate::ngscope::types::NgScopeRntiDci; +use crate::{ + logic::{ + check_not_stopped, wait_until_running, GeneralState, MainState, WorkerState, + DEFAULT_WORKER_SLEEP_US, + }, + ngscope::types::NgScopeCellDci, + parse::{Arguments, FlattenedLogArgs, DEFAULT_LOG_BASE_DIR}, + util::{determine_process_id, print_info}, +}; +use bus::BusReader; +use chrono::{DateTime, Utc}; +use once_cell::sync::Lazy; + +const LOGGER_CAPACITY: usize = 1000; +const LOGGER_SENDER_POOL_SIZE: usize = 5; +const LOGGER_STOP_TIME_DELAY_MS: i64 = 5000; +const LOGGER_RELATIVE_PATH_INFO: &str = "stdout/"; +const LOGGER_RELATIVE_PATH_DCI: &str = "dci/"; +const LOGGER_RELATIVE_PATH_RNTI_MATCHING: &str = "rnti_matching/"; +const LOGGER_RELATIVE_PATH_METRIC: &str = "metric/"; +// TODO: Implement saving measurements data +// const LOGGER_RELATIVE_PATH_MEASUREMENT: &str = "measurements/"; + +#[derive(Clone, Debug, PartialEq)] +pub enum LoggerState { + Running, + Stopped, + InitStopLoggingSoon, + StopLoggingSoon, +} + +impl WorkerState for LoggerState { + fn worker_name() -> String { + "logger".to_owned() + } + + fn to_general_state(&self) -> GeneralState { + match self { + LoggerState::Running => GeneralState::Running, + LoggerState::Stopped => GeneralState::Stopped, + _ => GeneralState::Unknown, + } + } +} + +pub struct LoggerArgs { + pub app_args: Arguments, + pub rx_app_state: BusReader, + pub tx_logger_state: SyncSender, +} + +struct RunArgs { + app_args: Arguments, + rx_app_state: BusReader, + tx_logger_state: SyncSender, +} + +#[derive(Debug)] +pub struct LogFile { + pub path: String, + pub last_write: DateTime, + pub file_handle: Option, +} + +#[derive(Debug)] +pub struct Logger { + pub base_dir: String, + pub rx: Mutex>, + pub tx_vec: Vec>>, + pub stdout_file: Option, + pub run_timestamp: chrono::DateTime, +} + +#[derive(Debug)] +pub enum LogMessage { + /// Log a simple string + Info(String), + /// NgScope cell dci + NgScopeDci(Vec), + /// RNTI matching traffic collection + RntiMatchingTrafficCollection(Box), + /// Model Metric + Metric(Box), + // Measurement transmission data (RTT) + // LogDataTransmission(Box), +} + +/* + * Logger thread state functions + * */ + +pub fn deploy_logger(args: LoggerArgs) -> Result> { + let mut run_args: RunArgs = RunArgs { + app_args: args.app_args, + rx_app_state: args.rx_app_state, + tx_logger_state: args.tx_logger_state, + }; + + let builder = thread::Builder::new().name("[logger]".to_string()); + let thread = builder.spawn(move || { + let _ = run(&mut run_args); + finish(run_args); + })?; + Ok(thread) +} + +fn run(run_args: &mut RunArgs) -> Result<()> { + let rx_app_state = &mut run_args.rx_app_state; + let tx_logger_state = &mut run_args.tx_logger_state; + let app_args = &run_args.app_args; + let log_args = FlattenedLogArgs::from_unflattened(app_args.clone().log.unwrap())?; + Logger::set_base_dir(log_args.log_base_dir); + + tx_logger_state.send(LoggerState::Running)?; + wait_for_running(rx_app_state, tx_logger_state)?; + print_info(&format!("[logger]: \t\tPID {:?}", determine_process_id())); + + let sleep_duration = Duration::from_millis(DEFAULT_WORKER_SLEEP_US); + let mut logger_state = LoggerState::Running; + let mut stop_time_init_option: Option = None; + + loop { + /* */ + thread::sleep(sleep_duration); + if check_not_stopped(rx_app_state).is_err() { + logger_state = LoggerState::InitStopLoggingSoon; + } + + match logger_state { + LoggerState::Running => handle_running()?, + LoggerState::Stopped => break, + LoggerState::InitStopLoggingSoon => { + stop_time_init_option = Some(chrono::Utc::now().timestamp_millis()); + logger_state = LoggerState::StopLoggingSoon; + } + LoggerState::StopLoggingSoon => { + if let Some(stop_time_init) = stop_time_init_option { + if chrono::Utc::now().timestamp_millis() - stop_time_init + >= LOGGER_STOP_TIME_DELAY_MS + { + break; + } + } + handle_running()?; + } + } + } + + Ok(()) +} + +fn finish(run_args: RunArgs) { + let _ = send_final_state(&run_args.tx_logger_state); +} + +fn handle_running() -> Result<()> { + /* Check logger for pending message */ + loop { + match get_logger().rx.lock().unwrap().try_recv() { + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + return Err(anyhow!( + "[logger] error: log Logger.rx is channel disconnected!" + )) + } + Ok(log_message) => handle_log_message(log_message)?, + } + } + Ok(()) +} + +fn handle_log_message(msg: LogMessage) -> Result<()> { + if let LogMessage::Info(ref content) = msg { + println!("{}", content); + } + let msg_type_name = msg.type_name(); + if Logger::write_log_message(msg).is_err() { + print_info(&format!("[logger] error: could not log message ({})", msg_type_name)) + } + Ok(()) +} + +fn wait_for_running( + rx_app_state: &mut BusReader, + tx_logger_state: &SyncSender, +) -> Result<()> { + match wait_until_running(rx_app_state) { + Ok(_) => Ok(()), + _ => { + send_final_state(tx_logger_state)?; + Err(anyhow!("[logger] Main did not send 'Running' message")) + } + } +} + +fn send_final_state(tx_logger_state: &SyncSender) -> Result<()> { + Ok(tx_logger_state.send(LoggerState::Stopped)?) +} + +/* + * Logger functions + * */ + +pub fn log_info(info: &str) -> Result<()> { + Logger::queue_log_message(LogMessage::Info(info.to_string())) +} + +pub fn log_traffic_collection(traffic_collection: TrafficCollection) -> Result<()> { + Logger::queue_log_message(LogMessage::RntiMatchingTrafficCollection(Box::new(traffic_collection))) +} + +pub fn log_metric(metric: LogMetric) -> Result<()> { + Logger::queue_log_message(LogMessage::Metric(Box::new(metric))) +} + +pub fn log_dci(dcis: Vec) -> Result<()> { + Logger::queue_log_message(LogMessage::NgScopeDci(dcis)) +} + +#[allow(unknown_lints)] +pub fn get_logger() -> &'static mut Lazy { + static mut GLOBAL_LOGGER: Lazy = Lazy::new(|| { + let (tx, rx) = sync_channel::(LOGGER_CAPACITY); + let tx_vec: Vec>> = (0..LOGGER_SENDER_POOL_SIZE) + .map(|_| Mutex::new(tx.clone())) + .collect(); + + Logger { + base_dir: DEFAULT_LOG_BASE_DIR.to_string(), + rx: Mutex::new(rx), + tx_vec, + stdout_file: None, + run_timestamp: chrono::Utc::now(), + } + }); + #[allow(static_mut_refs)] + unsafe { &mut GLOBAL_LOGGER } +} + +impl Logger { + /* Find an unused Sender (use last if none is lockable) */ + fn lockable_sender(&self) -> &Mutex> { + for sender in self.tx_vec.iter().take(self.tx_vec.len() - 1) { + if sender.try_lock().is_ok() { + return sender; + } + } + return self.tx_vec.last().unwrap(); + } + + pub fn set_base_dir(new_base_dir: String) { + get_logger().base_dir = new_base_dir; + } + + pub fn queue_log_message(msg: LogMessage) -> Result<()> { + let logger = get_logger(); + let sender: &Mutex> = logger.lockable_sender(); + sender.lock().unwrap().send(msg)?; + Ok(()) + } + + pub fn write_log_message(msg: LogMessage) -> Result<()> { + let logger: &mut Logger = get_logger(); + + /* check open file handle for stdout */ + if let LogMessage::Info(_) = msg { + let stdout_file_handle = { + let stdout_file = logger.stdout_file.get_or_insert_with(|| { + let stdout_file_path = msg.file_path(&logger.base_dir, &logger.run_timestamp); + LogFile { + path: stdout_file_path, + last_write: Utc::now(), + file_handle: None, + } + }); + + if let Some(parent) = Path::new(&stdout_file.path).parent() { + create_dir_all(parent)?; + } + stdout_file.file_handle.get_or_insert_with(|| { + OpenOptions::new() + .create(true) + .append(true) + .open(&stdout_file.path) + .expect("Failed to open log file") + }) + }; + msg.write_to_file(stdout_file_handle)?; + logger.stdout_file.as_mut().unwrap().last_write = chrono::Utc::now(); + return Ok(()); + } + + /* open new file handle for others */ + let file_path = msg.file_path(&logger.base_dir, &logger.run_timestamp); + if let Some(parent) = Path::new(&file_path).parent() { + create_dir_all(parent)?; + } + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(file_path) + .expect("Failed to open log file"); + msg.write_to_file(&mut file)?; + + Ok(()) + } +} + +impl LogMessage { + pub fn type_name(&self) -> String { + match self { + LogMessage::Info(_) => "info", + LogMessage::NgScopeDci(_) => "ngscope dci", + LogMessage::RntiMatchingTrafficCollection(_) => "rnti traffic collection", + LogMessage::Metric(_) => "metric", + }.to_string() + } + + pub fn file_path(&self, base_dir: &str, run_timestamp: &DateTime) -> String { + let run_timestamp_formatted = run_timestamp.format("%Y_%m_%d-%H_%M_%S").to_string(); + let now_formatted = Utc::now().format("%d-%H_%M_%S%.3f").to_string(); + + let message_type_file_path: String = match self { + LogMessage::Info(_) => { + format!( + "{}run_{}.log", + LOGGER_RELATIVE_PATH_INFO, run_timestamp_formatted + ) + } + LogMessage::NgScopeDci(_) => { + format!( + "{}run_{}_cell_data_{}.arrow", + LOGGER_RELATIVE_PATH_DCI, run_timestamp_formatted, now_formatted + ) + } + LogMessage::Metric(_) => { + format!( + "{}run_{}_metric.jsonl", + LOGGER_RELATIVE_PATH_METRIC, run_timestamp_formatted + ) + } + LogMessage::RntiMatchingTrafficCollection(_) => { + format!( + "{}run_{}_traffic_collection.jsonl", + LOGGER_RELATIVE_PATH_RNTI_MATCHING, run_timestamp_formatted + ) + } + }; + format!("{}{}", base_dir, message_type_file_path) + } + + pub fn write_to_file(&self, file: &mut File) -> Result<()> { + match self { + LogMessage::Info(info_msg) => { + writeln!(file, "{}", info_msg)?; + } + LogMessage::RntiMatchingTrafficCollection(traffic_collection) => { + let json_string = serde_json::to_string(traffic_collection)?; + writeln!(file, "{}", json_string)?; + } + LogMessage::NgScopeDci(ngscope_dci_list) => { + write_arrow_ipc(create_schema(), ngscope_dci_list.to_vec(), file)? + } + LogMessage::Metric(metric) => { + let json_string = serde_json::to_string(metric)?; + writeln!(file, "{}", json_string)?; + } + } + file.flush()?; + Ok(()) + } +} + + +/* + * Helpers for writing Vec as Apache Arrow to disk + * */ + +fn create_rnti_fields() -> Fields { + Fields::from(vec![ + Field::new("rnti", DataType::UInt16, true), + + Field::new("dl_prb", DataType::UInt8, true), + Field::new("dl_tbs", DataType::UInt32, true), + Field::new("dl_retx", DataType::UInt8, true), + + Field::new("ul_prb", DataType::UInt8, true), + Field::new("ul_tbs", DataType::UInt32, true), + Field::new("ul_retx", DataType::UInt8, true), + ]) +} + +fn create_schema() -> Arc { + + let rnti_struct = DataType::Struct(create_rnti_fields()); + + Arc::new(Schema::new(vec![ + Field::new("timestamp", DataType::UInt64, false), + Field::new("nof_rnti", DataType::UInt8, false), + Field::new("rnti_list", DataType::List(Arc::new(Field::new("item", rnti_struct, true))), true), + ])) +} + +fn write_arrow_ipc( + schema: Arc, + data: Vec, + file: &File +) -> Result<()> { + let mut timestamp_builder = UInt64Builder::with_capacity(data.len()); + let mut nof_rntis_builder = UInt8Builder::with_capacity(data.len()); + let mut rnti_list_builder = ListBuilder::new(StructBuilder::from_fields(create_rnti_fields(), data.len())); + + for cell_dci in &data { + timestamp_builder.append_value(cell_dci.time_stamp); + nof_rntis_builder.append_value(cell_dci.nof_rnti); + + if cell_dci.nof_rnti == 0 { + rnti_list_builder.append(false); // Append null for an empty list + } else { + let rnti_struct_builder = rnti_list_builder.values(); + append_rnti_list_to_struct(rnti_struct_builder, &cell_dci.rnti_list[0..cell_dci.nof_rnti as usize]); + rnti_list_builder.append(true); + } + } + + let timestamp_array = Arc::new(timestamp_builder.finish()) as ArrayRef; + let nof_rntis_array = Arc::new(nof_rntis_builder.finish()) as ArrayRef; + let rnti_list_array = Arc::new(rnti_list_builder.finish()) as ArrayRef; + + let batch = RecordBatch::try_new( + schema, + vec![timestamp_array, nof_rntis_array, rnti_list_array], + )?; + + let mut writer = FileWriter::try_new(file, &batch.schema())?; + writer.write(&batch)?; + writer.finish()?; + + Ok(()) +} + +fn append_rnti_list_to_struct(rnti_struct_builder: &mut StructBuilder, rnti_list: &[NgScopeRntiDci]) { + for rnti_dci in rnti_list.iter() { + rnti_struct_builder.field_builder::(0).unwrap().append_value(rnti_dci.rnti); + + rnti_struct_builder.field_builder::(1).unwrap().append_value(rnti_dci.dl_prb); + rnti_struct_builder.field_builder::(2).unwrap().append_value(rnti_dci.dl_tbs); + rnti_struct_builder.field_builder::(3).unwrap().append_value(rnti_dci.dl_reTx); + + rnti_struct_builder.field_builder::(4).unwrap().append_value(rnti_dci.ul_prb); + rnti_struct_builder.field_builder::(5).unwrap().append_value(rnti_dci.ul_tbs); + rnti_struct_builder.field_builder::(6).unwrap().append_value(rnti_dci.ul_reTx); + + rnti_struct_builder.append(true); + } +} + diff --git a/src/logic/model_handler.rs b/src/logic/model_handler.rs index e726b75..c959ac9 100644 --- a/src/logic/model_handler.rs +++ b/src/logic/model_handler.rs @@ -1,4 +1,5 @@ use crate::cell_info::CellInfo; +use crate::logger::log_metric; use crate::ngscope::types::{NgScopeCellDci, NgScopeRntiDci}; use crate::parse::{Arguments, DynamicValue, FlattenedModelArgs}; use crate::util::{print_debug, print_info}; @@ -8,13 +9,16 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use anyhow::{anyhow, Result}; +use serde_derive::{Deserialize, Serialize}; use bus::{Bus, BusReader}; + use crate::logic::{ check_not_stopped, wait_until_running, MainState, MessageCellInfo, MessageDci, MessageMetric, MessageRnti, ModelState, DEFAULT_WORKER_SLEEP_US, }; use crate::util::determine_process_id; +use super::{MetricA, MetricTypes}; pub const MAX_DCI_ARRAY_SIZE: usize = 10000; pub const MAX_DCI_SLICE_SIZE: usize = 1000; @@ -22,6 +26,7 @@ pub const MAX_DCI_SLICE_INDEX: usize = MAX_DCI_ARRAY_SIZE - MAX_DCI_SLICE_SIZE; // Parameter gamma from [p. 456] PBE-CC: https://dl.acm.org/doi/abs/10.1145/3387514.3405880 pub const PHYSICAL_TO_TRANSPORT_OVERHEAD: f64 = 0.068; pub const PHYSICAL_TO_TRANSPORT_FACTOR: f64 = 1.0 - PHYSICAL_TO_TRANSPORT_OVERHEAD; +pub const STANDARD_NOF_PRB_SLOT_TO_SUBFRAME: u64 = 2; struct DciRingBuffer { dci_array: Box<[NgScopeCellDci]>, @@ -63,7 +68,34 @@ impl DciRingBuffer { } } -use super::{MetricA, MetricTypes}; +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct MetricResult { + pub transport_fair_share_capacity: u64, + pub physical_fair_share_capacity: u64, + pub physical_rate_bit_per_prb: u64, + /// If 1, all RNTIs are used to determine the bit/PRB rate (more general) + /// If 0, only the target-RNTI PRBs are used to determine the bit/PRB rate (more specific) + pub physical_rate_coarse_flag: u8, + pub nof_retransmissions: u64, +} + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct MetricBasis { + pub nof_dci: u64, + pub p_cell: u64, + pub nof_rnti: u64, + pub p_alloc: u64, + pub tbs_alloc: u64, + pub target_rnti: u16, + pub tbs_alloc_rnti: u64, + pub p_alloc_rnti: u64, +} + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct LogMetric { + result: MetricResult, + basis: MetricBasis, +} pub struct ModelHandlerArgs { pub app_args: Arguments, @@ -85,6 +117,22 @@ struct RunArgs { pub tx_metric: Bus, } +struct RunParameters<'a> { + tx_metric: &'a mut Bus, + dci_buffer: &'a DciRingBuffer, + rnti: u16, + cell_info: &'a CellInfo, + is_log_metric: &'a bool, +} + +struct RunParametersSendingBehavior<'a> { + metric_sending_interval_us: &'a mut u64, + metric_smoothing_size_ms: &'a mut u64, + last_metric_timestamp_us: &'a mut u64, + model_args: &'a FlattenedModelArgs, + last_rtt_us: &'a Option, +} + pub fn deploy_model_handler(args: ModelHandlerArgs) -> Result> { let mut run_args = RunArgs { app_args: args.app_args, @@ -108,14 +156,10 @@ fn send_final_state(tx_model_state: &SyncSender) -> Result<()> { Ok(tx_model_state.send(ModelState::Stopped)?) } -fn wait_for_running( - rx_app_state: &mut BusReader, -) -> Result<()> { +fn wait_for_running(rx_app_state: &mut BusReader) -> Result<()> { match wait_until_running(rx_app_state) { Ok(_) => Ok(()), - _ => { - Err(anyhow!("[model] Main did not send 'Running' message")) - } + _ => Err(anyhow!("[model] Main did not send 'Running' message")), } } @@ -135,6 +179,7 @@ fn run(run_args: &mut RunArgs) -> Result<()> { let model_args = FlattenedModelArgs::from_unflattened(app_args.clone().model.unwrap())?; + let is_log_metric: bool = model_args.model_log_metric; let mut last_metric_timestamp_us: u64 = chrono::Utc::now().timestamp_micros() as u64; let mut dci_buffer = DciRingBuffer::new(); let mut last_rnti: Option = None; @@ -183,40 +228,23 @@ fn run(run_args: &mut RunArgs) -> Result<()> { let delta_last_metric_sent_us = chrono::Utc::now().timestamp_micros() as u64 - last_metric_timestamp_us; if delta_last_metric_sent_us > metric_sending_interval_us { - let buffer_slice_size: usize = metric_smoothing_size_ms as usize; - let buffer_slice = dci_buffer.slice(buffer_slice_size); - if !buffer_slice.is_empty() { - if let Ok((capacity_physical, phy_rate, phy_rate_flag, re_tx)) = calculate_capacity(rnti, &cell_info, buffer_slice) { - let capacity_transport = translate_physcial_to_transport_simple(capacity_physical); - print_debug(&format!("DEBUG [model] model: - capacity: \t{:?} - phy rate: \t{:?} - phy flag: \t{:?} - re-tx: \t{:?}", capacity_physical, phy_rate, phy_rate_flag, re_tx)); - let now_us = chrono::Utc::now().timestamp_micros() as u64; - tx_metric.broadcast(MessageMetric { - metric: MetricTypes::A(MetricA { - timestamp_us: now_us, - fair_share_send_rate: capacity_transport, - latest_dci_timestamp_us: buffer_slice.first().unwrap().time_stamp, - oldest_dci_timestamp_us: buffer_slice.last().unwrap().time_stamp, - nof_dci: buffer_slice.len() as u16, - nof_re_tx: re_tx as u16, - flag_phy_rate_all_rnti: phy_rate_flag, - phy_rate, // TODO: Check whether it is reasonable to make this - // u16/u32 - }), - }); - } - last_metric_timestamp_us = chrono::Utc::now().timestamp_micros() as u64; - metric_sending_interval_us = - determine_sending_interval(&model_args, &last_rtt_us); - metric_smoothing_size_ms = determine_smoothing_size(&model_args, &last_rtt_us); - } else { - print_debug("DEBUG [model] skipping metric calculation, dci slice is 0"); - } + let mut run_params = RunParameters { + tx_metric, + dci_buffer: &dci_buffer, + rnti, + cell_info: &cell_info, + is_log_metric: &is_log_metric, + }; + + let mut sending_behavior = RunParametersSendingBehavior { + metric_sending_interval_us: &mut metric_sending_interval_us, + metric_smoothing_size_ms: &mut metric_smoothing_size_ms, + last_metric_timestamp_us: &mut last_metric_timestamp_us, + model_args: &model_args, + last_rtt_us: &last_rtt_us, + }; + handle_calculate_metric(&mut run_params, &mut sending_behavior); } - // TODO: Log the else case. } } @@ -227,24 +255,106 @@ fn finish(run_args: RunArgs) { let _ = send_final_state(&run_args.tx_model_state); } + +fn handle_calculate_metric( + run_params: &mut RunParameters, + sending_behavior: &mut RunParametersSendingBehavior, +) { + // Use the fields from the structs + let RunParameters { + tx_metric, + dci_buffer, + rnti, + cell_info, + is_log_metric, + } = run_params; + + let RunParametersSendingBehavior { + metric_sending_interval_us, + metric_smoothing_size_ms, + last_metric_timestamp_us, + model_args, + last_rtt_us, + } = sending_behavior; + + let buffer_slice_size: usize = **metric_smoothing_size_ms as usize; + let buffer_slice = dci_buffer.slice(buffer_slice_size); + if !buffer_slice.is_empty() { + + if let Ok(metric_wrapper) = calculate_capacity(*rnti, cell_info, buffer_slice, is_log_metric) { + let transport_capacity = metric_wrapper.result.transport_fair_share_capacity; + let physical_rate_flag = metric_wrapper.result.physical_rate_coarse_flag; + let physical_rate = metric_wrapper.result.physical_rate_bit_per_prb; + let re_transmissions = metric_wrapper.result.nof_retransmissions; + let now_us = chrono::Utc::now().timestamp_micros() as u64; + + tx_metric.broadcast(MessageMetric { + metric: MetricTypes::A(MetricA { + timestamp_us: now_us, + fair_share_send_rate: transport_capacity, + latest_dci_timestamp_us: buffer_slice.first().unwrap().time_stamp, + oldest_dci_timestamp_us: buffer_slice.last().unwrap().time_stamp, + nof_dci: buffer_slice.len() as u16, + nof_re_tx: re_transmissions as u16, + flag_phy_rate_all_rnti: physical_rate_flag, + phy_rate: physical_rate, + }), + }); + } + **last_metric_timestamp_us = chrono::Utc::now().timestamp_micros() as u64; + **metric_sending_interval_us = + determine_sending_interval(model_args, last_rtt_us); + **metric_smoothing_size_ms = determine_smoothing_size(model_args, last_rtt_us); + } else { + print_debug("DEBUG [model] skipping metric calculation, dci slice is 0"); + } +} + +fn calculate_capacity( + target_rnti: u16, + cell_info: &CellInfo, + dci_list: &[NgScopeCellDci], + is_log_metric: &bool, +) -> Result { + let metric_wrapper = calculate_pbe_cc_capacity(target_rnti, cell_info, dci_list)?; + if *is_log_metric { + let _ = log_metric(metric_wrapper.clone()); + } + print_debug(&format!( + "DEBUG [model] model: + c_t: \t{:?} + c_p: \t{:?} + phy rate: \t{:?} + phy flag: \t{:?} + re-tx: \t{:?}", + metric_wrapper.result.transport_fair_share_capacity, + metric_wrapper.result.physical_fair_share_capacity, + metric_wrapper.result.physical_rate_bit_per_prb, + metric_wrapper.result.physical_rate_coarse_flag, + metric_wrapper.result.nof_retransmissions, + )); + Ok(metric_wrapper) +} + /* * PHY-Layer fair-share capacity in bit/ms in downlink * (capacity, bit/PRB ratio, re-transmissions) * * According to PBE-CC: https://dl.acm.org/doi/abs/10.1145/3387514.3405880 * */ -fn calculate_capacity( +fn calculate_pbe_cc_capacity( target_rnti: u16, cell_info: &CellInfo, - dci_list: &[NgScopeCellDci] -) -> Result<(u64, u64, u8, u64)> { + dci_list: &[NgScopeCellDci], +) -> Result { let nof_dci: u64 = dci_list.len() as u64; - if nof_dci == 0 { + if nof_dci == 0 { return Err(anyhow!("Cannot calculate capacity with 0 DCI")); } - let p_cell: f64 = cell_info.cells[0].nof_prb as f64 * nof_dci as f64; + // Total number of PRBs in a subframe, that the cell can offer + let p_cell: u64 = STANDARD_NOF_PRB_SLOT_TO_SUBFRAME * cell_info.cells[0].nof_prb as u64 * nof_dci; - let nof_rnti: f64 = dci_list + let nof_rnti: u64 = dci_list .iter() .flat_map(|dci| { dci.rnti_list @@ -254,45 +364,40 @@ fn calculate_capacity( .map(|rnti_dci| rnti_dci.rnti) }) .collect::>() - .len() as f64; + .len() as u64; - let p_alloc: u64 = dci_list - .iter() - .map(|dci| dci.total_dl_prb as u64) - .sum(); + let p_alloc: u64 = dci_list.iter().map(|dci| dci.total_dl_prb as u64).sum(); - let tbs_alloc: u64 = dci_list - .iter() - .map(|dci| dci.total_dl_tbs ) - .sum(); + let tbs_alloc: u64 = dci_list.iter().map(|dci| dci.total_dl_tbs).sum(); let target_rnti_dci_list: Vec<&NgScopeRntiDci> = dci_list .iter() .flat_map(|dci| { dci.rnti_list - .iter() - .take(dci.nof_rnti as usize) - .filter(|rnti_dci| rnti_dci.rnti == target_rnti) - .filter(|rnti_dci| rnti_dci.dl_prb > 0) + .iter() + .take(dci.nof_rnti as usize) + .filter(|rnti_dci| rnti_dci.rnti == target_rnti) + .filter(|rnti_dci| rnti_dci.dl_prb > 0) }) .collect::>(); let re_tx: u64 = target_rnti_dci_list .iter() - .map(|target_rnti_dci| target_rnti_dci.dl_reTx as u64 ) + .map(|target_rnti_dci| target_rnti_dci.dl_reTx as u64) .sum::(); let tbs_alloc_rnti: u64 = target_rnti_dci_list .iter() - .map(|target_rnti_dci| target_rnti_dci.dl_tbs as u64 ) + .map(|target_rnti_dci| target_rnti_dci.dl_tbs as u64) .sum::(); let p_alloc_rnti: u64 = target_rnti_dci_list .iter() - .map(|target_rnti_dci| target_rnti_dci.dl_prb as u64 ) + .map(|target_rnti_dci| target_rnti_dci.dl_prb as u64) .sum::(); - print_debug(&format!("DEBUG [model] parameters: + print_debug(&format!( + "DEBUG [model] parameters: nof_dci: \t\t{:?} p_cell: \t\t{:?} nof_rnti: \t\t{:?} @@ -300,18 +405,40 @@ fn calculate_capacity( p_alloc_rnti: \t\t{:?} tbs_alloc_rnti: \t{:?} re_tx: \t\t{:?}", - nof_dci, p_cell, nof_rnti, p_alloc, p_alloc_rnti, tbs_alloc_rnti, re_tx)); - let mut flag_phy_rate_all_rnti: u8 = 0; + nof_dci, p_cell, nof_rnti, p_alloc, p_alloc_rnti, tbs_alloc_rnti, re_tx + )); + let mut r_w_coarse_flag: u8 = 0; // [bit/PRB] let r_w: u64 = if p_alloc_rnti == 0 { - flag_phy_rate_all_rnti = 1; + r_w_coarse_flag = 1; 8 * tbs_alloc / p_alloc } else { 8 * tbs_alloc_rnti / p_alloc_rnti }; - let p_idle: f64 = p_cell - p_alloc as f64; - let c_p: u64 = ((r_w as f64 * (p_alloc_rnti as f64 + (p_idle / nof_rnti))) / nof_dci as f64) as u64; - Ok((c_p, r_w, flag_phy_rate_all_rnti, re_tx)) + let p_idle: f64 = p_cell as f64 - p_alloc as f64; + let c_p: u64 = + ((r_w as f64 * (p_alloc_rnti as f64 + (p_idle / nof_rnti as f64))) / nof_dci as f64) as u64; + let c_t = translate_physcial_to_transport_simple(c_p); + + Ok(LogMetric { + result: MetricResult { + physical_fair_share_capacity: c_p, + transport_fair_share_capacity: c_t, + physical_rate_bit_per_prb: r_w, + physical_rate_coarse_flag: r_w_coarse_flag, + nof_retransmissions: re_tx, + }, + basis: MetricBasis { + nof_dci, + p_cell, + nof_rnti, + p_alloc, + tbs_alloc, + target_rnti, + tbs_alloc_rnti, + p_alloc_rnti, + } + }) } fn translate_physcial_to_transport_simple(c_physical: u64) -> u64 { @@ -331,7 +458,8 @@ fn determine_smoothing_size(model_args: &FlattenedModelArgs, last_rtt_us: &Optio let unbound_slice = match model_args.model_metric_smoothing_size_type { DynamicValue::FixedMs => model_args.model_metric_smoothing_size_value as u64, DynamicValue::RttFactor => { - (last_rtt_us.unwrap() as f64 * model_args.model_metric_smoothing_size_value / 1000.0) as u64 + (last_rtt_us.unwrap() as f64 * model_args.model_metric_smoothing_size_value / 1000.0) + as u64 } }; if unbound_slice > MAX_DCI_SLICE_SIZE as u64 { @@ -342,8 +470,11 @@ fn determine_smoothing_size(model_args: &FlattenedModelArgs, last_rtt_us: &Optio #[cfg(test)] mod tests { - use crate::{ngscope::types::{NgScopeRntiDci, NGSCOPE_MAX_NOF_RNTI}, cell_info::SingleCell}; use super::*; + use crate::{ + cell_info::SingleCell, + ngscope::types::{NgScopeRntiDci, NGSCOPE_MAX_NOF_RNTI}, + }; fn dummy_rnti_dci(nof_rnti: u8) -> [NgScopeRntiDci; NGSCOPE_MAX_NOF_RNTI] { let mut rnti_list = [NgScopeRntiDci::default(); NGSCOPE_MAX_NOF_RNTI]; @@ -383,15 +514,16 @@ mod tests { fn test_capacity() -> Result<()> { let dummy_rnti: u16 = 123; let dummy_cell_info = CellInfo { - cells: vec![ - SingleCell { - nof_prb: 100, - ..Default::default() - } - ] + cells: vec![SingleCell { + nof_prb: 100, + ..Default::default() + }], }; - let actual_capacity = calculate_capacity(dummy_rnti, &dummy_cell_info, &dummy_dci_slice())?; - assert_eq!(actual_capacity, (129706, 512 * 8, 0, 0)); + let metric_params = calculate_capacity(dummy_rnti, &dummy_cell_info, &dummy_dci_slice(), &false)?; + assert_eq!(metric_params.result.physical_fair_share_capacity, 266240); + assert_eq!(metric_params.result.physical_rate_bit_per_prb, 512 * 8); + assert_eq!(metric_params.result.physical_rate_coarse_flag, 0); + assert_eq!(metric_params.result.nof_retransmissions, 0); Ok(()) } } diff --git a/src/logic/ngscope_controller.rs b/src/logic/ngscope_controller.rs index 40d21b7..cb4aae3 100644 --- a/src/logic/ngscope_controller.rs +++ b/src/logic/ngscope_controller.rs @@ -9,13 +9,14 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use crate::cell_info::CellInfo; +use crate::logger::log_dci; use crate::logic::{ check_not_stopped, wait_until_running, MainState, MessageCellInfo, MessageDci, NgControlState, CHANNEL_SYNC_SIZE, DEFAULT_WORKER_SLEEP_MS, DEFAULT_WORKER_SLEEP_US, }; use crate::ngscope; use crate::ngscope::config::NgScopeConfig; -use crate::ngscope::types::Message; +use crate::ngscope::types::{Message, NgScopeCellDci}; use crate::ngscope::{ ngscope_validate_server_check, ngscope_validate_server_send_initial, start_ngscope, stop_ngscope, @@ -105,6 +106,8 @@ fn run(run_args: &mut RunArgs, run_args_mov: RunArgsMovables) -> Result<()> { tx_dci, ng_args.ng_local_addr.to_string(), ng_args.ng_server_addr.to_string(), + ng_args.ng_log_dci, + ng_args.ng_log_dci_batch_size, )?); run_args.tx_dci_thread_handle = Some(tx_dci_thread.clone()); @@ -254,6 +257,8 @@ fn deploy_dci_fetcher_thread( tx_dci: Bus, local_socket_addr: String, ng_server_addr: String, + is_log_dci: bool, + log_dci_batch_size: u64, ) -> Result> { let thread = thread::spawn(move || { let _ = run_dci_fetcher( @@ -262,6 +267,8 @@ fn deploy_dci_fetcher_thread( tx_dci, local_socket_addr, ng_server_addr, + is_log_dci, + log_dci_batch_size, ); }); Ok(thread) @@ -273,6 +280,8 @@ fn run_dci_fetcher( mut tx_dci: Bus, local_socket_addr: String, ng_server_addr: String, + is_log_dci: bool, + log_dci_batch_size: u64, ) -> Result<()> { let socket = init_dci_server(&local_socket_addr)?; let mut dci_state: LocalDciState = LocalDciState::ListenForDci; @@ -281,6 +290,7 @@ fn run_dci_fetcher( determine_process_id() )); + let mut log_dci_buffer: Vec = Vec::with_capacity(2 * log_dci_batch_size as usize); let sleep_duration = Duration::from_micros(DEFAULT_WORKER_SLEEP_US); let mut last_dci_timestamp_us: u64 = 0; @@ -317,7 +327,8 @@ fn run_dci_fetcher( }; } LocalDciState::ListenForDci => { - check_ngscope_message(&socket, &mut tx_dci, &mut last_dci_timestamp_us) + check_ngscope_message(&socket, &mut tx_dci, &mut last_dci_timestamp_us, &is_log_dci, &mut log_dci_buffer); + check_log_dci(&is_log_dci, &mut log_dci_buffer, &log_dci_batch_size); } } } @@ -328,6 +339,8 @@ fn check_ngscope_message( socket: &UdpSocket, tx_dci: &mut Bus, last_dci_timestamp_us: &mut u64, + is_log_dci: &bool, + log_dci_buffer: &mut Vec, ) { match ngscope::ngscope_recv_single_message(socket) { Ok(msg) => { @@ -356,6 +369,9 @@ fn check_ngscope_message( let message_dci = MessageDci { ngscope_dci: *cell_dci, }; + if *is_log_dci { + log_dci_buffer.push(*cell_dci.clone()); + } match tx_dci.try_broadcast(message_dci) { Ok(_) => {} Err(msg) => { @@ -383,6 +399,18 @@ fn check_ngscope_message( } } +fn check_log_dci( + is_log_dci: &bool, + log_dci_buffer: &mut Vec, + log_dci_batch_size: &u64 +) { + if *is_log_dci && log_dci_buffer.len() >= *log_dci_batch_size as usize { + let _ = log_dci(log_dci_buffer.clone()); + log_dci_buffer.clear() + } +} + + /* -------------- */ /* Helpers */ /* -------------- */ diff --git a/src/logic/rnti_matcher.rs b/src/logic/rnti_matcher.rs index 72f198b..043d3ff 100644 --- a/src/logic/rnti_matcher.rs +++ b/src/logic/rnti_matcher.rs @@ -12,6 +12,7 @@ use bus::{Bus, BusReader}; use nalgebra::{DMatrix, DVector}; use serde_derive::{Deserialize, Serialize}; +use crate::logger::log_traffic_collection; use crate::logic::traffic_patterns::{TrafficPattern, TrafficPatternFeatures}; use crate::logic::{ check_not_stopped, wait_until_running, MainState, MessageDci, MessageRnti, RntiMatcherState, @@ -21,7 +22,7 @@ use crate::ngscope::types::NgScopeCellDci; use crate::parse::{Arguments, FlattenedRntiMatchingArgs}; use crate::util::{ - determine_process_id, log_rnti_matching_traffic, print_debug, print_info, CellRntiRingBuffer, + determine_process_id, print_debug, print_info, CellRntiRingBuffer, }; use crate::math_util::{ @@ -138,6 +139,8 @@ pub struct TrafficCollection { pub start_timestamp_ms: u64, pub finish_timestamp_ms: u64, pub traffic_pattern_features: TrafficPatternFeatures, + pub basic_filter_statistics: Option, + pub feature_distance_statistics: Option, } #[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] @@ -317,6 +320,8 @@ fn handle_start_matching( start_timestamp_ms, finish_timestamp_ms, traffic_pattern_features, + basic_filter_statistics: None, + feature_distance_statistics: None, }; let _ = tx_gen_thread.send(LocalGeneratorState::SendPattern(Box::new(traffic_pattern))); @@ -351,13 +356,9 @@ fn handle_process_dci( // Check number of packets plausability: expected ms -> expected dcis let mut message_rnti: MessageRnti = MessageRnti::default(); - /* log files */ - if log_traffic { - let _ = log_rnti_matching_traffic(&traffic_collection); - } - + /* First processing step: Reduce RNTIs */ traffic_collection.apply_basic_filter(); - + /* Second processing step: Determine distances */ let best_matches = match traffic_collection.find_best_matching_rnti() { Ok(matches) => matches, Err(e) => { @@ -370,6 +371,9 @@ fn handle_process_dci( ); } }; + if log_traffic { + let _ = log_traffic_collection(traffic_collection.clone()); + } cell_rnti_ring_buffer.update(&best_matches); print_debug(&format!( "DEBUG [rntimatcher] cell_rnti_ring_buffer: {:#?}", @@ -622,7 +626,7 @@ fn wait_for_running( Ok(_) => Ok(()), _ => { send_final_state(tx_rntimtacher_state)?; - Err(anyhow!("[sink] Main did not send 'Running' message")) + Err(anyhow!("[rntimatcher] Main did not send 'Running' message")) } } } @@ -675,7 +679,7 @@ impl TrafficCollection { * MEDIAN UL * * */ - fn apply_basic_filter(&mut self) -> BasicFilterStatistics { + fn apply_basic_filter(&mut self) { let mut stats: BasicFilterStatistics = Default::default(); let max_total_ul = (BASIC_FILTER_MAX_TOTAL_UL_FACTOR * self.traffic_pattern_features.total_ul_bytes as f64) @@ -764,19 +768,108 @@ impl TrafficCollection { .retain(|key, _| rntis_to_keep.contains(key)); } - stats + self.basic_filter_statistics = Some(stats) } /* * cell_id -> { (rnti, distance ) } * * */ - pub fn find_best_matching_rnti(&self) -> Result> { - let pattern_std_vec = &self.traffic_pattern_features.std_vec; - let pattern_feature_vec = &self.traffic_pattern_features.std_feature_vec; + pub fn find_best_matching_rnti(&mut self) -> Result> { /* Change this to use the functional approach */ // feature_distance_functional(&self.cell_traffic, pattern_std_vec, pattern_feature_vec); - feature_distance_matrices(&self.cell_traffic, pattern_std_vec, pattern_feature_vec) + self.feature_distance_matrices() + } + + fn feature_distance_functional(&self) -> Result> { + let pattern_std_vec = &self.traffic_pattern_features.std_vec; + let pattern_feature_vec = &self.traffic_pattern_features.std_feature_vec; + self.cell_traffic + .iter() + .map(|(&cell_id, cell_traffic)| { + let rnti_and_distance: Result> = cell_traffic + .traffic + .iter() + .map(|(&rnti, ue_traffic)| { + let std_feature_vec = + ue_traffic.generate_standardized_feature_vec(pattern_std_vec)?; + let distance = calculate_weighted_euclidean_distance( + pattern_feature_vec, + &std_feature_vec, + &MATCHING_WEIGHTINGS, + ); + Ok((rnti, distance)) + }) + .collect::>>(); + let mut rnti_and_distance = rnti_and_distance?; + rnti_and_distance.sort_by(|a, b| a.1.abs().partial_cmp(&b.1.abs()).unwrap()); + Ok((cell_id, rnti_and_distance.first().unwrap().0)) + }) + .collect::>>() + } + + fn feature_distance_matrices(&mut self) -> Result> { + let pattern_std_vec = &self.traffic_pattern_features.std_vec; + let pattern_feature_vec = &self.traffic_pattern_features.std_feature_vec; + let num_features = pattern_std_vec.len(); + let weightings_vector = DVector::from_row_slice(&MATCHING_WEIGHTINGS); + + self.cell_traffic + .iter() + .map(|(&cell_id, cell_traffic)| { + let standardized_feature_vecs: Vec> = cell_traffic + .traffic + .values() + .map(|ue_traffic| { + ue_traffic + .generate_standardized_feature_vec(pattern_std_vec) + .map_err(|e| anyhow!(e)) + }) + .collect::>>>()?; + + let num_vectors = standardized_feature_vecs.len(); + let data: Vec = standardized_feature_vecs.clone().into_iter().flatten().collect(); + let feature_matrix: DMatrix = + DMatrix::from_row_slice(num_vectors, num_features, &data); + + // Uncomment and implement debug print if needed + // print_debug(&format!("DEBUG [rntimatcher] feature_matrix: {:.2}", feature_matrix)); + + let pattern_feature_matrix = + DMatrix::from_fn(num_vectors, num_features, |_, r| pattern_feature_vec[r]); + let euclidean_distances = calculate_weighted_euclidean_distance_matrix( + &pattern_feature_matrix, + &feature_matrix, + &weightings_vector, + ); + + // Uncomment and implement debug print if needed + print_debug(&format!( + "DEBUG [rntimatcher] distances: {:.2}", + euclidean_distances + )); + + let mut rnti_and_distance: Vec<(u16, f64)> = cell_traffic + .traffic + .keys() + .cloned() + .zip(euclidean_distances.iter().cloned()) + .collect(); + + rnti_and_distance.sort_by(|a, b| a.1.abs().partial_cmp(&b.1.abs()).unwrap()); + + self.feature_distance_statistics = Some(FeatureDistanceStatistics { + weightings: MATCHING_WEIGHTINGS.to_vec(), + pattern_standardization: pattern_std_vec.clone(), + pattern_features: pattern_feature_vec.clone(), + rntis: cell_traffic.traffic.keys().cloned().collect(), + rnti_features: standardized_feature_vecs.clone(), + rnti_distances: euclidean_distances.column(0).iter().cloned().collect(), + }); + + Ok((cell_id, rnti_and_distance.first().unwrap().0)) + }) + .collect::>>() } } @@ -846,99 +939,21 @@ impl UeTraffic { } } -#[derive(Clone, Debug, Default)] -struct BasicFilterStatistics { +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +pub struct BasicFilterStatistics { pub max_total_ul: u64, pub min_total_ul: u64, pub max_ul_per_dci: u64, pub min_occurences: u64, - pub zero_ul_median: u64, -} - -fn feature_distance_functional( - traffic: &HashMap, - pattern_std_vec: &[(f64, f64)], - pattern_feature_vec: &[f64], -) -> Result> { - traffic - .iter() - .map(|(&cell_id, cell_traffic)| { - let rnti_and_distance: Result> = cell_traffic - .traffic - .iter() - .map(|(&rnti, ue_traffic)| { - let std_feature_vec = - ue_traffic.generate_standardized_feature_vec(pattern_std_vec)?; - let distance = calculate_weighted_euclidean_distance( - pattern_feature_vec, - &std_feature_vec, - &MATCHING_WEIGHTINGS, - ); - Ok((rnti, distance)) - }) - .collect::>>(); - let mut rnti_and_distance = rnti_and_distance?; - rnti_and_distance.sort_by(|a, b| a.1.abs().partial_cmp(&b.1.abs()).unwrap()); - Ok((cell_id, rnti_and_distance.first().unwrap().0)) - }) - .collect::>>() } -fn feature_distance_matrices( - traffic: &HashMap, - pattern_std_vec: &[(f64, f64)], - pattern_feature_vec: &[f64], -) -> Result> { - let num_features = pattern_std_vec.len(); - let weightings_vector = DVector::from_row_slice(&MATCHING_WEIGHTINGS); - - traffic - .iter() - .map(|(&cell_id, cell_traffic)| { - let standardized_feature_vecs: Result>> = cell_traffic - .traffic - .values() - .map(|ue_traffic| { - ue_traffic - .generate_standardized_feature_vec(pattern_std_vec) - .map_err(|e| anyhow!(e)) - }) - .collect(); - - let standardized_feature_vecs = standardized_feature_vecs?; - let num_vectors = standardized_feature_vecs.len(); - - let data: Vec = standardized_feature_vecs.into_iter().flatten().collect(); - let feature_matrix: DMatrix = - DMatrix::from_row_slice(num_vectors, num_features, &data); - - // Uncomment and implement debug print if needed - // print_debug(&format!("DEBUG [rntimatcher] feature_matrix: {:.2}", feature_matrix)); - - let pattern_feature_matrix = - DMatrix::from_fn(num_vectors, num_features, |_, r| pattern_feature_vec[r]); - let euclidean_distances = calculate_weighted_euclidean_distance_matrix( - &pattern_feature_matrix, - &feature_matrix, - &weightings_vector, - ); - - // Uncomment and implement debug print if needed - print_debug(&format!( - "DEBUG [rntimatcher] distances: {:.2}", - euclidean_distances - )); - - let mut rnti_and_distance: Vec<(u16, f64)> = cell_traffic - .traffic - .keys() - .cloned() - .zip(euclidean_distances.iter().cloned()) - .collect(); - - rnti_and_distance.sort_by(|a, b| a.1.abs().partial_cmp(&b.1.abs()).unwrap()); - - Ok((cell_id, rnti_and_distance.first().unwrap().0)) - }) - .collect::>>() +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +pub struct FeatureDistanceStatistics { + /* TODO: Implement fields */ + pub weightings: Vec, + pub pattern_standardization: Vec<(f64, f64)>, + pub pattern_features: Vec, + pub rntis: Vec, + pub rnti_features: Vec>, + pub rnti_distances: Vec, } diff --git a/src/main.rs b/src/main.rs index 9ad8f47..da945cf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ use anyhow::{anyhow, Result}; use bus::{Bus, BusReader}; use casual_logger::{Level, Log}; +use logger::{deploy_logger, LoggerArgs, LoggerState}; use std::collections::HashSet; use std::error::Error; use std::fs; @@ -11,6 +12,7 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; mod cell_info; +mod logger; mod logic; mod math_util; mod ngscope; @@ -35,6 +37,7 @@ struct CombinedReceivers { pub source: Receiver, pub rntimatcher: Receiver, pub ngcontrol: Receiver, + pub logger: Receiver, } struct CombinedSenders { @@ -42,6 +45,7 @@ struct CombinedSenders { pub source: SyncSender, pub rntimatcher: SyncSender, pub ngcontrol: SyncSender, + pub logger: SyncSender, } impl CombinedReceivers { @@ -50,6 +54,7 @@ impl CombinedReceivers { let _ = &self.model.worker_print_on_recv(); let _ = &self.ngcontrol.worker_print_on_recv(); let _ = &self.rntimatcher.worker_print_on_recv(); + let _ = &self.logger.worker_print_on_recv(); } } @@ -64,6 +69,11 @@ fn deploy_app( let mut tx_metric: Bus = Bus::::new(BUS_SIZE_METRIC); let rx_metric: BusReader = tx_metric.add_rx(); + let logger_args = LoggerArgs { + app_args: app_args.clone(), + rx_app_state: tx_app_state.add_rx(), + tx_logger_state: all_tx_states.logger, + }; let model_args = ModelHandlerArgs { app_args: app_args.clone(), rx_app_state: tx_app_state.add_rx(), @@ -100,6 +110,7 @@ fn deploy_app( deploy_cell_source(source_args)?, deploy_model_handler(model_args)?, deploy_rnti_matcher(rntimatcher_args)?, + deploy_logger(logger_args)?, ]; Ok(tasks) } @@ -136,9 +147,10 @@ fn wait_all_running( ) -> Result<()> { print_info("[ ] waiting for all threads to become ready"); - let mut waiting_for: HashSet<&str> = vec!["source", "model", "rntimatcher", "ngcontrol"] - .into_iter() - .collect(); + let mut waiting_for: HashSet<&str> = + vec!["source", "model", "rntimatcher", "ngcontrol", "logger"] + .into_iter() + .collect(); while !waiting_for.is_empty() { if is_notifier(sigint_notifier) { @@ -166,6 +178,11 @@ fn wait_all_running( waiting_for.remove("ngcontrol"); } } + if waiting_for.contains("logger") { + if let Ok(Some(_)) = check_running(&all_rx_states.logger) { + waiting_for.remove("logger"); + } + } } print_info("[✓] waiting for all threads to become ready"); @@ -192,17 +209,20 @@ fn main() -> Result<(), Box> { let (source_tx, source_rx) = sync_channel::(CHANNEL_SYNC_SIZE); let (rntimatcher_tx, rntimatcher_rx) = sync_channel::(CHANNEL_SYNC_SIZE); let (ngcontrol_tx, ngcontrol_rx) = sync_channel::(CHANNEL_SYNC_SIZE); + let (logger_tx, logger_rx) = sync_channel::(CHANNEL_SYNC_SIZE); let all_tx_states = CombinedSenders { model: model_tx, source: source_tx, rntimatcher: rntimatcher_tx, ngcontrol: ngcontrol_tx, + logger: logger_tx, }; let all_rx_states = CombinedReceivers { model: model_rx, source: source_rx, rntimatcher: rntimatcher_rx, ngcontrol: ngcontrol_rx, + logger: logger_rx, }; let tasks = deploy_app(&mut tx_app_state, &args, all_tx_states)?; diff --git a/src/ngscope/types.rs b/src/ngscope/types.rs index 6573aaf..f593f82 100644 --- a/src/ngscope/types.rs +++ b/src/ngscope/types.rs @@ -7,13 +7,13 @@ pub const NOF_VALIDATE_SUCCESS: usize = 2; pub const NGSCOPE_REMOTE_BUFFER_SIZE: usize = 1400; // ngscope sending buffer size // taken from: ngscope/hdr/dciLib/dci_sink_def.h pub const NGSCOPE_MAX_NOF_CELL: usize = 4; // ngscope max nof cells per station -pub const NGSCOPE_MAX_NOF_RNTI: usize = 20; +pub const NGSCOPE_MAX_NOF_RNTI: usize = 40; pub const NGSCOPE_MESSAGE_TYPE_SIZE: usize = 4; pub const NGSCOPE_MESSAGE_VERSION_POSITION: usize = 4; pub const NGSCOPE_MESSAGE_CONTENT_POSITION: usize = 5; pub const NGSCOPE_STRUCT_SIZE_DCI: usize = 40; -pub const NGSCOPE_STRUCT_SIZE_CELL_DCI: usize = 448; +pub const NGSCOPE_STRUCT_SIZE_CELL_DCI: usize = 848; pub const NGSCOPE_STRUCT_SIZE_CONFIG: usize = 12; // TODO: Determine this actually // IMPORTANT: @@ -144,7 +144,7 @@ pub struct NgScopeRntiDci { } #[repr(C)] -#[derive(Copy, Clone, Debug, Default)] +#[derive(Copy, Clone, Debug)] #[allow(non_snake_case)] pub struct NgScopeCellDci { pub cell_id: u8, @@ -160,6 +160,24 @@ pub struct NgScopeCellDci { pub rnti_list: [NgScopeRntiDci; NGSCOPE_MAX_NOF_RNTI], } +impl Default for NgScopeCellDci { + fn default() -> Self { + NgScopeCellDci { + cell_id: 0, + time_stamp: 0, + tti: 0, + total_dl_tbs: 0, + total_ul_tbs: 0, + total_dl_prb: 0, + total_ul_prb: 0, + total_dl_reTx: 0, + total_ul_reTx: 0, + nof_rnti: 0, + rnti_list: [NgScopeRntiDci::default(); NGSCOPE_MAX_NOF_RNTI], + } + } +} + impl NgScopeCellDci { pub fn from_bytes(bytes: [u8; NGSCOPE_STRUCT_SIZE_CELL_DCI]) -> Result { let cell_dci: &NgScopeCellDci = unsafe { &*bytes.as_ptr().cast() }; diff --git a/src/parse.rs b/src/parse.rs index 85c831a..515e1e3 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -6,6 +6,8 @@ use std::{default, error::Error, path::PathBuf}; use crate::{logic::traffic_patterns::RntiMatchingTrafficPatternType, util::print_debug}; +pub const DEFAULT_LOG_BASE_DIR: &str = "./.logs/"; + #[derive(Debug, Clone, PartialEq, Parser, Serialize, Deserialize)] #[command(author, version, about, long_about = None, next_line_help = true)] #[command(propagate_version = true)] @@ -31,6 +33,9 @@ pub struct Arguments { #[command(flatten)] pub model: Option, + #[command(flatten)] + pub log: Option, + /// Print additional information in the terminal #[arg(short('v'), long, required = false)] pub verbose: Option, @@ -108,6 +113,14 @@ pub struct NgScopeArgs { /// If true, UE Cell Tracker starts its own NG-Scope instance #[arg(long, required = false)] pub ng_start_process: Option, + + /// Log DCI and general cell data information + #[arg(long, required = false)] + pub ng_log_dci: Option, + + /// Determine the number of DCIs contained in a single log file + #[arg(long, required = false)] + pub ng_log_dci_batch_size: Option, } #[derive(Clone, Debug)] @@ -117,6 +130,8 @@ pub struct FlattenedNgScopeArgs { pub ng_server_addr: String, pub ng_log_file: Option, pub ng_start_process: bool, + pub ng_log_dci: bool, + pub ng_log_dci_batch_size: u64, } #[derive(Args, Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -133,7 +148,7 @@ pub struct RntiMatchingArgs { #[arg(long, required = false)] pub matching_traffic_destination: Option, - /// Log traffic used for matching (in ./logs/rnti_matching*.jsonl) + /// Log RNTI matching traffic and features #[arg(long, required = false)] pub matching_log_traffic: Option, } @@ -169,6 +184,10 @@ pub struct ModelArgs { /// Metric smoothing type (Rtt-factor or fixed) #[arg(long, value_enum, required = false)] pub model_metric_smoothing_size_type: Option, + + /// Log Metric and calculation basis + #[arg(long, required = false)] + pub model_log_metric: Option, } #[derive(Clone, Debug)] @@ -177,6 +196,19 @@ pub struct FlattenedModelArgs { pub model_send_metric_interval_type: DynamicValue, pub model_metric_smoothing_size_value: f64, pub model_metric_smoothing_size_type: DynamicValue, + pub model_log_metric: bool, +} + +#[derive(Args, Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct LogArgs { + /// Base directory for logging + #[arg(long, required = false)] + pub log_base_dir: Option, +} + +#[derive(Clone, Debug)] +pub struct FlattenedLogArgs { + pub log_base_dir: String, } impl default::Default for Arguments { @@ -199,6 +231,8 @@ impl default::Default for Arguments { ng_server_addr: Some("0.0.0.0:6767".to_string()), ng_log_file: Some("./.ng_scope_log.txt".to_string()), ng_start_process: Some(true), + ng_log_dci: Some(true), + ng_log_dci_batch_size: Some(60000), }), rntimatching: Some(RntiMatchingArgs { matching_local_addr: Some("0.0.0.0:9292".to_string()), @@ -211,6 +245,10 @@ impl default::Default for Arguments { model_send_metric_interval_type: Some(DynamicValue::RttFactor), model_metric_smoothing_size_value: Some(1.0), model_metric_smoothing_size_type: Some(DynamicValue::RttFactor), + model_log_metric: Some(true), + }), + log: Some(LogArgs { + log_base_dir: Some(DEFAULT_LOG_BASE_DIR.to_string()), }), } } @@ -248,6 +286,7 @@ impl Arguments { self.ngscope = self.ngscope.or(config_file.ngscope); self.rntimatching = self.rntimatching.or(config_file.rntimatching); self.model = self.model.or(config_file.model); + self.log = self.log.or(config_file.log); self.verbose = self.verbose.or(config_file.verbose); Ok(self) @@ -309,6 +348,8 @@ impl FlattenedNgScopeArgs { ng_server_addr: ng_args.ng_server_addr.unwrap(), ng_start_process: ng_args.ng_start_process.unwrap(), ng_log_file: ng_args.ng_log_file, + ng_log_dci: ng_args.ng_log_dci.unwrap(), + ng_log_dci_batch_size: ng_args.ng_log_dci_batch_size.unwrap(), }) } } @@ -333,6 +374,15 @@ impl FlattenedModelArgs { .model_metric_smoothing_size_value .unwrap(), model_metric_smoothing_size_type: model_args.model_metric_smoothing_size_type.unwrap(), + model_log_metric: model_args.model_log_metric.unwrap(), + }) + } +} + +impl FlattenedLogArgs { + pub fn from_unflattened(log_args: LogArgs) -> Result { + Ok(FlattenedLogArgs { + log_base_dir: log_args.log_base_dir.unwrap(), }) } } diff --git a/src/util.rs b/src/util.rs index d2dcf1d..ae7ff01 100644 --- a/src/util.rs +++ b/src/util.rs @@ -10,6 +10,7 @@ use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use crate::logger::log_info; use crate::logic::rnti_matcher::TrafficCollection; pub const MATCHING_LOG_FILE_PREFIX: &str = "./.logs/rnti_matching_pattern_"; @@ -123,7 +124,7 @@ pub fn determine_process_id() -> u64 { } pub fn print_dci(dci: crate::ngscope::types::NgScopeCellDci) { - Log::print_info(&format!( + print_info(&format!( "DEBUG: {:?} | {:03?} | {:03?} | {:08?} | {:08?} | {:03?} | {:03?}", dci.nof_rnti, dci.total_dl_prb, @@ -136,12 +137,14 @@ pub fn print_dci(dci: crate::ngscope::types::NgScopeCellDci) { } pub fn print_info(s: &str) { - Log::print_info(s) + let _ = log_info(s); + // Log::print_info(s) } pub fn print_debug(s: &str) { if is_debug() { - Log::print_debug(s) + let _ = log_info(s); + // Log::print_debug(s) } }