Skip to content

Commit

Permalink
WiP: Implement rnti matching and fining best match
Browse files Browse the repository at this point in the history
* Refactor rntimatcher and add RunArgs for better error handling
* Implement RntiMatching datastructures
  • Loading branch information
bastian-src committed May 24, 2024
1 parent 57985eb commit 8ee4d52
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 120 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.80"
bus = "2.4.1"
chrono = "0.4.38"
clap = { version = "4.5.4", features = ["derive", "color", "env", "help"] }
clap-serde = "0.5.1"
confy = { version = "0.6.1", default-features = false, features = ["yaml_conf"] }
Expand Down
19 changes: 14 additions & 5 deletions src/logic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(dead_code)]

use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::mpsc::{TryRecvError, Receiver};

Expand All @@ -9,6 +10,7 @@ use bus::BusReader;
use crate::cell_info::CellInfo;
use crate::ngscope::config::NgScopeConfig;
use crate::ngscope::types::NgScopeCellDci;
use crate::logic::rnti_matcher::TrafficCollection;

pub mod cell_sink;
pub mod cell_source;
Expand Down Expand Up @@ -148,13 +150,19 @@ pub enum RntiMatcherState {
Stopped,
Idle,
StartMatching,
MatchingCollectDci,
MatchingProcessDci,
MatchingError,
MatchingCollectDci(Box<TrafficCollection>),
MatchingProcessDci(Box<TrafficCollection>),
MatchingPublishRnti(MessageRnti),
MatchingError(RntiMatchingErrorType),
StoppingTrafficGeneratorThread,
SleepMs(u64, Box<RntiMatcherState>),
}

#[derive(Clone, Debug, PartialEq)]
pub enum RntiMatchingErrorType {
ExceededDciTimestampDelta
}

impl WorkerState for RntiMatcherState {
fn worker_name() -> String {
"rntimatcher".to_owned()
Expand Down Expand Up @@ -216,9 +224,10 @@ pub struct MessageCellInfo {
}

#[allow(dead_code)]
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Default)]
pub struct MessageRnti {
rnti: u16,
/* cell_id -> ue_rnti */
cell_rnti: HashMap<u64, u16>,
}

/* -------------- */
Expand Down
99 changes: 52 additions & 47 deletions src/logic/ngscope_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ pub struct NgControlArgs {
pub tx_dci: Bus<MessageDci>,
}

struct RunArgs {
rx_app_state: BusReader<MainState>,
tx_ngcontrol_state: SyncSender<NgControlState>,
app_args: Arguments,
rx_cell_info: BusReader<MessageCellInfo>,
tx_dci_thread_handle: Option<SyncSender<LocalDciState>>,
dci_thread_handle: Option<JoinHandle<()>>,
ng_process_handle: Option<Child>,
}

struct RunArgsMovables {
tx_dci: Bus<MessageDci>,
}

pub fn deploy_ngscope_controller(args: NgControlArgs) -> Result<JoinHandle<()>> {
let mut run_args: RunArgs = RunArgs {
rx_app_state: args.rx_app_state,
Expand All @@ -63,55 +77,9 @@ pub fn deploy_ngscope_controller(args: NgControlArgs) -> Result<JoinHandle<()>>
Ok(thread)
}

fn send_final_state(tx_ngcontrol_state: &SyncSender<NgControlState>) -> Result<()> {
Ok(tx_ngcontrol_state.send(NgControlState::Stopped)?)
}

fn wait_for_running(
rx_app_state: &mut BusReader<MainState>,
tx_ngcontrol_state: &SyncSender<NgControlState>,
) -> Result<()> {
match wait_until_running(rx_app_state) {
Ok(_) => Ok(()),
_ => {
send_final_state(tx_ngcontrol_state)?;
Err(anyhow!("[source] Main did not send 'Running' message"))
}
}
}

fn check_cell_update(
rx_cell_info: &mut BusReader<MessageCellInfo>,
) -> Result<Option<CellInfo>> {
match rx_cell_info.try_recv() {
Ok(msg) => {
Ok(Some(msg.cell_info))
},
Err(TryRecvError::Disconnected) => {
// TODO: print error properly
Err(anyhow!("[ngcontrol] err: rx_cell_info disconnected!"))
}
Err(TryRecvError::Empty) => { Ok(None) },
}
}

struct RunArgs {
rx_app_state: BusReader<MainState>,
tx_ngcontrol_state: SyncSender<NgControlState>,
app_args: Arguments,
rx_cell_info: BusReader<MessageCellInfo>,
tx_dci_thread_handle: Option<SyncSender<LocalDciState>>,
dci_thread_handle: Option<JoinHandle<()>>,
ng_process_handle: Option<Child>,
}

struct RunArgsMovables {
tx_dci: Bus<MessageDci>,
}

fn run(run_args: &mut RunArgs, run_args_mov: RunArgsMovables) -> Result<()> {
let tx_ngcontrol_state = &mut run_args.tx_ngcontrol_state;
let rx_app_state = &mut run_args.rx_app_state;
let tx_ngcontrol_state = &mut run_args.tx_ngcontrol_state;
let app_args = &run_args.app_args;
let rx_cell_info = &mut run_args.rx_cell_info;
let tx_dci = run_args_mov.tx_dci;
Expand Down Expand Up @@ -346,6 +314,10 @@ fn check_ngscope_message(socket: &UdpSocket, tx_dci: &mut Bus<MessageDci>) {
}
}

/* -------------- */
/* Helpers */
/* -------------- */

fn check_rx_state(
rx_local_dci_state: &Receiver<LocalDciState>,
) -> Result<Option<LocalDciState>> {
Expand All @@ -363,3 +335,36 @@ fn init_dci_server(local_addr: &str) -> Result<UdpSocket> {
Ok(socket)
}

fn send_final_state(tx_ngcontrol_state: &SyncSender<NgControlState>) -> Result<()> {
Ok(tx_ngcontrol_state.send(NgControlState::Stopped)?)
}

fn wait_for_running(
rx_app_state: &mut BusReader<MainState>,
tx_ngcontrol_state: &SyncSender<NgControlState>,
) -> Result<()> {
match wait_until_running(rx_app_state) {
Ok(_) => Ok(()),
_ => {
send_final_state(tx_ngcontrol_state)?;
Err(anyhow!("[source] Main did not send 'Running' message"))
}
}
}

fn check_cell_update(
rx_cell_info: &mut BusReader<MessageCellInfo>,
) -> Result<Option<CellInfo>> {
match rx_cell_info.try_recv() {
Ok(msg) => {
Ok(Some(msg.cell_info))
},
Err(TryRecvError::Disconnected) => {
// TODO: print error properly
Err(anyhow!("[ngcontrol] err: rx_cell_info disconnected!"))
}
Err(TryRecvError::Empty) => { Ok(None) },
}
}


Loading

0 comments on commit 8ee4d52

Please sign in to comment.