Skip to content

Commit

Permalink
Prepare ModelHandler: [model]
Browse files Browse the repository at this point in the history
* Rename CellSink to ModelHandler
* Add Bus in [model] to publish metric
  • Loading branch information
bastian-src committed Jun 22, 2024
1 parent a00da67 commit 997d5d9
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 40 deletions.
22 changes: 16 additions & 6 deletions src/logic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::logic::rnti_matcher::TrafficCollection;
use crate::ngscope::config::NgScopeConfig;
use crate::ngscope::types::NgScopeCellDci;

pub mod cell_sink;
pub mod model_handler;
pub mod cell_source;
pub mod ngscope_controller;
pub mod rnti_matcher;
Expand All @@ -28,6 +28,7 @@ pub const BUS_SIZE_APP_STATE: usize = 50;
pub const BUS_SIZE_DCI: usize = 100000;
pub const BUS_SIZE_CELL_INFO: usize = 100;
pub const BUS_SIZE_RNTI: usize = 100;
pub const BUS_SIZE_METRIC: usize = 100;

pub trait WorkerState: Sized + Clone + Sync + Debug {
fn to_general_state(&self) -> GeneralState;
Expand Down Expand Up @@ -108,21 +109,21 @@ impl WorkerState for MainState {
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SinkState {
pub enum ModelState {
Running,
Stopped,
SpecialState,
}

impl WorkerState for SinkState {
impl WorkerState for ModelState {
fn worker_name() -> String {
"sink".to_owned()
"model".to_owned()
}

fn to_general_state(&self) -> GeneralState {
match self {
SinkState::Running => GeneralState::Running,
SinkState::Stopped => GeneralState::Stopped,
ModelState::Running => GeneralState::Running,
ModelState::Stopped => GeneralState::Stopped,
_ => GeneralState::Unknown,
}
}
Expand Down Expand Up @@ -255,6 +256,15 @@ pub struct MessageRnti {
cell_rnti: HashMap<u64, u16>,
}

#[allow(dead_code)]
#[derive(Clone, Debug, PartialEq, Default)]
pub struct MessageMetric {
/// Timestamp when the metric was calculated (not sent!)
timestamp_us: u64,
/// Fair share send rate [bits/subframe] = [bits/ms]
fair_share_send_rate: u64,
}

/* -------------- */
/* Logic Helper */
/* -------------- */
Expand Down
39 changes: 21 additions & 18 deletions src/logic/cell_sink.rs → src/logic/model_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,62 +4,65 @@ use std::thread::{self, JoinHandle};
use std::time::Duration;

use anyhow::{anyhow, Result};
use bus::BusReader;
use bus::{BusReader, Bus};

use crate::logic::{
check_not_stopped, wait_until_running, MainState, MessageCellInfo, MessageDci, MessageRnti,
SinkState, DEFAULT_WORKER_SLEEP_US,
MessageMetric, ModelState, DEFAULT_WORKER_SLEEP_US,
};
use crate::util::determine_process_id;

pub struct CellSinkArgs {
pub struct ModelHandlerArgs {
pub rx_app_state: BusReader<MainState>,
pub tx_sink_state: SyncSender<SinkState>,
pub tx_model_state: SyncSender<ModelState>,
pub rx_cell_info: BusReader<MessageCellInfo>,
pub rx_dci: BusReader<MessageDci>,
pub rx_rnti: BusReader<MessageRnti>,
pub tx_metric: Bus<MessageMetric>,
}

pub fn deploy_cell_sink(mut args: CellSinkArgs) -> Result<JoinHandle<()>> {
pub fn deploy_model_handler(mut args: ModelHandlerArgs) -> Result<JoinHandle<()>> {
let thread = thread::spawn(move || {
let _ = run(
args.rx_app_state,
args.tx_sink_state,
args.tx_model_state,
&mut args.rx_cell_info,
&mut args.rx_dci,
&mut args.rx_rnti,
&mut args.tx_metric,
);
});
Ok(thread)
}

fn send_final_state(tx_sink_state: &SyncSender<SinkState>) -> Result<()> {
Ok(tx_sink_state.send(SinkState::Stopped)?)
fn send_final_state(tx_model_state: &SyncSender<ModelState>) -> Result<()> {
Ok(tx_model_state.send(ModelState::Stopped)?)
}

fn wait_for_running(
rx_app_state: &mut BusReader<MainState>,
tx_sink_state: &SyncSender<SinkState>,
tx_model_state: &SyncSender<ModelState>,
) -> Result<()> {
match wait_until_running(rx_app_state) {
Ok(_) => Ok(()),
_ => {
send_final_state(tx_sink_state)?;
Err(anyhow!("[sink] Main did not send 'Running' message"))
send_final_state(tx_model_state)?;
Err(anyhow!("[model] Main did not send 'Running' message"))
}
}
}

fn run(
mut rx_app_state: BusReader<MainState>,
tx_sink_state: SyncSender<SinkState>,
tx_model_state: SyncSender<ModelState>,
rx_cell_info: &mut BusReader<MessageCellInfo>,
rx_dci: &mut BusReader<MessageDci>,
rx_rnti: &mut BusReader<MessageRnti>,
_tx_metric: &mut Bus<MessageMetric>,
) -> Result<()> {
tx_sink_state.send(SinkState::Running)?;
wait_for_running(&mut rx_app_state, &tx_sink_state)?;
print_info(&format!("[sink]: \t\tPID {:?}", determine_process_id()));
tx_model_state.send(ModelState::Running)?;
wait_for_running(&mut rx_app_state, &tx_model_state)?;
print_info(&format!("[model]: \t\tPID {:?}", determine_process_id()));
let sleep_duration = Duration::from_micros(DEFAULT_WORKER_SLEEP_US);

loop {
Expand Down Expand Up @@ -90,16 +93,16 @@ fn run(
if let Some(rnti_msg) = new_rnti {
if !rnti_msg.cell_rnti.is_empty() {
print_debug(&format!(
"DEBUG [sink] new rnti {:#?}",
"DEBUG [model] new rnti {:#?}",
rnti_msg.cell_rnti.get(&0).unwrap()
));
}
}

// TODO: Consume rx_dci, rx_cell_info, and rx_rnti
// TODO: -> Send combined message to some remote
// tx_metrc.send(metrics)
}

send_final_state(&tx_sink_state)?;
send_final_state(&tx_model_state)?;
Ok(())
}
34 changes: 18 additions & 16 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@ mod parse;
mod util;
mod math_util;

use logic::cell_sink::{deploy_cell_sink, CellSinkArgs};
use logic::model_handler::{deploy_model_handler, ModelHandlerArgs};
use logic::cell_source::{deploy_cell_source, CellSourceArgs};
use logic::ngscope_controller::{deploy_ngscope_controller, NgControlArgs};
use logic::rnti_matcher::{deploy_rnti_matcher, RntiMatcherArgs};
use logic::WorkerChannel;
use logic::{WorkerChannel, MessageMetric, BUS_SIZE_METRIC};
use logic::{
GeneralState, MainState, MessageCellInfo, MessageDci, MessageRnti, NgControlState,
RntiMatcherState, SinkState, SourceState, WorkerState, BUS_SIZE_APP_STATE, BUS_SIZE_CELL_INFO,
RntiMatcherState, ModelState, SourceState, WorkerState, BUS_SIZE_APP_STATE, BUS_SIZE_CELL_INFO,
BUS_SIZE_DCI, BUS_SIZE_RNTI, CHANNEL_SYNC_SIZE, WORKER_SLEEP_LONG_MS,
};
use parse::Arguments;
use util::{determine_process_id, is_notifier, prepare_sigint_notifier, print_info, set_debug};

struct CombinedReceivers {
pub sink: Receiver<SinkState>,
pub model: Receiver<ModelState>,
pub source: Receiver<SourceState>,
pub rntimatcher: Receiver<RntiMatcherState>,
pub ngcontrol: Receiver<NgControlState>,
}

struct CombinedSenders {
pub sink: SyncSender<SinkState>,
pub model: SyncSender<ModelState>,
pub source: SyncSender<SourceState>,
pub rntimatcher: SyncSender<RntiMatcherState>,
pub ngcontrol: SyncSender<NgControlState>,
Expand All @@ -47,7 +47,7 @@ struct CombinedSenders {
impl CombinedReceivers {
fn print_worker_messages(&self) {
let _ = &self.source.worker_print_on_recv();
let _ = &self.sink.worker_print_on_recv();
let _ = &self.model.worker_print_on_recv();
let _ = &self.ngcontrol.worker_print_on_recv();
let _ = &self.rntimatcher.worker_print_on_recv();
}
Expand All @@ -61,13 +61,15 @@ fn deploy_app(
let mut tx_dci: Bus<MessageDci> = Bus::<MessageDci>::new(BUS_SIZE_DCI);
let mut tx_cell_info: Bus<MessageCellInfo> = Bus::<MessageCellInfo>::new(BUS_SIZE_CELL_INFO);
let mut tx_rnti: Bus<MessageRnti> = Bus::<MessageRnti>::new(BUS_SIZE_RNTI);
let tx_metric: Bus<MessageMetric> = Bus::<MessageMetric>::new(BUS_SIZE_METRIC);

let sink_args = CellSinkArgs {
let model_args = ModelHandlerArgs {
rx_app_state: tx_app_state.add_rx(),
tx_sink_state: all_tx_states.sink,
tx_model_state: all_tx_states.model,
rx_cell_info: tx_cell_info.add_rx(),
rx_dci: tx_dci.add_rx(),
rx_rnti: tx_rnti.add_rx(),
tx_metric,
};
let rntimatcher_args = RntiMatcherArgs {
rx_app_state: tx_app_state.add_rx(),
Expand All @@ -93,7 +95,7 @@ fn deploy_app(
let tasks: Vec<JoinHandle<()>> = vec![
deploy_ngscope_controller(ngcontrol_args)?,
deploy_cell_source(source_args)?,
deploy_cell_sink(sink_args)?,
deploy_model_handler(model_args)?,
deploy_rnti_matcher(rntimatcher_args)?,
];
Ok(tasks)
Expand Down Expand Up @@ -131,7 +133,7 @@ fn wait_all_running(
) -> Result<()> {
print_info("[ ] waiting for all threads to become ready");

let mut waiting_for: HashSet<&str> = vec!["source", "sink", "rntimatcher", "ngcontrol"]
let mut waiting_for: HashSet<&str> = vec!["source", "model", "rntimatcher", "ngcontrol"]
.into_iter()
.collect();

Expand All @@ -146,9 +148,9 @@ fn wait_all_running(
waiting_for.remove("source");
}
}
if waiting_for.contains("sink") {
if let Ok(Some(_)) = check_running(&all_rx_states.sink) {
waiting_for.remove("sink");
if waiting_for.contains("model") {
if let Ok(Some(_)) = check_running(&all_rx_states.model) {
waiting_for.remove("model");
}
}
if waiting_for.contains("rntimatcher") {
Expand Down Expand Up @@ -183,18 +185,18 @@ fn main() -> Result<(), Box<dyn Error>> {
let sigint_notifier = prepare_sigint_notifier()?;

let mut tx_app_state = Bus::<MainState>::new(BUS_SIZE_APP_STATE);
let (sink_tx, sink_rx) = sync_channel::<SinkState>(CHANNEL_SYNC_SIZE);
let (model_tx, model_rx) = sync_channel::<ModelState>(CHANNEL_SYNC_SIZE);
let (source_tx, source_rx) = sync_channel::<SourceState>(CHANNEL_SYNC_SIZE);
let (rntimatcher_tx, rntimatcher_rx) = sync_channel::<RntiMatcherState>(CHANNEL_SYNC_SIZE);
let (ngcontrol_tx, ngcontrol_rx) = sync_channel::<NgControlState>(CHANNEL_SYNC_SIZE);
let all_tx_states = CombinedSenders {
sink: sink_tx,
model: model_tx,
source: source_tx,
rntimatcher: rntimatcher_tx,
ngcontrol: ngcontrol_tx,
};
let all_rx_states = CombinedReceivers {
sink: sink_rx,
model: model_rx,
source: source_rx,
rntimatcher: rntimatcher_rx,
ngcontrol: ngcontrol_rx,
Expand Down

0 comments on commit 997d5d9

Please sign in to comment.