From 9fc5f1ea2692a2ff8034e737f835b524654d0ec3 Mon Sep 17 00:00:00 2001 From: Bastian Schmidt Date: Mon, 22 Jul 2024 17:00:29 +0200 Subject: [PATCH] Add downloader thread * Add DownloadConfig containing rtt and rnti_share_type * Add messaging between downloader and model * Add parse entries for download config * Add sockopt_get_tcp_info to determine RTT * Log DownloadFinish in .logs/download/*.jsonl Introduce scenario parameter Set the scenario to choose between * TrackUeAndEstimateTransportCapacity * TrackCellDciOnly * PerformMeasurement --- src/logger.rs | 23 ++- src/logic/downloader.rs | 353 +++++++++++++++++++++++++++++++++++++ src/logic/mod.rs | 35 ++++ src/logic/model_handler.rs | 69 ++++++-- src/logic/rnti_matcher.rs | 16 +- src/main.rs | 43 ++++- src/parse.rs | 67 +++++++ src/util.rs | 82 ++++++++- 8 files changed, 655 insertions(+), 33 deletions(-) create mode 100644 src/logic/downloader.rs diff --git a/src/logger.rs b/src/logger.rs index 9347528..4c1169e 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -14,6 +14,7 @@ use arrow::datatypes::{DataType, Field, Fields, Schema}; use arrow::ipc::writer::FileWriter; use arrow::record_batch::RecordBatch; +use crate::logic::downloader::DownloadFinishParameters; use crate::logic::model_handler::LogMetric; use crate::logic::rnti_matcher::TrafficCollection; use crate::ngscope::types::NgScopeRntiDci; @@ -37,8 +38,7 @@ const LOGGER_RELATIVE_PATH_INFO: &str = "stdout/"; const LOGGER_RELATIVE_PATH_DCI: &str = "dci/"; const LOGGER_RELATIVE_PATH_RNTI_MATCHING: &str = "rnti_matching/"; const LOGGER_RELATIVE_PATH_METRIC: &str = "metric/"; -// TODO: Implement saving measurements data -// const LOGGER_RELATIVE_PATH_MEASUREMENT: &str = "measurements/"; +const LOGGER_RELATIVE_PATH_DOWNLOAD: &str = "download/"; #[derive(Clone, Debug, PartialEq)] pub enum LoggerState { @@ -100,8 +100,8 @@ pub enum LogMessage { RntiMatchingTrafficCollection(Box), /// Model Metric Metric(Box), - // Measurement transmission data (RTT) - // LogDataTransmission(Box), + /// Measurement transmission data (RTT) + DownloadStatistics(Box), } /* @@ -241,6 +241,10 @@ pub fn log_dci(dcis: Vec) -> Result<()> { Logger::queue_log_message(LogMessage::NgScopeDci(dcis)) } +pub fn log_download(download: DownloadFinishParameters) -> Result<()> { + Logger::queue_log_message(LogMessage::DownloadStatistics(Box::new(download))) +} + #[allow(unknown_lints)] pub fn get_logger() -> &'static mut Lazy { static mut GLOBAL_LOGGER: Lazy = Lazy::new(|| { @@ -339,6 +343,7 @@ impl LogMessage { LogMessage::NgScopeDci(_) => "ngscope dci", LogMessage::RntiMatchingTrafficCollection(_) => "rnti traffic collection", LogMessage::Metric(_) => "metric", + LogMessage::DownloadStatistics(_) => "download", } .to_string() } @@ -372,6 +377,12 @@ impl LogMessage { LOGGER_RELATIVE_PATH_RNTI_MATCHING, run_timestamp_formatted ) } + LogMessage::DownloadStatistics(_) => { + format!( + "{}run_{}_download.jsonl", + LOGGER_RELATIVE_PATH_DOWNLOAD, run_timestamp_formatted + ) + } }; format!("{}{}", base_dir, message_type_file_path) } @@ -392,6 +403,10 @@ impl LogMessage { let json_string = serde_json::to_string(metric)?; writeln!(file, "{}", json_string)?; } + LogMessage::DownloadStatistics(download) => { + let json_string = serde_json::to_string(download)?; + writeln!(file, "{}", json_string)?; + } } file.flush()?; Ok(()) diff --git a/src/logic/downloader.rs b/src/logic/downloader.rs new file mode 100644 index 0000000..2cf080f --- /dev/null +++ b/src/logic/downloader.rs @@ -0,0 +1,353 @@ +use std::os::unix::io::AsRawFd; +use std::{ + collections::HashMap, + io::{self, Read, Write}, + net::{Shutdown, TcpStream}, + sync::mpsc::SyncSender, + thread::{self, JoinHandle}, + time::Duration, +}; + +use anyhow::{anyhow, Result}; +use bus::{Bus, BusReader}; +use serde_derive::{Deserialize, Serialize}; + +use super::{ + check_not_stopped, wait_until_running, DownloaderState, MainState, MessageDownloadConfig, + DEFAULT_WORKER_SLEEP_US, +}; +use crate::{ + logger::{log_download, log_info}, + parse::{Arguments, FlattenedDownloadArgs, Scenario}, + util::{determine_process_id, init_heap_buffer, print_debug, print_info, sockopt_get_tcp_info}, +}; + +pub const INITIAL_SLEEP_TIME_MS: u64 = 10000; +pub const READILY_WAITING_SLEEP_TIME_MS: u64 = 500; +pub const DOWNLOADING_IDLE_SLEEP_TIME_MS: u64 = 20; +pub const RECOVERY_SLEEP_TIME_MS: u64 = 2000; + +pub const TCP_STREAM_READ_BUFFER_SIZE: usize = 16384; + +#[derive(Clone, Debug, PartialEq)] +pub struct DownloadStreamState { + pub base_addr: String, + pub path: String, + pub rnti_share_type: u8, + pub start_timestamp_us: u64, + timedata: HashMap, +} + +#[derive(Debug)] +pub struct DownloadingParameters<'a> { + pub stream: &'a mut TcpStream, + pub stream_buffer: &'a mut Box<[u8]>, + pub tx_download_config: &'a mut Bus, + pub download_stream_state: &'a mut DownloadStreamState, +} + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct DownloadFinishParameters { + base_addr: String, + path: String, + start_timestamp_us: u64, + finish_timestamp_us: u64, + average_rtt_us: u64, + total_download_bytes: u64, + timedata: HashMap, +} + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct TcpLogStats { + received_bytes: u64, + rtt_us: u64, +} + +pub struct DownloaderArgs { + pub app_args: Arguments, + pub rx_app_state: BusReader, + pub tx_downloader_state: SyncSender, + pub tx_download_config: Bus, +} + +struct RunArgs { + pub app_args: Arguments, + pub rx_app_state: BusReader, + pub tx_downloader_state: SyncSender, + pub tx_download_config: Bus, + pub stream_handle: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct DownloadConfig { + pub rtt_us: u64, + pub rnti_share_type: u8, +} + +pub fn deploy_downloader(args: DownloaderArgs) -> Result> { + let mut run_args = RunArgs { + app_args: args.app_args, + rx_app_state: args.rx_app_state, + tx_downloader_state: args.tx_downloader_state, + tx_download_config: args.tx_download_config, + stream_handle: None, + }; + + let builder = thread::Builder::new().name("[download]".to_string()); + let thread = builder.spawn(move || { + let _ = run(&mut run_args); + finish(run_args); + })?; + Ok(thread) +} + +fn send_final_state(tx_download_state: &SyncSender) -> Result<()> { + Ok(tx_download_state.send(DownloaderState::Stopped)?) +} + +fn is_idle_scenario(scenario: Scenario) -> bool { + match scenario { + Scenario::TrackCellDciOnly => true, + Scenario::TrackUeAndEstimateTransportCapacity => true, + Scenario::PerformMeasurement => false, + } +} + +fn wait_for_running(rx_app_state: &mut BusReader) -> Result<()> { + match wait_until_running(rx_app_state) { + Ok(_) => Ok(()), + _ => Err(anyhow!("[download] Main did not send 'Running' message")), + } +} + +fn run(run_args: &mut RunArgs) -> Result<()> { + let app_args = &run_args.app_args; + let rx_app_state: &mut BusReader = &mut run_args.rx_app_state; + let tx_downloader_state: &mut SyncSender = &mut run_args.tx_downloader_state; + let tx_download_config: &mut Bus = &mut run_args.tx_download_config; + + tx_downloader_state.send(DownloaderState::Ready)?; + wait_for_running(rx_app_state)?; + print_info(&format!("[download]: \t\tPID {:?}", determine_process_id())); + + let sleep_duration = Duration::from_micros(DEFAULT_WORKER_SLEEP_US); + let download_args = + FlattenedDownloadArgs::from_unflattened(app_args.clone().download.unwrap())?; + let scenario = app_args.scenario.unwrap(); + let stream_handle = &mut run_args.stream_handle; + let mut stream_buffer = init_heap_buffer(TCP_STREAM_READ_BUFFER_SIZE); + let base_addr = download_args.download_base_addr; + let mut path_list_index = 0; + let paths = download_args.download_paths; + let mut rnti_share_type_index = 0; + let rnti_share_types = download_args.download_rnti_share_types; + + let mut downloader_state = DownloaderState::SleepMs( + INITIAL_SLEEP_TIME_MS, + Box::new(DownloaderState::StartDownload), + ); + let mut current_download: DownloadStreamState = DownloadStreamState { + base_addr: base_addr.clone(), + path: paths[path_list_index].clone(), + rnti_share_type: rnti_share_types[rnti_share_type_index], + start_timestamp_us: 0, + timedata: HashMap::new(), + }; + + loop { + /* */ + thread::sleep(sleep_duration); + if check_not_stopped(rx_app_state).is_err() { + break; + } + if is_idle_scenario(scenario) { + continue; /* keep the thread running, because the Bus-reference must be kept alive for the model */ + } + /* */ + + downloader_state = match downloader_state { + DownloaderState::Stopped => break, + DownloaderState::Ready => DownloaderState::SleepMs( + READILY_WAITING_SLEEP_TIME_MS, + Box::new(DownloaderState::Ready), + ), + DownloaderState::SleepMs(sleep_time_ms, next_state) => { + thread::sleep(Duration::from_millis(sleep_time_ms)); + *next_state + } + DownloaderState::StartDownload => { + current_download = DownloadStreamState { + base_addr: base_addr.clone(), + path: paths[path_list_index].clone(), + rnti_share_type: rnti_share_types[rnti_share_type_index], + start_timestamp_us: 0, + timedata: HashMap::new(), + }; + handle_start_download(&mut current_download, stream_handle) + } + DownloaderState::Downloading => { + let params = DownloadingParameters { + stream: stream_handle.as_mut().unwrap(), + stream_buffer: &mut stream_buffer, + tx_download_config, + download_stream_state: &mut current_download, + }; + handle_downloading(params) + } + DownloaderState::FinishDownload(params) => { + if path_list_index >= (paths.len() - 1) { + // Iterate paths first, and then rnti_share at overflow + path_list_index = 0; + rnti_share_type_index = (rnti_share_type_index + 1) % rnti_share_types.len() + } else { + path_list_index += 1; + } + *stream_handle = None; + handle_finish_download(params) + } + DownloaderState::ErrorStartingDownload(message) => { + print_info(&format!("[download] error during download: {}", message)); + DownloaderState::SleepMs( + RECOVERY_SLEEP_TIME_MS, + Box::new(DownloaderState::StartDownload), + ) + } + } + } + + Ok(()) +} + +fn finish(run_args: RunArgs) { + if let Some(stream) = run_args.stream_handle { + let _ = stream.shutdown(Shutdown::Both); + } + let _ = send_final_state(&run_args.tx_downloader_state); +} + +fn handle_finish_download(finish_parameters: DownloadFinishParameters) -> DownloaderState { + if let Err(e) = log_download(finish_parameters) { + let _ = log_info(&format!( + "[download] error occured while logging download statistics: {:?}", + e + )); + } + DownloaderState::StartDownload +} + +fn handle_start_download( + download_stream_state: &mut DownloadStreamState, + stream_option: &mut Option, +) -> DownloaderState { + match create_download_stream( + &download_stream_state.base_addr, + &download_stream_state.path, + ) { + Ok(stream) => { + download_stream_state.start_timestamp_us = chrono::Utc::now().timestamp_micros() as u64; + *stream_option = Some(stream); + DownloaderState::Downloading + } + Err(e) => { + DownloaderState::ErrorStartingDownload(format!("Error starting download: {:?}", e)) + } + } +} + +fn handle_downloading(params: DownloadingParameters) -> DownloaderState { + let DownloadingParameters { + stream, + stream_buffer, + tx_download_config, + download_stream_state: + DownloadStreamState { + base_addr, + path, + rnti_share_type, + start_timestamp_us, + timedata, + }, + } = params; + + match stream.read(stream_buffer) { + Ok(chunk_size) => { + if chunk_size == 0 { + // End of stream + let download_finish_timestamp_us = chrono::Utc::now().timestamp_micros() as u64; + let total_download_bytes = + timedata.iter().map(|(_, v)| v.received_bytes).sum::(); + let average_rtt_us = (timedata.iter().map(|(_, v)| v.rtt_us).sum::() as f64 + / timedata.len() as f64) as u64; + DownloaderState::FinishDownload(DownloadFinishParameters { + base_addr: base_addr.to_string(), + path: path.to_string(), + start_timestamp_us: *start_timestamp_us, + finish_timestamp_us: download_finish_timestamp_us, + average_rtt_us, + total_download_bytes, + timedata: timedata.clone(), + }) + } else { + let now_us = chrono::Utc::now().timestamp_micros() as u64; + match determine_socket_rtt(stream) { + Ok(rtt_us) => { + timedata.entry(now_us).or_insert(TcpLogStats { + received_bytes: chunk_size as u64, + rtt_us, + }); + tx_download_config.broadcast(MessageDownloadConfig { + config: DownloadConfig { + rtt_us, + rnti_share_type: *rnti_share_type, + }, + }); + } + Err(e) => { + print_debug(&format!( + "[download] error occured while logging RTT: {:?}. Keep downloading..", + e + )); + } + } + DownloaderState::Downloading + } + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + // No data available yet, handle as needed (e.g., sleep or continue looping) + DownloaderState::SleepMs( + DOWNLOADING_IDLE_SLEEP_TIME_MS, + Box::new(DownloaderState::Downloading), + ) + } + Err(e) => { + DownloaderState::ErrorStartingDownload(format!("Error starting download: {:?}", e)) + } + } +} + +fn create_download_stream(base_addr: &str, path: &str) -> Result { + let mut stream = TcpStream::connect(base_addr)?; + + print_debug(&format!( + "DEBUG [download] create_download_stream.path: {}", + path + )); + + // Send HTTP GET request + let request = format!( + "GET {} HTTP/1.1\r\n\ + Host: example.com\r\n\ + Connection: close\r\n\r\n", + path + ); + stream.write_all(request.as_bytes())?; + stream.set_nonblocking(true)?; + + Ok(stream) +} + +fn determine_socket_rtt(stream: &mut TcpStream) -> Result { + let socket_file_descriptor: i32 = stream.as_raw_fd(); + let tcp_info = sockopt_get_tcp_info(socket_file_descriptor)?; + Ok(tcp_info.tcpi_rtt.into()) +} diff --git a/src/logic/mod.rs b/src/logic/mod.rs index 7e64115..e12ef23 100644 --- a/src/logic/mod.rs +++ b/src/logic/mod.rs @@ -13,7 +13,10 @@ use crate::logic::rnti_matcher::TrafficCollection; use crate::ngscope::config::NgScopeConfig; use crate::ngscope::types::NgScopeCellDci; +use self::downloader::{DownloadConfig, DownloadFinishParameters}; + pub mod cell_source; +pub mod downloader; pub mod model_handler; pub mod ngscope_controller; pub mod rnti_matcher; @@ -233,6 +236,32 @@ impl WorkerState for NgControlState { } } +#[derive(Clone, Debug, PartialEq)] +pub enum DownloaderState { + Stopped, + Ready, + SleepMs(u64, Box), + StartDownload, + ErrorStartingDownload(String), + Downloading, + FinishDownload(DownloadFinishParameters), +} + +impl WorkerState for DownloaderState { + fn worker_name() -> String { + "download".to_owned() + } + + fn to_general_state(&self) -> GeneralState { + match self { + DownloaderState::Ready => GeneralState::Running, + DownloaderState::SleepMs(_, _) => GeneralState::Running, + DownloaderState::Stopped => GeneralState::Stopped, + _ => GeneralState::Unknown, + } + } +} + /* -------------- */ /* Worker Messaging */ /* -------------- */ @@ -268,6 +297,12 @@ pub enum MetricTypes { A(MetricA), } +#[allow(dead_code)] +#[derive(Clone, Debug, PartialEq)] +pub struct MessageDownloadConfig { + config: DownloadConfig, +} + #[repr(C)] #[derive(Clone, Copy, Debug, PartialEq)] pub struct MetricA { diff --git a/src/logic/model_handler.rs b/src/logic/model_handler.rs index 72a7851..4d935ba 100644 --- a/src/logic/model_handler.rs +++ b/src/logic/model_handler.rs @@ -1,7 +1,7 @@ use crate::cell_info::CellInfo; use crate::logger::log_metric; use crate::ngscope::types::{NgScopeCellDci, NgScopeRntiDci}; -use crate::parse::{Arguments, DynamicValue, FlattenedModelArgs}; +use crate::parse::{Arguments, DynamicValue, FlattenedModelArgs, Scenario}; use crate::util::{print_debug, print_info}; use std::collections::HashSet; use std::sync::mpsc::{SyncSender, TryRecvError}; @@ -12,7 +12,7 @@ use anyhow::{anyhow, Result}; use bus::{Bus, BusReader}; use serde_derive::{Deserialize, Serialize}; -use super::{MetricA, MetricTypes}; +use super::{MessageDownloadConfig, MetricA, MetricTypes}; use crate::logic::{ check_not_stopped, wait_until_running, MainState, MessageCellInfo, MessageDci, MessageMetric, MessageRnti, ModelState, DEFAULT_WORKER_SLEEP_US, @@ -112,6 +112,7 @@ pub struct ModelHandlerArgs { pub rx_cell_info: BusReader, pub rx_dci: BusReader, pub rx_rnti: BusReader, + pub rx_download_config: BusReader, pub tx_metric: Bus, } @@ -122,6 +123,7 @@ struct RunArgs { pub rx_cell_info: BusReader, pub rx_dci: BusReader, pub rx_rnti: BusReader, + pub rx_download_config: BusReader, pub tx_metric: Bus, } @@ -137,8 +139,9 @@ struct RunParametersSendingBehavior<'a> { metric_sending_interval_us: &'a mut u64, metric_smoothing_size_ms: &'a mut u64, last_metric_timestamp_us: &'a mut u64, - model_args: &'a FlattenedModelArgs, + rnti_share_type: &'a u8, last_rtt_us: &'a Option, + model_args: &'a FlattenedModelArgs, } pub fn deploy_model_handler(args: ModelHandlerArgs) -> Result> { @@ -149,6 +152,7 @@ pub fn deploy_model_handler(args: ModelHandlerArgs) -> Result> { rx_cell_info: args.rx_cell_info, rx_dci: args.rx_dci, rx_rnti: args.rx_rnti, + rx_download_config: args.rx_download_config, tx_metric: args.tx_metric, }; @@ -178,6 +182,8 @@ fn run(run_args: &mut RunArgs) -> Result<()> { let rx_cell_info: &mut BusReader = &mut run_args.rx_cell_info; let rx_dci: &mut BusReader = &mut run_args.rx_dci; let rx_rnti: &mut BusReader = &mut run_args.rx_rnti; + let rx_download_config: &mut BusReader = + &mut run_args.rx_download_config; let tx_metric: &mut Bus = &mut run_args.tx_metric; tx_model_state.send(ModelState::Running)?; @@ -186,14 +192,15 @@ fn run(run_args: &mut RunArgs) -> Result<()> { let sleep_duration = Duration::from_micros(DEFAULT_WORKER_SLEEP_US); let model_args = FlattenedModelArgs::from_unflattened(app_args.clone().model.unwrap())?; + let scenario = app_args.scenario.unwrap(); let is_log_metric: bool = model_args.model_log_metric; let mut last_metric_timestamp_us: u64 = chrono::Utc::now().timestamp_micros() as u64; let mut dci_buffer = DciRingBuffer::new(); let mut last_rnti: Option = None; let mut last_cell_info: Option = None; - let last_rtt_us: Option = Some(40000); // TODO: Replace test-RTT with actual RTT and make - // it mutable + let mut last_rnti_share_type: u8 = RNTI_SHARE_TYPE_ALL; + let mut last_rtt_us: Option = Some(40000); let mut metric_sending_interval_us: u64 = determine_sending_interval(&model_args, &last_rtt_us); let mut metric_smoothing_size_ms: u64 = determine_smoothing_size(&model_args, &last_rtt_us); @@ -203,10 +210,6 @@ fn run(run_args: &mut RunArgs) -> Result<()> { if check_not_stopped(rx_app_state).is_err() { break; } - /* */ - - /* unpack dci, cell_info, rnti at every iteration to keep the queue "empty"! */ - // TODO: Unpack the RTT too match rx_dci.try_recv() { Ok(dci) => { dci_buffer.push(dci.ngscope_dci); @@ -232,6 +235,18 @@ fn run(run_args: &mut RunArgs) -> Result<()> { Err(TryRecvError::Empty) => {} Err(TryRecvError::Disconnected) => break, }; + match rx_download_config.try_recv() { + Ok(download_config) => { + last_rtt_us = Some(download_config.config.rtt_us); + last_rnti_share_type = download_config.config.rnti_share_type; + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => break, + }; + if is_idle_scenario(scenario) { + continue; + } + /* */ if let (Some(rnti), Some(cell_info)) = (last_rnti, last_cell_info.clone()) { let delta_last_metric_sent_us = @@ -249,6 +264,7 @@ fn run(run_args: &mut RunArgs) -> Result<()> { metric_sending_interval_us: &mut metric_sending_interval_us, metric_smoothing_size_ms: &mut metric_smoothing_size_ms, last_metric_timestamp_us: &mut last_metric_timestamp_us, + rnti_share_type: &last_rnti_share_type, model_args: &model_args, last_rtt_us: &last_rtt_us, }; @@ -260,6 +276,14 @@ fn run(run_args: &mut RunArgs) -> Result<()> { Ok(()) } +fn is_idle_scenario(scenario: Scenario) -> bool { + match scenario { + Scenario::TrackCellDciOnly => true, + Scenario::TrackUeAndEstimateTransportCapacity => false, + Scenario::PerformMeasurement => false, + } +} + fn finish(run_args: RunArgs) { let _ = send_final_state(&run_args.tx_model_state); } @@ -281,6 +305,7 @@ fn handle_calculate_metric( metric_sending_interval_us, metric_smoothing_size_ms, last_metric_timestamp_us, + rnti_share_type, model_args, last_rtt_us, } = sending_behavior; @@ -288,9 +313,13 @@ fn handle_calculate_metric( let buffer_slice_size: usize = **metric_smoothing_size_ms as usize; let buffer_slice = dci_buffer.slice(buffer_slice_size); if !buffer_slice.is_empty() { - if let Ok(metric_wrapper) = - calculate_capacity(*rnti, cell_info, buffer_slice, is_log_metric) - { + if let Ok(metric_wrapper) = calculate_capacity( + *rnti, + cell_info, + buffer_slice, + is_log_metric, + rnti_share_type, + ) { let transport_capacity = metric_wrapper .result .transport_fair_share_capacity_bit_per_ms; @@ -325,9 +354,10 @@ fn calculate_capacity( cell_info: &CellInfo, dci_list: &[NgScopeCellDci], is_log_metric: &bool, + rnti_share_type: &u8, ) -> Result { let metric_wrapper = - calculate_pbe_cc_capacity(target_rnti, cell_info, dci_list, RNTI_SHARE_TYPE_ALL)?; + calculate_pbe_cc_capacity(target_rnti, cell_info, dci_list, rnti_share_type)?; if *is_log_metric { let _ = log_metric(metric_wrapper.clone()); } @@ -371,7 +401,7 @@ fn calculate_pbe_cc_capacity( target_rnti: u16, cell_info: &CellInfo, dci_list: &[NgScopeCellDci], - rnti_share_type: u8, + rnti_share_type: &u8, ) -> Result { let nof_dci: u64 = dci_list.len() as u64; if nof_dci == 0 { @@ -503,7 +533,7 @@ fn calculate_pbe_cc_capacity( nof_dci, p_cell, nof_rnti_shared, - rnti_share_type, + rnti_share_type: *rnti_share_type, p_alloc, p_alloc_no_tbs, tbs_alloc_bit, @@ -594,8 +624,13 @@ mod tests { ..Default::default() }], }; - let metric_params = - calculate_capacity(dummy_rnti, &dummy_cell_info, &dummy_dci_slice(), &false)?; + let metric_params = calculate_capacity( + dummy_rnti, + &dummy_cell_info, + &dummy_dci_slice(), + &false, + &RNTI_SHARE_TYPE_ALL, + )?; assert_eq!( metric_params.result.physical_fair_share_capacity_bit_per_ms, 33621 diff --git a/src/logic/rnti_matcher.rs b/src/logic/rnti_matcher.rs index 14a0372..83300eb 100644 --- a/src/logic/rnti_matcher.rs +++ b/src/logic/rnti_matcher.rs @@ -19,7 +19,7 @@ use crate::logic::{ RntiMatchingErrorType, CHANNEL_SYNC_SIZE, DEFAULT_WORKER_SLEEP_MS, }; use crate::ngscope::types::NgScopeCellDci; -use crate::parse::{Arguments, FlattenedRntiMatchingArgs}; +use crate::parse::{Arguments, FlattenedRntiMatchingArgs, Scenario}; use crate::util::{determine_process_id, print_debug, print_info, CellRntiRingBuffer}; @@ -204,6 +204,8 @@ fn run(run_args: &mut RunArgs, run_args_mov: RunArgsMovables) -> Result<()> { let matching_args = FlattenedRntiMatchingArgs::from_unflattened(app_args.clone().rntimatching.unwrap())?; + let scenario = app_args.scenario.unwrap(); + let mut cell_rnti_ring_buffer: CellRntiRingBuffer = CellRntiRingBuffer::new(RNTI_RING_BUFFER_SIZE); let traffic_destination = matching_args.matching_traffic_destination; @@ -237,6 +239,9 @@ fn run(run_args: &mut RunArgs, run_args_mov: RunArgsMovables) -> Result<()> { } /* unpack dci at every iteration to keep the queue "empty"! */ let latest_dcis = collect_dcis(rx_dci); + if is_idle_scenario(scenario) { + continue; + } /* */ matcher_state = match matcher_state { @@ -250,7 +255,6 @@ fn run(run_args: &mut RunArgs, run_args_mov: RunArgsMovables) -> Result<()> { &mut traffic_pattern_index, ), RntiMatcherState::MatchingCollectDci(traffic_collection) => { - // TODO: Use all dcis here and let this thread sleep again! handle_collect_dci(latest_dcis, *traffic_collection) } RntiMatcherState::MatchingProcessDci(traffic_collection) => handle_process_dci( @@ -403,6 +407,14 @@ fn handle_matching_error( ) } +fn is_idle_scenario(scenario: Scenario) -> bool { + match scenario { + Scenario::TrackCellDciOnly => true, + Scenario::TrackUeAndEstimateTransportCapacity => false, + Scenario::PerformMeasurement => false, + } +} + fn finish(run_args: RunArgs) { let _ = run_args .tx_rntimatcher_state diff --git a/src/main.rs b/src/main.rs index da945cf..cc43cb6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use anyhow::{anyhow, Result}; use bus::{Bus, BusReader}; use casual_logger::{Level, Log}; use logger::{deploy_logger, LoggerArgs, LoggerState}; +use logic::downloader::{deploy_downloader, DownloaderArgs}; use std::collections::HashSet; use std::error::Error; use std::fs; @@ -24,9 +25,10 @@ use logic::model_handler::{deploy_model_handler, ModelHandlerArgs}; use logic::ngscope_controller::{deploy_ngscope_controller, NgControlArgs}; use logic::rnti_matcher::{deploy_rnti_matcher, RntiMatcherArgs}; use logic::{ - GeneralState, MainState, MessageCellInfo, MessageDci, MessageRnti, ModelState, NgControlState, - RntiMatcherState, SourceState, WorkerState, BUS_SIZE_APP_STATE, BUS_SIZE_CELL_INFO, - BUS_SIZE_DCI, BUS_SIZE_RNTI, CHANNEL_SYNC_SIZE, WORKER_SLEEP_LONG_MS, + DownloaderState, GeneralState, MainState, MessageCellInfo, MessageDci, MessageDownloadConfig, + MessageRnti, ModelState, NgControlState, RntiMatcherState, SourceState, WorkerState, + BUS_SIZE_APP_STATE, BUS_SIZE_CELL_INFO, BUS_SIZE_DCI, BUS_SIZE_RNTI, CHANNEL_SYNC_SIZE, + WORKER_SLEEP_LONG_MS, }; use logic::{MessageMetric, WorkerChannel, BUS_SIZE_METRIC}; use parse::Arguments; @@ -38,6 +40,7 @@ struct CombinedReceivers { pub rntimatcher: Receiver, pub ngcontrol: Receiver, pub logger: Receiver, + pub downloader: Receiver, } struct CombinedSenders { @@ -46,6 +49,7 @@ struct CombinedSenders { pub rntimatcher: SyncSender, pub ngcontrol: SyncSender, pub logger: SyncSender, + pub downloader: SyncSender, } impl CombinedReceivers { @@ -55,6 +59,7 @@ impl CombinedReceivers { let _ = &self.ngcontrol.worker_print_on_recv(); let _ = &self.rntimatcher.worker_print_on_recv(); let _ = &self.logger.worker_print_on_recv(); + let _ = &self.downloader.worker_print_on_recv(); } } @@ -67,6 +72,8 @@ fn deploy_app( let mut tx_cell_info: Bus = Bus::::new(BUS_SIZE_CELL_INFO); let mut tx_rnti: Bus = Bus::::new(BUS_SIZE_RNTI); let mut tx_metric: Bus = Bus::::new(BUS_SIZE_METRIC); + let mut tx_download_config: Bus = + Bus::::new(BUS_SIZE_METRIC); let rx_metric: BusReader = tx_metric.add_rx(); let logger_args = LoggerArgs { @@ -82,6 +89,7 @@ fn deploy_app( rx_dci: tx_dci.add_rx(), rx_rnti: tx_rnti.add_rx(), tx_metric, + rx_download_config: tx_download_config.add_rx(), }; let rntimatcher_args = RntiMatcherArgs { app_args: app_args.clone(), @@ -104,6 +112,12 @@ fn deploy_app( tx_source_state: all_tx_states.source, tx_cell_info, }; + let downloader_args = DownloaderArgs { + app_args: app_args.clone(), + rx_app_state: tx_app_state.add_rx(), + tx_downloader_state: all_tx_states.downloader, + tx_download_config, + }; let tasks: Vec> = vec![ deploy_ngscope_controller(ngcontrol_args)?, @@ -111,6 +125,7 @@ fn deploy_app( deploy_model_handler(model_args)?, deploy_rnti_matcher(rntimatcher_args)?, deploy_logger(logger_args)?, + deploy_downloader(downloader_args)?, ]; Ok(tasks) } @@ -147,10 +162,16 @@ fn wait_all_running( ) -> Result<()> { print_info("[ ] waiting for all threads to become ready"); - let mut waiting_for: HashSet<&str> = - vec!["source", "model", "rntimatcher", "ngcontrol", "logger"] - .into_iter() - .collect(); + let mut waiting_for: HashSet<&str> = vec![ + "source", + "model", + "rntimatcher", + "ngcontrol", + "logger", + "downloader", + ] + .into_iter() + .collect(); while !waiting_for.is_empty() { if is_notifier(sigint_notifier) { @@ -183,6 +204,11 @@ fn wait_all_running( waiting_for.remove("logger"); } } + if waiting_for.contains("downloader") { + if let Ok(Some(_)) = check_running(&all_rx_states.downloader) { + waiting_for.remove("downloader"); + } + } } print_info("[✓] waiting for all threads to become ready"); @@ -210,12 +236,14 @@ fn main() -> Result<(), Box> { let (rntimatcher_tx, rntimatcher_rx) = sync_channel::(CHANNEL_SYNC_SIZE); let (ngcontrol_tx, ngcontrol_rx) = sync_channel::(CHANNEL_SYNC_SIZE); let (logger_tx, logger_rx) = sync_channel::(CHANNEL_SYNC_SIZE); + let (downloader_tx, downloader_rx) = sync_channel::(CHANNEL_SYNC_SIZE); let all_tx_states = CombinedSenders { model: model_tx, source: source_tx, rntimatcher: rntimatcher_tx, ngcontrol: ngcontrol_tx, logger: logger_tx, + downloader: downloader_tx, }; let all_rx_states = CombinedReceivers { model: model_rx, @@ -223,6 +251,7 @@ fn main() -> Result<(), Box> { rntimatcher: rntimatcher_rx, ngcontrol: ngcontrol_rx, logger: logger_rx, + downloader: downloader_rx, }; let tasks = deploy_app(&mut tx_app_state, &args, all_tx_states)?; diff --git a/src/parse.rs b/src/parse.rs index 515e1e3..2047a33 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -6,12 +6,26 @@ use std::{default, error::Error, path::PathBuf}; use crate::{logic::traffic_patterns::RntiMatchingTrafficPatternType, util::print_debug}; +pub const DEFAULT_SCENARIO: Scenario = Scenario::TrackUeAndEstimateTransportCapacity; pub const DEFAULT_LOG_BASE_DIR: &str = "./.logs/"; +pub const DEFAULT_DOWNLOAD_BASE_ADDR: &str = "http://some.addr"; +pub const DEFAULT_DOWNLOAD_PATHS: &[&str] = &[ + ":9393/cubic", + ":9393/bbr", + ":9393/pbe/init", + ":9393/pbe/init_and_upper", + ":9393/pbe/direct", +]; +pub const DEFAULT_DOWNLOAD_RNTI_SHARE_TYPES: &[u8] = &[0, 1]; #[derive(Debug, Clone, PartialEq, Parser, Serialize, Deserialize)] #[command(author, version, about, long_about = None, next_line_help = true)] #[command(propagate_version = true)] pub struct Arguments { + /// The scenario to run + #[arg(long, value_enum, required = false)] + pub scenario: Option, + /// Define which API to use to fetch cell data #[arg(short('a'), value_enum, required = false)] pub cellapi: Option, @@ -36,11 +50,24 @@ pub struct Arguments { #[command(flatten)] pub log: Option, + #[command(flatten)] + pub download: Option, + /// Print additional information in the terminal #[arg(short('v'), long, required = false)] pub verbose: Option, } +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug, Serialize, Deserialize)] +pub enum Scenario { + /// Track UE and send estimated capacity + TrackUeAndEstimateTransportCapacity, + /// Do not send anything or try to identify the UE's traffic - just collect the cell's DCI data + TrackCellDciOnly, + /// Perform a measurement by downloading data and collecting connection information + PerformMeasurement, +} + #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug, Serialize, Deserialize)] pub enum CellApiConfig { /// Use a Milesight router as cell data API @@ -211,9 +238,27 @@ pub struct FlattenedLogArgs { pub log_base_dir: String, } +#[derive(Args, Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct DownloadArgs { + /// Base target address inluding host and port + pub download_base_addr: Option, + /// List of paths to call on the base address + pub download_paths: Option>, + /// List of RNTI Share Type configurations to run + pub download_rnti_share_types: Option>, +} + +#[derive(Clone, Debug)] +pub struct FlattenedDownloadArgs { + pub download_base_addr: String, + pub download_paths: Vec, + pub download_rnti_share_types: Vec, +} + impl default::Default for Arguments { fn default() -> Self { Arguments { + scenario: Some(DEFAULT_SCENARIO), verbose: Some(true), cellapi: Some(CellApiConfig::Milesight), milesight: Some(MilesightArgs { @@ -250,6 +295,16 @@ impl default::Default for Arguments { log: Some(LogArgs { log_base_dir: Some(DEFAULT_LOG_BASE_DIR.to_string()), }), + download: Some(DownloadArgs { + download_base_addr: Some(DEFAULT_DOWNLOAD_BASE_ADDR.to_string()), + download_paths: Some( + DEFAULT_DOWNLOAD_PATHS + .iter() + .map(|path| path.to_string()) + .collect(), + ), + download_rnti_share_types: Some(DEFAULT_DOWNLOAD_RNTI_SHARE_TYPES.to_vec()), + }), } } } @@ -287,7 +342,9 @@ impl Arguments { self.rntimatching = self.rntimatching.or(config_file.rntimatching); self.model = self.model.or(config_file.model); self.log = self.log.or(config_file.log); + self.download = self.download.or(config_file.download); self.verbose = self.verbose.or(config_file.verbose); + self.scenario = self.scenario.or(config_file.scenario); Ok(self) } @@ -386,3 +443,13 @@ impl FlattenedLogArgs { }) } } + +impl FlattenedDownloadArgs { + pub fn from_unflattened(download_args: DownloadArgs) -> Result { + Ok(FlattenedDownloadArgs { + download_base_addr: download_args.download_base_addr.unwrap(), + download_paths: download_args.download_paths.unwrap(), + download_rnti_share_types: download_args.download_rnti_share_types.unwrap(), + }) + } +} diff --git a/src/util.rs b/src/util.rs index 85f3b03..d8dcc80 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,8 +1,5 @@ #![allow(dead_code)] -use anyhow::{anyhow, Result}; -use casual_logger::{Level, Log}; -use lazy_static::lazy_static; use std::collections::{HashMap, VecDeque}; use std::fs::{File, OpenOptions}; use std::hash::Hash; @@ -10,6 +7,12 @@ use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use anyhow::{anyhow, Result}; +use casual_logger::{Level, Log}; +use lazy_static::lazy_static; +use libc::{c_void, getsockopt, socklen_t, TCP_INFO}; +use std::mem; + use crate::logger::log_info; use crate::logic::rnti_matcher::TrafficCollection; @@ -28,6 +31,51 @@ pub struct CellRntiRingBuffer { size: usize, } +#[repr(C)] +#[derive(Debug, Default)] +pub struct StockTcpInfo { + pub tcpi_state: u8, + pub tcpi_ca_state: u8, + pub tcpi_retransmits: u8, + pub tcpi_probes: u8, + pub tcpi_backoff: u8, + pub tcpi_options: u8, + pub tcpi_snd_wscale: u8, + pub tcpi_rcv_wscale: u8, + + pub tcpi_rto: u32, + pub tcpi_ato: u32, + pub tcpi_snd_mss: u32, + pub tcpi_rcv_mss: u32, + + pub tcpi_unacked: u32, + pub tcpi_sacked: u32, + pub tcpi_lost: u32, + pub tcpi_retrans: u32, + pub tcpi_fackets: u32, + + // Times + pub tcpi_last_data_sent: u32, + pub tcpi_last_ack_sent: u32, + pub tcpi_last_data_recv: u32, + pub tcpi_last_ack_recv: u32, + + // Metrics + pub tcpi_pmtu: u32, + pub tcpi_rcv_ssthresh: u32, + pub tcpi_rtt: u32, + pub tcpi_rttvar: u32, + pub tcpi_snd_ssthresh: u32, + pub tcpi_snd_cwnd: u32, + pub tcpi_advmss: u32, + pub tcpi_reordering: u32, + + pub tcpi_rcv_rtt: u32, + pub tcpi_rcv_space: u32, + + pub tcpi_total_retrans: u32, +} + impl RingBuffer where T: Eq + Hash + Clone, @@ -203,3 +251,31 @@ pub fn set_debug(level: bool) { pub fn is_debug() -> bool { IS_DEBUG.load(Ordering::SeqCst) } + +pub fn sockopt_get_tcp_info(socket_file_descriptor: i32) -> Result { + let mut tcp_info: StockTcpInfo = StockTcpInfo::default(); + let mut tcp_info_len = mem::size_of::() as socklen_t; + + let ret = unsafe { + getsockopt( + socket_file_descriptor, + libc::IPPROTO_TCP, + TCP_INFO, + &mut tcp_info as *mut _ as *mut c_void, + &mut tcp_info_len, + ) + }; + + // Check if getsockopt was successful + if ret != 0 { + return Err(anyhow!("An error occured running libc::getsockopt")); + } + Ok(tcp_info) +} + +pub fn init_heap_buffer(size: usize) -> Box<[u8]> { + let mut vec: Vec = Vec::::with_capacity(size); + /* Fill the vector with zeros */ + vec.resize_with(size, || 0); + vec.into_boxed_slice() +}