Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
* Implement WorkerChannel and functions directly on Receiver<WorkerState>
* Add communication from dci_fetcher thread to NgScopeControl main thread
* Add main-based trigger from ngcontrol to rntimatcher
  • Loading branch information
bastian-src committed May 23, 2024
1 parent ec2df34 commit fededd9
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 112 deletions.
9 changes: 7 additions & 2 deletions src/cell_info.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use anyhow::{anyhow, Result};

use regex::Regex;
Expand All @@ -6,8 +8,8 @@ use serde_derive::Deserialize;

use crate::util::helper_json_pointer;

// TODO: CellInfo should be able to hold several cell
// Define the struct to hold cell information

pub const REQUEST_TIMEOUT_MS: u64 = 2000;

#[derive(Debug, Clone, PartialEq)]
#[allow(clippy::upper_case_acronyms)]
Expand Down Expand Up @@ -207,6 +209,7 @@ async fn cgi_get_token(base_addr: &str, user: &str, auth: &str) -> Result<Header
);
let resp = reqwest::Client::new()
.get(&url)
.timeout(Duration::from_millis(REQUEST_TIMEOUT_MS))
.header("Accept", "application/json, text/javascript, */*; q=0.01")
.header("Content-Type", "application/json")
.body(payload)
Expand Down Expand Up @@ -240,6 +243,7 @@ async fn cgi_get_cell(base_addr: &str, header_map: &HeaderMap) -> Result<serde_j
.default_headers(header_map.clone())
.build()?
.get(&url)
.timeout(Duration::from_millis(REQUEST_TIMEOUT_MS))
.header("Accept", "application/json, text/javascript, */*; q=0.01")
.header("Content-Type", "application/json")
.body(payload)
Expand All @@ -254,6 +258,7 @@ async fn devpub_get_cell(base_addr: &str) -> Result<String> {
let url = format!("http://{}:7353/api/v1/celldata/connected/all", base_addr);
let resp = reqwest::Client::new()
.get(&url)
.timeout(Duration::from_millis(REQUEST_TIMEOUT_MS))
.header("Accept", "application/json, text/javascript, */*; q=0.01")
.send()
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/logic/cell_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ fn run(
},
Err(some_err) => {
// TODO: print error properly
println!("[source] err retriecing cell info: {:#?}", some_err);
println!("[source] err retrieving cell info: {:#?}", some_err);
},
}

Expand Down
119 changes: 50 additions & 69 deletions src/logic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,51 @@ pub const BUS_SIZE_DCI: usize = 10000;
pub const BUS_SIZE_CELL_INFO: usize = 100;
pub const BUS_SIZE_RNTI: usize = 100;

pub trait WorkerState: Sized + Clone + Sync {
fn recv_general_state(rx_state: &Receiver<Self>) -> Result<GeneralState>;
fn try_recv_general_state(rx_state: &Receiver<Self>) -> Result<Option<GeneralState>>;
pub trait WorkerState: Sized + Clone + Sync + Debug {
fn to_general_state(&self) -> GeneralState;
fn worker_name() -> String;
}

pub trait WorkerChannel<T: WorkerState> {
fn worker_try_recv(&self) -> Result<Option<T>, TryRecvError>;
fn worker_print_on_recv(&self) -> Result<Option<T>, TryRecvError>;
fn worker_try_recv_general_state(&self) -> Result<Option<GeneralState>>;
fn worker_recv_general_state(&self) -> Result<GeneralState>;
}

impl<T: WorkerState> WorkerChannel<T> for Receiver<T> {
fn worker_try_recv(&self) -> Result<Option<T>, TryRecvError> {
match self.try_recv() {
Ok(msg) => Ok(Some(msg)),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(TryRecvError::Disconnected),
}
}

fn worker_print_on_recv(&self) -> Result<Option<T>, TryRecvError> {
match self.worker_try_recv() {
Ok(Some(msg)) => {
println!("[main] message from {}: {:#?}", T::worker_name(), msg);
Ok(Some(msg))
}
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}

fn worker_recv_general_state(&self) -> Result<GeneralState> {
Ok(self.recv()?.to_general_state())
}

fn worker_try_recv_general_state(&self) -> Result<Option<GeneralState>> {
match self.try_recv() {
Ok(msg) => Ok(Some(msg.to_general_state())),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(TryRecvError::Disconnected.into()),
}
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum GeneralState {
Running,
Expand All @@ -43,30 +81,19 @@ pub enum MainState {
Running,
Stopped,
NotifyStop,
UeConnectionReset, /* NgScope has been restarted */
}

impl WorkerState for MainState {
fn worker_name() -> String {
"main".to_owned()
}

fn recv_general_state(rx_state: &Receiver<Self>) -> Result<GeneralState> {
Ok(rx_state.recv()?.to_general_state())
}

fn try_recv_general_state(rx_state: &Receiver<Self>) -> Result<Option<GeneralState>> {
match rx_state.try_recv() {
Ok(msg) => Ok(Some(msg.to_general_state())),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(TryRecvError::Disconnected.into()),
}
}

fn to_general_state(&self) -> GeneralState {
match self {
MainState::Running => GeneralState::Running,
MainState::Stopped => GeneralState::Stopped,
MainState::NotifyStop => GeneralState::Unknown,
_ => GeneralState::Unknown,
}
}
}
Expand All @@ -84,23 +111,11 @@ impl WorkerState for SinkState {
"sink".to_owned()
}

fn recv_general_state(rx_state: &Receiver<SinkState>) -> Result<GeneralState> {
Ok(rx_state.recv()?.to_general_state())
}

fn try_recv_general_state(rx_state: &Receiver<SinkState>) -> Result<Option<GeneralState>> {
match rx_state.try_recv() {
Ok(msg) => Ok(Some(msg.to_general_state())),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(TryRecvError::Disconnected.into()),
}
}

fn to_general_state(&self) -> GeneralState {
match self {
SinkState::Running => GeneralState::Running,
SinkState::Stopped => GeneralState::Stopped,
SinkState::SpecialState => GeneralState::Unknown,
_ => GeneralState::Unknown,
}
}
}
Expand All @@ -117,23 +132,11 @@ impl WorkerState for SourceState {
"source".to_owned()
}

fn recv_general_state(rx_state: &Receiver<SourceState>) -> Result<GeneralState> {
Ok(rx_state.recv()?.to_general_state())
}

fn try_recv_general_state(rx_state: &Receiver<SourceState>) -> Result<Option<GeneralState>> {
match rx_state.try_recv() {
Ok(msg) => Ok(Some(msg.to_general_state())),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(TryRecvError::Disconnected.into()),
}
}

fn to_general_state(&self) -> GeneralState {
match self {
SourceState::Running => GeneralState::Running,
SourceState::Stopped => GeneralState::Stopped,
SourceState::SpecialState => GeneralState::Unknown,
_ => GeneralState::Unknown,
}
}
}
Expand All @@ -156,18 +159,6 @@ impl WorkerState for RntiMatcherState {
"rntimatcher".to_owned()
}

fn recv_general_state(rx_state: &Receiver<Self>) -> Result<GeneralState> {
Ok(rx_state.recv()?.to_general_state())
}

fn try_recv_general_state(rx_state: &Receiver<Self>) -> Result<Option<GeneralState>> {
match rx_state.try_recv() {
Ok(msg) => Ok(Some(msg.to_general_state())),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(TryRecvError::Disconnected.into()),
}
}

fn to_general_state(&self) -> GeneralState {
match self {
RntiMatcherState::Running => GeneralState::Running,
Expand All @@ -185,6 +176,8 @@ pub enum NgControlState {
StartNgScope(Box<NgScopeConfig>),
StopNgScope,
TriggerListenDci,
WaitForTriggerResponse,
SuccessfulTriggerResponse,
SleepMs(u64, Box<NgControlState>),
StoppingDciFetcherThread,
RestartingNgScopeProcess,
Expand All @@ -196,18 +189,6 @@ impl WorkerState for NgControlState {
"ngcontrol".to_owned()
}

fn recv_general_state(rx_state: &Receiver<Self>) -> Result<GeneralState> {
Ok(rx_state.recv()?.to_general_state())
}

fn try_recv_general_state(rx_state: &Receiver<Self>) -> Result<Option<GeneralState>> {
match rx_state.try_recv() {
Ok(msg) => Ok(Some(msg.to_general_state())),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(TryRecvError::Disconnected.into()),
}
}

fn to_general_state(&self) -> GeneralState {
match self {
NgControlState::Running => GeneralState::Running,
Expand Down Expand Up @@ -250,15 +231,15 @@ pub enum AppError {

pub fn check_not_stopped<T: WorkerState>(
rx_state: &mut BusReader<T>,
) -> Result<()> {
) -> Result<Option<T>> {
match rx_state.try_recv() {
Ok(msg) => {
match msg.to_general_state() {
GeneralState::Stopped => Err(anyhow!("BusReader received GeneralState::Stopped!")),
_ => Ok(()),
_ => Ok(Some(msg)),
}
}
Err(TryRecvError::Empty) => Ok(()),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(anyhow!("BusReader disconnected!")),
}
}
Expand Down
57 changes: 44 additions & 13 deletions src/logic/ngscope_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ use crate::parse::{Arguments, FlattenedNgScopeArgs};
use crate::util::determine_process_id;


const WAIT_FOR_TRIGGER_NGSCOPE_RESPONE_MS: u64 = 500;

#[derive(Clone, Copy, Debug, PartialEq)]
enum LocalDciState {
Stop,
SendInitial,
WaitForServerAuth(u8),
SuccessfulAuth,
ListenForDci,
}

Expand Down Expand Up @@ -103,9 +106,11 @@ fn run(
..Default::default()
};

let (tx_dci_thread, rx_dci_thread) = sync_channel::<LocalDciState>(CHANNEL_SYNC_SIZE);
let (tx_dci_thread, rx_main_thread) = sync_channel::<LocalDciState>(CHANNEL_SYNC_SIZE);
let (tx_main_thread, rx_dci_thread) = sync_channel::<LocalDciState>(CHANNEL_SYNC_SIZE);
let dci_thread = deploy_dci_fetcher_thread(
rx_dci_thread,
tx_main_thread,
rx_main_thread,
tx_dci,
ng_args.ng_local_addr.to_string(),
ng_args.ng_server_addr.to_string(),
Expand All @@ -128,7 +133,26 @@ fn run(
},
NgControlState::TriggerListenDci => {
tx_dci_thread.send(LocalDciState::SendInitial)?;
ngcontrol_state = NgControlState::CheckingCellInfo;
ngcontrol_state = NgControlState::WaitForTriggerResponse;
},
NgControlState::WaitForTriggerResponse => {
ngcontrol_state = match rx_dci_thread.try_recv() {
Ok(LocalDciState::SuccessfulAuth) => {
NgControlState::SuccessfulTriggerResponse
}
Ok(_) | Err(TryRecvError::Empty) => NgControlState::SleepMs(
WAIT_FOR_TRIGGER_NGSCOPE_RESPONE_MS,
Box::new(NgControlState::WaitForTriggerResponse)),
Err(TryRecvError::Disconnected) => {
// TODO: Print error properly
println!("[ngcontrol]: dci_fetcher thread disconnected unexpectedly while waiting for trigger response");
break;
},
}
},
NgControlState::SuccessfulTriggerResponse => {
tx_ngcontrol_state.send(NgControlState::SuccessfulTriggerResponse)?;
ngcontrol_state = NgControlState::CheckingCellInfo
},
NgControlState::SleepMs(time_ms, next_state) => {
thread::sleep(Duration::from_millis(time_ms));
Expand All @@ -146,12 +170,12 @@ fn run(
}
}

tx_ngcontrol_state.send(NgControlState::StoppingDciFetcherThread)?;
tx_dci_thread.send(LocalDciState::Stop)?;
let _ = tx_ngcontrol_state.send(NgControlState::StoppingDciFetcherThread);
let _ = tx_dci_thread.send(LocalDciState::Stop);
let _ = dci_thread.join();
if ng_process_option.is_some() {
tx_ngcontrol_state.send(NgControlState::StoppingNgScopeProcess)?;
stop_ngscope(&mut ng_process_option.unwrap())?;
if let Some(ref mut process) = ng_process_option {
let _ = tx_ngcontrol_state.send(NgControlState::StoppingNgScopeProcess);
let _ = stop_ngscope(process);
}
send_final_state(&tx_ngcontrol_state)?;
Ok(())
Expand Down Expand Up @@ -201,14 +225,16 @@ fn handle_cell_update(


fn deploy_dci_fetcher_thread(
rx_local_dci_state: Receiver<LocalDciState>,
tx_main_thread: SyncSender<LocalDciState>,
rx_main_thread: Receiver<LocalDciState>,
tx_dci: Bus<MessageDci>,
local_socket_addr: String,
ng_server_addr: String,
) -> Result<JoinHandle<()>> {
let thread = thread::spawn(move || {
let _ = run_dci_fetcher(
rx_local_dci_state,
tx_main_thread,
rx_main_thread,
tx_dci,
local_socket_addr,
ng_server_addr,
Expand All @@ -218,7 +244,8 @@ fn deploy_dci_fetcher_thread(
}

fn run_dci_fetcher(
rx_local_dci_state: Receiver<LocalDciState>,
tx_main_thread: SyncSender<LocalDciState>,
rx_main_thread: Receiver<LocalDciState>,
mut tx_dci: Bus<MessageDci>,
local_socket_addr: String,
ng_server_addr: String,
Expand All @@ -229,7 +256,7 @@ fn run_dci_fetcher(

loop {
thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS));
if let Some(new_state) = check_rx_state(&rx_local_dci_state)? {
if let Some(new_state) = check_rx_state(&rx_main_thread)? {
dci_state = new_state;
}

Expand All @@ -241,11 +268,15 @@ fn run_dci_fetcher(
Err(_) => LocalDciState::SendInitial,
};
},
LocalDciState::SuccessfulAuth => {
tx_main_thread.send(LocalDciState::SuccessfulAuth)?;
dci_state = LocalDciState::ListenForDci;
}
LocalDciState::WaitForServerAuth(successful_auths) => {
dci_state = match ngscope_validate_server_check(&socket)? {
Some(_) => {
if successful_auths >= 1 {
LocalDciState::ListenForDci
LocalDciState::SuccessfulAuth
} else {
LocalDciState::WaitForServerAuth(successful_auths + 1)
}
Expand Down
Loading

0 comments on commit fededd9

Please sign in to comment.