diff --git a/src/logger.rs b/src/logger.rs index 45e3c25..9347528 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -2,13 +2,15 @@ use std::fs::{create_dir_all, File, OpenOptions}; use std::io::Write; use std::path::Path; use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError}; -use std::sync::{Mutex, Arc}; +use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; use std::time::Duration; use anyhow::{anyhow, Result}; -use arrow::array::{UInt64Builder, StructBuilder, UInt8Builder, ListBuilder, ArrayRef, UInt16Builder, UInt32Builder}; -use arrow::datatypes::{Schema, Field, DataType, Fields}; +use arrow::array::{ + ArrayRef, ListBuilder, StructBuilder, UInt16Builder, UInt32Builder, UInt64Builder, UInt8Builder, +}; +use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::ipc::writer::FileWriter; use arrow::record_batch::RecordBatch; @@ -192,7 +194,10 @@ fn handle_log_message(msg: LogMessage) -> Result<()> { } let msg_type_name = msg.type_name(); if Logger::write_log_message(msg).is_err() { - print_info(&format!("[logger] error: could not log message ({})", msg_type_name)) + print_info(&format!( + "[logger] error: could not log message ({})", + msg_type_name + )) } Ok(()) } @@ -223,7 +228,9 @@ pub fn log_info(info: &str) -> Result<()> { } pub fn log_traffic_collection(traffic_collection: TrafficCollection) -> Result<()> { - Logger::queue_log_message(LogMessage::RntiMatchingTrafficCollection(Box::new(traffic_collection))) + Logger::queue_log_message(LogMessage::RntiMatchingTrafficCollection(Box::new( + traffic_collection, + ))) } pub fn log_metric(metric: LogMetric) -> Result<()> { @@ -251,7 +258,9 @@ pub fn get_logger() -> &'static mut Lazy { } }); #[allow(static_mut_refs)] - unsafe { &mut GLOBAL_LOGGER } + unsafe { + &mut GLOBAL_LOGGER + } } impl Logger { @@ -330,7 +339,8 @@ impl LogMessage { LogMessage::NgScopeDci(_) => "ngscope dci", LogMessage::RntiMatchingTrafficCollection(_) => "rnti traffic collection", LogMessage::Metric(_) => "metric", - }.to_string() + } + .to_string() } pub fn file_path(&self, base_dir: &str, run_timestamp: &DateTime) -> String { @@ -388,7 +398,6 @@ impl LogMessage { } } - /* * Helpers for writing Vec as Apache Arrow to disk * */ @@ -396,11 +405,9 @@ impl LogMessage { fn create_rnti_fields() -> Fields { Fields::from(vec![ Field::new("rnti", DataType::UInt16, true), - Field::new("dl_tbs_bit", DataType::UInt32, true), Field::new("dl_prb", DataType::UInt8, true), Field::new("dl_no_tbs_prb", DataType::UInt8, true), - Field::new("ul_tbs_bit", DataType::UInt32, true), Field::new("ul_prb", DataType::UInt8, true), Field::new("ul_no_tbs_prb", DataType::UInt8, true), @@ -408,24 +415,24 @@ fn create_rnti_fields() -> Fields { } fn create_schema() -> Arc { - let rnti_struct = DataType::Struct(create_rnti_fields()); Arc::new(Schema::new(vec![ Field::new("timestamp", DataType::UInt64, false), Field::new("nof_rnti", DataType::UInt8, false), - Field::new("rnti_list", DataType::List(Arc::new(Field::new("item", rnti_struct, true))), true), + Field::new( + "rnti_list", + DataType::List(Arc::new(Field::new("item", rnti_struct, true))), + true, + ), ])) } -fn write_arrow_ipc( - schema: Arc, - data: Vec, - file: &File -) -> Result<()> { +fn write_arrow_ipc(schema: Arc, data: Vec, file: &File) -> Result<()> { let mut timestamp_builder = UInt64Builder::with_capacity(data.len()); let mut nof_rntis_builder = UInt8Builder::with_capacity(data.len()); - let mut rnti_list_builder = ListBuilder::new(StructBuilder::from_fields(create_rnti_fields(), data.len())); + let mut rnti_list_builder = + ListBuilder::new(StructBuilder::from_fields(create_rnti_fields(), data.len())); for cell_dci in &data { timestamp_builder.append_value(cell_dci.time_stamp); @@ -435,7 +442,11 @@ fn write_arrow_ipc( rnti_list_builder.append(false); // Append null for an empty list } else { let rnti_struct_builder = rnti_list_builder.values(); - append_rnti_list_to_struct(rnti_struct_builder, &cell_dci.rnti_list[0..cell_dci.nof_rnti as usize]); rnti_list_builder.append(true); + append_rnti_list_to_struct( + rnti_struct_builder, + &cell_dci.rnti_list[0..cell_dci.nof_rnti as usize], + ); + rnti_list_builder.append(true); } } @@ -455,19 +466,42 @@ fn write_arrow_ipc( Ok(()) } -fn append_rnti_list_to_struct(rnti_struct_builder: &mut StructBuilder, rnti_list: &[NgScopeRntiDci]) { +fn append_rnti_list_to_struct( + rnti_struct_builder: &mut StructBuilder, + rnti_list: &[NgScopeRntiDci], +) { for rnti_dci in rnti_list.iter() { - rnti_struct_builder.field_builder::(0).unwrap().append_value(rnti_dci.rnti); - - rnti_struct_builder.field_builder::(1).unwrap().append_value(rnti_dci.dl_tbs_bit); - rnti_struct_builder.field_builder::(2).unwrap().append_value(rnti_dci.dl_prb); - rnti_struct_builder.field_builder::(3).unwrap().append_value(rnti_dci.dl_no_tbs_prb); - - rnti_struct_builder.field_builder::(4).unwrap().append_value(rnti_dci.ul_tbs_bit); - rnti_struct_builder.field_builder::(5).unwrap().append_value(rnti_dci.ul_prb); - rnti_struct_builder.field_builder::(6).unwrap().append_value(rnti_dci.ul_no_tbs_prb); + rnti_struct_builder + .field_builder::(0) + .unwrap() + .append_value(rnti_dci.rnti); + + rnti_struct_builder + .field_builder::(1) + .unwrap() + .append_value(rnti_dci.dl_tbs_bit); + rnti_struct_builder + .field_builder::(2) + .unwrap() + .append_value(rnti_dci.dl_prb); + rnti_struct_builder + .field_builder::(3) + .unwrap() + .append_value(rnti_dci.dl_no_tbs_prb); + + rnti_struct_builder + .field_builder::(4) + .unwrap() + .append_value(rnti_dci.ul_tbs_bit); + rnti_struct_builder + .field_builder::(5) + .unwrap() + .append_value(rnti_dci.ul_prb); + rnti_struct_builder + .field_builder::(6) + .unwrap() + .append_value(rnti_dci.ul_no_tbs_prb); rnti_struct_builder.append(true); } } - diff --git a/src/logic/model_handler.rs b/src/logic/model_handler.rs index a806a5d..72a7851 100644 --- a/src/logic/model_handler.rs +++ b/src/logic/model_handler.rs @@ -9,16 +9,15 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use anyhow::{anyhow, Result}; -use serde_derive::{Deserialize, Serialize}; use bus::{Bus, BusReader}; +use serde_derive::{Deserialize, Serialize}; - +use super::{MetricA, MetricTypes}; use crate::logic::{ check_not_stopped, wait_until_running, MainState, MessageCellInfo, MessageDci, MessageMetric, MessageRnti, ModelState, DEFAULT_WORKER_SLEEP_US, }; use crate::util::determine_process_id; -use super::{MetricA, MetricTypes}; pub const MAX_DCI_ARRAY_SIZE: usize = 10000; pub const MAX_DCI_SLICE_SIZE: usize = 1000; @@ -97,6 +96,7 @@ pub struct MetricBasis { pub tbs_alloc_rnti_bit: u64, pub p_alloc_rnti: u64, pub p_alloc_no_tbs_rnti: u64, + pub p_alloc_rnti_suggested: u64, } #[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] @@ -264,7 +264,6 @@ fn finish(run_args: RunArgs) { let _ = send_final_state(&run_args.tx_model_state); } - fn handle_calculate_metric( run_params: &mut RunParameters, sending_behavior: &mut RunParametersSendingBehavior, @@ -289,9 +288,12 @@ fn handle_calculate_metric( let buffer_slice_size: usize = **metric_smoothing_size_ms as usize; let buffer_slice = dci_buffer.slice(buffer_slice_size); if !buffer_slice.is_empty() { - - if let Ok(metric_wrapper) = calculate_capacity(*rnti, cell_info, buffer_slice, is_log_metric) { - let transport_capacity = metric_wrapper.result.transport_fair_share_capacity_bit_per_ms; + if let Ok(metric_wrapper) = + calculate_capacity(*rnti, cell_info, buffer_slice, is_log_metric) + { + let transport_capacity = metric_wrapper + .result + .transport_fair_share_capacity_bit_per_ms; let physical_rate_flag = metric_wrapper.result.physical_rate_coarse_flag; let physical_rate = metric_wrapper.result.physical_rate_bit_per_prb; let no_tbs_prb_ratio = metric_wrapper.result.no_tbs_prb_ratio; @@ -311,8 +313,7 @@ fn handle_calculate_metric( }); } **last_metric_timestamp_us = chrono::Utc::now().timestamp_micros() as u64; - **metric_sending_interval_us = - determine_sending_interval(model_args, last_rtt_us); + **metric_sending_interval_us = determine_sending_interval(model_args, last_rtt_us); **metric_smoothing_size_ms = determine_smoothing_size(model_args, last_rtt_us); } else { print_debug("DEBUG [model] skipping metric calculation, dci slice is 0"); @@ -325,25 +326,38 @@ fn calculate_capacity( dci_list: &[NgScopeCellDci], is_log_metric: &bool, ) -> Result { - let metric_wrapper = calculate_pbe_cc_capacity(target_rnti, cell_info, dci_list, RNTI_SHARE_TYPE_ALL)?; + let metric_wrapper = + calculate_pbe_cc_capacity(target_rnti, cell_info, dci_list, RNTI_SHARE_TYPE_ALL)?; if *is_log_metric { let _ = log_metric(metric_wrapper.clone()); } print_debug(&format!( - "DEBUG [model] model: + "DEBUG [model] model: c_t: \t{:6?} bit/ms | {:3.3?} Mbit/s c_p: \t{:6?} bit/ms | {:3.3?} Mbit/s phy rate: \t{:6?} bit/PRB phy flag: \t{:?} no_tbs %: \t{:?}", - metric_wrapper.result.transport_fair_share_capacity_bit_per_ms, - metric_wrapper.result.transport_fair_share_capacity_bit_per_ms as f64 * 1000.0 / (1024.0 * 1024.0), - metric_wrapper.result.physical_fair_share_capacity_bit_per_ms, - metric_wrapper.result.physical_fair_share_capacity_bit_per_ms as f64 * 1000.0 / (1024.0 * 1024.0), - metric_wrapper.result.physical_rate_bit_per_prb, - metric_wrapper.result.physical_rate_coarse_flag, - metric_wrapper.result.no_tbs_prb_ratio, - )); + metric_wrapper + .result + .transport_fair_share_capacity_bit_per_ms, + metric_wrapper + .result + .transport_fair_share_capacity_bit_per_ms as f64 + * 1000.0 + / (1024.0 * 1024.0), + metric_wrapper + .result + .physical_fair_share_capacity_bit_per_ms, + metric_wrapper + .result + .physical_fair_share_capacity_bit_per_ms as f64 + * 1000.0 + / (1024.0 * 1024.0), + metric_wrapper.result.physical_rate_bit_per_prb, + metric_wrapper.result.physical_rate_coarse_flag, + metric_wrapper.result.no_tbs_prb_ratio, + )); Ok(metric_wrapper) } @@ -364,7 +378,8 @@ fn calculate_pbe_cc_capacity( return Err(anyhow!("Cannot calculate capacity with 0 DCI")); } // Total number of PRBs in a subframe, that the cell can offer - let p_cell: u64 = STANDARD_NOF_PRB_SLOT_TO_SUBFRAME * cell_info.cells[0].nof_prb as u64 * nof_dci; + let p_cell: u64 = + STANDARD_NOF_PRB_SLOT_TO_SUBFRAME * cell_info.cells[0].nof_prb as u64 * nof_dci; let nof_rnti: u64 = dci_list .iter() @@ -379,14 +394,13 @@ fn calculate_pbe_cc_capacity( .len() as u64; /* TOOD: Evaluate rnti_share_type */ - let nof_rnti_shared = if nof_rnti == 0 { - 1 - } else { - nof_rnti - }; + let nof_rnti_shared: u64 = if nof_rnti == 0 { 1 } else { nof_rnti }; let p_alloc: u64 = dci_list.iter().map(|dci| dci.total_dl_prb as u64).sum(); - let p_alloc_no_tbs: u64 = dci_list.iter().map(|dci| dci.total_dl_no_tbs_prb as u64).sum(); + let p_alloc_no_tbs: u64 = dci_list + .iter() + .map(|dci| dci.total_dl_no_tbs_prb as u64) + .sum(); let tbs_alloc_bit: u64 = dci_list.iter().map(|dci| dci.total_dl_tbs_bit).sum(); @@ -401,7 +415,6 @@ fn calculate_pbe_cc_capacity( }) .collect::>(); - let tbs_alloc_rnti_bit: u64 = target_rnti_dci_list .iter() .map(|target_rnti_dci| target_rnti_dci.dl_tbs_bit as u64) @@ -417,22 +430,6 @@ fn calculate_pbe_cc_capacity( .map(|target_rnti_dci| target_rnti_dci.dl_no_tbs_prb as u64) .sum::(); - print_debug(&format!( - "DEBUG [model] parameters: - nof_dci: {:?} - p_cell: {:?} - nof_rnti: {:?} - p_alloc: {:?} - p_alloc_no_tbs: {:?} - p_alloc_rnti: {:?} - p_alloc_rnti_no_tbs: {:?} - tbs_alloc_rnti: {:?} - RNTI MB/s: {:3.3?}, - RNTI Mb/s: {:3.3?}", - nof_dci, p_cell, nof_rnti, p_alloc, p_alloc_no_tbs, p_alloc_rnti, p_alloc_no_tbs_rnti, tbs_alloc_rnti_bit, - (tbs_alloc_rnti_bit as f64 / (8.0 * nof_dci as f64)) * 1000.0 / (1024.0 * 1024.0), - (tbs_alloc_rnti_bit as f64 / (nof_dci as f64)) * 1000.0 / (1024.0 * 1024.0), - )); let mut r_w_coarse_flag: u8 = 0; let p_alloc_total = p_alloc + p_alloc_no_tbs; // [bit/PRB] @@ -448,12 +445,15 @@ fn calculate_pbe_cc_capacity( } else { tbs_alloc_rnti_bit / p_alloc_rnti }; - let p_idle: f64 = p_cell as f64 - p_alloc_total as f64; - if p_idle < 0.0 { - return Err(anyhow!("error in calculate PBE capacity: p_idle < 0! (probably more p_alloc_no_tbs than p_cell)")); - } + let p_idle: u64 = match p_cell.checked_sub(p_alloc_total) { + Some(result_p_idle) => result_p_idle, + None => return Err(anyhow!("error in calculate PBE capacity: p_idle < 0! (probably more p_alloc_no_tbs than p_cell)")), + }; + + let p_alloc_rnti_suggested: u64 = + p_alloc_rnti + ((p_idle + nof_rnti_shared - 1) / nof_rnti_shared); let c_p: u64 = - ((r_w as f64 * (p_alloc_rnti as f64 + (p_idle / nof_rnti_shared as f64))) / nof_dci as f64) as u64; + ((r_w as f64 * (p_alloc_rnti + (p_alloc_rnti_suggested)) as f64) / nof_dci as f64) as u64; let c_t = translate_physcial_to_transport_simple(c_p); let mut no_tbs_prb_ratio = 0.0; @@ -461,6 +461,36 @@ fn calculate_pbe_cc_capacity( no_tbs_prb_ratio = p_alloc_total as f64 / p_alloc_no_tbs as f64; } + print_debug(&format!( + "DEBUG [model] parameters: + nof_dci: {:8?} + p_cell: {:8?} + nof_rnti: {:8?} + p_alloc: {:8?} | {:3?} PRB/DCI + p_alloc_no_tbs: {:8?} | {:3?} PRB/DCI + p_alloc_rnti: {:8?} | {:3?} PRB/DCI + p_alloc_rnti_suggested: {:8?} | {:3?} PRB/DCI + p_alloc_rnti_no_tbs: {:8?} + tbs_alloc_rnti: {:8?} + RNTI MB/s: {:8.3?} + RNTI Mb/s: {:8.3?}", + nof_dci, + p_cell, + nof_rnti, + p_alloc, + p_alloc / nof_dci, + p_alloc_no_tbs, + p_alloc_no_tbs / nof_dci, + p_alloc_rnti, + p_alloc_rnti / nof_dci, + p_alloc_rnti_suggested, + p_alloc_rnti_suggested / nof_dci, + p_alloc_no_tbs_rnti, + tbs_alloc_rnti_bit, + (tbs_alloc_rnti_bit as f64 / (8.0 * nof_dci as f64)) * 1000.0 / (1024.0 * 1024.0), + (tbs_alloc_rnti_bit as f64 / (nof_dci as f64)) * 1000.0 / (1024.0 * 1024.0), + )); + Ok(LogMetric { result: MetricResult { physical_fair_share_capacity_bit_per_ms: c_p, @@ -480,8 +510,9 @@ fn calculate_pbe_cc_capacity( target_rnti, tbs_alloc_rnti_bit, p_alloc_rnti, + p_alloc_rnti_suggested, p_alloc_no_tbs_rnti, - } + }, }) } @@ -563,8 +594,12 @@ mod tests { ..Default::default() }], }; - let metric_params = calculate_capacity(dummy_rnti, &dummy_cell_info, &dummy_dci_slice(), &false)?; - assert_eq!(metric_params.result.physical_fair_share_capacity_bit_per_ms, 33280); + let metric_params = + calculate_capacity(dummy_rnti, &dummy_cell_info, &dummy_dci_slice(), &false)?; + assert_eq!( + metric_params.result.physical_fair_share_capacity_bit_per_ms, + 33621 + ); assert_eq!(metric_params.result.physical_rate_bit_per_prb, 512); assert_eq!(metric_params.result.physical_rate_coarse_flag, 0); assert_eq!(metric_params.result.no_tbs_prb_ratio, 0.0); diff --git a/src/logic/ngscope_controller.rs b/src/logic/ngscope_controller.rs index cb4aae3..6595c9d 100644 --- a/src/logic/ngscope_controller.rs +++ b/src/logic/ngscope_controller.rs @@ -290,7 +290,8 @@ fn run_dci_fetcher( determine_process_id() )); - let mut log_dci_buffer: Vec = Vec::with_capacity(2 * log_dci_batch_size as usize); + let mut log_dci_buffer: Vec = + Vec::with_capacity(2 * log_dci_batch_size as usize); let sleep_duration = Duration::from_micros(DEFAULT_WORKER_SLEEP_US); let mut last_dci_timestamp_us: u64 = 0; @@ -327,7 +328,13 @@ fn run_dci_fetcher( }; } LocalDciState::ListenForDci => { - check_ngscope_message(&socket, &mut tx_dci, &mut last_dci_timestamp_us, &is_log_dci, &mut log_dci_buffer); + check_ngscope_message( + &socket, + &mut tx_dci, + &mut last_dci_timestamp_us, + &is_log_dci, + &mut log_dci_buffer, + ); check_log_dci(&is_log_dci, &mut log_dci_buffer, &log_dci_batch_size); } } @@ -402,7 +409,7 @@ fn check_ngscope_message( fn check_log_dci( is_log_dci: &bool, log_dci_buffer: &mut Vec, - log_dci_batch_size: &u64 + log_dci_batch_size: &u64, ) { if *is_log_dci && log_dci_buffer.len() >= *log_dci_batch_size as usize { let _ = log_dci(log_dci_buffer.clone()); @@ -410,7 +417,6 @@ fn check_log_dci( } } - /* -------------- */ /* Helpers */ /* -------------- */ diff --git a/src/logic/rnti_matcher.rs b/src/logic/rnti_matcher.rs index 1ea95af..14a0372 100644 --- a/src/logic/rnti_matcher.rs +++ b/src/logic/rnti_matcher.rs @@ -21,9 +21,7 @@ use crate::logic::{ use crate::ngscope::types::NgScopeCellDci; use crate::parse::{Arguments, FlattenedRntiMatchingArgs}; -use crate::util::{ - determine_process_id, print_debug, print_info, CellRntiRingBuffer, -}; +use crate::util::{determine_process_id, print_debug, print_info, CellRntiRingBuffer}; use crate::math_util::{ calculate_mean_variance, calculate_median, calculate_weighted_euclidean_distance, @@ -813,7 +811,7 @@ impl TrafficCollection { let pattern_feature_vec = &self.traffic_pattern_features.std_feature_vec; let num_features = pattern_std_vec.len(); let weightings_vector = DVector::from_row_slice(&MATCHING_WEIGHTINGS); - + self.cell_traffic .iter() .map(|(&cell_id, cell_traffic)| { @@ -826,15 +824,19 @@ impl TrafficCollection { .map_err(|e| anyhow!(e)) }) .collect::>>>()?; - + let num_vectors = standardized_feature_vecs.len(); - let data: Vec = standardized_feature_vecs.clone().into_iter().flatten().collect(); + let data: Vec = standardized_feature_vecs + .clone() + .into_iter() + .flatten() + .collect(); let feature_matrix: DMatrix = DMatrix::from_row_slice(num_vectors, num_features, &data); - + // Uncomment and implement debug print if needed // print_debug(&format!("DEBUG [rntimatcher] feature_matrix: {:.2}", feature_matrix)); - + let pattern_feature_matrix = DMatrix::from_fn(num_vectors, num_features, |_, r| pattern_feature_vec[r]); let euclidean_distances = calculate_weighted_euclidean_distance_matrix( @@ -842,20 +844,20 @@ impl TrafficCollection { &feature_matrix, &weightings_vector, ); - + // Uncomment and implement debug print if needed print_debug(&format!( "DEBUG [rntimatcher] distances: {:.2}", euclidean_distances )); - + let mut rnti_and_distance: Vec<(u16, f64)> = cell_traffic .traffic .keys() .cloned() .zip(euclidean_distances.iter().cloned()) .collect(); - + rnti_and_distance.sort_by(|a, b| a.1.abs().partial_cmp(&b.1.abs()).unwrap()); self.feature_distance_statistics = Some(FeatureDistanceStatistics { @@ -866,7 +868,7 @@ impl TrafficCollection { rnti_features: standardized_feature_vecs.clone(), rnti_distances: euclidean_distances.column(0).iter().cloned().collect(), }); - + Ok((cell_id, rnti_and_distance.first().unwrap().0)) }) .collect::>>()