diff --git a/scripts/api_test_device_publisher.py b/scripts/api_test_device_publisher.py index 61562c1..0787793 100755 --- a/scripts/api_test_device_publisher.py +++ b/scripts/api_test_device_publisher.py @@ -1,23 +1,41 @@ #!/usr/bin/env python3 +import argparse from flask import Flask, jsonify +CELL_DATA_A = { + "cid": None, + "enodeB": 100344, + "pci": 62, + "type": "LTE", + "arfcn": 3350, + "band": "7", + "rssi": 110.7, + "rsrp": 90.7, + "rsrq": 11.3, +} +CELL_DATA_B = { + "cid": None, + "enodeB": 105059, + "pci": 5, + "type": "LTE", + "arfcn": 500, + "band": "1", + "rssi": 110.7, + "rsrp": 90.7, + "rsrq": 11.3, +} +DEFAULT_CELL_DATA_CHOICES = { + "A": CELL_DATA_A, + "B": CELL_DATA_B, +} + +def default_cell_data() -> str: + return list(DEFAULT_CELL_DATA_CHOICES.keys())[0] + app = Flask(__name__) +cell_data = [] -# Predefined response -cell_data = [ - { - "cid": None, - "enodeB": 100344, - "pci": 62, - "type": "LTE", - "arfcn": 3350, - "band": "7", - "rssi": 110.7, - "rsrp": 90.7, - "rsrq": 11.3, - }, -] @app.route('/api/v1/celldata') def get_cell_data_all(): @@ -27,5 +45,23 @@ def get_cell_data_all(): def get_cell_data_connected_all(): return jsonify(cell_data) -if __name__ == '__main__': + +def parse_application_args(): + parser = argparse.ArgumentParser(description='Test API to simulate DevicePublisher.') + parser.add_argument('--cell-data', + type=str, + choices=list(DEFAULT_CELL_DATA_CHOICES.keys()), + default=default_cell_data(), + help=f'Column to visualize (default: {default_cell_data()})') + return parser.parse_args() + +def main(): + args = parse_application_args() + + global cell_data + cell_data = [ DEFAULT_CELL_DATA_CHOICES[args.cell_data] ] + app.run(debug=True, port=7353) + +if __name__ == '__main__': + main() diff --git a/src/cell_info.rs b/src/cell_info.rs index d4ab915..9020216 100644 --- a/src/cell_info.rs +++ b/src/cell_info.rs @@ -39,7 +39,6 @@ pub struct SingleCell { pub cell_id: u64, pub cell_type: CellularType, /// Number of PRB per slot - pub nof_prb: u16, pub frequency: u64, pub rssi: f64, pub rsrp: f64, @@ -313,7 +312,6 @@ impl CellInfo { let mut single_cell: SingleCell = SingleCell { cell_id: 0, cell_type: CellularType::LTE, - nof_prb: 100, frequency: 0, rssi: 0.0, rsrp: 0.0, @@ -329,7 +327,6 @@ impl CellInfo { single_cell.rssi = cgi_response_extract_rssi(response_value)?; single_cell.rsrq = cgi_response_extract_rsrp(response_value)?; single_cell.rsrp = cgi_response_extract_rsrq(response_value)?; - single_cell.nof_prb = prb_from_cell_id(single_cell.cell_id); // TODO: Evaluate: Add estimated bandwidth? Ok(CellInfo { cells: [single_cell].to_vec(), @@ -343,7 +340,6 @@ impl CellInfo { cell_id: cell.safe_id(), cell_type: CellularType::from_str(&cell.r#type)?, frequency: 0, - nof_prb: 100, rssi: cell.rssi, rsrp: cell.rsrp, rsrq: cell.rsrq, @@ -351,7 +347,6 @@ impl CellInfo { ul_est: cell.estimatedUpBandwidth, }; single_cell.frequency = arfcn_to_frequency(cell.arfcn, &single_cell.cell_type)?; - single_cell.nof_prb = prb_from_cell_id(single_cell.cell_id); cell_info.cells.push(single_cell); } @@ -359,24 +354,6 @@ impl CellInfo { } } -// Quick fix for setting the nof PRB. -fn prb_from_cell_id(cell_id: u64) -> u16 { - match cell_id { - /* O2 */ - 21 => 50, - 41 => 100, - 51 => 100, - 61 => 100, - 63 => 100, - /* Telekom */ - 6 => 50, - 11 => 50, // LTE 800 DD (EARFCN 6500) - 7 => 100, - 8 => 100, - _ => 100, - } -} - /* --------------------------- */ /* Milesight cgi helpers */ /* --------------------------- */ diff --git a/src/logic/downloader.rs b/src/logic/downloader.rs index d6582d9..df39303 100644 --- a/src/logic/downloader.rs +++ b/src/logic/downloader.rs @@ -30,6 +30,7 @@ pub const DOWNLOADING_IDLE_SLEEP_TIME_MS: u64 = 20; pub const RECOVERY_SLEEP_TIME_MS: u64 = 2_000; pub const BETWEEN_DOWNLOADS_SLEEP_TIME_MS: u64 = 1_000; pub const RESTART_TIMEOUT_US: u64 = 2_000_000; +pub const POST_DOWNLOAD_TIME_US: u64 = 2_000_000; pub const TCP_STREAM_READ_BUFFER_SIZE: usize = 100_000; @@ -40,6 +41,7 @@ pub struct DownloadStreamState { pub rnti_share_type: u8, pub last_rtt_us: Option, pub start_timestamp_us: u64, + pub finish_timestamp_us: Option, pub timedata: HashMap, pub dci_total_dl_bit: u64, pub dci_rnti_dl_bit: u64, @@ -229,6 +231,9 @@ fn run(run_args: &mut RunArgs) -> Result<()> { Box::new(DownloaderState::StartDownload), ) } + DownloaderState::PostDownload => { + handle_post_download(&mut current_download) + } } } @@ -262,19 +267,27 @@ fn unpack_all_dci_messages( downloader_state: &DownloaderState, rnti_option: Option ) -> Result<()> { - - loop { - match rx_dci.try_recv() { - Ok(dci) => { - if DownloaderState::Downloading == *downloader_state && - dci.ngscope_dci.time_stamp >= download_stream_state.start_timestamp_us { - download_stream_state.add_ngscope_dci(dci.ngscope_dci, rnti_option); + while let Ok(dci) = rx_dci.try_recv() { + if let DownloaderState::Downloading | DownloaderState::PostDownload = downloader_state { + if let MessageDci::CellDci(ngscope_dci) = dci { + if ngscope_dci.time_stamp >= download_stream_state.start_timestamp_us { + if let Some(finish_timestamp_us) = download_stream_state.finish_timestamp_us { + if ngscope_dci.time_stamp > finish_timestamp_us { + continue; + } else { + print_debug("DEBUG [download] Collecting DCI in PostDownload state!!!"); + } + } + download_stream_state.add_ngscope_dci(*ngscope_dci, rnti_option); } } - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => return Err(anyhow!("[download] error: rx_dci disconnected")) - }; + } + } + + if let Err(TryRecvError::Disconnected) = rx_dci.try_recv() { + return Err(anyhow!("[download] error: rx_dci disconnected")); } + Ok(()) } @@ -326,6 +339,7 @@ fn handle_downloading(params: DownloadingParameters) -> DownloaderState { rnti_share_type, last_rtt_us, start_timestamp_us, + finish_timestamp_us, timedata, dci_total_dl_bit, dci_rnti_dl_bit, @@ -340,26 +354,11 @@ fn handle_downloading(params: DownloadingParameters) -> DownloaderState { Ok(chunk_size) => { if chunk_size == 0 { // End of stream - let download_finish_timestamp_us = chrono::Local::now().timestamp_micros() as u64; - let total_download_bytes = determine_average_download_bytes(timedata); - let average_rtt_us = determine_average_rtt_us(timedata); - DownloaderState::FinishDownload(DownloadFinishParameters { - base_addr: base_addr.to_string(), - path: path.to_string(), - start_timestamp_us: *start_timestamp_us, - finish_timestamp_us: download_finish_timestamp_us, - average_rtt_us, - total_download_bytes, - dci_total_dl_bit: *dci_total_dl_bit, - dci_total_dl_prb_with_tbs: *dci_total_dl_prb_with_tbs, - dci_total_dl_prb_no_tbs: *dci_total_dl_prb_no_tbs, - dci_rnti_dl_bit: *dci_rnti_dl_bit, - dci_rnti_dl_prb_with_tbs: *dci_rnti_dl_prb_with_tbs, - dci_rnti_dl_prb_no_tbs: *dci_rnti_dl_prb_no_tbs, - }) + *finish_timestamp_us = Some(chrono::Local::now().timestamp_micros() as u64); + DownloaderState::PostDownload + } else { let now_us = chrono::Local::now().timestamp_micros() as u64; - if let Some(rtt_us) = try_to_decode_rtt(&stream_buffer[0..chunk_size], last_rtt_us) { timedata.entry(now_us).or_insert(TcpLogStats { received_bytes: chunk_size as u64, @@ -414,6 +413,47 @@ fn handle_downloading(params: DownloadingParameters) -> DownloaderState { } } +fn handle_post_download(download_stream_state: &mut DownloadStreamState) -> DownloaderState { + let DownloadStreamState { + base_addr, + path, + start_timestamp_us, + finish_timestamp_us, + timedata, + dci_total_dl_bit, + dci_rnti_dl_bit, + dci_total_dl_prb_with_tbs, + dci_total_dl_prb_no_tbs, + dci_rnti_dl_prb_with_tbs, + dci_rnti_dl_prb_no_tbs, + .. + } = download_stream_state; + + let now_us = chrono::Local::now().timestamp_micros() as u64; + if now_us < (finish_timestamp_us.unwrap() + POST_DOWNLOAD_TIME_US) { + // Stay in PostDownload and keep collecting DCI + DownloaderState::PostDownload + } else { + let total_download_bytes = determine_average_download_bytes(timedata); + let average_rtt_us = determine_average_rtt_us(timedata); + DownloaderState::FinishDownload(DownloadFinishParameters { + base_addr: base_addr.to_string(), + path: path.to_string(), + start_timestamp_us: *start_timestamp_us, + finish_timestamp_us: finish_timestamp_us.unwrap(), + average_rtt_us, + total_download_bytes, + dci_total_dl_bit: *dci_total_dl_bit, + dci_total_dl_prb_with_tbs: *dci_total_dl_prb_with_tbs, + dci_total_dl_prb_no_tbs: *dci_total_dl_prb_no_tbs, + dci_rnti_dl_bit: *dci_rnti_dl_bit, + dci_rnti_dl_prb_with_tbs: *dci_rnti_dl_prb_with_tbs, + dci_rnti_dl_prb_no_tbs: *dci_rnti_dl_prb_no_tbs, + }) + } + +} + fn create_download_stream(base_addr: &str, path: &str) -> Result { let mut stream = TcpStream::connect(base_addr)?; diff --git a/src/logic/mod.rs b/src/logic/mod.rs index e12ef23..c01813c 100644 --- a/src/logic/mod.rs +++ b/src/logic/mod.rs @@ -11,7 +11,7 @@ use bus::BusReader; use crate::cell_info::CellInfo; use crate::logic::rnti_matcher::TrafficCollection; use crate::ngscope::config::NgScopeConfig; -use crate::ngscope::types::NgScopeCellDci; +use crate::ngscope::types::{NgScopeCellDci, NgScopeCellConfig}; use self::downloader::{DownloadConfig, DownloadFinishParameters}; @@ -244,6 +244,7 @@ pub enum DownloaderState { StartDownload, ErrorStartingDownload(String), Downloading, + PostDownload, FinishDownload(DownloadFinishParameters), } @@ -268,8 +269,9 @@ impl WorkerState for DownloaderState { #[allow(dead_code)] #[derive(Clone, Debug)] -pub struct MessageDci { - ngscope_dci: NgScopeCellDci, +pub enum MessageDci { + CellDci(Box), + CellConfig(Box), } #[allow(dead_code)] diff --git a/src/logic/model_handler.rs b/src/logic/model_handler.rs index f708428..2650d68 100644 --- a/src/logic/model_handler.rs +++ b/src/logic/model_handler.rs @@ -1,4 +1,3 @@ -use crate::cell_info::CellInfo; use crate::logger::log_metric; use crate::ngscope::types::{NgScopeCellDci, NgScopeRntiDci}; use crate::parse::{Arguments, DynamicValue, FlattenedModelArgs, Scenario}; @@ -14,7 +13,7 @@ use serde_derive::{Deserialize, Serialize}; use super::{MessageDownloadConfig, MetricA, MetricTypes}; use crate::logic::{ - check_not_stopped, wait_until_running, MainState, MessageCellInfo, MessageDci, MessageMetric, + check_not_stopped, wait_until_running, MainState, MessageDci, MessageMetric, MessageRnti, ModelState, DEFAULT_WORKER_SLEEP_US, }; use crate::util::determine_process_id; @@ -72,6 +71,18 @@ impl DciRingBuffer { let delta_index = self.dci_next - slice_size; &self.dci_array[delta_index..self.dci_next] } + + fn pop(&mut self, wanted_slice_size: usize) -> &[NgScopeCellDci] { + if wanted_slice_size == 0 || self.dci_next == 0 { + return &[]; + } + + let slice_size = usize::min(wanted_slice_size, self.dci_next); + let delta_index = self.dci_next - slice_size; + let old_dci_next = self.dci_next; + self.dci_next = 0; + &self.dci_array[delta_index..old_dci_next] + } } #[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] @@ -112,7 +123,6 @@ pub struct ModelHandlerArgs { pub app_args: Arguments, pub rx_app_state: BusReader, pub tx_model_state: SyncSender, - pub rx_cell_info: BusReader, pub rx_dci: BusReader, pub rx_rnti: BusReader, pub rx_download_config: BusReader, @@ -123,7 +133,6 @@ struct RunArgs { pub app_args: Arguments, pub rx_app_state: BusReader, pub tx_model_state: SyncSender, - pub rx_cell_info: BusReader, pub rx_dci: BusReader, pub rx_rnti: BusReader, pub rx_download_config: BusReader, @@ -132,9 +141,9 @@ struct RunArgs { struct RunParameters<'a> { tx_metric: &'a mut Bus, - dci_buffer: &'a DciRingBuffer, + dci_buffer: &'a mut DciRingBuffer, rnti: u16, - cell_info: &'a CellInfo, + cell_capacity_prb_per_slot: u16, is_log_metric: &'a bool, } @@ -152,7 +161,6 @@ pub fn deploy_model_handler(args: ModelHandlerArgs) -> Result> { app_args: args.app_args, rx_app_state: args.rx_app_state, tx_model_state: args.tx_model_state, - rx_cell_info: args.rx_cell_info, rx_dci: args.rx_dci, rx_rnti: args.rx_rnti, rx_download_config: args.rx_download_config, @@ -182,7 +190,6 @@ fn run(run_args: &mut RunArgs) -> Result<()> { let app_args = &run_args.app_args; let rx_app_state: &mut BusReader = &mut run_args.rx_app_state; let tx_model_state: &mut SyncSender = &mut run_args.tx_model_state; - let rx_cell_info: &mut BusReader = &mut run_args.rx_cell_info; let rx_dci: &mut BusReader = &mut run_args.rx_dci; let rx_rnti: &mut BusReader = &mut run_args.rx_rnti; let rx_download_config: &mut BusReader = @@ -201,7 +208,7 @@ fn run(run_args: &mut RunArgs) -> Result<()> { let mut last_metric_timestamp_us: u64 = chrono::Local::now().timestamp_micros() as u64; let mut dci_buffer = DciRingBuffer::new(); let mut last_rnti: Option = None; - let mut last_cell_info: Option = None; + let mut last_cell_capacity: Option = None; let mut last_rnti_share_type: u8 = RNTI_SHARE_TYPE_ALL; let mut last_rtt_us: Option = Some(40000); let mut metric_sending_interval_us: u64 = determine_sending_interval(&model_args, &last_rtt_us); @@ -213,18 +220,7 @@ fn run(run_args: &mut RunArgs) -> Result<()> { if check_not_stopped(rx_app_state).is_err() { break; } - match rx_dci.try_recv() { - Ok(dci) => { - dci_buffer.push(dci.ngscope_dci); - } - Err(TryRecvError::Empty) => {} - Err(TryRecvError::Disconnected) => break, - }; - match rx_cell_info.try_recv() { - Ok(cell_info) => last_cell_info = Some(cell_info.cell_info.clone()), - Err(TryRecvError::Empty) => {} - Err(TryRecvError::Disconnected) => break, - }; + unpack_all_dci_messages(rx_dci, &mut dci_buffer, &mut last_cell_capacity)?; match rx_rnti.try_recv() { Ok(rnti_msg) => { if let Some(rnti) = rnti_msg.cell_rnti.values().copied().next() { @@ -251,15 +247,15 @@ fn run(run_args: &mut RunArgs) -> Result<()> { } /* */ - if let (Some(rnti), Some(cell_info)) = (last_rnti, last_cell_info.clone()) { + if let (Some(rnti), Some(cell_capacity_prb_per_slot)) = (last_rnti, last_cell_capacity) { let delta_last_metric_sent_us = chrono::Local::now().timestamp_micros() as u64 - last_metric_timestamp_us; if delta_last_metric_sent_us > metric_sending_interval_us { let mut run_params = RunParameters { tx_metric, - dci_buffer: &dci_buffer, + dci_buffer: &mut dci_buffer, rnti, - cell_info: &cell_info, + cell_capacity_prb_per_slot, is_log_metric: &is_log_metric, }; @@ -291,6 +287,33 @@ fn finish(run_args: RunArgs) { let _ = send_final_state(&run_args.tx_model_state); } +fn unpack_all_dci_messages( + rx_dci: &mut BusReader, + dci_buffer: &mut DciRingBuffer, + last_cell_capacity: &mut Option, +) -> Result<()> { + + loop { + match rx_dci.try_recv() { + Ok(dci) => { + match dci { + MessageDci::CellDci(ngscope_dci) => { + dci_buffer.push(*ngscope_dci) + } + MessageDci::CellConfig(ngscope_cell_config) => { + if ngscope_cell_config.nof_cell == 1 { + *last_cell_capacity = Some(ngscope_cell_config.cell_prb[0]) + } + } + } + }, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => return Err(anyhow!("[model] error: rx_dci disconnected")) + } + } + Ok(()) +} + fn handle_calculate_metric( run_params: &mut RunParameters, sending_behavior: &mut RunParametersSendingBehavior, @@ -300,7 +323,7 @@ fn handle_calculate_metric( tx_metric, dci_buffer, rnti, - cell_info, + cell_capacity_prb_per_slot, is_log_metric, } = run_params; @@ -318,7 +341,7 @@ fn handle_calculate_metric( if !buffer_slice.is_empty() { if let Ok(metric_wrapper) = calculate_capacity( *rnti, - cell_info, + *cell_capacity_prb_per_slot, buffer_slice, is_log_metric, rnti_share_type, @@ -354,13 +377,13 @@ fn handle_calculate_metric( fn calculate_capacity( target_rnti: u16, - cell_info: &CellInfo, + cell_capacity_prb_per_slot: u16, dci_list: &[NgScopeCellDci], is_log_metric: &bool, rnti_share_type: &u8, ) -> Result { let metric_wrapper = - calculate_pbe_cc_capacity(target_rnti, cell_info, dci_list, rnti_share_type)?; + calculate_pbe_cc_capacity(target_rnti, cell_capacity_prb_per_slot, dci_list, rnti_share_type)?; if *is_log_metric { let _ = log_metric(metric_wrapper.clone()); } @@ -402,7 +425,7 @@ fn calculate_capacity( * */ fn calculate_pbe_cc_capacity( target_rnti: u16, - cell_info: &CellInfo, + cell_capacity_prb_per_slot: u16, dci_list: &[NgScopeCellDci], rnti_share_type: &u8, ) -> Result { @@ -416,7 +439,7 @@ fn calculate_pbe_cc_capacity( * */ // Total number of PRBs in a subframe, that the cell can offer let p_cell: u64 = - STANDARD_NOF_PRB_SLOT_TO_SUBFRAME * cell_info.cells[0].nof_prb as u64 * nof_dci; + STANDARD_NOF_PRB_SLOT_TO_SUBFRAME * cell_capacity_prb_per_slot as u64 * nof_dci; // Total number of unique RNTIs let nof_rnti: u64 = dci_list @@ -644,10 +667,7 @@ fn determine_smoothing_size(model_args: &FlattenedModelArgs, last_rtt_us: &Optio #[cfg(test)] mod tests { use super::*; - use crate::{ - cell_info::SingleCell, - ngscope::types::{NgScopeRntiDci, NGSCOPE_MAX_NOF_RNTI}, - }; + use crate:: ngscope::types::{NgScopeRntiDci, NGSCOPE_MAX_NOF_RNTI}; fn dummy_rnti_dci(nof_rnti: u8) -> [NgScopeRntiDci; NGSCOPE_MAX_NOF_RNTI] { let mut rnti_list = [NgScopeRntiDci::default(); NGSCOPE_MAX_NOF_RNTI]; @@ -686,15 +706,10 @@ mod tests { #[test] fn test_capacity() -> Result<()> { let dummy_rnti: u16 = 123; - let dummy_cell_info = CellInfo { - cells: vec![SingleCell { - nof_prb: 100, - ..Default::default() - }], - }; + let dummy_cell_capacity_prb: u16 = 100; let metric_params = calculate_capacity( dummy_rnti, - &dummy_cell_info, + dummy_cell_capacity_prb, &dummy_dci_slice(), &false, &RNTI_SHARE_TYPE_ALL, diff --git a/src/logic/ngscope_controller.rs b/src/logic/ngscope_controller.rs index daf9bb9..2cc728f 100644 --- a/src/logic/ngscope_controller.rs +++ b/src/logic/ngscope_controller.rs @@ -317,7 +317,11 @@ fn run_dci_fetcher( } LocalDciState::WaitForServerAuth(successful_auths) => { dci_state = match ngscope_validate_server_check(&socket)? { - Some(_) => { + Some(msg) => { + if let Message::Config(ngscope_cell_config) = msg { + print_info(&format!("[ngcontrol] {:?}", ngscope_cell_config)); + broadcast_message_dci(&mut tx_dci, MessageDci::CellConfig(Box::new(ngscope_cell_config))); + } if successful_auths >= 1 { LocalDciState::SuccessfulAuth } else { @@ -342,6 +346,19 @@ fn run_dci_fetcher( Ok(()) } +fn broadcast_message_dci( + tx_dci: &mut Bus, + message_dci: MessageDci, +) { + match tx_dci.try_broadcast(message_dci) { + Ok(_) => {} + Err(msg) => { + print_info("ERROR [ngcontrol] DCI bus is full!!"); + tx_dci.broadcast(msg) + } + } +} + fn check_ngscope_message( socket: &UdpSocket, tx_dci: &mut Bus, @@ -373,19 +390,11 @@ fn check_ngscope_message( *last_dci_timestamp_us = cell_dci.time_stamp; } /* check bus size */ - let message_dci = MessageDci { - ngscope_dci: *cell_dci, - }; if *is_log_dci { log_dci_buffer.push(*cell_dci.clone()); } - match tx_dci.try_broadcast(message_dci) { - Ok(_) => {} - Err(msg) => { - print_info("ERROR [ngcontrol] DCI bus is full!!"); - tx_dci.broadcast(msg) - } - } + let message_dci = MessageDci::CellDci(Box::new(*cell_dci)); + broadcast_message_dci(tx_dci, message_dci) } Message::Dci(ue_dci) => { // TODO: Evaluate how to handle this @@ -394,6 +403,8 @@ fn check_ngscope_message( Message::Config(cell_config) => { // TODO: Evaluate how to handle this print_info(&format!("[ngcontrol] {:?}", cell_config)); + let message_dci = MessageDci::CellConfig(Box::new(cell_config)); + broadcast_message_dci(tx_dci, message_dci) } // TODO: Evaluate how to handle Start andExit Message::Start => {} diff --git a/src/logic/rnti_matcher.rs b/src/logic/rnti_matcher.rs index 726e68b..a561083 100644 --- a/src/logic/rnti_matcher.rs +++ b/src/logic/rnti_matcher.rs @@ -41,7 +41,7 @@ pub const COLLECT_DCI_MAX_TIMESTAMP_DELTA_US: u64 = 50000; pub const BASIC_FILTER_MAX_TOTAL_UL_FACTOR: f64 = 200.0; pub const BASIC_FILTER_MIN_TOTAL_UL_FACTOR: f64 = 0.005; pub const BASIC_FILTER_MAX_UL_PER_DCI: u64 = 5_000_000; -pub const BASIC_FILTER_MIN_OCCURENCES_FACTOR: f64 = 0.05; +pub const BASIC_FILTER_MIN_OCCURENCES_FACTOR: f64 = 0.01; pub const RNTI_RING_BUFFER_SIZE: usize = 10; @@ -343,8 +343,10 @@ fn handle_collect_dci( let start_timestamp_ms_bound = traffic_collection.start_timestamp_ms * TIME_MS_TO_US_FACTOR; for dci in dci_list.iter() { - if dci.ngscope_dci.time_stamp >= start_timestamp_ms_bound { - traffic_collection.update_from_cell_dci(&dci.ngscope_dci); + if let MessageDci::CellDci(ngscope_dci) = dci { + if ngscope_dci.time_stamp >= start_timestamp_ms_bound { + traffic_collection.update_from_cell_dci(ngscope_dci); + } } } RntiMatcherState::MatchingCollectDci(Box::new(traffic_collection)) diff --git a/src/main.rs b/src/main.rs index bd7316b..5bfdea9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -83,7 +83,6 @@ fn deploy_app( app_args: app_args.clone(), rx_app_state: tx_app_state.add_rx(), tx_model_state: all_tx_states.model, - rx_cell_info: tx_cell_info.add_rx(), rx_dci: tx_dci.add_rx(), rx_rnti: tx_rnti.add_rx(), tx_metric, diff --git a/src/ngscope/mod.rs b/src/ngscope/mod.rs index 019bcf2..e835284 100644 --- a/src/ngscope/mod.rs +++ b/src/ngscope/mod.rs @@ -61,16 +61,6 @@ pub fn ngscope_recv_single_message_type(socket: &UdpSocket) -> Result<(MessageTy } } -pub fn ngscope_recv_single_message_type_non_blocking( - socket: &UdpSocket, -) -> Result<(MessageType, Vec)> { - let mut buf = [0u8; types::NGSCOPE_REMOTE_BUFFER_SIZE]; - match socket.recv_from(&mut buf) { - Ok((nof_recv, _)) => types::ngscope_extract_packet(&buf[..nof_recv]), - Err(err) => Err(err.into()), - } -} - pub fn ngscope_recv_single_message(socket: &UdpSocket) -> Result { let mut buf = [0u8; types::NGSCOPE_REMOTE_BUFFER_SIZE]; let (nof_recv, _) = socket.recv_from(&mut buf)?; @@ -114,16 +104,17 @@ pub fn ngscope_validate_server_send_initial(socket: &UdpSocket, server_addr: &st Ok(()) } -pub fn ngscope_validate_server_check(socket: &UdpSocket) -> Result> { - let msg_type = ngscope_recv_single_message_type_non_blocking(socket); - match msg_type { - Ok((msg_type, _)) => match msg_type { - MessageType::Start | MessageType::Dci | MessageType::CellDci | MessageType::Config => { - Ok(Some(())) - } - MessageType::Exit => Err(anyhow!( +pub fn ngscope_validate_server_check(socket: &UdpSocket) -> Result> { + let msg = ngscope_recv_single_message(socket); + match msg { + Ok(msg) => match msg { + Message::Exit => Err(anyhow!( "Received Exit from ngscope server during validation" )), + Message::Start | + Message::Dci(_) | + Message::CellDci(_) | + Message::Config(_) => { Ok(Some(msg)) } }, Err(_) => Ok(None), } diff --git a/src/ngscope/types.rs b/src/ngscope/types.rs index 8bfc800..e50a0e3 100644 --- a/src/ngscope/types.rs +++ b/src/ngscope/types.rs @@ -14,7 +14,7 @@ pub const NGSCOPE_MESSAGE_VERSION_POSITION: usize = 4; pub const NGSCOPE_MESSAGE_CONTENT_POSITION: usize = 5; pub const NGSCOPE_STRUCT_SIZE_DCI: usize = 40; pub const NGSCOPE_STRUCT_SIZE_CELL_DCI: usize = 856; -pub const NGSCOPE_STRUCT_SIZE_CONFIG: usize = 12; // TODO: Determine this actually +pub const NGSCOPE_STRUCT_SIZE_CONFIG: usize = 12; // IMPORTANT: // - when receiving messages, check the timestamp - due to UDP, messages might arrive out of order @@ -70,9 +70,8 @@ impl Message { )?)) } MessageType::Config => { - check_size(bytes.len(), NGSCOPE_MESSAGE_VERSION_POSITION + 1)?; - let _version_byte: u8 = bytes[NGSCOPE_MESSAGE_VERSION_POSITION]; - let content_bytes: &[u8] = &bytes[NGSCOPE_MESSAGE_CONTENT_POSITION..]; + let content_bytes: &[u8] = &bytes[NGSCOPE_MESSAGE_VERSION_POSITION..]; + check_size(content_bytes.len(), NGSCOPE_STRUCT_SIZE_CONFIG)?; Message::Config(NgScopeCellConfig::from_bytes(content_bytes.try_into()?)?) } MessageType::Exit => Message::Exit,