diff --git a/src/logic/mod.rs b/src/logic/mod.rs index 8a5ceb3..9ce9319 100644 --- a/src/logic/mod.rs +++ b/src/logic/mod.rs @@ -13,7 +13,7 @@ use crate::logic::rnti_matcher::TrafficCollection; use crate::ngscope::config::NgScopeConfig; use crate::ngscope::types::NgScopeCellDci; -pub mod cell_sink; +pub mod model_handler; pub mod cell_source; pub mod ngscope_controller; pub mod rnti_matcher; @@ -28,6 +28,7 @@ pub const BUS_SIZE_APP_STATE: usize = 50; pub const BUS_SIZE_DCI: usize = 100000; pub const BUS_SIZE_CELL_INFO: usize = 100; pub const BUS_SIZE_RNTI: usize = 100; +pub const BUS_SIZE_METRIC: usize = 100; pub trait WorkerState: Sized + Clone + Sync + Debug { fn to_general_state(&self) -> GeneralState; @@ -108,21 +109,21 @@ impl WorkerState for MainState { } #[derive(Clone, Copy, Debug, PartialEq)] -pub enum SinkState { +pub enum ModelState { Running, Stopped, SpecialState, } -impl WorkerState for SinkState { +impl WorkerState for ModelState { fn worker_name() -> String { - "sink".to_owned() + "model".to_owned() } fn to_general_state(&self) -> GeneralState { match self { - SinkState::Running => GeneralState::Running, - SinkState::Stopped => GeneralState::Stopped, + ModelState::Running => GeneralState::Running, + ModelState::Stopped => GeneralState::Stopped, _ => GeneralState::Unknown, } } @@ -255,6 +256,15 @@ pub struct MessageRnti { cell_rnti: HashMap, } +#[allow(dead_code)] +#[derive(Clone, Debug, PartialEq, Default)] +pub struct MessageMetric { + /// Timestamp when the metric was calculated (not sent!) + timestamp_us: u64, + /// Fair share send rate [bits/subframe] = [bits/ms] + fair_share_send_rate: u64, +} + /* -------------- */ /* Logic Helper */ /* -------------- */ diff --git a/src/logic/cell_sink.rs b/src/logic/model_handler.rs similarity index 69% rename from src/logic/cell_sink.rs rename to src/logic/model_handler.rs index a48a765..7c8a717 100644 --- a/src/logic/cell_sink.rs +++ b/src/logic/model_handler.rs @@ -4,62 +4,65 @@ use std::thread::{self, JoinHandle}; use std::time::Duration; use anyhow::{anyhow, Result}; -use bus::BusReader; +use bus::{BusReader, Bus}; use crate::logic::{ check_not_stopped, wait_until_running, MainState, MessageCellInfo, MessageDci, MessageRnti, - SinkState, DEFAULT_WORKER_SLEEP_US, + MessageMetric, ModelState, DEFAULT_WORKER_SLEEP_US, }; use crate::util::determine_process_id; -pub struct CellSinkArgs { +pub struct ModelHandlerArgs { pub rx_app_state: BusReader, - pub tx_sink_state: SyncSender, + pub tx_model_state: SyncSender, pub rx_cell_info: BusReader, pub rx_dci: BusReader, pub rx_rnti: BusReader, + pub tx_metric: Bus, } -pub fn deploy_cell_sink(mut args: CellSinkArgs) -> Result> { +pub fn deploy_model_handler(mut args: ModelHandlerArgs) -> Result> { let thread = thread::spawn(move || { let _ = run( args.rx_app_state, - args.tx_sink_state, + args.tx_model_state, &mut args.rx_cell_info, &mut args.rx_dci, &mut args.rx_rnti, + &mut args.tx_metric, ); }); Ok(thread) } -fn send_final_state(tx_sink_state: &SyncSender) -> Result<()> { - Ok(tx_sink_state.send(SinkState::Stopped)?) +fn send_final_state(tx_model_state: &SyncSender) -> Result<()> { + Ok(tx_model_state.send(ModelState::Stopped)?) } fn wait_for_running( rx_app_state: &mut BusReader, - tx_sink_state: &SyncSender, + tx_model_state: &SyncSender, ) -> Result<()> { match wait_until_running(rx_app_state) { Ok(_) => Ok(()), _ => { - send_final_state(tx_sink_state)?; - Err(anyhow!("[sink] Main did not send 'Running' message")) + send_final_state(tx_model_state)?; + Err(anyhow!("[model] Main did not send 'Running' message")) } } } fn run( mut rx_app_state: BusReader, - tx_sink_state: SyncSender, + tx_model_state: SyncSender, rx_cell_info: &mut BusReader, rx_dci: &mut BusReader, rx_rnti: &mut BusReader, + _tx_metric: &mut Bus, ) -> Result<()> { - tx_sink_state.send(SinkState::Running)?; - wait_for_running(&mut rx_app_state, &tx_sink_state)?; - print_info(&format!("[sink]: \t\tPID {:?}", determine_process_id())); + tx_model_state.send(ModelState::Running)?; + wait_for_running(&mut rx_app_state, &tx_model_state)?; + print_info(&format!("[model]: \t\tPID {:?}", determine_process_id())); let sleep_duration = Duration::from_micros(DEFAULT_WORKER_SLEEP_US); loop { @@ -90,16 +93,16 @@ fn run( if let Some(rnti_msg) = new_rnti { if !rnti_msg.cell_rnti.is_empty() { print_debug(&format!( - "DEBUG [sink] new rnti {:#?}", + "DEBUG [model] new rnti {:#?}", rnti_msg.cell_rnti.get(&0).unwrap() )); } } - // TODO: Consume rx_dci, rx_cell_info, and rx_rnti // TODO: -> Send combined message to some remote + // tx_metrc.send(metrics) } - send_final_state(&tx_sink_state)?; + send_final_state(&tx_model_state)?; Ok(()) } diff --git a/src/main.rs b/src/main.rs index 8b1eb8b..072f83c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,28 +17,28 @@ mod parse; mod util; mod math_util; -use logic::cell_sink::{deploy_cell_sink, CellSinkArgs}; +use logic::model_handler::{deploy_model_handler, ModelHandlerArgs}; use logic::cell_source::{deploy_cell_source, CellSourceArgs}; use logic::ngscope_controller::{deploy_ngscope_controller, NgControlArgs}; use logic::rnti_matcher::{deploy_rnti_matcher, RntiMatcherArgs}; -use logic::WorkerChannel; +use logic::{WorkerChannel, MessageMetric, BUS_SIZE_METRIC}; use logic::{ GeneralState, MainState, MessageCellInfo, MessageDci, MessageRnti, NgControlState, - RntiMatcherState, SinkState, SourceState, WorkerState, BUS_SIZE_APP_STATE, BUS_SIZE_CELL_INFO, + RntiMatcherState, ModelState, SourceState, WorkerState, BUS_SIZE_APP_STATE, BUS_SIZE_CELL_INFO, BUS_SIZE_DCI, BUS_SIZE_RNTI, CHANNEL_SYNC_SIZE, WORKER_SLEEP_LONG_MS, }; use parse::Arguments; use util::{determine_process_id, is_notifier, prepare_sigint_notifier, print_info, set_debug}; struct CombinedReceivers { - pub sink: Receiver, + pub model: Receiver, pub source: Receiver, pub rntimatcher: Receiver, pub ngcontrol: Receiver, } struct CombinedSenders { - pub sink: SyncSender, + pub model: SyncSender, pub source: SyncSender, pub rntimatcher: SyncSender, pub ngcontrol: SyncSender, @@ -47,7 +47,7 @@ struct CombinedSenders { impl CombinedReceivers { fn print_worker_messages(&self) { let _ = &self.source.worker_print_on_recv(); - let _ = &self.sink.worker_print_on_recv(); + let _ = &self.model.worker_print_on_recv(); let _ = &self.ngcontrol.worker_print_on_recv(); let _ = &self.rntimatcher.worker_print_on_recv(); } @@ -61,13 +61,15 @@ fn deploy_app( let mut tx_dci: Bus = Bus::::new(BUS_SIZE_DCI); let mut tx_cell_info: Bus = Bus::::new(BUS_SIZE_CELL_INFO); let mut tx_rnti: Bus = Bus::::new(BUS_SIZE_RNTI); + let tx_metric: Bus = Bus::::new(BUS_SIZE_METRIC); - let sink_args = CellSinkArgs { + let model_args = ModelHandlerArgs { rx_app_state: tx_app_state.add_rx(), - tx_sink_state: all_tx_states.sink, + tx_model_state: all_tx_states.model, rx_cell_info: tx_cell_info.add_rx(), rx_dci: tx_dci.add_rx(), rx_rnti: tx_rnti.add_rx(), + tx_metric, }; let rntimatcher_args = RntiMatcherArgs { rx_app_state: tx_app_state.add_rx(), @@ -93,7 +95,7 @@ fn deploy_app( let tasks: Vec> = vec![ deploy_ngscope_controller(ngcontrol_args)?, deploy_cell_source(source_args)?, - deploy_cell_sink(sink_args)?, + deploy_model_handler(model_args)?, deploy_rnti_matcher(rntimatcher_args)?, ]; Ok(tasks) @@ -131,7 +133,7 @@ fn wait_all_running( ) -> Result<()> { print_info("[ ] waiting for all threads to become ready"); - let mut waiting_for: HashSet<&str> = vec!["source", "sink", "rntimatcher", "ngcontrol"] + let mut waiting_for: HashSet<&str> = vec!["source", "model", "rntimatcher", "ngcontrol"] .into_iter() .collect(); @@ -146,9 +148,9 @@ fn wait_all_running( waiting_for.remove("source"); } } - if waiting_for.contains("sink") { - if let Ok(Some(_)) = check_running(&all_rx_states.sink) { - waiting_for.remove("sink"); + if waiting_for.contains("model") { + if let Ok(Some(_)) = check_running(&all_rx_states.model) { + waiting_for.remove("model"); } } if waiting_for.contains("rntimatcher") { @@ -183,18 +185,18 @@ fn main() -> Result<(), Box> { let sigint_notifier = prepare_sigint_notifier()?; let mut tx_app_state = Bus::::new(BUS_SIZE_APP_STATE); - let (sink_tx, sink_rx) = sync_channel::(CHANNEL_SYNC_SIZE); + let (model_tx, model_rx) = sync_channel::(CHANNEL_SYNC_SIZE); let (source_tx, source_rx) = sync_channel::(CHANNEL_SYNC_SIZE); let (rntimatcher_tx, rntimatcher_rx) = sync_channel::(CHANNEL_SYNC_SIZE); let (ngcontrol_tx, ngcontrol_rx) = sync_channel::(CHANNEL_SYNC_SIZE); let all_tx_states = CombinedSenders { - sink: sink_tx, + model: model_tx, source: source_tx, rntimatcher: rntimatcher_tx, ngcontrol: ngcontrol_tx, }; let all_rx_states = CombinedReceivers { - sink: sink_rx, + model: model_rx, source: source_rx, rntimatcher: rntimatcher_rx, ngcontrol: ngcontrol_rx,