Skip to content

Commit

Permalink
Implement Traffic pattern sending
Browse files Browse the repository at this point in the history
* Improve NgScopeController error handling by introducing RunArgs
* TODO: Implement RunArgs in other threads too
  • Loading branch information
bastian-src committed May 23, 2024
1 parent 04e813c commit 57985eb
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 90 deletions.
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,31 @@ smartphone IP. It shows the dns routes per interface:
sudo resolvectl dns
```

In case your device gets only an IPv6 address, you can request an IPv6 by
configuring the network interface further. Therefore, add the following
udev rule to apply a static naming schema to your USB interface:

``` /etc/udev/rules.d/99-usb-tethering.rules
SUBSYSTEM=="net", ACTION=="add", DRIVERS=="rndis_host", ATTR{dev_id}=="0x0", ATTR{type}=="1", NAME="usb_tether"
```

```
sudo udevadm control --reload-rules
sudo udevadm trigger
```

Now, your device should show up as "usb_tether". So, you can configure
the interface to request an IPv4 DHCP:

``` /etc/netplan/01-usb-tethering.yaml
etwork:
version: 2
ethernets:
usb_tether:
dhcp4: true
dhcp6: true
```

```
sudo netplan apply
```
1 change: 1 addition & 0 deletions src/logic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod cell_sink;
pub mod cell_source;
pub mod ngscope_controller;
pub mod rnti_matcher;
pub mod traffic_patterns;

pub const NUM_OF_WORKERS: usize = 4;
pub const DEFAULT_WORKER_SLEEP_MS: u64 = 2;
Expand Down
82 changes: 56 additions & 26 deletions src/logic/ngscope_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use std::net::UdpSocket;
use std::path::Path;
use std::process::{Child, Stdio};
use std::sync::mpsc::{SyncSender, TryRecvError, Receiver, sync_channel};
use std::thread;
use std::thread::JoinHandle;
use std::thread::{self, JoinHandle};
use std::time::Duration;

use crate::cell_info::CellInfo;
Expand Down Expand Up @@ -45,14 +44,21 @@ pub struct NgControlArgs {
}

pub fn deploy_ngscope_controller(args: NgControlArgs) -> Result<JoinHandle<()>> {
let mut run_args: RunArgs = RunArgs {
rx_app_state: args.rx_app_state,
tx_ngcontrol_state: args.tx_ngcontrol_state,
app_args: args.app_args,
rx_cell_info: args.rx_cell_info,
tx_dci_thread_handle: None,
dci_thread_handle: None,
ng_process_handle: None,
};
let run_args_mov: RunArgsMovables = RunArgsMovables {
tx_dci: args.tx_dci,
};
let thread = thread::spawn(move || {
let _ = run(
args.rx_app_state,
args.tx_ngcontrol_state,
args.app_args,
args.rx_cell_info,
args.tx_dci,
);
let _ = run(&mut run_args, run_args_mov);
finish(run_args);
});
Ok(thread)
}
Expand Down Expand Up @@ -89,46 +95,61 @@ fn check_cell_update(
}
}

fn run(
mut rx_app_state: BusReader<MainState>,
struct RunArgs {
rx_app_state: BusReader<MainState>,
tx_ngcontrol_state: SyncSender<NgControlState>,
app_args: Arguments,
mut rx_cell_info: BusReader<MessageCellInfo>,
rx_cell_info: BusReader<MessageCellInfo>,
tx_dci_thread_handle: Option<SyncSender<LocalDciState>>,
dci_thread_handle: Option<JoinHandle<()>>,
ng_process_handle: Option<Child>,
}

struct RunArgsMovables {
tx_dci: Bus<MessageDci>,
) -> Result<()> {
}

fn run(run_args: &mut RunArgs, run_args_mov: RunArgsMovables) -> Result<()> {
let tx_ngcontrol_state = &mut run_args.tx_ngcontrol_state;
let rx_app_state = &mut run_args.rx_app_state;
let app_args = &run_args.app_args;
let rx_cell_info = &mut run_args.rx_cell_info;
let tx_dci = run_args_mov.tx_dci;

tx_ngcontrol_state.send(NgControlState::Running)?;
wait_for_running(&mut rx_app_state, &tx_ngcontrol_state)?;
wait_for_running(rx_app_state, tx_ngcontrol_state)?;
println!("[ngcontrol]: \t\tPID {:?}", determine_process_id());

let ng_args = FlattenedNgScopeArgs::from_unflattened(app_args.ngscope.unwrap())?;
let ng_args = FlattenedNgScopeArgs::from_unflattened(app_args.clone().ngscope.unwrap())?;
let mut ng_process_option: Option<Child> = None;
let ngscope_config = NgScopeConfig {
..Default::default()
};

let (tx_dci_thread, rx_main_thread) = sync_channel::<LocalDciState>(CHANNEL_SYNC_SIZE);
let (tx_main_thread, rx_dci_thread) = sync_channel::<LocalDciState>(CHANNEL_SYNC_SIZE);
let dci_thread = deploy_dci_fetcher_thread(
run_args.dci_thread_handle = Some(deploy_dci_fetcher_thread(
tx_main_thread,
rx_main_thread,
tx_dci,
ng_args.ng_local_addr.to_string(),
ng_args.ng_server_addr.to_string(),
)?;
)?);
run_args.tx_dci_thread_handle = Some(tx_dci_thread.clone());

let mut ngcontrol_state: NgControlState = NgControlState::CheckingCellInfo;

loop {
/* <precheck> */
thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS));
if check_not_stopped(&mut rx_app_state).is_err() {
if check_not_stopped(rx_app_state).is_err() {
break;
}
/* </precheck> */

match ngcontrol_state {
NgControlState::CheckingCellInfo => {
ngcontrol_state = handle_cell_update(&mut rx_cell_info,
ngcontrol_state = handle_cell_update(rx_cell_info,
&ngscope_config)?;
},
NgControlState::TriggerListenDci => {
Expand Down Expand Up @@ -170,15 +191,24 @@ fn run(
}
}

let _ = tx_ngcontrol_state.send(NgControlState::StoppingDciFetcherThread);
let _ = tx_dci_thread.send(LocalDciState::Stop);
let _ = dci_thread.join();
if let Some(ref mut process) = ng_process_option {
let _ = tx_ngcontrol_state.send(NgControlState::StoppingNgScopeProcess);
Ok(())
}

fn finish(mut run_args: RunArgs) {
let _ = run_args.tx_ngcontrol_state.send(NgControlState::StoppingDciFetcherThread);

if let Some(tx_dci_thread) = run_args.tx_dci_thread_handle {
let _ = tx_dci_thread.send(LocalDciState::Stop);
}

if let Some(dci_thread) = run_args.dci_thread_handle {
let _ = dci_thread.join();
}
if let Some(ref mut process) = run_args.ng_process_handle {
let _ = run_args.tx_ngcontrol_state.send(NgControlState::StoppingNgScopeProcess);
let _ = stop_ngscope(process);
}
send_final_state(&tx_ngcontrol_state)?;
Ok(())
let _ = send_final_state(&run_args.tx_ngcontrol_state);
}

fn handle_start_ngscope(
Expand Down
120 changes: 58 additions & 62 deletions src/logic/rnti_matcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use std::net::UdpSocket;
use std::sync::mpsc::{SyncSender, TryRecvError, Receiver, sync_channel};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use clap::ValueEnum;
use serde::{Serialize, Deserialize};
use bus::{Bus, BusReader};

use crate::logic::{
Expand All @@ -16,28 +14,16 @@ use crate::logic::{
};
use crate::parse::{Arguments, FlattenedRntiMatchingArgs};
use crate::util::determine_process_id;
use crate::logic::traffic_patterns::TrafficPattern;


#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug, Serialize, Deserialize)]
pub enum RntiMatchingTrafficPatternType {
A,
B,
C,
}

#[derive(Clone, Debug, PartialEq)]
enum LocalGeneratorState {
Stop,
SendPattern(String, Vec<TrafficPattern>),
SendPattern(String, Box<TrafficPattern>),
Idle,
}

#[derive(Clone, Debug, PartialEq)]
struct TrafficPattern {
pub time_ms: u16,
pub payload: Vec<u8>,
}

pub struct RntiMatcherArgs {
pub rx_app_state: BusReader<MainState>,
pub tx_rntimatcher_state: SyncSender<RntiMatcherState>,
Expand All @@ -46,36 +32,6 @@ pub struct RntiMatcherArgs {
pub tx_rnti: Bus<MessageRnti>,
}

pub fn deploy_rnti_matcher(args: RntiMatcherArgs) -> Result<JoinHandle<()>> {
let thread = thread::spawn(move || {
let _ = run(
args.rx_app_state,
args.tx_rntimatcher_state,
args.app_args,
args.rx_dci,
args.tx_rnti,
);
});
Ok(thread)
}

fn send_final_state(tx_rntimatcher_state: &SyncSender<RntiMatcherState>) -> Result<()> {
Ok(tx_rntimatcher_state.send(RntiMatcherState::Stopped)?)
}

fn wait_for_running(
rx_app_state: &mut BusReader<MainState>,
tx_rntimtacher_state: &SyncSender<RntiMatcherState>,
) -> Result<()> {
match wait_until_running(rx_app_state) {
Ok(_) => Ok(()),
_ => {
send_final_state(tx_rntimtacher_state)?;
Err(anyhow!("[sink] Main did not send 'Running' message"))
}
}
}

fn run(
mut rx_app_state: BusReader<MainState>,
tx_rntimatcher_state: SyncSender<RntiMatcherState>,
Expand All @@ -87,14 +43,16 @@ fn run(
wait_for_running(&mut rx_app_state, &tx_rntimatcher_state)?;
println!("[rntimatcher]: \t\tPID {:?}", determine_process_id());

let matcher_args = FlattenedRntiMatchingArgs::from_unflattened(app_args.rntimatching.unwrap())?;
let matching_args = FlattenedRntiMatchingArgs::from_unflattened(app_args.rntimatching.unwrap())?;

let (_tx_gen_thread, rx_gen_thread) = sync_channel::<LocalGeneratorState>(CHANNEL_SYNC_SIZE);
let _gen_thread = deploy_traffic_generator_thread(
let (tx_gen_thread, rx_gen_thread) = sync_channel::<LocalGeneratorState>(CHANNEL_SYNC_SIZE);
let gen_thread = deploy_traffic_generator_thread(
rx_gen_thread,
matcher_args.matching_local_addr,
);
matching_args.matching_local_addr,
)?;

let traffic_destination = matching_args.matching_traffic_destination;
let traffic_pattern = matching_args.matching_traffic_pattern.generate_pattern();
let mut matcher_state: RntiMatcherState = RntiMatcherState::Idle;

loop {
Expand All @@ -120,9 +78,12 @@ fn run(
match matcher_state {
RntiMatcherState::Idle => {},
RntiMatcherState::StartMatching => {
todo!()
println!("DEBUG [rntimatcher] StartMatching");
let _ = tx_gen_thread.send(LocalGeneratorState::SendPattern(
traffic_destination.clone(),
Box::new(traffic_pattern.clone())));
matcher_state = RntiMatcherState::MatchingCollectDci;
// TODO: Reset current matching -> this might get triggered by main state
// TODO: Generate upstream pattern? -> maybe in another thread
},
RntiMatcherState::MatchingCollectDci => {
todo!()
Expand All @@ -145,6 +106,9 @@ fn run(

}

let _ = tx_rntimatcher_state.send(RntiMatcherState::StoppingTrafficGeneratorThread);
let _ = tx_gen_thread.send(LocalGeneratorState::Stop);
let _ = gen_thread.join();
send_final_state(&tx_rntimatcher_state)?;
Ok(())
}
Expand All @@ -166,7 +130,7 @@ fn run_traffic_generator(
rx_local_gen_state: Receiver<LocalGeneratorState>,
local_socket_addr: String,
) -> Result<()> {
let _socket = init_udp_socket(&local_socket_addr)?;
let socket = init_udp_socket(&local_socket_addr)?;
let mut gen_state: LocalGeneratorState = LocalGeneratorState::Idle;
println!("[rntimatcher.gen]: \tPID {:?}", determine_process_id());

Expand All @@ -181,7 +145,9 @@ fn run_traffic_generator(
thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS));
},
LocalGeneratorState::Stop => { break; },
LocalGeneratorState::SendPattern(_, _) => todo!(),
LocalGeneratorState::SendPattern(destination, pattern) => {
gen_state = gen_handle_send_pattern(&socket, &destination, *pattern.clone());
},
}
}
Ok(())
Expand All @@ -203,13 +169,43 @@ fn check_rx_state(
}
}

// TODO Define patterns here
impl RntiMatchingTrafficPatternType {
pub fn derive_pattern(self) {
match self {
RntiMatchingTrafficPatternType::A => todo!(),
RntiMatchingTrafficPatternType::B => todo!(),
RntiMatchingTrafficPatternType::C => todo!(),
fn gen_handle_send_pattern(socket: &UdpSocket, destination: &str, mut pattern: TrafficPattern) -> LocalGeneratorState {
while let Some(msg) = pattern.messages.pop_front() {
thread::sleep(Duration::from_millis(msg.time_ms as u64));
let _ = socket.send_to(&msg.payload, destination);
}
LocalGeneratorState::Idle
}


pub fn deploy_rnti_matcher(args: RntiMatcherArgs) -> Result<JoinHandle<()>> {
let thread = thread::spawn(move || {
let _ = run(
args.rx_app_state,
args.tx_rntimatcher_state,
args.app_args,
args.rx_dci,
args.tx_rnti,
);
});
Ok(thread)
}

fn send_final_state(tx_rntimatcher_state: &SyncSender<RntiMatcherState>) -> Result<()> {
Ok(tx_rntimatcher_state.send(RntiMatcherState::Stopped)?)
}

fn wait_for_running(
rx_app_state: &mut BusReader<MainState>,
tx_rntimtacher_state: &SyncSender<RntiMatcherState>,
) -> Result<()> {
match wait_until_running(rx_app_state) {
Ok(_) => Ok(()),
_ => {
send_final_state(tx_rntimtacher_state)?;
Err(anyhow!("[sink] Main did not send 'Running' message"))
}
}
}


Loading

0 comments on commit 57985eb

Please sign in to comment.