diff --git a/Cargo.toml b/Cargo.toml index 2f6b947..81e72a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,8 @@ edition = "2021" [dependencies] anyhow = "1.0.80" bus = "2.4.1" +casual_logger = "0.6.5" +chrono = "0.4.38" clap = { version = "4.5.4", features = ["derive", "color", "env", "help"] } clap-serde = "0.5.1" confy = { version = "0.6.1", default-features = false, features = ["yaml_conf"] } @@ -15,12 +17,16 @@ ctrlc = "3.4.4" derive = "1.0.0" features = "0.10.0" kanal = "0.1.0-pre8" +lazy_static = "1.4.0" +libc = "0.2.155" +nalgebra = "0.32.6" +nix = "0.28.0" regex = "1.10.4" reqwest = { version = "0.12.3", features = ["json"] } serde = { version = "1.0.197", features = ["derive"] } serde_derive = "1.0.198" serde_json = "1.0.116" -serde_libconfig = "0.0.1" +serde_libconfig = "0.0.2" serde_yaml = "0.9.34" strum_macros = "0.26.2" tokio = { version = "1.37.0", features = ["full", "sync"] } diff --git a/README.md b/README.md index 6f13883..5dec668 100644 --- a/README.md +++ b/README.md @@ -21,3 +21,44 @@ smartphone IP. It shows the dns routes per interface: sudo resolvectl dns ``` +In case your device gets only an IPv6 address, you can request an IPv6 by +configuring the network interface further. Therefore, add the following +udev rule to apply a static naming schema to your USB interface: + +``` /etc/udev/rules.d/99-usb-tethering.rules +SUBSYSTEM=="net", ACTION=="add", DRIVERS=="rndis_host", ATTR{dev_id}=="0x0", ATTR{type}=="1", NAME="usb_tether" +``` + +``` +sudo udevadm control --reload-rules +sudo udevadm trigger +``` + +Now, your device should show up as "usb_tether". So, you can configure +the interface to request an IPv4 DHCP: + +``` /etc/netplan/01-usb-tethering.yaml +etwork: + version: 2 + ethernets: + usb_tether: + dhcp4: true + dhcp6: true +``` + +``` +sudo netplan apply +``` + + +### Disable IPv6 + +According to [link](https://itsfoss.com/disable-ipv6-ubuntu-linux/). + +``` +sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1 +sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1 +sudo sysctl -w net.ipv6.conf.lo.disable_ipv6=1 +``` + +The configuration can be made persistently by editing/adding `/etc/sysctl.d/`. diff --git a/scripts/api_test_device_publisher.py b/scripts/api_test_device_publisher.py index acad0f5..7777d1f 100755 --- a/scripts/api_test_device_publisher.py +++ b/scripts/api_test_device_publisher.py @@ -6,12 +6,24 @@ # Predefined response cell_data = [ - {"id": 1, "type": "LTE", "earfcn": "Neuron A"}, + { + "id": 62, + "type": "LTE", + "arfcn": 3350, + "band": "7", + "rssi": 110.7, + "rsrp": 90.7, + "rsrq": 11.3, + }, ] @app.route('/api/v1/celldata') -def get_cell_data(): +def get_cell_data_all(): + return jsonify(cell_data) + +@app.route('/api/v1/celldata/connected/all') +def get_cell_data_connected_all(): return jsonify(cell_data) if __name__ == '__main__': - app.run(debug=True) + app.run(debug=True, port=7353) diff --git a/scripts/requirements.txt b/scripts/requirements.txt index ca54751..2dddffb 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -1,2 +1,8 @@ flask marshmallow +matplotlib +jsonlines +seaborn +json +linecache +pandas diff --git a/scripts/visualize_rnti_matching.py b/scripts/visualize_rnti_matching.py new file mode 100755 index 0000000..88e16bf --- /dev/null +++ b/scripts/visualize_rnti_matching.py @@ -0,0 +1,787 @@ +#!/usr/bin/env python3 + +import os +import matplotlib.pyplot as plt +from matplotlib.widgets import Button +from matplotlib.axes import Axes +from datetime import datetime +import pandas as pd +import seaborn as sns +import argparse +import numpy as np +import linecache +import json + +# Default Arguments +DEFAULT_DATASET_PATH = './.logs/rnti_matching_pattern_A.jsonl' +DEFAULT_EXPORT_PATH = '.export' +DEFAULT_RNTI = None +DEFAULT_PLOT_FILTERED = True +DEFAULT_EXPORT_RECORDING_INDEX = 17 +DEFAULT_RESET_TIMESTAMPS = True +DEFAULT_FILTER_KEEP_REST = False +DEFAULT_EXPORT_PATH = './.export/' + +# RNTI Filterting +DCI_THRESHOLD = 0 +EMPTY_DCI_RATIO_THRESHOLD = 0.99 +SAMPLES_THRESHOLD = 5 + +MAX_TOTAL_UL_FACTOR = 100.0 +MIN_TOTAL_UL_FACTOR = 0.005 # x% of the expected UL traffic +MAX_UL_PER_DCI_THRESHOLD = 5_000_000 +MIN_OCCURENCES_FACTOR = 0.005 + +# Plotting +PLOT_SCATTER_MARKER_SIZE = 10 +MARKERS = ['o', 's', '^', 'd', 'p', 'h', '*', '>', 'v', '<', 'x'] + +sns.set(style="darkgrid") + +########## +# Helpers +########## + +def print_debug(msg: str): + print(msg) + +def print_info(msg: str): + print(msg) + + +def count_lines(file_path: str) -> int: + with open(file_path, 'r') as file: + return sum(1 for _ in file) + + +def read_single_dataset(file_path: str, line_number: int) -> dict: + try: + return json.loads(linecache.getline(file_path, line_number).strip()) + except Exception as e: + raise Exception(f"An error occured reading dataset at {file_path}:{line_number}\n{e}") + + +def merge_ue_traffic(dict_a, dict_b): + for key, value in dict_b.items(): + if key in dict_a: + dict_a[key]['ul_bytes'] += value['ul_bytes'] + dict_a[key]['dl_bytes'] += value['dl_bytes'] + else: + dict_a[key] = {'ul_bytes': value['ul_bytes'], 'dl_bytes': value['dl_bytes']} + + return dict_a + + +def move_column_to_front(df: pd.DataFrame, specific_column: str) -> pd.DataFrame: + """ + Move a specific column to the front of a Pandas DataFrame. + + Parameters: + - df (pd.DataFrame): The input DataFrame. + - specific_column (str): The column name to move to the front. + + Returns: + - pd.DataFrame: DataFrame with the specified column moved to the front. + """ + cols = df.columns.tolist() + cols.insert(0, cols.pop(cols.index(specific_column))) + cols_df = df[cols] + + if isinstance(cols_df, pd.DataFrame): + return cols_df + + raise Exception("Should not happen, move_column_to_front tried to return something else than a pd.DataFrame") + + +def save_plot(file_path): + plt.tight_layout() + plt.savefig(file_path) + print_debug(f"Saved file: {file_path}") + + +def recording_to_pandas(recording) -> pd.DataFrame: + """ + Converts a recording of uplink (UL) bytes data from multiple RNTIs into a Pandas DataFrame. + + Args: + recording (dict): A dictionary containing the recording data. The structure of the dictionary is: + Example: + { 'recordings': [ + ( { 1612345600000000: {'ul_bytes': 100, 'dl_bytes': 200}, }, 'RNTI_A'), + ... + ] + } + + Returns: + pd.DataFrame: A DataFrame with timestamps as the index and RNTIs as columns. Each cell contains + the 'ul_bytes' value for the corresponding timestamp and RNTI. If an RNTI did not + send anything at a specific timestamp, the cell will contain NaN. + + Note: + - The function assumes that the 'recordings' key is always present in the recording dictionary. + - Timestamps are assumed to be in epoch time format in microseconds. + - Actual timestamps are masqueraded + """ + flatten_data = {} + for ue_traffic, rnti in recording['recordings']: + for timestamp, values in ue_traffic.items(): + converted_timestamp = np.datetime64(int(timestamp), 'us') + if converted_timestamp not in flatten_data: + flatten_data[converted_timestamp] = {} + flatten_data[converted_timestamp][rnti] = values['ul_bytes'] + + df = pd.DataFrame.from_dict(flatten_data, orient='index') + df.sort_index(inplace=True) + + df.index = pd.to_datetime(df.index) + # Reset the index to start from 0:00 + df.index = (df.index - df.index[0]).astype('timedelta64[us]') + + return df + +########## +# Diashow +########## + +def diashow(settings): + data = [None for _ in range(count_lines(settings.path))] + print(f"Number of datasets in the file: {len(data)}") + + _, ax = plt.subplots() + tracker = IndexTracker(ax, data, settings) + + axprev = plt.axes([0.7, 0.01, 0.1, 0.075]) + axnext = plt.axes([0.81, 0.01, 0.1, 0.075]) + bnext = Button(axnext, 'Next') + bnext.on_clicked(tracker.next) + bprev = Button(axprev, 'Previous') + bprev.on_clicked(tracker.prev) + + plt.show() + + +class FilteredRecording: + def __init__(self): + self.filtered_df: pd.DataFrame = pd.DataFrame() + self.nof_empty_dci: int = 0 + self.nof_total_dci: int = 0 + self.expected_ul_bytes: int = 0 + self.expected_nof_packets: int = 0 + self.skipped_max_total_ul: int = 0 + self.skipped_max_per_dci_ul: int = 0 + self.skipped_min_exp_ul: int = 0 + self.skipped_min_count: int = 0 + self.skipped_median_zero: int = 0 + self.skipped_not_target_rnti: int = 0 + + +def filter_dataset(settings, raw_dataset) -> FilteredRecording: + if settings.filter_keep_rest: + return filter_dataset_slow(settings, raw_dataset) + else: + return filter_dataset_fast(settings, raw_dataset) + + +def filter_dataset_fast(settings, raw_dataset) -> FilteredRecording: + cell_traffic = raw_dataset['cell_traffic'] + if len(cell_traffic) > 1: + print(f"!!! More than one cell: {len(cell_traffic)}. Using only the first one.") + + _, cell_data = next(iter((cell_traffic.items()))) + + result: FilteredRecording = FilteredRecording() + result.nof_empty_dci = cell_data['nof_empty_dci'] + result.nof_total_dci = cell_data['nof_total_dci'] + result.expected_ul_bytes = raw_dataset['traffic_pattern_features']['total_ul_bytes'] + result.expected_nof_packets = raw_dataset['traffic_pattern_features']['nof_packets'] + + min_total_ul = result.expected_ul_bytes * MIN_TOTAL_UL_FACTOR + max_total_ul = result.expected_ul_bytes * MAX_TOTAL_UL_FACTOR + min_count = result.expected_nof_packets * MIN_OCCURENCES_FACTOR + + # Start checks + if result.nof_total_dci <= DCI_THRESHOLD: + raise Exception("nof_total_dci <= DCI_THRESHOLD") + if result.nof_empty_dci > EMPTY_DCI_RATIO_THRESHOLD * result.nof_total_dci: + raise Exception("nof_empty_dci > EMPTY_DCI_RATIO_THRESHOLD * nof_total_dci") + + for _, cell_data in cell_traffic.items(): + flattened: dict = {} + for rnti, ue_data in cell_data['traffic'].items(): + if hasattr(settings, 'rnti'): + if settings.rnti is not None and rnti != settings.target_rnti: + result.skipped_not_target_rnti += 1 + continue + + if ue_data['total_ul_bytes'] > max_total_ul: + result.skipped_max_total_ul += 1 + # print(f"RNTI_UL_THRESHOLD: {ue_data['total_ul_bytes']}") + continue + + if ue_data['total_ul_bytes'] < min_total_ul: + result.skipped_min_exp_ul += 1 + # print(f"MIN_EXPECTED_UL_THRESHOLD: {ue_data['total_ul_bytes']}") + continue + + if len(ue_data['traffic']) < min_count: + result.skipped_min_count += 1 + # print(f"MIN_OCCURENCES_THRESHOLD: {len(ue_data['traffic'])}") + continue + + skip_rnti = False + for _, tx_data in ue_data['traffic'].items(): + if tx_data['ul_bytes'] > MAX_UL_PER_DCI_THRESHOLD: + result.skipped_max_per_dci_ul += 1 + skip_rnti = True + break + if skip_rnti: + result.skipped_max_per_dci_ul += 1 + # print("MAX_UL_BYTES_PER_DCI") + continue + + if calculate_median(ue_data) <= 0: + result.skipped_median_zero += 1 + continue + + for timestamp, values in ue_data['traffic'].items(): + converted_timestamp = np.datetime64(int(timestamp), 'us') + if converted_timestamp not in flattened: + flattened[converted_timestamp] = {} + flattened[converted_timestamp][rnti] = values['ul_bytes'] + + all_df = pd.DataFrame.from_dict(flattened, orient='index') + all_df.sort_index(inplace=True) + + all_df.index = pd.to_datetime(all_df.index) + if settings.reset_timestamps: + # Reset the index to start from 0:00 + all_df.index = (all_df.index - all_df.index[0]).astype('timedelta64[us]') + + result.filtered_df = all_df + + + print("Skipped RNTIs during filtering:") + print(f" MAX TOTAL UL: \t {result.skipped_max_total_ul} \t({max_total_ul})") + print(f" MIN TOTAL UL: \t {result.skipped_min_exp_ul} \t({min_total_ul})") + print(f" MAX UL/DCI: \t {result.skipped_max_per_dci_ul}") + print(f" MIN OCCURE: \t {result.skipped_min_count} \t({min_count})") + print(f" MEDIAN UL: \t {result.skipped_median_zero}") + if hasattr(settings, 'rnti') and settings.rnti is not None: + print(f" TARGET RNTI: \t {result.skipped_not_target_rnti}") + + print(f"Nof valid RNTIs: {result.filtered_df.shape[1]}") + if result.filtered_df.shape[1] <= 0: + raise Exception("No RNTIs left after filtering!!") + + return result + + +def filter_dataset_slow(settings, raw_dataset) -> FilteredRecording: + cell_traffic = raw_dataset['cell_traffic'] + if len(cell_traffic) > 1: + print(f"!!! More than one cell: {len(cell_traffic)}. Using only the first one.") + + _, cell_data = next(iter((cell_traffic.items()))) + + result: FilteredRecording = FilteredRecording() + result.nof_empty_dci = cell_data['nof_empty_dci'] + result.nof_total_dci = cell_data['nof_total_dci'] + result.expected_ul_bytes = raw_dataset['expected_ul_bytes'] + result.expected_nof_packets = raw_dataset['expected_nof_packets'] + + for _, cell_data in cell_traffic.items(): + flattened: dict = {} + for rnti, ue_data in cell_data['traffic'].items(): + for timestamp, values in ue_data['traffic'].items(): + converted_timestamp = np.datetime64(int(timestamp), 'us') + if converted_timestamp not in flattened: + flattened[converted_timestamp] = {} + flattened[converted_timestamp][rnti] = values['ul_bytes'] + + all_df = pd.DataFrame.from_dict(flattened, orient='index') + all_df.sort_index(inplace=True) + + all_df.index = pd.to_datetime(all_df.index) + if settings.reset_timestamps: + # Reset the index to start from 0:00 + all_df.index = (all_df.index - all_df.index[0]).astype('timedelta64[us]') + + result.filtered_df = all_df + + print(f"Nof RNTIs: {result.filtered_df.shape[1]}") + if result.filtered_df.shape[1] <= 0: + raise Exception("No RNTIs left after filtering!!") + + return result + + +def convert_timestamp(timestamp): + dt = datetime.fromtimestamp(timestamp / 1_000_000.0) + return dt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + + +def calculate_median(ue_traffic): + numbers = [tx_data['ul_bytes'] for _, tx_data in ue_traffic['traffic'].items()] + sorted_list = sorted(numbers) + length = len(sorted_list) + + if length % 2 == 0: + # If the length is even, return the average of the two middle elements + mid1 = sorted_list[length // 2 - 1] + mid2 = sorted_list[length // 2] + median = (mid1 + mid2) / 2 + else: + # If the length is odd, return the middle element + median = sorted_list[length // 2] + + return median + + +def plot_data(ax, dataset, index, settings): + ax.clear() + rnti_count = {} + all_timestamps = [] + + if settings.plot_filtered and len(dataset[index]['removed_by_filter']) > 0: + ue_traffic = dataset[index]['removed_by_filter'] + + timestamps = np.array([np.datetime64(int(ts), 'us') for ts in sorted(list(ue_traffic.keys()))]) + ul_bytes = [ue_traffic[ts]['ul_bytes'] for ts in sorted(list(ue_traffic.keys()))] + + ax.scatter(timestamps, ul_bytes, color='grey', alpha=0.2, label='Filtered RNTIs') + + + for ue_traffic, rnti in dataset[index]['recordings']: + # Extract timestamps and UL bytes + timestamps = np.array([np.datetime64(int(ts), 'us') for ts in sorted(list(ue_traffic.keys()))]) + all_timestamps.extend(timestamps) + + ul_bytes = [ue_traffic[ts]['ul_bytes'] for ts in sorted(list(ue_traffic.keys()))] + + rnti_count[rnti] = len(timestamps) + # Plot UL bytes for the RNTI + ax.scatter(timestamps, ul_bytes, label=f'RNTI: {rnti}', s=PLOT_SCATTER_MARKER_SIZE) + + max_key = max(rnti_count, key=rnti_count.get) + max_value = rnti_count[max_key] + + # y-axis limit + # ax.set_ylim(0, 2000) + + nof_total_dci = dataset[index]['nof_total_dci'] + nof_empty_dci = dataset[index]['nof_empty_dci'] + + ax.set_xlabel('Timestamp') + ax.set_ylabel('UL Bytes') + ax.set_title(f'UL Traffic Pattern | Dataset: {index + 1}/{len(dataset)} | Empty DCI {nof_empty_dci}/{nof_total_dci} | Most occurring RNTI: [{max_key}]: {max_value}') + # ax.grid(True, which='both', axis='y', linestyle='--', linewidth=0.5, color='gray', alpha=0.7) + + # Rotate x-axis labels + for label in ax.get_xticklabels(): + label.set_rotation(45) + label.set_ha('right') + + ax.legend() + + plt.draw() + + +class IndexTracker: + def __init__(self, ax, data: list, settings): + self.ax = ax + self.data = data # : list of FilteredRecording + self.settings = settings + self.index = 0 + if self.check_data(self.index): + self.plot() + + def plot(self): + self.ax.clear() + df_kbps = self.data[self.index].filtered_df.resample('1s').median().mul(8 / 1_000) + plot_df(plot_pandas_line, df_kbps, axes=self.ax) + + def check_data(self, file_index) -> bool: + if isinstance(self.data[file_index], FilteredRecording): + return True + elif self.data[self.index] == None: + # Read dataset + try: + raw_data = read_single_dataset(self.settings.path, self.index + 1) + filtered_data = filter_dataset(self.settings, raw_data) + if filtered_data is not None: + self.data[file_index] = filtered_data + print(f"Successfully loaded dataset {self.settings.path}:{file_index}") + return True + except Exception as e: + print(f"Dataset not plottable: {e}") + + return False + + def next(self, _): + if count_lines(self.settings.path) != len(self.data): + self.data = [None for _ in range(count_lines(self.settings.path))] + self.index = (self.index + 1) % len(self.data) + if self.check_data(self.index): + self.plot() + + def prev(self, _): + if count_lines(self.settings.path) != len(self.data): + self.data = [None for _ in range(count_lines(self.settings.path))] + self.index = (self.index - 1) % len(self.data) + if self.check_data(self.index): + self.plot() + + +########## +# Standardize +########## + +def standardize(settings): + all_recordings = read_all_recordings(settings) + print(f"DEBUG len(all_data) {len(all_recordings)}") + ul_timeline_matrix = np.zeros((len(all_recordings), 3)) + dci_time_deltas_matrix = np.zeros((len(all_recordings), 3)) + count_vec = np.zeros((len(all_recordings), 1)) + total_ul_vec = np.zeros((len(all_recordings), 1)) + for (index, recordings) in enumerate(all_recordings): + (ul_timeline, dci_time_deltas, count, total_ul_bytes) = determine_highest_count_ul_timeline(recordings) + count_vec[index] = count + total_ul_vec[index] = total_ul_bytes + + ul_timeline_matrix[index, 0] = np.median(ul_timeline) + ul_timeline_matrix[index, 1] = np.mean(ul_timeline) + ul_timeline_matrix[index, 2] = np.var(ul_timeline) + + dci_time_deltas_matrix[index, 0] = np.median(dci_time_deltas) + dci_time_deltas_matrix[index, 1] = np.mean(dci_time_deltas) + dci_time_deltas_matrix[index, 2] = np.var(dci_time_deltas) + + std_count = (np.mean(count_vec), np.std(count_vec)) + std_total_ul = (np.mean(total_ul_vec), np.std(total_ul_vec)) + + std_ul_timeline_median = ( + np.mean(ul_timeline_matrix[:, 0]), + np.std(ul_timeline_matrix[:, 0]) + ) + + ul_timeline_mean = ( + np.mean(ul_timeline_matrix[:, 1]), + np.std(ul_timeline_matrix[:, 1]) + ) + + ul_timeline_variance = ( + np.mean(ul_timeline_matrix[:, 2]), + np.std(ul_timeline_matrix[:, 2]) + ) + + dci_time_deltas_median = ( + np.mean(dci_time_deltas_matrix[:, 0]), + np.std(dci_time_deltas_matrix[:, 0]) + ) + + dci_time_deltas_mean = ( + np.mean(dci_time_deltas_matrix[:, 1]), + np.std(dci_time_deltas_matrix[:, 1]) + ) + + dci_time_deltas_variance = ( + np.mean(dci_time_deltas_matrix[:, 2]), + np.std(dci_time_deltas_matrix[:, 2]) + ) + + # Print the values in Rust tuple format with three decimal places + print("vec![") + print(f" ({std_count[0]:.3f}, {std_count[1]:.3f}),") + print(f" ({std_total_ul[0]:.3f}, {std_total_ul[1]:.3f}),") + print(f" ({std_ul_timeline_median[0]:.3f}, {std_ul_timeline_median[1]:.3f}),") + print(f" ({ul_timeline_mean[0]:.3f}, {ul_timeline_mean[1]:.3f}),") + print(f" ({ul_timeline_variance[0]:.3f}, {ul_timeline_variance[1]:.3f}),") + print(f" ({dci_time_deltas_median[0]:.3f}, {dci_time_deltas_median[1]:.3f}),") + print(f" ({dci_time_deltas_mean[0]:.3f}, {dci_time_deltas_mean[1]:.3f}),") + print(f" ({dci_time_deltas_variance[0]:.3f}, {dci_time_deltas_variance[1]:.3f})") + print("],") + + +def read_all_recordings(settings): + all_runs = [] + for line_number in range(1, count_lines(settings.path)): + raw_data = read_single_dataset(settings.path, line_number) + try: + result = filter_dataset(settings, raw_data) + if result is not None: + all_runs.append(result.filtered_df) + except: + pass + + return all_runs + + +def determine_highest_count_ul_timeline(df): + rnti = "11223" + target_traffic: pd.DataFrame = pd.DataFrame() + + if not rnti in df.columns: + rnti = df.count().idxmax() + target_traffic = df[rnti].dropna() + + ul_timeline = target_traffic.values + count = target_traffic.count() + total_ul_bytes = target_traffic.sum() + index_numpy = target_traffic.index.to_numpy() + dci_time_deltas = (index_numpy[1:] - index_numpy[:-1]).astype(int) + + print_debug(f"DEBUG [determine_highest_count_ul_timeline] rnti: {rnti} | count: {count}") + + return (ul_timeline, dci_time_deltas, count, total_ul_bytes) + + +########## +# Exports +########## + +def export(settings): + dir_name, base_name = os.path.split(settings.path) + base_name_no_ext, _ = os.path.splitext(base_name) + export_base_path = os.path.join(settings.export_path, + dir_name.split('/', 1)[-1], + base_name_no_ext) + + raw_recording = read_single_dataset(settings.path, settings.recording_index) + filtered_data = filter_dataset(settings, raw_recording) + + df = move_column_to_front(filtered_data.filtered_df, '11985') + df_kbps = df.resample('1s').median().mul(8 / 1_000) + + # Replace the first directory and change the file extension + os.makedirs(os.path.dirname(export_base_path), exist_ok=True) + + print_info(f"Exporting files to: {export_base_path}*") + + ### + # Show filtered (resample) + ### + plot_df(plot_pandas_line, df_kbps) + xlim = plt.xlim() + ylim = plt.ylim() + save_plot(export_base_path + "_filtered.png") + save_plot(export_base_path + "_filtered.pdf") + save_plot(export_base_path + "_filtered.svg") + plt.close() + + ### + # Show Single one + ### + df_single_rnti = df_kbps[['11985']] + + plot_df(plot_pandas_line, df_single_rnti) + plt.xlim(xlim) + plt.ylim(ylim) + save_plot(export_base_path + "_single.png") + save_plot(export_base_path + "_single.pdf") + save_plot(export_base_path + "_single.svg") + plt.close() + + ### + # Show all data + ### + + print_info( " [ ] Reading the whole dataset without filtering..") + df_all: pd.DataFrame = filter_dataset_slow(settings, raw_recording).filtered_df + df_all = move_column_to_front(df_all, '11985') + df_all_kbps: pd.DataFrame = df_all.mul(8 / 1_000) + print_info( " [x] Reading the whole dataset without filtering..") + + print_info(f" [ ] Plotting unfiltered {df_all_kbps.shape[0]}x{df_all_kbps.shape[1]}..") + plot_df(plot_pandas_line, df_all_kbps, legend=False) + print_info(f" [x] Plotting unfiltered {df_all_kbps.shape[0]}x{df_all_kbps.shape[1]}..") + + save_plot(export_base_path + "_unfiltered.png") + save_plot(export_base_path + "_unfiltered.pdf") + save_plot(export_base_path + "_unfiltered.svg") + plt.show() + plt.close() + + + # plot_pandas_scatter(pandas_recording) + # plot_pandas_resample_scatter(pandas_recording) + # plot_pandas_hist(pandas_recording) + # plot_pandas_hist_log(pandas_recording, freedman_diaconis_bins) + # plot_pandas_hist_log(pandas_recording, sturges_bins) + # plot_pandas_hist_log(pandas_recording, scott_bins) + # plot_pandas_hist_log(pandas_recording, log_bins) + + # plot_raw(settings, recording) + # plot_basic_filtered(settings, filtered_recording) + + +def plot_df(func, df: pd.DataFrame, axes=None, legend=True): + ax: Axes = axes + + if ax is None: + _, ax = plt.subplots() + + func(ax, df) + + # ax.set_title('Scatter Plot of UL Bytes over Time') + # ax.tick_params(axis='x', rotation=45) + ax.set_xlabel('Timestamp (seconds)', fontsize=28) + ax.set_ylabel('UL Traffic (kbit/s)', fontsize=28) + ax.tick_params(axis='x', labelsize=24) + ax.tick_params(axis='y', labelsize=24) + + if legend: + ax.legend(fontsize=18) + + if ax is None: + plt.show() + else: + plt.draw() + + +def plot_pandas_scatter(ax, df: pd.DataFrame): + for i, column in enumerate(df.columns): + ax.scatter(df.index.total_seconds(), df[column], label=column, marker=MARKERS[i % len(MARKERS)], s=PLOT_SCATTER_MARKER_SIZE) + + + +def plot_pandas_line(ax, df): + for i, column in enumerate(df.columns): + x_values = df.index.total_seconds().values # Convert index to NumPy array + y_values = df[column].values + ax.plot(x_values, y_values, marker=MARKERS[i % len(MARKERS)], label=column) + + +def log_bins(data): + return np.logspace(np.log10(20), np.log10(data.max()), num=50) + + +def plot_pandas_hist_log(ax, df, bin_func=log_bins): + all_ul_bytes = df.stack().values + bins = bin_func(all_ul_bytes) + + # Create logarithmic bins + for column in df.columns: + ax.hist(df[column], bins=bins, edgecolor='k', alpha=0.7, label=column) + + ax.set_xscale('symlog') + # ax.set_title('Histogram of UL Bytes') + + +def plot_pandas_hist(ax, df): + + df.plot(kind='hist', bins=50, alpha=0.5, ax=ax) + ax.set_xlabel('UL Bytes') + ax.set_ylabel('Frequency') + # ax.set_title('Histogram of UL Bytes') + + +def plot_basic_filtered(settings, recording): + + rnti_count = {} + all_timestamps = [] + _, ax = plt.subplots(figsize=(12, 6)) + + if settings.plot_filtered and len(recording['removed_by_filter']) > 0: + ue_traffic = recording['removed_by_filter'] + + timestamps = np.array([np.datetime64(int(ts), 'us') for ts in sorted(list(ue_traffic.keys()))]) + ul_bytes = [ue_traffic[ts]['ul_bytes'] for ts in sorted(list(ue_traffic.keys()))] + + ax.scatter(timestamps, ul_bytes, color='grey', alpha=0.2, label='Filtered RNTIs') + + + for ue_traffic, rnti in recording['recordings']: + # Extract timestamps and UL bytes + timestamps = np.array([np.datetime64(int(ts), 'us') for ts in sorted(list(ue_traffic.keys()))]) + all_timestamps.extend(timestamps) + + ul_bytes = [ue_traffic[ts]['ul_bytes'] for ts in sorted(list(ue_traffic.keys()))] + + rnti_count[rnti] = len(timestamps) + # Plot UL bytes for the RNTI + ax.scatter(timestamps, ul_bytes, label=f'RNTI: {rnti}', s=PLOT_SCATTER_MARKER_SIZE) + + max_key = max(rnti_count, key=rnti_count.get) + max_value = rnti_count[max_key] + + ax.set_yscale('symlog', linthresh=5000, linscale=1) + + # formatter = ScalarFormatter(useMathText=True) + # formatter.set_scientific(True) + # ax.yaxis.set_major_formatter(formatter) + # ax.yaxis.set_major_locator(LogLocator(base=10.0, subs='auto', numticks=5)) + # ax.yaxis.set_major_locator(SymmetricalLogLocator(base=10, linthresh=20000, subs=[1.0, 2.0, 5.0])) # Log part above linthresh + + # y-axis limit + # ax.set_ylim(0, 2000) + + ax.set_xlabel('Timestamp') + ax.set_ylabel('UL Bytes') + ax.set_title(f"Basic filtered UL traffic | Number of RNTIs: {len(recording['recordings'])}") + ax.grid(True, which='both', axis='y', linestyle='--', linewidth=0.5, color='gray', alpha=0.7) + + # Rotate x-axis labels + for label in ax.get_xticklabels(): + label.set_rotation(45) + label.set_ha('right') + + plt.draw() + plt.tight_layout() + + plt.show() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Display UL traffic patterns from a dataset.') + parser.add_argument('--path', + type=str, + default=DEFAULT_DATASET_PATH, + help=f'Path to the dataset file (default: {DEFAULT_DATASET_PATH})') + parser.add_argument('--reset-timestamps', + type=bool, + default=DEFAULT_RESET_TIMESTAMPS, + help=f'Reset timestamps to 00:00 (default: {DEFAULT_RESET_TIMESTAMPS})') + parser.add_argument('--filter-keep-rest', + type=bool, + default=DEFAULT_FILTER_KEEP_REST, + help=f'If one wants to keep the rest, the filtering takes significantly longer (default: {DEFAULT_FILTER_KEEP_REST})') + + subparsers = parser.add_subparsers(dest='command', required=True) + + # diashow subcommand + parser_diashow = subparsers.add_parser('diashow', help='Run diashow mode') + parser_diashow.add_argument('--rnti', + type=str, + default=None, + help='Display only a single RNTI') + parser_diashow.add_argument('--plot-filtered', + type=bool, + default=DEFAULT_PLOT_FILTERED, + help='Display filtered RNTIs in the plot (default: {DEFAULT_PLOT_FILTERED})') + + # standardize subcommand + parser_standardize = subparsers.add_parser('standardize', help='Run standardize mode') + + # export subcommand + parser_export = subparsers.add_parser('export', help='Run export mode') + parser_export.add_argument('--export-path', + type=str, + default=DEFAULT_EXPORT_PATH, + help=f'Base path to export images to (default: {DEFAULT_EXPORT_PATH})') + parser_export.add_argument('--recording-index', + type=int, + default=DEFAULT_EXPORT_RECORDING_INDEX, + help='Index of the recording in the dataset (default: {DEFAULT_EXPORT_RECORDING_INDEX})') + parser_export.add_argument('--plot-filtered', + type=bool, + default=DEFAULT_PLOT_FILTERED, + help='Display filtered RNTIs in the plot (default: {DEFAULT_PLOT_FILTERED})') + + args = parser.parse_args() + + if args.command == 'diashow': + diashow(args) + elif args.command == 'standardize': + standardize(args) + elif args.command == 'export': + export(args) diff --git a/src/cell_info.rs b/src/cell_info.rs index d6ef413..72d1176 100644 --- a/src/cell_info.rs +++ b/src/cell_info.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use anyhow::{anyhow, Result}; use regex::Regex; @@ -6,8 +8,7 @@ use serde_derive::Deserialize; use crate::util::helper_json_pointer; -// TODO: CellInfo should be able to hold several cell -// Define the struct to hold cell information +pub const REQUEST_TIMEOUT_MS: u64 = 2000; #[derive(Debug, Clone, PartialEq)] #[allow(clippy::upper_case_acronyms)] @@ -207,6 +208,7 @@ async fn cgi_get_token(base_addr: &str, user: &str, auth: &str) -> Result
Result Result { let url = format!("http://{}:7353/api/v1/celldata/connected/all", base_addr); let resp = reqwest::Client::new() .get(&url) + .timeout(Duration::from_millis(REQUEST_TIMEOUT_MS)) .header("Accept", "application/json, text/javascript, */*; q=0.01") .send() .await?; @@ -608,7 +612,10 @@ mod tests { let cell_data = serde_json::from_str::>(DUMMY_DEVICEPUBLISHER_RESPONSE)?; let cell_info = CellInfo::from_devpub_celldata(cell_data)?; assert_eq!(cell_info.cells.first().unwrap().cell_id, 10); - assert_eq!(cell_info.cells.first().unwrap().cell_type, CellularType::LTE); + assert_eq!( + cell_info.cells.first().unwrap().cell_type, + CellularType::LTE + ); assert_eq!(cell_info.cells.first().unwrap().rssi, -89.0); assert_eq!(cell_info.cells.first().unwrap().rsrp, -120.0); assert_eq!(cell_info.cells.first().unwrap().rsrq, -13.0); diff --git a/src/logic/cell_sink.rs b/src/logic/cell_sink.rs index fa8eca4..a48a765 100644 --- a/src/logic/cell_sink.rs +++ b/src/logic/cell_sink.rs @@ -1,43 +1,45 @@ -use anyhow::{anyhow, Result}; -use bus::BusReader; -use std::sync::mpsc::SyncSender; +use crate::util::{print_debug, print_info}; +use std::sync::mpsc::{SyncSender, TryRecvError}; use std::thread::{self, JoinHandle}; use std::time::Duration; +use anyhow::{anyhow, Result}; +use bus::BusReader; + use crate::logic::{ - check_not_stopped, wait_until_running, - MessageCellInfo, MessageDci, MessageRnti, WorkerState, WorkerType, - DEFAULT_WORKER_SLEEP_MS, + check_not_stopped, wait_until_running, MainState, MessageCellInfo, MessageDci, MessageRnti, + SinkState, DEFAULT_WORKER_SLEEP_US, }; +use crate::util::determine_process_id; pub struct CellSinkArgs { - pub rx_app_state: BusReader, - pub tx_sink_state: SyncSender, + pub rx_app_state: BusReader, + pub tx_sink_state: SyncSender, pub rx_cell_info: BusReader, pub rx_dci: BusReader, pub rx_rnti: BusReader, } -pub fn deploy_cell_sink(args: CellSinkArgs) -> Result> { +pub fn deploy_cell_sink(mut args: CellSinkArgs) -> Result> { let thread = thread::spawn(move || { let _ = run( args.rx_app_state, args.tx_sink_state, - args.rx_cell_info, - args.rx_dci, - args.rx_rnti, + &mut args.rx_cell_info, + &mut args.rx_dci, + &mut args.rx_rnti, ); }); Ok(thread) } -fn send_final_state(tx_sink_state: &SyncSender) -> Result<()> { - Ok(tx_sink_state.send(WorkerState::Stopped(WorkerType::CellSink))?) +fn send_final_state(tx_sink_state: &SyncSender) -> Result<()> { + Ok(tx_sink_state.send(SinkState::Stopped)?) } fn wait_for_running( - rx_app_state: &mut BusReader, - tx_sink_state: &SyncSender, + rx_app_state: &mut BusReader, + tx_sink_state: &SyncSender, ) -> Result<()> { match wait_until_running(rx_app_state) { Ok(_) => Ok(()), @@ -49,23 +51,51 @@ fn wait_for_running( } fn run( - mut rx_app_state: BusReader, - tx_sink_state: SyncSender, - _rx_cell_info: BusReader, - _rx_dci: BusReader, - _rx_rnti: BusReader, + mut rx_app_state: BusReader, + tx_sink_state: SyncSender, + rx_cell_info: &mut BusReader, + rx_dci: &mut BusReader, + rx_rnti: &mut BusReader, ) -> Result<()> { - tx_sink_state.send(WorkerState::Running(WorkerType::CellSink))?; + tx_sink_state.send(SinkState::Running)?; wait_for_running(&mut rx_app_state, &tx_sink_state)?; + print_info(&format!("[sink]: \t\tPID {:?}", determine_process_id())); + let sleep_duration = Duration::from_micros(DEFAULT_WORKER_SLEEP_US); loop { /* */ - thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS)); + thread::sleep(sleep_duration); if check_not_stopped(&mut rx_app_state).is_err() { break; } /* */ + /* unpack dci, cell_info, rnti at every iteration to keep the queue "empty"! */ + let _new_dci = match rx_dci.try_recv() { + Ok(dci) => Some(dci), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Disconnected) => break, + }; + let _new_cell_info = match rx_cell_info.try_recv() { + Ok(cell_info) => Some(cell_info), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Disconnected) => break, + }; + let new_rnti = match rx_rnti.try_recv() { + Ok(rnti) => Some(rnti), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Disconnected) => break, + }; + + if let Some(rnti_msg) = new_rnti { + if !rnti_msg.cell_rnti.is_empty() { + print_debug(&format!( + "DEBUG [sink] new rnti {:#?}", + rnti_msg.cell_rnti.get(&0).unwrap() + )); + } + } + // TODO: Consume rx_dci, rx_cell_info, and rx_rnti // TODO: -> Send combined message to some remote } diff --git a/src/logic/cell_source.rs b/src/logic/cell_source.rs index a26720a..7a64a69 100644 --- a/src/logic/cell_source.rs +++ b/src/logic/cell_source.rs @@ -6,19 +6,20 @@ use std::time::Duration; use crate::cell_info::CellInfo; use crate::logic::{ - check_not_stopped, wait_until_running, - MessageCellInfo, WorkerState, WorkerType, + check_not_stopped, wait_until_running, MainState, MessageCellInfo, SourceState, DEFAULT_WORKER_SLEEP_MS, }; use crate::parse::{Arguments, FlattenedCellApiConfig}; +use crate::util::{determine_process_id, print_info}; - -const WAIT_TO_RETRIEVE_CELL_INFO_MS: u64 = 500; - +// This interval is what influences the UE Cell Tracker CPU load +// the most. TODO: Identify how much the cellinfo data changes +// when de/increasing the request inverval here +const WAIT_TO_RETRIEVE_CELL_INFO_MS: u64 = 5000; pub struct CellSourceArgs { - pub rx_app_state: BusReader, - pub tx_source_state: SyncSender, + pub rx_app_state: BusReader, + pub tx_source_state: SyncSender, pub app_args: Arguments, pub tx_cell_info: Bus, } @@ -35,13 +36,13 @@ pub fn deploy_cell_source(args: CellSourceArgs) -> Result> { Ok(thread) } -fn send_final_state(tx_sink_state: &SyncSender) -> Result<()> { - Ok(tx_sink_state.send(WorkerState::Stopped(WorkerType::CellSource))?) +fn send_final_state(tx_source_state: &SyncSender) -> Result<()> { + Ok(tx_source_state.send(SourceState::Stopped)?) } fn wait_for_running( - rx_app_state: &mut BusReader, - tx_source_state: &SyncSender, + rx_app_state: &mut BusReader, + tx_source_state: &SyncSender, ) -> Result<()> { match wait_until_running(rx_app_state) { Ok(_) => Ok(()), @@ -66,22 +67,21 @@ fn retrieve_cell_info(cell_api: &FlattenedCellApiConfig) -> Result { } fn run( - mut rx_app_state: BusReader, - tx_source_state: SyncSender, + mut rx_app_state: BusReader, + tx_source_state: SyncSender, app_args: Arguments, mut tx_cell_info: Bus, ) -> Result<()> { - tx_source_state.send(WorkerState::Running(WorkerType::CellSource))?; + tx_source_state.send(SourceState::Running)?; wait_for_running(&mut rx_app_state, &tx_source_state)?; + print_info(&format!("[source]: \t\tPID {:?}", determine_process_id())); let cell_api_args = FlattenedCellApiConfig::from_unflattened( app_args.cellapi.unwrap(), app_args.milesight.unwrap(), app_args.devicepublisher.unwrap(), )?; - let mut last_cell_info: CellInfo = CellInfo { - cells: vec![], - }; + let mut last_cell_info: CellInfo = CellInfo { cells: vec![] }; loop { /* */ @@ -94,17 +94,24 @@ fn run( match retrieve_cell_info(&cell_api_args) { Ok(cell_info) => { if !CellInfo::equal_content(&cell_info, &last_cell_info) { - tx_cell_info.broadcast(MessageCellInfo { cell_info: cell_info.clone() }); + tx_cell_info.broadcast(MessageCellInfo { + cell_info: cell_info.clone(), + }); last_cell_info = cell_info; } - }, + } Err(some_err) => { // TODO: print error properly - println!("[source] err retriecing cell info: {:#?}", some_err); - }, + print_info(&format!( + "[source] err retrieving cell info: {:#?}", + some_err + )); + } } - thread::sleep(Duration::from_millis(WAIT_TO_RETRIEVE_CELL_INFO_MS - DEFAULT_WORKER_SLEEP_MS)); + thread::sleep(Duration::from_millis( + WAIT_TO_RETRIEVE_CELL_INFO_MS - DEFAULT_WORKER_SLEEP_MS, + )); } send_final_state(&tx_source_state)?; diff --git a/src/logic/mod.rs b/src/logic/mod.rs index 487c2a1..fe1ab68 100644 --- a/src/logic/mod.rs +++ b/src/logic/mod.rs @@ -1,11 +1,15 @@ +#![allow(dead_code)] + +use std::collections::HashMap; use std::fmt::Debug; -use std::sync::mpsc::{TryRecvError, SyncSender}; -use std::fmt; +use std::sync::mpsc::{Receiver, TryRecvError}; -use anyhow::Result; +use crate::util::print_info; +use anyhow::{anyhow, Result}; use bus::BusReader; use crate::cell_info::CellInfo; +use crate::logic::rnti_matcher::TrafficCollection; use crate::ngscope::config::NgScopeConfig; use crate::ngscope::types::NgScopeCellDci; @@ -13,93 +17,217 @@ pub mod cell_sink; pub mod cell_source; pub mod ngscope_controller; pub mod rnti_matcher; +pub mod traffic_patterns; pub const NUM_OF_WORKERS: usize = 4; -pub const DEFAULT_WORKER_SLEEP_MS: u64 = 1; +pub const DEFAULT_WORKER_SLEEP_MS: u64 = 2; +pub const DEFAULT_WORKER_SLEEP_US: u64 = 50; +pub const WORKER_SLEEP_LONG_MS: u64 = 500; pub const CHANNEL_SYNC_SIZE: usize = 10; pub const BUS_SIZE_APP_STATE: usize = 50; -pub const BUS_SIZE_DCI: usize = 10000; +pub const BUS_SIZE_DCI: usize = 100000; pub const BUS_SIZE_CELL_INFO: usize = 100; pub const BUS_SIZE_RNTI: usize = 100; -#[derive(Clone, Copy, Debug, PartialEq, Hash, Eq)] -pub enum WorkerType { - Main, - CellSource, - CellSink, - NgScopeController, - RntiMatcher, -} - -impl fmt::Display for WorkerType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let display_str = match self { - WorkerType::Main => "main", - WorkerType::CellSource => "cell_source", - WorkerType::CellSink => "cell_sink", - WorkerType::NgScopeController => "ngscope_controller", - WorkerType::RntiMatcher => "rnti_matcher", - }; - write!(f, "{}", display_str) +pub trait WorkerState: Sized + Clone + Sync + Debug { + fn to_general_state(&self) -> GeneralState; + fn worker_name() -> String; +} + +pub trait WorkerChannel { + fn worker_try_recv(&self) -> Result, TryRecvError>; + fn worker_print_on_recv(&self) -> Result, TryRecvError>; + fn worker_try_recv_general_state(&self) -> Result>; + fn worker_recv_general_state(&self) -> Result; +} + +impl WorkerChannel for Receiver { + fn worker_try_recv(&self) -> Result, TryRecvError> { + match self.try_recv() { + Ok(msg) => Ok(Some(msg)), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Disconnected) => Err(TryRecvError::Disconnected), + } + } + + fn worker_print_on_recv(&self) -> Result, TryRecvError> { + match self.worker_try_recv() { + Ok(Some(msg)) => { + print_info(&format!( + "[main] message from {}: {:#?}", + T::worker_name(), + msg + )); + Ok(Some(msg)) + } + Ok(None) => Ok(None), + Err(e) => Err(e), + } + } + + fn worker_recv_general_state(&self) -> Result { + Ok(self.recv()?.to_general_state()) + } + + fn worker_try_recv_general_state(&self) -> Result> { + match self.try_recv() { + Ok(msg) => Ok(Some(msg.to_general_state())), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Disconnected) => Err(TryRecvError::Disconnected.into()), + } } } -#[allow(dead_code)] -#[derive(Clone, Debug, PartialEq)] -pub enum ExplicitWorkerState { - Main(MainState), - Sink(SinkState), - Source(SourceState), - RntiMatcher(RntiMatcherState), - NgControl(NgControlState), +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum GeneralState { + Running, + Stopped, + Unknown, } -#[allow(dead_code)] #[derive(Clone, Copy, Debug, PartialEq)] pub enum MainState { Running, - Stopping, + Stopped, NotifyStop, + UeConnectionReset, /* NgScope has been restarted */ +} + +impl WorkerState for MainState { + fn worker_name() -> String { + "main".to_owned() + } + + fn to_general_state(&self) -> GeneralState { + match self { + MainState::Running => GeneralState::Running, + MainState::Stopped => GeneralState::Stopped, + _ => GeneralState::Unknown, + } + } } -#[allow(dead_code)] #[derive(Clone, Copy, Debug, PartialEq)] pub enum SinkState { + Running, + Stopped, SpecialState, } -#[allow(dead_code)] +impl WorkerState for SinkState { + fn worker_name() -> String { + "sink".to_owned() + } + + fn to_general_state(&self) -> GeneralState { + match self { + SinkState::Running => GeneralState::Running, + SinkState::Stopped => GeneralState::Stopped, + _ => GeneralState::Unknown, + } + } +} + #[derive(Clone, Copy, Debug, PartialEq)] pub enum SourceState { + Running, + Stopped, SpecialState, } -#[allow(dead_code)] -#[derive(Clone, Copy, Debug, PartialEq)] +impl WorkerState for SourceState { + fn worker_name() -> String { + "source".to_owned() + } + + fn to_general_state(&self) -> GeneralState { + match self { + SourceState::Running => GeneralState::Running, + SourceState::Stopped => GeneralState::Stopped, + _ => GeneralState::Unknown, + } + } +} + +#[derive(Clone, Debug, PartialEq)] pub enum RntiMatcherState { - SpecialState, + Running, + Stopped, + Idle, + StartMatching, + MatchingCollectDci(Box), + MatchingProcessDci(Box), + MatchingPublishRnti(MessageRnti), + MatchingError(RntiMatchingErrorType), + StoppingTrafficGeneratorThread, + SleepMs(u64, Box), +} + +impl RntiMatcherState { + fn name(&self) -> String { + let name_str = match self { + RntiMatcherState::Running => "Running", + RntiMatcherState::Stopped => "Stopped", + RntiMatcherState::Idle => "Idle", + RntiMatcherState::StartMatching => "StartMatching", + RntiMatcherState::MatchingCollectDci(_) => "MatchingCollectDci", + RntiMatcherState::MatchingProcessDci(_) => "MatchingProcessDci", + RntiMatcherState::MatchingPublishRnti(_) => "MatchingPublishRnti", + RntiMatcherState::MatchingError(_) => "MatchingError", + RntiMatcherState::StoppingTrafficGeneratorThread => "StoppingTrafficGeneratorThread", + RntiMatcherState::SleepMs(_, _) => "Sleep", + }; + name_str.to_owned() + } +} + +#[derive(Clone, Debug, PartialEq)] +pub enum RntiMatchingErrorType { + ExceededDciTimestampDelta, +} + +impl WorkerState for RntiMatcherState { + fn worker_name() -> String { + "rntimatcher".to_owned() + } + + fn to_general_state(&self) -> GeneralState { + match self { + RntiMatcherState::Running => GeneralState::Running, + RntiMatcherState::Stopped => GeneralState::Stopped, + _ => GeneralState::Unknown, + } + } } -#[allow(dead_code)] #[derive(Clone, Debug, PartialEq)] pub enum NgControlState { + Running, + Stopped, CheckingCellInfo, StartNgScope(Box), StopNgScope, TriggerListenDci, + WaitForTriggerResponse, + SuccessfulTriggerResponse, SleepMs(u64, Box), StoppingDciFetcherThread, RestartingNgScopeProcess, StoppingNgScopeProcess, - Idle, } -#[allow(dead_code)] -#[derive(Clone, Debug, PartialEq)] -pub enum WorkerState { - Running(WorkerType), - Stopped(WorkerType), - Specific(WorkerType, ExplicitWorkerState), +impl WorkerState for NgControlState { + fn worker_name() -> String { + "ngcontrol".to_owned() + } + + fn to_general_state(&self) -> GeneralState { + match self { + NgControlState::Running => GeneralState::Running, + NgControlState::Stopped => GeneralState::Stopped, + _ => GeneralState::Unknown, + } + } } /* -------------- */ @@ -119,9 +247,10 @@ pub struct MessageCellInfo { } #[allow(dead_code)] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Default)] pub struct MessageRnti { - rnti: u16, + /* cell_id -> ue_rnti */ + cell_rnti: HashMap, } /* -------------- */ @@ -133,43 +262,24 @@ pub enum AppError { Disconnected, } -pub fn check_not_stopped( - rx_app_state: &mut BusReader, -) -> Result<(), AppError> { - match rx_app_state.try_recv() { - Ok(msg) => { - match msg { - WorkerState::Stopped(_) => Err(AppError::Stopped), - _ => Ok(()), - } - } - Err(TryRecvError::Empty) => Ok(()), - Err(TryRecvError::Disconnected) => Err(AppError::Disconnected), +pub fn check_not_stopped(rx_state: &mut BusReader) -> Result> { + match rx_state.try_recv() { + Ok(msg) => match msg.to_general_state() { + GeneralState::Stopped => Err(anyhow!("BusReader received GeneralState::Stopped!")), + _ => Ok(Some(msg)), + }, + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Disconnected) => Err(anyhow!("BusReader disconnected!")), } } -pub fn wait_until_running( - rx_app_state: &mut BusReader, -) -> Result<(), AppError> { - match rx_app_state.recv() { - Ok(msg) => match msg { - WorkerState::Running(_) => Ok(()), - WorkerState::Stopped(_) => Err(AppError::Stopped), - _ => Ok(()), +pub fn wait_until_running(rx_state: &mut BusReader) -> Result<()> { + match rx_state.recv() { + Ok(msg) => match msg.to_general_state() { + GeneralState::Running => Ok(()), + GeneralState::Stopped => Err(anyhow!("BusReader received GeneralState::Stopped!")), + GeneralState::Unknown => Ok(()), }, - Err(_) => Err(AppError::Stopped), + Err(_) => Err(anyhow!("BusReader disconnected!")), } } - -pub fn send_explicit_state(tx_state: &SyncSender, state: ExplicitWorkerState) -> Result<()> { - let worker_type = match state { - ExplicitWorkerState::Main(_) => WorkerType::Main, - ExplicitWorkerState::Sink(_) => WorkerType::CellSink, - ExplicitWorkerState::Source(_) => WorkerType::CellSource, - ExplicitWorkerState::RntiMatcher(_) => WorkerType::RntiMatcher, - ExplicitWorkerState::NgControl(_) => WorkerType::NgScopeController, - }; - - tx_state.send( WorkerState::Specific( worker_type, state))?; - Ok(()) -} diff --git a/src/logic/ngscope_controller.rs b/src/logic/ngscope_controller.rs index de2b697..2585c21 100644 --- a/src/logic/ngscope_controller.rs +++ b/src/logic/ngscope_controller.rs @@ -1,167 +1,207 @@ use anyhow::{anyhow, Result}; use bus::{Bus, BusReader}; -use std::fs::{File, self}; +use std::fs::{self, File}; use std::net::UdpSocket; use std::path::Path; use std::process::{Child, Stdio}; -use std::sync::mpsc::{SyncSender, TryRecvError, Receiver, sync_channel}; -use std::thread; -use std::thread::JoinHandle; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError}; +use std::thread::{self, JoinHandle}; use std::time::Duration; use crate::cell_info::CellInfo; use crate::logic::{ - check_not_stopped, wait_until_running, send_explicit_state, - MessageCellInfo, MessageDci, WorkerState, WorkerType, - ExplicitWorkerState, NgControlState, - DEFAULT_WORKER_SLEEP_MS, CHANNEL_SYNC_SIZE + check_not_stopped, wait_until_running, MainState, MessageCellInfo, MessageDci, NgControlState, + CHANNEL_SYNC_SIZE, DEFAULT_WORKER_SLEEP_MS, DEFAULT_WORKER_SLEEP_US, }; use crate::ngscope; use crate::ngscope::config::NgScopeConfig; use crate::ngscope::types::Message; use crate::ngscope::{ - start_ngscope, stop_ngscope, ngscope_validate_server_send_initial, ngscope_validate_server_check + ngscope_validate_server_check, ngscope_validate_server_send_initial, start_ngscope, + stop_ngscope, }; use crate::parse::{Arguments, FlattenedNgScopeArgs}; +use crate::util::{determine_process_id, is_debug, print_debug, print_info}; +const WAIT_FOR_TRIGGER_NGSCOPE_RESPONE_MS: u64 = 500; #[derive(Clone, Copy, Debug, PartialEq)] enum LocalDciState { Stop, SendInitial, WaitForServerAuth(u8), + SuccessfulAuth, ListenForDci, } pub struct NgControlArgs { - pub rx_app_state: BusReader, - pub tx_ngcontrol_state: SyncSender, + pub rx_app_state: BusReader, + pub tx_ngcontrol_state: SyncSender, pub app_args: Arguments, pub rx_cell_info: BusReader, pub tx_dci: Bus, } -pub fn deploy_ngscope_controller(args: NgControlArgs) -> Result> { - let thread = thread::spawn(move || { - let _ = run( - args.rx_app_state, - args.tx_ngcontrol_state, - args.app_args, - args.rx_cell_info, - args.tx_dci, - ); - }); - Ok(thread) +struct RunArgs { + rx_app_state: BusReader, + tx_ngcontrol_state: SyncSender, + app_args: Arguments, + rx_cell_info: BusReader, + tx_dci_thread_handle: Option>, + dci_thread_handle: Option>, + ng_process_handle: Option, } -fn send_final_state(tx_sink_state: &SyncSender) -> Result<()> { - Ok(tx_sink_state.send(WorkerState::Stopped(WorkerType::CellSource))?) +struct RunArgsMovables { + tx_dci: Bus, } -fn wait_for_running( - rx_app_state: &mut BusReader, - tx_source_state: &SyncSender, -) -> Result<()> { - match wait_until_running(rx_app_state) { - Ok(_) => Ok(()), - _ => { - send_final_state(tx_source_state)?; - Err(anyhow!("[source] Main did not send 'Running' message")) - } - } +pub fn deploy_ngscope_controller(args: NgControlArgs) -> Result> { + let mut run_args: RunArgs = RunArgs { + rx_app_state: args.rx_app_state, + tx_ngcontrol_state: args.tx_ngcontrol_state, + app_args: args.app_args, + rx_cell_info: args.rx_cell_info, + tx_dci_thread_handle: None, + dci_thread_handle: None, + ng_process_handle: None, + }; + let run_args_mov: RunArgsMovables = RunArgsMovables { + tx_dci: args.tx_dci, + }; + let thread = thread::spawn(move || { + let _ = run(&mut run_args, run_args_mov); + finish(run_args); + }); + Ok(thread) } -fn check_cell_update( - rx_cell_info: &mut BusReader, -) -> Result> { - match rx_cell_info.try_recv() { - Ok(msg) => { - Ok(Some(msg.cell_info)) - }, - Err(TryRecvError::Disconnected) => { - // TODO: print error properly - Err(anyhow!("[ngcontrol] err: rx_cell_info disconnected!")) - } - Err(TryRecvError::Empty) => { Ok(None) }, - } -} +fn run(run_args: &mut RunArgs, run_args_mov: RunArgsMovables) -> Result<()> { + let rx_app_state = &mut run_args.rx_app_state; + let tx_ngcontrol_state = &mut run_args.tx_ngcontrol_state; + let app_args = &run_args.app_args; + let rx_cell_info = &mut run_args.rx_cell_info; + let tx_dci = run_args_mov.tx_dci; -fn run( - mut rx_app_state: BusReader, - tx_ngcontrol_state: SyncSender, - app_args: Arguments, - mut rx_cell_info: BusReader, - tx_dci: Bus, -) -> Result<()> { - tx_ngcontrol_state.send(WorkerState::Running(WorkerType::NgScopeController))?; - wait_for_running(&mut rx_app_state, &tx_ngcontrol_state)?; + tx_ngcontrol_state.send(NgControlState::Running)?; + wait_for_running(rx_app_state, tx_ngcontrol_state)?; + print_info(&format!( + "[ngcontrol]: \t\tPID {:?}", + determine_process_id() + )); - let ng_args = FlattenedNgScopeArgs::from_unflattened(app_args.ngscope.unwrap())?; + let ng_args = FlattenedNgScopeArgs::from_unflattened(app_args.clone().ngscope.unwrap())?; let mut ng_process_option: Option = None; let ngscope_config = NgScopeConfig { ..Default::default() }; - let (tx_dci_thread, rx_dci_thread) = sync_channel::(CHANNEL_SYNC_SIZE); - let dci_thread = deploy_dci_fetcher_thread( - rx_dci_thread, + let (tx_dci_thread, rx_main_thread) = sync_channel::(CHANNEL_SYNC_SIZE); + let (tx_main_thread, rx_dci_thread) = sync_channel::(CHANNEL_SYNC_SIZE); + run_args.dci_thread_handle = Some(deploy_dci_fetcher_thread( + tx_main_thread, + rx_main_thread, tx_dci, ng_args.ng_local_addr.to_string(), ng_args.ng_server_addr.to_string(), - )?; + )?); + run_args.tx_dci_thread_handle = Some(tx_dci_thread.clone()); let mut ngcontrol_state: NgControlState = NgControlState::CheckingCellInfo; loop { /* */ thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS)); - if check_not_stopped(&mut rx_app_state).is_err() { + if check_not_stopped(rx_app_state).is_err() { break; } /* */ match ngcontrol_state { NgControlState::CheckingCellInfo => { - ngcontrol_state = handle_cell_update(&mut rx_cell_info, - &ngscope_config)?; - }, + ngcontrol_state = handle_cell_update(rx_cell_info, &ngscope_config)?; + } NgControlState::TriggerListenDci => { tx_dci_thread.send(LocalDciState::SendInitial)?; - ngcontrol_state = NgControlState::CheckingCellInfo; - }, + ngcontrol_state = NgControlState::WaitForTriggerResponse; + } + NgControlState::WaitForTriggerResponse => { + ngcontrol_state = match rx_dci_thread.try_recv() { + Ok(LocalDciState::SuccessfulAuth) => NgControlState::SuccessfulTriggerResponse, + Ok(_) | Err(TryRecvError::Empty) => NgControlState::SleepMs( + WAIT_FOR_TRIGGER_NGSCOPE_RESPONE_MS, + Box::new(NgControlState::WaitForTriggerResponse), + ), + Err(TryRecvError::Disconnected) => { + print_info("[ngcontrol]: dci_fetcher thread disconnected unexpectedly while waiting for trigger response"); + break; + } + } + } + NgControlState::SuccessfulTriggerResponse => { + tx_ngcontrol_state.send(NgControlState::SuccessfulTriggerResponse)?; + ngcontrol_state = NgControlState::CheckingCellInfo + } NgControlState::SleepMs(time_ms, next_state) => { thread::sleep(Duration::from_millis(time_ms)); ngcontrol_state = *next_state; - }, + } NgControlState::StartNgScope(config) => { - if let Some(ref mut process) = ng_process_option { stop_ngscope(process)?; } - (ngcontrol_state, ng_process_option) = handle_start_ngscope(&config, &ng_args)?; - }, + if let Some(ref mut process) = ng_process_option { + stop_ngscope(process)?; + } + match handle_start_ngscope(&config, &ng_args) { + Ok((state, proc)) => { + ngcontrol_state = state; + ng_process_option = proc; + } + Err(err) => { + print_info(&format!( + "ERROR [ngcontrol] could not start NG-Scope process: {:?}", + err + )); + print_info("ERROR [ngcontrol] retrying in 2 seconds.."); + ngcontrol_state = NgControlState::SleepMs( + 2000, + Box::new(NgControlState::StartNgScope(Box::new(*config.clone()))), + ) + } + } + } NgControlState::StopNgScope => { - if let Some(ref mut process) = ng_process_option { stop_ngscope(process)?; } + if let Some(ref mut process) = ng_process_option { + stop_ngscope(process)?; + } ngcontrol_state = NgControlState::CheckingCellInfo; - }, + } _ => todo!(), } } - send_explicit_state(&tx_ngcontrol_state, - ExplicitWorkerState::NgControl( - NgControlState::StoppingDciFetcherThread) - )?; - tx_dci_thread.send(LocalDciState::Stop)?; - let _ = dci_thread.join(); - if ng_process_option.is_some() { - send_explicit_state(&tx_ngcontrol_state, - ExplicitWorkerState::NgControl( - NgControlState::StoppingNgScopeProcess) - )?; - stop_ngscope(&mut ng_process_option.unwrap())?; - } - tx_ngcontrol_state.send(WorkerState::Stopped(WorkerType::NgScopeController))?; Ok(()) } +fn finish(mut run_args: RunArgs) { + let _ = run_args + .tx_ngcontrol_state + .send(NgControlState::StoppingDciFetcherThread); + + if let Some(tx_dci_thread) = run_args.tx_dci_thread_handle { + let _ = tx_dci_thread.send(LocalDciState::Stop); + } + + if let Some(dci_thread) = run_args.dci_thread_handle { + let _ = dci_thread.join(); + } + if let Some(ref mut process) = run_args.ng_process_handle { + let _ = run_args + .tx_ngcontrol_state + .send(NgControlState::StoppingNgScopeProcess); + let _ = stop_ngscope(process); + } + let _ = send_final_state(&run_args.tx_ngcontrol_state); +} + fn handle_start_ngscope( ng_conf: &NgScopeConfig, ng_args: &FlattenedNgScopeArgs, @@ -174,43 +214,50 @@ fn handle_start_ngscope( let file_out = File::create(path)?; let file_err = file_out.try_clone()?; (Stdio::from(file_out), Stdio::from(file_err)) - }, + } None => (Stdio::null(), Stdio::null()), }; - let new_ng_process = Some(start_ngscope(&ng_args.ng_path, ng_conf, std_out, std_err)?); - Ok((NgControlState::SleepMs(5000, Box::new(NgControlState::TriggerListenDci)), new_ng_process)) + let new_ng_process = match ng_args.ng_start_process { + true => Some(start_ngscope(&ng_args.ng_path, ng_conf, std_out, std_err)?), + false => None, + }; + Ok(( + NgControlState::SleepMs(5000, Box::new(NgControlState::TriggerListenDci)), + new_ng_process, + )) } - fn handle_cell_update( rx_cell_info: &mut BusReader, ng_conf: &NgScopeConfig, ) -> Result { match check_cell_update(rx_cell_info)? { Some(cell_info) => { - println!("[ngcontrol] cell_info: {:#?}", cell_info); + print_info(&format!("[ngcontrol] cell_info: {:#?}", cell_info)); if cell_info.cells.is_empty() { return Ok(NgControlState::StopNgScope); } // TODO: Handle multi cell let mut new_conf = ng_conf.clone(); - new_conf.rf_config0.as_mut().unwrap().rf_freq = cell_info.cells.first().unwrap().frequency; + new_conf.rf_config0.as_mut().unwrap().rf_freq = + cell_info.cells.first().unwrap().frequency as i64; Ok(NgControlState::StartNgScope(Box::new(new_conf))) } - _ => Ok(NgControlState:: CheckingCellInfo), + _ => Ok(NgControlState::CheckingCellInfo), } } - fn deploy_dci_fetcher_thread( - rx_local_dci_state: Receiver, + tx_main_thread: SyncSender, + rx_main_thread: Receiver, tx_dci: Bus, local_socket_addr: String, ng_server_addr: String, ) -> Result> { let thread = thread::spawn(move || { let _ = run_dci_fetcher( - rx_local_dci_state, + tx_main_thread, + rx_main_thread, tx_dci, local_socket_addr, ng_server_addr, @@ -220,100 +267,166 @@ fn deploy_dci_fetcher_thread( } fn run_dci_fetcher( - rx_local_dci_state: Receiver, + tx_main_thread: SyncSender, + rx_main_thread: Receiver, mut tx_dci: Bus, local_socket_addr: String, ng_server_addr: String, ) -> Result<()> { let socket = init_dci_server(&local_socket_addr)?; let mut dci_state: LocalDciState = LocalDciState::ListenForDci; + print_info(&format!( + "[ngcontrol.dci]: \tPID {:?}", + determine_process_id() + )); + + let sleep_duration = Duration::from_micros(DEFAULT_WORKER_SLEEP_US); + let mut last_dci_timestamp_us: u64 = 0; loop { - thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS)); - if let Some(new_state) = check_rx_state(&rx_local_dci_state)? { + thread::sleep(sleep_duration); + if let Some(new_state) = check_rx_state(&rx_main_thread)? { dci_state = new_state; } match dci_state { - LocalDciState::Stop => { break; }, + LocalDciState::Stop => { + break; + } LocalDciState::SendInitial => { dci_state = match ngscope_validate_server_send_initial(&socket, &ng_server_addr) { Ok(_) => LocalDciState::WaitForServerAuth(0), Err(_) => LocalDciState::SendInitial, }; - }, - LocalDciState::WaitForServerAuth(successful_auths) => { + } + LocalDciState::SuccessfulAuth => { + tx_main_thread.send(LocalDciState::SuccessfulAuth)?; + dci_state = LocalDciState::ListenForDci; + } + LocalDciState::WaitForServerAuth(successful_auths) => { dci_state = match ngscope_validate_server_check(&socket)? { Some(_) => { if successful_auths >= 1 { - LocalDciState::ListenForDci + LocalDciState::SuccessfulAuth } else { LocalDciState::WaitForServerAuth(successful_auths + 1) } - }, + } None => LocalDciState::WaitForServerAuth(successful_auths), }; - }, - LocalDciState::ListenForDci => { check_ngscope_message(&socket, &mut tx_dci) }, + } + LocalDciState::ListenForDci => { + check_ngscope_message(&socket, &mut tx_dci, &mut last_dci_timestamp_us) + } } } Ok(()) } -fn check_ngscope_message(socket: &UdpSocket, tx_dci: &mut Bus) { +fn check_ngscope_message( + socket: &UdpSocket, + tx_dci: &mut Bus, + last_dci_timestamp_us: &mut u64, +) { match ngscope::ngscope_recv_single_message(socket) { Ok(msg) => { match msg { Message::CellDci(cell_dci) => { - println!( - " {:?} | {:03?} | {:03?} | {:08?} | {:08?} | {:03?} | {:03?}", - cell_dci.nof_rnti, - cell_dci.total_dl_prb, - cell_dci.total_ul_prb, - cell_dci.total_dl_tbs, - cell_dci.total_ul_tbs, - cell_dci.total_dl_reTx, - cell_dci.total_ul_reTx - ); - - // TODO: Take items from the bus. - tx_dci.broadcast(MessageDci { ngscope_dci: *cell_dci }); + if is_debug() { + if *last_dci_timestamp_us != 0 + && cell_dci.time_stamp > *last_dci_timestamp_us + { + let timestamp_delta: i64 = + cell_dci.time_stamp as i64 - *last_dci_timestamp_us as i64; + if timestamp_delta > 1000000 { + let now_delta = chrono::Utc::now().timestamp_micros() as u64 + - cell_dci.time_stamp; + print_debug(&format!( + "DEBUG [ngcontrol.fetcher] 1s DCI gap:\n\ + \tdiff to previous DCI: {:>10} us\n\ + \tdiff to now: {:>10} us", + timestamp_delta, now_delta + )); + } + } + *last_dci_timestamp_us = cell_dci.time_stamp; + } + /* check bus size */ + let message_dci = MessageDci { + ngscope_dci: *cell_dci, + }; + match tx_dci.try_broadcast(message_dci) { + Ok(_) => {} + Err(msg) => { + print_info("ERROR [ngcontrol] DCI bus is full!!"); + tx_dci.broadcast(msg) + } + } } Message::Dci(ue_dci) => { // TODO: Evaluate how to handle this - println!("{:?}", ue_dci) + print_info(&format!("[ngcontrol] {:?}", ue_dci)); } Message::Config(cell_config) => { // TODO: Evaluate how to handle this - println!("{:?}", cell_config) + print_info(&format!("[ngcontrol] {:?}", cell_config)); } // TODO: Evaluate how to handle Start andExit Message::Start => {} Message::Exit => {} } - }, + } _ => { // TODO: print error properly? it also goes here when there just hasn't been a message } } } -fn check_rx_state( - rx_local_dci_state: &Receiver, -) -> Result> { +/* -------------- */ +/* Helpers */ +/* -------------- */ + +fn check_rx_state(rx_local_dci_state: &Receiver) -> Result> { match rx_local_dci_state.try_recv() { Ok(msg) => Ok(Some(msg)), Err(TryRecvError::Empty) => Ok(None), - Err(TryRecvError::Disconnected) => Err(anyhow!("[ngcontrol.local_dci_fetcher] rx_local_dci_state disconnected")), + Err(TryRecvError::Disconnected) => Err(anyhow!( + "[ngcontrol.local_dci_fetcher] rx_local_dci_state disconnected" + )), } } -#[allow(dead_code)] fn init_dci_server(local_addr: &str) -> Result { let socket = UdpSocket::bind(local_addr)?; socket.set_nonblocking(true)?; - // ngscope::ngscope_validate_server(&socket, server_addr).expect("server validation error"); Ok(socket) } +fn send_final_state(tx_ngcontrol_state: &SyncSender) -> Result<()> { + Ok(tx_ngcontrol_state.send(NgControlState::Stopped)?) +} + +fn wait_for_running( + rx_app_state: &mut BusReader, + tx_ngcontrol_state: &SyncSender, +) -> Result<()> { + match wait_until_running(rx_app_state) { + Ok(_) => Ok(()), + _ => { + send_final_state(tx_ngcontrol_state)?; + Err(anyhow!("[source] Main did not send 'Running' message")) + } + } +} + +fn check_cell_update(rx_cell_info: &mut BusReader) -> Result> { + match rx_cell_info.try_recv() { + Ok(msg) => Ok(Some(msg.cell_info)), + Err(TryRecvError::Disconnected) => { + // TODO: print error properly + Err(anyhow!("[ngcontrol] err: rx_cell_info disconnected!")) + } + Err(TryRecvError::Empty) => Ok(None), + } +} diff --git a/src/logic/rnti_matcher.rs b/src/logic/rnti_matcher.rs index 0dfb8d2..403af71 100644 --- a/src/logic/rnti_matcher.rs +++ b/src/logic/rnti_matcher.rs @@ -1,41 +1,425 @@ -use anyhow::{anyhow, Result}; -use bus::{Bus, BusReader}; -use std::sync::mpsc::SyncSender; +#![allow(dead_code)] + +use std::collections::{HashMap, HashSet}; +use std::net::UdpSocket; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError}; use std::thread::{self, JoinHandle}; use std::time::Duration; +use anyhow::{anyhow, Result}; +use bus::{Bus, BusReader}; +use serde_derive::{Deserialize, Serialize}; +use nalgebra::{DMatrix, DVector}; + +use crate::logic::traffic_patterns::{TrafficPattern, TrafficPatternFeatures}; use crate::logic::{ - check_not_stopped, wait_until_running, - MessageDci, MessageRnti, WorkerState, WorkerType, - DEFAULT_WORKER_SLEEP_MS, + check_not_stopped, wait_until_running, MainState, MessageDci, MessageRnti, RntiMatcherState, + RntiMatchingErrorType, CHANNEL_SYNC_SIZE, DEFAULT_WORKER_SLEEP_MS, +}; +use crate::ngscope::types::NgScopeCellDci; +use crate::parse::{Arguments, FlattenedRntiMatchingArgs}; + +use crate::util::{ + calculate_mean_variance, calculate_median, calculate_weighted_euclidean_distance, + log_rnti_matching_traffic, print_debug, print_info, standardize_feature_vec, + determine_process_id, calculate_weighted_euclidean_distance_matrix, }; +pub const MATCHING_INTERVAL_MS: u64 = 1000; +pub const MATCHING_TRAFFIC_PATTERN_TIME_OVERLAP_FACTOR: f64 = 1.1; +pub const MATCHING_MAX_DCI_TIMESTAMP_DELTA_MS: u64 = 100; +pub const MATCHING_UL_BYTES_LOWER_BOUND_FACTOR: f64 = 0.5; +pub const MATCHING_UL_BYTES_UPPER_BOUND_FACTOR: f64 = 4.0; +pub const TIME_MS_TO_US_FACTOR: u64 = 1000; +pub const MATCHING_LOG_FILE_PREFIX: &str = "./.logs/rnti_matching_pattern_"; +pub const MATCHING_LOG_FILE_SUFFIX: &str = ".jsonl"; +pub const COLLECT_DCI_MAX_TIMESTAMP_DELTA_US: u64 = 50000; + +pub const BASIC_FILTER_MAX_TOTAL_UL_FACTOR: f64 = 100.0; +pub const BASIC_FILTER_MIN_TOTAL_UL_FACTOR: f64 = 0.005; +pub const BASIC_FILTER_MAX_UL_PER_DCI: u64 = 5_000_000; +pub const BASIC_FILTER_MIN_OCCURENCES_FACTOR: f64 = 0.005; + + /* + * Feature vector, order matters: + * + * DCI count (occurences) + * Total UL bytes + * UL bytes median + * UL bytes mean + * UL bytes variance + * DCI timestamp delta median + * DCI timestamp delta mean + * DCI timestamp delta variance + * */ +/* not as good as all-weighted one */ +// pub const MATCHING_WEIGHTINGS: [f64; 8] = [ +// 0.5, /* DCI count (occurences) */ +// 0.0, /* Total UL bytes */ +// 0.5, /* UL bytes median */ +// 0.0, /* UL bytes mean */ +// 0.0, /* UL bytes variance */ +// 0.0, /* DCI time delta median */ +// 0.0, /* DCI time delta mean */ +// 0.0, /* DCI time delta variance */ +// ]; + +pub const MATCHING_WEIGHTINGS: [f64; 8] = [ + 0.5, /* DCI count (occurences) */ + 0.1, /* Total UL bytes */ + 0.15, /* UL bytes median */ + 0.025, /* UL bytes mean */ + 0.025, /* UL bytes variance */ + 0.15, /* DCI time delta median */ + 0.025, /* DCI time delta mean */ + 0.025, /* DCI time delta variance */ +]; + +#[derive(Clone, Debug, PartialEq)] +enum LocalGeneratorState { + Stop, + SendPattern(String, Box), + PatternSent, + Idle, +} + pub struct RntiMatcherArgs { - pub rx_app_state: BusReader, - pub tx_rntimatcher_state: SyncSender, + pub rx_app_state: BusReader, + pub tx_rntimatcher_state: SyncSender, + pub app_args: Arguments, pub rx_dci: BusReader, pub tx_rnti: Bus, } +struct RunArgs { + rx_app_state: BusReader, + tx_rntimatcher_state: SyncSender, + app_args: Arguments, + rx_dci: BusReader, + tx_rnti: Bus, + tx_gen_thread_handle: Option>, + gen_thread_handle: Option>, +} + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct TrafficCollection { + /* cell_id -> { traffic } */ + pub cell_traffic: HashMap, + pub start_timestamp_ms: u64, + pub finish_timestamp_ms: u64, + pub traffic_pattern_features: TrafficPatternFeatures, +} + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct CellTrafficCollection { + /* rnti -> { {tx, tx+1, tx+2} }*/ + pub traffic: HashMap, + pub nof_total_dci: u64, + pub nof_empty_dci: u64, + pub first_dci_timestamp_us: u64, + pub last_dci_timestamp_us: u64, +} + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct UeTraffic { + /* tx -> { dl, ul }*/ + pub traffic: HashMap, + pub total_dl_bytes: u64, + pub total_ul_bytes: u64, +} + +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct Traffic { + pub dl_bytes: u64, + pub ul_bytes: u64, +} + pub fn deploy_rnti_matcher(args: RntiMatcherArgs) -> Result> { + let mut run_args: RunArgs = RunArgs { + rx_app_state: args.rx_app_state, + tx_rntimatcher_state: args.tx_rntimatcher_state, + app_args: args.app_args, + rx_dci: args.rx_dci, + tx_rnti: args.tx_rnti, + tx_gen_thread_handle: None, + gen_thread_handle: None, + }; + + let thread = thread::spawn(move || { + let _ = run(&mut run_args); + finish(run_args); + }); + Ok(thread) +} + +fn run(run_args: &mut RunArgs) -> Result<()> { + let rx_app_state = &mut run_args.rx_app_state; + let tx_rntimatcher_state = &mut run_args.tx_rntimatcher_state; + let app_args = &run_args.app_args; + let rx_dci = &mut run_args.rx_dci; + let tx_rnti = &mut run_args.tx_rnti; + + tx_rntimatcher_state.send(RntiMatcherState::Running)?; + wait_for_running(rx_app_state, tx_rntimatcher_state)?; + print_info(&format!( + "[rntimatcher]: \t\tPID {:?}", + determine_process_id() + )); + + let matching_args = + FlattenedRntiMatchingArgs::from_unflattened(app_args.clone().rntimatching.unwrap())?; + + let (tx_gen_thread, rx_gen_thread) = sync_channel::(CHANNEL_SYNC_SIZE); + run_args.gen_thread_handle = Some(deploy_traffic_generator_thread( + rx_gen_thread, + matching_args.matching_local_addr, + )?); + run_args.tx_gen_thread_handle = Some(tx_gen_thread.clone()); + + let traffic_destination = matching_args.matching_traffic_destination; + let traffic_pattern = matching_args.matching_traffic_pattern.generate_pattern(); + let matching_log_file_path = &format!( + "{}{:?}{}", + MATCHING_LOG_FILE_PREFIX, matching_args.matching_traffic_pattern, MATCHING_LOG_FILE_SUFFIX + ); + let mut matcher_state: RntiMatcherState = RntiMatcherState::Idle; + + loop { + /* */ + thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS)); + match check_not_stopped(rx_app_state) { + Ok(Some(MainState::UeConnectionReset)) => { + matcher_state = RntiMatcherState::StartMatching; + } + Err(_) => break, + _ => {} + } + /* unpack dci at every iteration to keep the queue "empty"! */ + let latest_dcis = collect_dcis(rx_dci); + /* */ + + matcher_state = match matcher_state { + RntiMatcherState::Idle => { + thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS)); + matcher_state + } + RntiMatcherState::StartMatching => handle_start_matching( + &tx_gen_thread, + &traffic_destination, + traffic_pattern.clone(), + ), + RntiMatcherState::MatchingCollectDci(traffic_collection) => { + // TODO: Use all dcis here and let this thread sleep again! + handle_collect_dci(latest_dcis, *traffic_collection) + } + RntiMatcherState::MatchingProcessDci(traffic_collection) => { + handle_process_dci(*traffic_collection, matching_log_file_path) + } + RntiMatcherState::MatchingPublishRnti(rnti) => { + tx_rnti.broadcast(rnti); + RntiMatcherState::SleepMs( + MATCHING_INTERVAL_MS, + Box::new(RntiMatcherState::StartMatching), + ) + } + RntiMatcherState::MatchingError(error_type) => handle_matching_error(error_type), + RntiMatcherState::SleepMs(time_ms, next_state) => { + thread::sleep(Duration::from_millis(time_ms)); + *next_state + } + _ => matcher_state, + } + } + + Ok(()) +} + +fn collect_dcis(rx_dci: &mut BusReader) -> Vec { + let mut dci_list = Vec::new(); + loop { + match rx_dci.try_recv() { + Ok(dci) => dci_list.push(dci), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => break, + } + } + dci_list +} + +fn handle_start_matching( + tx_gen_thread: &SyncSender, + traffic_destination: &str, + traffic_pattern: TrafficPattern, +) -> RntiMatcherState { + let pattern_total_ms = traffic_pattern.total_time_ms(); + let start_timestamp_ms = chrono::Utc::now().timestamp_millis() as u64; + let finish_timestamp_ms = start_timestamp_ms + + (MATCHING_TRAFFIC_PATTERN_TIME_OVERLAP_FACTOR * pattern_total_ms as f64) as u64; + + let traffic_collection: TrafficCollection = TrafficCollection { + cell_traffic: Default::default(), + start_timestamp_ms, + finish_timestamp_ms, + traffic_pattern_features: TrafficPatternFeatures::from_traffic_pattern(&traffic_pattern) + }; + + let _ = tx_gen_thread.send(LocalGeneratorState::SendPattern( + traffic_destination.to_string(), + Box::new(traffic_pattern), + )); + RntiMatcherState::MatchingCollectDci(Box::new(traffic_collection)) +} + +fn handle_collect_dci( + dci_list: Vec, + mut traffic_collection: TrafficCollection, +) -> RntiMatcherState { + // TODO: Check time -> proceed to ProcessDci + let chrono_now = chrono::Utc::now(); + let now_ms = chrono_now.timestamp_millis() as u64; + if now_ms >= traffic_collection.finish_timestamp_ms { + return RntiMatcherState::MatchingProcessDci(Box::new(traffic_collection)); + } + + let start_timestamp_ms_bound = traffic_collection.start_timestamp_ms * TIME_MS_TO_US_FACTOR; + for dci in dci_list.iter() { + if dci.ngscope_dci.time_stamp >= start_timestamp_ms_bound { + traffic_collection.update_from_cell_dci(&dci.ngscope_dci); + } + } + RntiMatcherState::MatchingCollectDci(Box::new(traffic_collection)) +} + +fn handle_process_dci( + mut traffic_collection: TrafficCollection, + log_file_path: &str, +) -> RntiMatcherState { + // Check number of packets plausability: expected ms -> expected dcis + let mut message_rnti: MessageRnti = MessageRnti::default(); + + /* log files */ + let _ = log_rnti_matching_traffic(log_file_path, &traffic_collection); + + traffic_collection.apply_basic_filter(); + + message_rnti.cell_rnti = traffic_collection.find_best_matching_rnti(); + RntiMatcherState::MatchingPublishRnti(message_rnti) +} + +fn handle_matching_error(error_type: RntiMatchingErrorType) -> RntiMatcherState { + print_info(&format!( + "[rntimatcher] error during RNTI matching: {:?}\n -> stopping pattern", + error_type + )); + RntiMatcherState::SleepMs( + MATCHING_INTERVAL_MS, + Box::new(RntiMatcherState::StartMatching), + ) +} + +fn finish(run_args: RunArgs) { + let _ = run_args + .tx_rntimatcher_state + .send(RntiMatcherState::StoppingTrafficGeneratorThread); + if let Some(tx_gen_thread) = run_args.tx_gen_thread_handle { + let _ = tx_gen_thread.send(LocalGeneratorState::Stop); + } + if let Some(gen_thread) = run_args.gen_thread_handle { + let _ = gen_thread.join(); + } + let _ = send_final_state(&run_args.tx_rntimatcher_state); +} + +fn deploy_traffic_generator_thread( + rx_local_gen_state: Receiver, + local_socket_addr: String, +) -> Result> { let thread = thread::spawn(move || { - let _ = run( - args.rx_app_state, - args.tx_rntimatcher_state, - args.rx_dci, - args.tx_rnti, - ); + let _ = run_traffic_generator(rx_local_gen_state, local_socket_addr); }); Ok(thread) } -fn send_final_state(tx_rntimatcher_state: &SyncSender) -> Result<()> { - Ok(tx_rntimatcher_state.send(WorkerState::Stopped(WorkerType::RntiMatcher))?) +fn run_traffic_generator( + rx_local_gen_state: Receiver, + local_socket_addr: String, +) -> Result<()> { + let socket = init_udp_socket(&local_socket_addr)?; + let mut gen_state: LocalGeneratorState = LocalGeneratorState::Idle; + print_info(&format!( + "[rntimatcher.gen]: \tPID {:?}", + determine_process_id() + )); + + loop { + if let Some(new_state) = check_rx_state(&rx_local_gen_state)? { + gen_state = new_state; + } + + match gen_state { + LocalGeneratorState::Idle => { + /* Idle here, because it shall not interfere the sendpattern */ + thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS)); + } + LocalGeneratorState::Stop => { + break; + } + LocalGeneratorState::SendPattern(destination, pattern) => { + gen_state = gen_handle_send_pattern(&socket, &destination, *pattern.clone()); + } + LocalGeneratorState::PatternSent => { + print_info("[rntimatcher.gen] Finished sending pattern!"); + gen_state = LocalGeneratorState::Idle + } + } + } + Ok(()) +} + +/* -------------- */ +/* Helpers */ +/* -------------- */ + +fn init_udp_socket(local_addr: &str) -> Result { + let socket = UdpSocket::bind(local_addr)?; + + Ok(socket) +} + +fn check_rx_state( + rx_local_gen_state: &Receiver, +) -> Result> { + match rx_local_gen_state.try_recv() { + Ok(msg) => Ok(Some(msg)), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Disconnected) => Err(anyhow!( + "[rntimatcher.local_generator] rx_local_gen_state disconnected" + )), + } +} + +fn gen_handle_send_pattern( + socket: &UdpSocket, + destination: &str, + mut pattern: TrafficPattern, +) -> LocalGeneratorState { + match pattern.messages.pop_front() { + Some(msg) => { + thread::sleep(Duration::from_millis(msg.time_ms as u64)); + let _ = socket.send_to(&msg.payload, destination); + LocalGeneratorState::SendPattern(destination.to_string(), Box::new(pattern)) + } + None => LocalGeneratorState::PatternSent, + } +} + +fn send_final_state(tx_rntimatcher_state: &SyncSender) -> Result<()> { + Ok(tx_rntimatcher_state.send(RntiMatcherState::Stopped)?) } fn wait_for_running( - rx_app_state: &mut BusReader, - tx_rntimtacher_state: &SyncSender, + rx_app_state: &mut BusReader, + tx_rntimtacher_state: &SyncSender, ) -> Result<()> { match wait_until_running(rx_app_state) { Ok(_) => Ok(()), @@ -46,27 +430,291 @@ fn wait_for_running( } } -fn run( - mut rx_app_state: BusReader, - tx_rntimatcher_state: SyncSender, - _rx_dci: BusReader, - _tx_rnti: Bus, -) -> Result<()> { - tx_rntimatcher_state.send(WorkerState::Running(WorkerType::RntiMatcher))?; - wait_for_running(&mut rx_app_state, &tx_rntimatcher_state)?; +impl TrafficCollection { + pub fn update_from_cell_dci(&mut self, cell_dci: &NgScopeCellDci) { + // Ensure the cell_traffic entry exists + let cell_id = cell_dci.cell_id as u64; + let cell_traffic_collection = self.cell_traffic.entry(cell_id).or_default(); - loop { - /* */ - thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS)); - if check_not_stopped(&mut rx_app_state).is_err() { - break; + // Iterate over each RNTI entry in the CellDCI + for i in 0..cell_dci.nof_rnti as usize { + let rnti_dci = &cell_dci.rnti_list[i]; + // Ensure the UE traffic entry exists + let ue_traffic = cell_traffic_collection + .traffic + .entry(rnti_dci.rnti) + .or_default(); + + // Update the traffic for the specific TTI + let traffic = ue_traffic.traffic.entry(cell_dci.time_stamp).or_default(); + traffic.dl_bytes += rnti_dci.dl_tbs as u64; + traffic.ul_bytes += rnti_dci.ul_tbs as u64; + ue_traffic.total_dl_bytes += rnti_dci.dl_tbs as u64; + ue_traffic.total_ul_bytes += rnti_dci.ul_tbs as u64; } - /* */ - // TODO: Match rntis - // TODO: Generate upstream pattern? -> maybe in another thread + // Increment the nof_dci + if cell_dci.nof_rnti == 0 { + cell_traffic_collection.nof_empty_dci += 1; + } + cell_traffic_collection.nof_total_dci += 1; + + // Set timestamps + if cell_traffic_collection.first_dci_timestamp_us == 0 { + cell_traffic_collection.first_dci_timestamp_us = cell_dci.time_stamp; + cell_traffic_collection.last_dci_timestamp_us = cell_dci.time_stamp; + } + if cell_traffic_collection.last_dci_timestamp_us < cell_dci.time_stamp { + cell_traffic_collection.last_dci_timestamp_us = cell_dci.time_stamp; + } } - tx_rntimatcher_state.send(WorkerState::Stopped(WorkerType::RntiMatcher))?; - Ok(()) + /* Removes RNTIs by applying basic filter based on: + * + * MAX TOTAL UL + * MIN TOTAL UL + * MAX UL/DCI + * MIN OCCURENCES + * MEDIAN UL + * + * */ + fn apply_basic_filter(&mut self) -> BasicFilterStatistics { + let mut stats: BasicFilterStatistics = Default::default(); + let max_total_ul = + (BASIC_FILTER_MAX_TOTAL_UL_FACTOR * self.traffic_pattern_features.total_ul_bytes as f64).round() as u64; + let min_total_ul = + (BASIC_FILTER_MIN_TOTAL_UL_FACTOR * self.traffic_pattern_features.total_ul_bytes as f64).round() as u64; + let min_occurences = + (BASIC_FILTER_MIN_OCCURENCES_FACTOR * self.traffic_pattern_features.nof_packets as f64).round() as u64; + + // Determine which RNTIs to remove + let to_keep: HashMap> = self + .cell_traffic + .iter() + .map(|(&cell_id, cell_traffic)| { + let rntis_to_keep: HashSet = cell_traffic + .traffic + .iter() + /* MAX TOTAL UL */ + .filter(|(_, ue_traffic)| { + if ue_traffic.total_ul_bytes > max_total_ul { + stats.max_total_ul += 1; + false + } else { + true + } + }) + /* MIN TOTAL UL */ + .filter(|(_, ue_traffic)| { + if ue_traffic.total_ul_bytes < min_total_ul { + stats.min_total_ul += 1; + false + } else { + true + } + }) + /* MIN OCCURENCES */ + .filter(|(_, ue_traffic)| { + if (ue_traffic.traffic.len() as u64) < min_occurences { + stats.min_occurences += 1; + false + } else { + true + } + }) + /* MAX UL/DCI */ + .filter(|(_, ue_traffic)| { + let mut filtered = true; + for (_, tx_data) in ue_traffic.traffic.iter() { + if tx_data.ul_bytes > BASIC_FILTER_MAX_UL_PER_DCI { + stats.max_ul_per_dci += 1; + filtered = false; + break; + } + } + filtered + }) + /* ZERO MEDIAN */ + .filter(|(_, ue_traffic)| { + if ue_traffic.feature_ul_bytes_median_mean_variance().0 < 0.0 { + stats.zero_ul_median += 1; + false + } else { + true + } + }) + .map(|(&rnti, _)| rnti) + .collect(); + (cell_id, rntis_to_keep) + }) + .filter(|(_, rntis_to_keep)| !rntis_to_keep.is_empty()) + .collect(); + + print_debug(&format!( + "DEBUG [rntimatcher] apply basic filter: {:#?}", + stats + )); + + for (cell_id, rntis_to_keep) in to_keep { + self.cell_traffic + .get_mut(&cell_id) + .unwrap() + .traffic + .retain(|key, _| rntis_to_keep.contains(key)); + } + + stats + } + + /* + * cell_id -> { (rnti, distance ) } + * + * */ + pub fn find_best_matching_rnti(&self) -> HashMap { + let pattern_std_vec = &self.traffic_pattern_features.std_vec; + let pattern_feature_vec = &self.traffic_pattern_features.std_feature_vec; + /* Change this to use the functional approach */ + // feature_distance_functional(&self.cell_traffic, pattern_std_vec, pattern_feature_vec); + feature_distance_matrices(&self.cell_traffic, pattern_std_vec, pattern_feature_vec) + } } + +impl UeTraffic { + /* + * Feature vector, order matters: + * + * DCI count (occurences) + * Total UL bytes + * UL bytes median + * UL bytes mean + * UL bytes variance + * DCI timestamp delta median + * DCI timestamp delta mean + * DCI timestamp delta variance + * */ + pub fn generate_standardized_feature_vec(&self, std_vec: &[(f64, f64)]) -> Vec { + let mut non_std_feature_vec = vec![]; + let (ul_median, ul_mean, ul_variance) = self.feature_ul_bytes_median_mean_variance(); + let (tx_median, tx_mean, tx_variance) = self.feature_dci_time_delta_median_mean_variance(); + + non_std_feature_vec.push(self.feature_dci_count()); + non_std_feature_vec.push(self.feature_total_ul_bytes()); + non_std_feature_vec.push(ul_median); + non_std_feature_vec.push(ul_mean); + non_std_feature_vec.push(ul_variance); + non_std_feature_vec.push(tx_median); + non_std_feature_vec.push(tx_mean); + non_std_feature_vec.push(tx_variance); + + standardize_feature_vec(&non_std_feature_vec, std_vec) + } + + pub fn feature_total_ul_bytes(&self) -> f64 { + self.total_ul_bytes as f64 + } + + pub fn feature_dci_count(&self) -> f64 { + self.traffic.len() as f64 + } + + pub fn feature_dci_time_delta_median_mean_variance(&self) -> (f64, f64, f64) { + let mut sorted_timestamps: Vec = self.traffic.keys().cloned().collect(); + sorted_timestamps.sort_by(|a, b| a.partial_cmp(b).unwrap()); + let timestamp_deltas: Vec = sorted_timestamps + .windows(2) + .map(|window| (window[1] - window[0]) as f64) + .collect(); + + let (mean, variance) = calculate_mean_variance(×tamp_deltas); + let median = calculate_median(×tamp_deltas); + + (median, mean, variance) + } + + pub fn feature_ul_bytes_median_mean_variance(&self) -> (f64, f64, f64) { + let ul_bytes: Vec = self + .traffic + .values() + .map(|ul_dl_traffic| ul_dl_traffic.ul_bytes as f64) + .collect(); + let (mean, variance) = calculate_mean_variance(&ul_bytes); + let median = calculate_median(&ul_bytes); + + (median, mean, variance) + } +} + +#[derive(Clone, Debug, Default)] +struct BasicFilterStatistics { + pub max_total_ul: u64, + pub min_total_ul: u64, + pub max_ul_per_dci: u64, + pub min_occurences: u64, + pub zero_ul_median: u64, +} + +fn feature_distance_functional( + traffic: &HashMap, + pattern_std_vec: &[(f64, f64)], + pattern_feature_vec: &[f64] +) -> HashMap { + traffic.iter() + .map(|(&cell_id, cell_traffic)| { + let mut rnti_and_distance: Vec<(u16, f64)> = cell_traffic.traffic + .iter() + .map(|(&rnti, ue_traffic)| { + ( + rnti, + calculate_weighted_euclidean_distance( + pattern_feature_vec, + &ue_traffic.generate_standardized_feature_vec(pattern_std_vec), + &MATCHING_WEIGHTINGS, + ), + ) + }) + .collect(); + rnti_and_distance.sort_by(|a, b| a.1.abs().partial_cmp(&b.1.abs()).unwrap()); + (cell_id, *rnti_and_distance.first().unwrap()) + }) + .collect() +} + +fn feature_distance_matrices( + traffic: &HashMap, + pattern_std_vec: &[(f64, f64)], + pattern_feature_vec: &[f64] +) -> HashMap { + let num_features = pattern_std_vec.len(); + let weightings_vector = DVector::from_row_slice(&MATCHING_WEIGHTINGS); + traffic.iter() + .map(|(&cell_id, cell_traffic)| { + let standardized_feature_vecs: Vec> = cell_traffic.traffic + .values() + .map(|ue_traffic| { + ue_traffic.generate_standardized_feature_vec(pattern_std_vec) + }) + .collect(); + + let num_vectors = standardized_feature_vecs.len(); + + let data: Vec = standardized_feature_vecs.into_iter().flatten().collect(); + let feature_matrix: DMatrix = DMatrix::from_row_slice(num_vectors, num_features, &data); + let pattern_feature_matrix = DMatrix::from_fn(num_vectors, num_features, |_, r| pattern_feature_vec[r]); + let euclidean_distances = calculate_weighted_euclidean_distance_matrix( + &pattern_feature_matrix, + &feature_matrix, + &weightings_vector); + + let mut rnti_and_distance: Vec<(u16, f64)> = cell_traffic.traffic.keys() + .cloned() + .zip(euclidean_distances.iter().cloned()) + .collect(); + + rnti_and_distance.sort_by(|a, b| a.1.abs().partial_cmp(&b.1.abs()).unwrap()); + + (cell_id, *rnti_and_distance.first().unwrap()) + }) + .collect() +} + + diff --git a/src/logic/traffic_patterns.rs b/src/logic/traffic_patterns.rs new file mode 100644 index 0000000..9f0f347 --- /dev/null +++ b/src/logic/traffic_patterns.rs @@ -0,0 +1,987 @@ +use std::collections::VecDeque; + +use clap::ValueEnum; +use serde::{Deserialize, Serialize}; + +use crate::util::{calculate_mean_variance, calculate_median, standardize_feature_vec}; + +#[derive( + Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug, Serialize, Deserialize, Default, +)] +pub enum RntiMatchingTrafficPatternType { + #[default] + A, /* t: 24 sec, 1KB packets, 1ms interval => ? Mbit/s */ + B, /* t: 24 sec, 2KB packets, 5ms interval => ? Mbit/s */ + C, /* t: 24 sec, 4KB packets, 5ms interval => ? Mbit/s */ + D, /* t: 24 sec, 8KB packets, 5ms interval => ~ 5.8 Mbit/s */ + E, /* t: 24 sec, 16KB packets, 10ms interval => ? Mbit/s */ + F, /* t: 24 sec, 32KB packets, 10ms interval => ? Mbit/s */ + G, + H, + I, + J, + K, + L, + M, + N, + O, + P, /* 5s "small" + 5s "big" */ + Q, /* test "no" traffic */ + R, /* test partial traffic */ + S, /* 0.6 MB/s and 1.6 MB/s */ + T, /* 6.1 MB/s */ + U, /* ~300 KB/s for 20 seconds */ + V, /* 12s, increasing packet size from 1B to 10000B */ + W, /* same as V, but larger packet size */ + X, /* For some reason, results only in ~130KB/s */ + Y, /* Like U, increment but more t ~ 22sec */ + Z, /* t: 24 sec, 32KB packets, 3ms interval => ? Mbit/s */ +} + +#[derive(Clone, Debug, PartialEq, Default)] +pub struct TrafficPattern { + pub messages: VecDeque, + pub pattern_type: RntiMatchingTrafficPatternType, + /* Standardization Vector: (mean, std deviation) */ + pub std_vec: Vec<(f64, f64)>, +} + +/* TrafficPatternFeatures + * + * This struct can be attached to other structs when those shall not + * contain the whole TrafficPattern. + * */ +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] +pub struct TrafficPatternFeatures { + pub pattern_type: RntiMatchingTrafficPatternType, + /* Standardization Vector: (mean, std deviation) */ + pub std_vec: Vec<(f64, f64)>, + /* Standardized feature vector */ + pub std_feature_vec: Vec, + pub total_ul_bytes: u64, + pub nof_packets: u64, +} + +#[derive(Clone, Debug, PartialEq)] +pub struct TrafficPatternMessage { + pub time_ms: u16, + pub payload: Vec, +} + +impl RntiMatchingTrafficPatternType { + pub fn generate_pattern(&self) -> TrafficPattern { + match self { + RntiMatchingTrafficPatternType::A => pattern_a(), + RntiMatchingTrafficPatternType::B => pattern_b(), + RntiMatchingTrafficPatternType::C => pattern_c(), + RntiMatchingTrafficPatternType::D => pattern_d(), + RntiMatchingTrafficPatternType::E => pattern_e(), + RntiMatchingTrafficPatternType::F => pattern_f(), + RntiMatchingTrafficPatternType::G => pattern_g(), + RntiMatchingTrafficPatternType::H => pattern_h(), + RntiMatchingTrafficPatternType::I => pattern_i(), + RntiMatchingTrafficPatternType::J => pattern_j(), + RntiMatchingTrafficPatternType::K => pattern_k(), + RntiMatchingTrafficPatternType::L => pattern_l(), + RntiMatchingTrafficPatternType::M => pattern_m(), + RntiMatchingTrafficPatternType::N => pattern_n(), + RntiMatchingTrafficPatternType::O => pattern_o(), + RntiMatchingTrafficPatternType::P => pattern_p(), + RntiMatchingTrafficPatternType::Q => pattern_q(), + RntiMatchingTrafficPatternType::R => pattern_r(), + RntiMatchingTrafficPatternType::S => pattern_s(), + RntiMatchingTrafficPatternType::T => pattern_t(), + RntiMatchingTrafficPatternType::U => pattern_u(), + RntiMatchingTrafficPatternType::V => pattern_v(), + RntiMatchingTrafficPatternType::W => pattern_w(), + RntiMatchingTrafficPatternType::X => pattern_x(), + RntiMatchingTrafficPatternType::Y => pattern_y(), + RntiMatchingTrafficPatternType::Z => pattern_z(), + } + } +} + + +impl TrafficPatternFeatures { + pub fn from_traffic_pattern(pattern: &TrafficPattern) -> TrafficPatternFeatures { + TrafficPatternFeatures { + pattern_type: pattern.pattern_type, + std_vec: pattern.std_vec.clone(), + std_feature_vec: pattern.generate_standardized_feature_vec(), + total_ul_bytes: pattern.total_ul_bytes(), + nof_packets: pattern.nof_packets(), + } + } +} + +impl TrafficPattern { + pub fn nof_packets(&self) -> u64 { + self.messages.len() as u64 + } + + pub fn total_ul_bytes(&self) -> u64 { + self.messages + .iter() + .map(|msg| msg.payload.len() as u64) + .sum() + } + + pub fn total_time_ms(&self) -> u64 { + self.messages.iter().map(|msg| msg.time_ms as u64).sum() + } + + /* + * Feature vector, order matters: + * + * DCI count (occurences) + * Total UL bytes + * UL bytes median + * UL bytes mean + * UL bytes variance + * DCI timestamp delta median + * DCI timestamp delta mean + * DCI timestamp delta variance + * */ + pub fn generate_standardized_feature_vec(&self) -> Vec { + let packet_sizes: Vec = self.messages + .iter() + .map(|t| t.payload.len() as f64) + .collect::>(); + let time_deltas: Vec = self.messages + .iter() + .map(|m| m.time_ms as f64) + .collect::>(); + + let (ul_mean, ul_variance) = calculate_mean_variance(&packet_sizes); + let ul_median = calculate_median(&packet_sizes); + let (tx_mean, tx_variance) = calculate_mean_variance(&time_deltas); + let tx_median = calculate_median(&time_deltas); + + let non_std_feature_vec: Vec = vec![ + packet_sizes.len() as f64, + self.total_ul_bytes() as f64, + ul_median, + ul_mean, + ul_variance, + tx_median, + tx_mean, + tx_variance, + ]; + + standardize_feature_vec(&non_std_feature_vec, &self.std_vec) + } +} + +fn generate_incremental_pattern( + interval_ms: u16, + max_pow: u32, + time_ms: u32, + pause_time_ms: u16, +) -> VecDeque { + let mut messages: VecDeque = VecDeque::::new(); + let max_increment: usize = usize::pow(2, max_pow); + for i in 0..(time_ms / interval_ms as u32) { + let increment: usize = if i < max_pow { + usize::pow(2, i) + } else { + max_increment + }; + messages.push_back(TrafficPatternMessage { + time_ms: interval_ms, + payload: vec![0xA0; 500 + increment], + }) + } + messages.push_back(TrafficPatternMessage { + time_ms: pause_time_ms, + payload: vec![0xA0; max_increment], + }); + messages +} + +/* WARNING: The total time of a traffic pattern must be > 0 + * + * After sending the pattern, the DCI messages are collected + * for the total time * MATCHING_TRAFFIC_PATTERN_TIME_OVERLAP_FACTOR + * */ + +fn pattern_a() -> TrafficPattern { + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::A, + messages: generate_incremental_pattern(1, 10, 20000, 4000), + std_vec: vec![ + (4298.972, 655.833), + (8977880.222, 1309843.824), + (914.222, 100.852), + (2098.996, 192.174), + (14021412.605, 1704132.447), + (4694.583, 654.029), + (6267.872, 969.600), + (294670529.455, 312539196.447), + ], + } +} + +fn pattern_b() -> TrafficPattern { + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::B, + messages: generate_incremental_pattern(5, 11, 20000, 4000), + std_vec: vec![ + (5112.829, 687.871), + (35530994.057, 5229654.427), + (1893.257, 1701.453), + (6958.369, 595.488), + (70584056.539, 15275088.919), + (4216.443, 592.529), + (5257.455, 742.506), + (29897761.426, 38041362.647), + ], + } +} + +fn pattern_c() -> TrafficPattern { + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::C, + messages: generate_incremental_pattern(5, 12, 20000, 4000), + std_vec: vec![ + (5506.000, 370.136), + (46840983.314, 3698612.070), + (2457.829, 1445.095), + (8508.389, 366.796), + (88656831.248, 8468478.266), + (3925.357, 157.284), + (4794.976, 294.362), + (51687966.768, 102163561.246), + ], + } +} + +fn pattern_d() -> TrafficPattern { + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::D, + messages: generate_incremental_pattern(5, 13, 20000, 4000), + std_vec: vec![ + (4033.685, 1241.680), + (40102402.815, 12125065.759), + (9740.000, 4245.484), + (10010.720, 892.519), + (95905843.575, 27841845.809), + (3720.194, 618.443), + (7359.077, 2938.561), + (3492377743.477, 13955565407.330), + ], + } +} + +fn pattern_e() -> TrafficPattern { + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::E, + messages: generate_incremental_pattern(10, 14, 20000, 4000), + std_vec: vec![ + (7076.078, 1322.567), + (84959693.091, 16536550.367), + (13803.740, 1147.954), + (12002.835, 465.873), + (51426415.654, 10688602.674), + (2408.825, 434.685), + (3860.716, 721.621), + (81868020.141, 107397581.426), + ], + } +} + +fn pattern_f() -> TrafficPattern { + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::F, + messages: generate_incremental_pattern(10, 15, 20000, 4000), + std_vec: vec![ + (4963.692, 1691.421), + (60475162.154, 18629696.408), + (13202.769, 1750.840), + (12332.931, 881.872), + (64867114.805, 20203503.942), + (2344.231, 301.511), + (5792.186, 2363.042), + (1210676863.405, 1269774046.111), + ], + } +} + +fn pattern_g() -> TrafficPattern { + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::G, + messages: VecDeque::from_iter([ + /* to be determined */ + TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA0; 8192], + }, + TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA1; 8192], + }, + TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA2; 8192], + }, + TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA3; 8192], + }, + TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA4; 8192], + }, + TrafficPatternMessage { + time_ms: 500, + payload: vec![0xA4; 8192], + }, + TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA1; 8192], + }, + TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA2; 8192], + }, + TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA3; 8192], + }, + TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA4; 8192], + }, + TrafficPatternMessage { + time_ms: 500, + payload: vec![0xA4; 512], + }, + ]), + ..Default::default() + } +} + +fn pattern_h() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..1000 { + messages.push_back(TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA0; 512], + }) + } + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::H, + messages, + ..Default::default() + } +} + +fn pattern_i() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + + let amplitude = 5000.0; + let vertical_shift = 10000.0; + let angular_frequency = std::f64::consts::PI; // omega = pi for T = 2s + let time_interval_ms = 50; // Constant inter-arrival time in milliseconds + + for i in 0..200 { + let t = i as f64 * time_interval_ms as f64 / 1000.0; + let packet_size = + (amplitude * (angular_frequency * t).sin() + vertical_shift).round() as usize; + messages.push_back(TrafficPatternMessage { + time_ms: time_interval_ms, + payload: vec![0xA0; packet_size], + }); + } + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::I, + messages, + ..Default::default() + } +} + +fn pattern_j() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64], + }) + } + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64], + }) + } + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::J, + messages, + ..Default::default() + } +} + +/* K and J do not show a pattern similar to J -> maybe, 250ms is too short */ +fn pattern_k() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..250 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..250 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64], + }) + } + for _ in 0..250 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..250 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64], + }) + } + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::K, + messages, + ..Default::default() + } +} + +fn pattern_l() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..250 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + + messages.push_back(TrafficPatternMessage { + time_ms: 250, + payload: vec![0xA0; 32], + }); + + for _ in 0..250 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + + messages.push_back(TrafficPatternMessage { + time_ms: 250, + payload: vec![0xA0; 32], + }); + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::L, + messages, + ..Default::default() + } +} + +fn pattern_m() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64], + }) + } + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64], + }) + } + + messages.push_back(TrafficPatternMessage { + time_ms: 250, + payload: vec![0xA0; 16000], + }); + + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64], + }) + } + + messages.push_back(TrafficPatternMessage { + time_ms: 250, + payload: vec![0xA0; 16000], + }); + + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64], + }) + } + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::M, + messages, + ..Default::default() + } +} + +fn pattern_n() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64], + }) + } + + for _ in 0..50 { + messages.push_back(TrafficPatternMessage { + time_ms: 20, + payload: vec![0xA0; 512], + }) + } + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::N, + messages, + ..Default::default() + } +} + +fn pattern_o() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64], + }) + } + + for _ in 0..50 { + messages.push_back(TrafficPatternMessage { + time_ms: 20, + payload: vec![0xA0; 512], + }) + } + + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64], + }) + } + + for _ in 0..50 { + messages.push_back(TrafficPatternMessage { + time_ms: 20, + payload: vec![0xA0; 512], + }) + } + + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64], + }) + } + + for _ in 0..50 { + messages.push_back(TrafficPatternMessage { + time_ms: 20, + payload: vec![0xA0; 512], + }) + } + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::O, + messages, + ..Default::default() + } +} + +fn pattern_p() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..1000 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 8], + }) + } + for _ in 0..1000 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 16], + }) + } + for _ in 0..1000 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..1000 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + for _ in 0..1000 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 32], + }) + } + + for _ in 0..10 { + messages.push_back(TrafficPatternMessage { + time_ms: 100, + payload: vec![0xA0; 16000], + }) + } + for _ in 0..10 { + messages.push_back(TrafficPatternMessage { + time_ms: 100, + payload: vec![0xA0; 16000], + }) + } + for _ in 0..10 { + messages.push_back(TrafficPatternMessage { + time_ms: 100, + payload: vec![0xA0; 16000], + }) + } + for _ in 0..10 { + messages.push_back(TrafficPatternMessage { + time_ms: 100, + payload: vec![0xA0; 16000], + }) + } + for _ in 0..10 { + messages.push_back(TrafficPatternMessage { + time_ms: 100, + payload: vec![0xA0; 16000], + }) + } + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::P, + messages, + ..Default::default() + } +} + +fn pattern_q() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + messages.push_back(TrafficPatternMessage { + time_ms: 100, + payload: vec![0xA0; 16000], + }); + + messages.push_back(TrafficPatternMessage { + time_ms: 10000, + payload: vec![0xA0; 16000], + }); + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::Q, + messages, + ..Default::default() + } +} + +fn pattern_r() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..5000 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 16], + }) + } + + messages.push_back(TrafficPatternMessage { + time_ms: 5000, + payload: vec![0xA0; 16000], + }); + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::R, + messages, + ..Default::default() + } +} + +fn pattern_s() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA0; 16000], + }) + } + + for _ in 0..500 { + messages.push_back(TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA0; 32000], + }) + } + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::S, + messages, + ..Default::default() + } +} + +fn pattern_t() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..8000 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 1024], + }) + } + messages.push_back(TrafficPatternMessage { + time_ms: 2000, + payload: vec![0xA0; 64000], + }); + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::T, + messages, + ..Default::default() + } +} + +fn pattern_u() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..10000 { + messages.push_back(TrafficPatternMessage { + time_ms: 2, + payload: vec![0xA0; 16384], + }) + } + + messages.push_back(TrafficPatternMessage { + time_ms: 1000, + payload: vec![0xA0; 16384], + }); + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::U, + messages, + ..Default::default() + } +} + +fn pattern_v() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for i in 0..1000 { + messages.push_back(TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA0; 200 + i], + }) + } + + for msg in pattern_i().messages.iter() { + messages.push_back(msg.clone()); + } + + messages.push_back(TrafficPatternMessage { + time_ms: 2000, + payload: vec![0xA0; 64000], + }); + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::V, + messages, + ..Default::default() + } +} + +fn pattern_w() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for i in 0..1000 { + messages.push_back(TrafficPatternMessage { + time_ms: 10, + payload: vec![0xA0; 500 + 10 * i], + }) + } + + for msg in pattern_i().messages.iter() { + messages.push_back(msg.clone()); + } + + messages.push_back(TrafficPatternMessage { + time_ms: 2000, + payload: vec![0xA0; 64000], + }); + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::W, + messages, + ..Default::default() + } +} + +fn pattern_x() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for _ in 0..20000 { + messages.push_back(TrafficPatternMessage { + time_ms: 1, + payload: vec![0xA0; 64000], + }) + } + + messages.push_back(TrafficPatternMessage { + time_ms: 1000, + payload: vec![0xA0; 64000], + }); + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::X, + messages, + ..Default::default() + } +} + +fn pattern_y() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + for i in 0..4000 { + let extra: usize = if i < 15 { + usize::pow(2, i) + } else { + usize::pow(2, 15) + }; + messages.push_back(TrafficPatternMessage { + time_ms: 5, + payload: vec![0xA0; 500 + extra], + }) + } + + messages.push_back(TrafficPatternMessage { + time_ms: 2000, + payload: vec![0xA0; 16384], + }); + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::Y, + messages, + ..Default::default() + } +} + +fn pattern_z() -> TrafficPattern { + let mut messages: VecDeque = VecDeque::::new(); + /* to be determined */ + let max_pow = 15; + let max_increment = usize::pow(2, max_pow); + for i in 0..6000 { + let increment: usize = if i < max_pow { + usize::pow(2, i) + } else { + max_increment + }; + messages.push_back(TrafficPatternMessage { + time_ms: 3, + payload: vec![0xA0; 500 + increment], + }) + } + + messages.push_back(TrafficPatternMessage { + time_ms: 4000, + payload: vec![0xA0; max_increment], + }); + + TrafficPattern { + pattern_type: RntiMatchingTrafficPatternType::Z, + messages, + ..Default::default() + } +} diff --git a/src/main.rs b/src/main.rs index f46a4a3..94bee84 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,11 @@ use anyhow::{anyhow, Result}; use bus::Bus; +use casual_logger::{Level, Log}; use std::collections::HashSet; use std::error::Error; +use std::fs; use std::sync::atomic::AtomicBool; -use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError}; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::Arc; use std::thread::{self, JoinHandle}; use std::time::Duration; @@ -18,20 +20,42 @@ use logic::cell_sink::{deploy_cell_sink, CellSinkArgs}; use logic::cell_source::{deploy_cell_source, CellSourceArgs}; use logic::ngscope_controller::{deploy_ngscope_controller, NgControlArgs}; use logic::rnti_matcher::{deploy_rnti_matcher, RntiMatcherArgs}; +use logic::WorkerChannel; use logic::{ - MessageCellInfo, MessageDci, MessageRnti, MainState, WorkerState, WorkerType, - NUM_OF_WORKERS, DEFAULT_WORKER_SLEEP_MS, BUS_SIZE_APP_STATE, BUS_SIZE_DCI, BUS_SIZE_CELL_INFO, BUS_SIZE_RNTI, CHANNEL_SYNC_SIZE + GeneralState, MainState, MessageCellInfo, MessageDci, MessageRnti, NgControlState, + RntiMatcherState, SinkState, SourceState, WorkerState, BUS_SIZE_APP_STATE, BUS_SIZE_CELL_INFO, + BUS_SIZE_DCI, BUS_SIZE_RNTI, CHANNEL_SYNC_SIZE, WORKER_SLEEP_LONG_MS, }; use parse::Arguments; -use util::{is_notifier, prepare_sigint_notifier}; +use util::{determine_process_id, is_notifier, prepare_sigint_notifier, print_info, set_debug}; + +struct CombinedReceivers { + pub sink: Receiver, + pub source: Receiver, + pub rntimatcher: Receiver, + pub ngcontrol: Receiver, +} + +struct CombinedSenders { + pub sink: SyncSender, + pub source: SyncSender, + pub rntimatcher: SyncSender, + pub ngcontrol: SyncSender, +} + +impl CombinedReceivers { + fn print_worker_messages(&self) { + let _ = &self.source.worker_print_on_recv(); + let _ = &self.sink.worker_print_on_recv(); + let _ = &self.ngcontrol.worker_print_on_recv(); + let _ = &self.rntimatcher.worker_print_on_recv(); + } +} fn deploy_app( - tx_app_state: &mut Bus, + tx_app_state: &mut Bus, app_args: &Arguments, - tx_sink_state: SyncSender, - tx_source_state: SyncSender, - tx_ngcontrol_state: SyncSender, - tx_rntimatcher_state: SyncSender, + all_tx_states: CombinedSenders, ) -> Result>> { let mut tx_dci: Bus = Bus::::new(BUS_SIZE_DCI); let mut tx_cell_info: Bus = Bus::::new(BUS_SIZE_CELL_INFO); @@ -39,27 +63,28 @@ fn deploy_app( let sink_args = CellSinkArgs { rx_app_state: tx_app_state.add_rx(), - tx_sink_state, + tx_sink_state: all_tx_states.sink, rx_cell_info: tx_cell_info.add_rx(), rx_dci: tx_dci.add_rx(), rx_rnti: tx_rnti.add_rx(), }; let rntimatcher_args = RntiMatcherArgs { rx_app_state: tx_app_state.add_rx(), - tx_rntimatcher_state, + tx_rntimatcher_state: all_tx_states.rntimatcher, + app_args: app_args.clone(), rx_dci: tx_dci.add_rx(), tx_rnti, }; let ngcontrol_args = NgControlArgs { rx_app_state: tx_app_state.add_rx(), - tx_ngcontrol_state, + tx_ngcontrol_state: all_tx_states.ngcontrol, app_args: app_args.clone(), rx_cell_info: tx_cell_info.add_rx(), tx_dci, }; let source_args = CellSourceArgs { rx_app_state: tx_app_state.add_rx(), - tx_source_state, + tx_source_state: all_tx_states.source, app_args: app_args.clone(), tx_cell_info, }; @@ -73,20 +98,41 @@ fn deploy_app( Ok(tasks) } +fn check_running(rx_state: &Receiver) -> Result> { + if let Ok(Some(msg)) = rx_state.worker_try_recv_general_state() { + match msg { + GeneralState::Running => { + print_info(&format!(" ✓ {:?} running", T::worker_name())); + return Ok(Some(())); + } + GeneralState::Stopped => { + print_info(&format!(" ✗ {:?} stopped", T::worker_name())); + return Err(anyhow!( + "Waiting for all workers to be running, but {:?} sent GeneralState::Stopped", + T::worker_name(), + )); + } + GeneralState::Unknown => { + return Err(anyhow!( + "Waiting for all workers to be running, but {:?} sent: {:?}", + T::worker_name(), + msg, + )) + } + } + } + Ok(None) +} + fn wait_all_running( sigint_notifier: &Arc, - rx_states: [&Receiver; NUM_OF_WORKERS], + all_rx_states: &CombinedReceivers, ) -> Result<()> { - println!("[ ] waiting for all threads to become ready"); + print_info("[ ] waiting for all threads to become ready"); - let mut waiting_for: HashSet = vec![ - WorkerType::CellSource, - WorkerType::CellSink, - WorkerType::NgScopeController, - WorkerType::RntiMatcher, - ] - .into_iter() - .collect(); + let mut waiting_for: HashSet<&str> = vec!["source", "sink", "rntimatcher", "ngcontrol"] + .into_iter() + .collect(); while !waiting_for.is_empty() { if is_notifier(sigint_notifier) { @@ -94,114 +140,109 @@ fn wait_all_running( "SIGINT while waiting for all workers to be running" )); } - for rx_state in rx_states.iter() { - match rx_state.try_recv() { - Ok(msg) => match msg { - WorkerState::Running(worker) => { - println!(" ✓ {:?} running", worker); - waiting_for.remove(&worker); - } - WorkerState::Stopped(worker) => { - println!(" ✗ {:?} stopped", worker); - waiting_for.remove(&worker); - } - WorkerState::Specific(worker, state) => { - return Err(anyhow!( - "Waiting for all workers to be running, but {:?} sent: {:?}", - worker, - state - )) - } - }, - Err(TryRecvError::Empty) => {} - Err(TryRecvError::Disconnected) => { - return Err(anyhow!( - "Waiting for all workers to be running, \ - but a channel disconnected. Left workers: {:?}", - waiting_for - )); - } + if waiting_for.contains("source") { + if let Ok(Some(_)) = check_running(&all_rx_states.source) { + waiting_for.remove("source"); + } + } + if waiting_for.contains("sink") { + if let Ok(Some(_)) = check_running(&all_rx_states.sink) { + waiting_for.remove("sink"); + } + } + if waiting_for.contains("rntimatcher") { + if let Ok(Some(_)) = check_running(&all_rx_states.rntimatcher) { + waiting_for.remove("rntimatcher"); + } + } + if waiting_for.contains("ngcontrol") { + if let Ok(Some(_)) = check_running(&all_rx_states.ngcontrol) { + waiting_for.remove("ngcontrol"); } } } - println!("[✓] waiting for all threads to become ready"); + print_info("[✓] waiting for all threads to become ready"); + Ok(()) +} + +fn init_logger() -> Result<()> { + fs::create_dir_all("./.logs")?; + Log::set_file_name("./.logs/log"); + Log::set_level(Level::Debug); Ok(()) } fn main() -> Result<(), Box> { - println!("Hello, world!"); + init_logger()?; + print_info("Hello, world!"); let args: Arguments = Arguments::build()?; + set_debug(args.verbose.unwrap()); let sigint_notifier = prepare_sigint_notifier()?; - let mut tx_app_state = Bus::::new(BUS_SIZE_APP_STATE); - let (tx_sink_state, rx_sink_state) = sync_channel::(CHANNEL_SYNC_SIZE); - let (tx_source_state, rx_source_state) = sync_channel::(CHANNEL_SYNC_SIZE); - let (tx_ngcontrol_state, rx_ngcontrol_state) = sync_channel::(CHANNEL_SYNC_SIZE); - let (tx_rntimatcher_state, rx_rntimatcher_state) = sync_channel::(CHANNEL_SYNC_SIZE); - let all_rx_states = [ - &rx_source_state, - &rx_sink_state, - &rx_ngcontrol_state, - &rx_rntimatcher_state, - ]; - let tasks = deploy_app( - &mut tx_app_state, - &args, - tx_sink_state, - tx_source_state, - tx_ngcontrol_state, - tx_rntimatcher_state, - )?; + let mut tx_app_state = Bus::::new(BUS_SIZE_APP_STATE); + let (sink_tx, sink_rx) = sync_channel::(CHANNEL_SYNC_SIZE); + let (source_tx, source_rx) = sync_channel::(CHANNEL_SYNC_SIZE); + let (rntimatcher_tx, rntimatcher_rx) = sync_channel::(CHANNEL_SYNC_SIZE); + let (ngcontrol_tx, ngcontrol_rx) = sync_channel::(CHANNEL_SYNC_SIZE); + let all_tx_states = CombinedSenders { + sink: sink_tx, + source: source_tx, + rntimatcher: rntimatcher_tx, + ngcontrol: ngcontrol_tx, + }; + let all_rx_states = CombinedReceivers { + sink: sink_rx, + source: source_rx, + rntimatcher: rntimatcher_rx, + ngcontrol: ngcontrol_rx, + }; - wait_all_running(&sigint_notifier, all_rx_states)?; + let tasks = deploy_app(&mut tx_app_state, &args, all_tx_states)?; - tx_app_state.broadcast(WorkerState::Running(WorkerType::Main)); + wait_all_running(&sigint_notifier, &all_rx_states)?; + print_info(&format!("[main]: \t\tPID {:?}", determine_process_id())); let mut app_state: MainState = MainState::Running; + tx_app_state.broadcast(app_state); loop { /* */ - thread::sleep(Duration::from_millis(DEFAULT_WORKER_SLEEP_MS)); - if is_notifier(&sigint_notifier) && app_state != MainState::Stopping { + thread::sleep(Duration::from_millis(WORKER_SLEEP_LONG_MS)); + if is_notifier(&sigint_notifier) && app_state != MainState::Stopped { app_state = MainState::NotifyStop; } /* */ match app_state { - MainState::Running => print_worker_messages(all_rx_states), - MainState::Stopping => { - print_worker_messages(all_rx_states); + MainState::Running => app_state = handle_running(&mut tx_app_state, &all_rx_states)?, + MainState::Stopped => { + all_rx_states.print_worker_messages(); if tasks.iter().all(|task| task.is_finished()) { break; } - }, + } MainState::NotifyStop => { - tx_app_state.broadcast(WorkerState::Stopped(WorkerType::Main)); - app_state = MainState::Stopping; - }, + app_state = MainState::Stopped; + tx_app_state.broadcast(app_state); + } + _ => {} } } Ok(()) } -fn print_worker_messages( - rx_states: [&Receiver; NUM_OF_WORKERS], -) { - for rx_state in rx_states.iter() { - match rx_state.try_recv() { - Ok(resp) => { - let worker_type = match resp { - WorkerState::Running(w_type) => w_type, - WorkerState::Stopped(w_type) => w_type, - WorkerState::Specific(w_type, _) => w_type, - }; - println!("[main] message from {}: {:#?}", worker_type, resp); - } - Err(TryRecvError::Empty) => { /* No message received, continue the loop */ } - Err(TryRecvError::Disconnected) => {} // Handle disconnection if necessary - } +fn handle_running( + tx_app_state: &mut Bus, + rx_states: &CombinedReceivers, +) -> Result { + if let Some(NgControlState::SuccessfulTriggerResponse) = + rx_states.ngcontrol.worker_try_recv()? + { + tx_app_state.broadcast(MainState::UeConnectionReset); } + + Ok(MainState::Running) } diff --git a/src/ngscope/config.rs b/src/ngscope/config.rs index 22d119f..1cb97e8 100644 --- a/src/ngscope/config.rs +++ b/src/ngscope/config.rs @@ -5,7 +5,7 @@ use std::option::Option; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] #[allow(non_snake_case)] pub struct NgScopeConfigRfDev { - pub rf_freq: u64, + pub rf_freq: i64, pub N_id_2: i16, pub rf_args: String, pub nof_thread: u8, @@ -63,9 +63,9 @@ impl Default for NgScopeConfigRfDev { rf_args: "serial=3295B62".to_string(), nof_thread: 4, disable_plot: Some(true), - log_dl: None, - log_ul: None, - log_phich: None, + log_dl: Some(false), + log_ul: Some(false), + log_phich: Some(false), } } } @@ -85,10 +85,12 @@ impl Default for NgScopeConfig { fn default() -> Self { NgScopeConfig { nof_rf_dev: 1, - rnti: 0xffff, - remote_enable: None, - decode_single_ue: None, - decode_sib: None, + // It seems like the rnti influences the outcome even if it should just + // decode all RNTIs + rnti: 0x0001, + remote_enable: Some(true), + decode_single_ue: Some(false), + decode_sib: Some(true), dci_logs_path: None, sib_logs_path: None, dci_log_config: Some(NgScopeConfigDciLog::default()), @@ -174,7 +176,10 @@ dci_log_config = { }"#; const DEFAULT_CONFIG_STR: &str = r#"nof_rf_dev = 1; -rnti = 65535; +rnti = 1; +remote_enable = true; +decode_single_ue = false; +decode_sib = true; dci_log_config = { nof_cell = 1; log_ul = false; @@ -182,11 +187,14 @@ dci_log_config = { log_interval = 5; }; rf_config0 = { - rf_freq = 796000000; + rf_freq = 796000000L; N_id_2 = -1; rf_args = "serial=3295B62"; nof_thread = 4; disable_plot = true; + log_dl = false; + log_ul = false; + log_phich = false; };"#; #[test] diff --git a/src/ngscope/mod.rs b/src/ngscope/mod.rs index b82a616..019bcf2 100644 --- a/src/ngscope/mod.rs +++ b/src/ngscope/mod.rs @@ -13,9 +13,16 @@ pub mod types; use config::NgScopeConfig; use types::{Message, MessageType}; +use crate::util::print_info; + const TMP_NGSCOPE_CONFIG_PATH: &str = "./.tmp_ngscope_conf.cfg"; -pub fn start_ngscope>(exec_path: &str, config: &NgScopeConfig, proc_stdout: T, proc_stderr: T) -> Result { +pub fn start_ngscope>( + exec_path: &str, + config: &NgScopeConfig, + proc_stdout: T, + proc_stderr: T, +) -> Result { serde_libconfig::to_file(config, TMP_NGSCOPE_CONFIG_PATH)?; let child = Command::new(exec_path) .stdout(proc_stdout) @@ -32,7 +39,13 @@ pub fn stop_ngscope(child: &mut Child) -> Result<()> { } #[allow(dead_code)] -pub fn restart_ngscope>(child: &mut Child, exec_path: &str, config: &NgScopeConfig, proc_stdout: T, proc_stderr: T) -> Result { +pub fn restart_ngscope>( + child: &mut Child, + exec_path: &str, + config: &NgScopeConfig, + proc_stdout: T, + proc_stderr: T, +) -> Result { stop_ngscope(child)?; thread::sleep(Duration::from_secs(3)); let new_child = start_ngscope(exec_path, config, proc_stdout, proc_stderr)?; @@ -43,12 +56,14 @@ pub fn ngscope_recv_single_message_type(socket: &UdpSocket) -> Result<(MessageTy let mut buf = [0u8; types::NGSCOPE_REMOTE_BUFFER_SIZE]; loop { if let Ok((nof_recv, _)) = socket.recv_from(&mut buf) { - return types::ngscope_extract_packet(&buf[..nof_recv]) + return types::ngscope_extract_packet(&buf[..nof_recv]); } } } -pub fn ngscope_recv_single_message_type_non_blocking(socket: &UdpSocket) -> Result<(MessageType, Vec)> { +pub fn ngscope_recv_single_message_type_non_blocking( + socket: &UdpSocket, +) -> Result<(MessageType, Vec)> { let mut buf = [0u8; types::NGSCOPE_REMOTE_BUFFER_SIZE]; match socket.recv_from(&mut buf) { Ok((nof_recv, _)) => types::ngscope_extract_packet(&buf[..nof_recv]), @@ -83,7 +98,7 @@ pub fn ngscope_validate_server(socket: &UdpSocket, server_addr: &str) -> Result< | MessageType::Config => nof_messages_to_validate -= 1, MessageType::Exit => break, }, - Err(err) => println!("failed evaluating message, retrying... `{}`", err), + Err(err) => print_info(&format!("failed evaluating message, retrying... `{}`", err)), } } @@ -103,11 +118,12 @@ pub fn ngscope_validate_server_check(socket: &UdpSocket) -> Result> { let msg_type = ngscope_recv_single_message_type_non_blocking(socket); match msg_type { Ok((msg_type, _)) => match msg_type { - MessageType::Start - | MessageType::Dci - | MessageType::CellDci - | MessageType::Config => Ok(Some(())), - MessageType::Exit => Err(anyhow!("Received Exit from ngscope server during validation")), + MessageType::Start | MessageType::Dci | MessageType::CellDci | MessageType::Config => { + Ok(Some(())) + } + MessageType::Exit => Err(anyhow!( + "Received Exit from ngscope server during validation" + )), }, Err(_) => Ok(None), } diff --git a/src/ngscope/types.rs b/src/ngscope/types.rs index f926f19..5a3dfdb 100644 --- a/src/ngscope/types.rs +++ b/src/ngscope/types.rs @@ -38,25 +38,41 @@ pub enum Message { Exit, } +fn check_size(size: usize, bound: usize) -> Result<()> { + if size < bound { + return Err(anyhow!( + "Error converting bytes to NgScope message: bytes must be at least {}", + bound + )); + } + Ok(()) +} + impl Message { pub fn from_bytes(bytes: &[u8]) -> Result { - if bytes.len() < NGSCOPE_MESSAGE_TYPE_SIZE { - return Err(anyhow!( - "bytes must be at least {}", - NGSCOPE_MESSAGE_TYPE_SIZE - )); - } + check_size(bytes.len(), NGSCOPE_MESSAGE_TYPE_SIZE)?; let msg_type_bytes: [u8; NGSCOPE_MESSAGE_TYPE_SIZE] = bytes[..NGSCOPE_MESSAGE_TYPE_SIZE].try_into().unwrap(); - let _version_byte: u8 = bytes[NGSCOPE_MESSAGE_VERSION_POSITION]; - let content_bytes: &[u8] = &bytes[NGSCOPE_MESSAGE_CONTENT_POSITION..]; let msg: Message = match MessageType::from_bytes(&msg_type_bytes).unwrap() { MessageType::Start => Message::Start, - MessageType::Dci => Message::Dci(NgScopeUeDci::from_bytes(content_bytes.try_into()?)?), - MessageType::CellDci => Message::CellDci(Box::new(NgScopeCellDci::from_bytes( - content_bytes.try_into()?, - )?)), + MessageType::Dci => { + check_size(bytes.len(), NGSCOPE_MESSAGE_VERSION_POSITION + 1)?; + let _version_byte: u8 = bytes[NGSCOPE_MESSAGE_VERSION_POSITION]; + let content_bytes: &[u8] = &bytes[NGSCOPE_MESSAGE_CONTENT_POSITION..]; + Message::Dci(NgScopeUeDci::from_bytes(content_bytes.try_into()?)?) + } + MessageType::CellDci => { + check_size(bytes.len(), NGSCOPE_MESSAGE_VERSION_POSITION + 1)?; + let _version_byte: u8 = bytes[NGSCOPE_MESSAGE_VERSION_POSITION]; + let content_bytes: &[u8] = &bytes[NGSCOPE_MESSAGE_CONTENT_POSITION..]; + Message::CellDci(Box::new(NgScopeCellDci::from_bytes( + content_bytes.try_into()?, + )?)) + } MessageType::Config => { + check_size(bytes.len(), NGSCOPE_MESSAGE_VERSION_POSITION + 1)?; + let _version_byte: u8 = bytes[NGSCOPE_MESSAGE_VERSION_POSITION]; + let content_bytes: &[u8] = &bytes[NGSCOPE_MESSAGE_CONTENT_POSITION..]; Message::Config(NgScopeCellConfig::from_bytes(content_bytes.try_into()?)?) } MessageType::Exit => Message::Exit, diff --git a/src/parse.rs b/src/parse.rs index de6ea17..3773915 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -4,6 +4,8 @@ use clap::{Args, Command, CommandFactory, Parser, ValueEnum}; use serde::{Deserialize, Serialize}; use std::{default, error::Error, path::PathBuf}; +use crate::{logic::traffic_patterns::RntiMatchingTrafficPatternType, util::print_debug}; + #[derive(Debug, Clone, PartialEq, Parser, Serialize, Deserialize)] #[command(author, version, about, long_about = None, next_line_help = true)] #[command(propagate_version = true)] @@ -23,6 +25,9 @@ pub struct Arguments { #[command(flatten)] pub ngscope: Option, + #[command(flatten)] + pub rntimatching: Option, + /// Print additional information in the terminal #[arg(short('v'), long, required = false)] pub verbose: Option, @@ -84,7 +89,7 @@ pub struct NgScopeArgs { #[arg(long, required = false)] pub ng_path: Option, - /// Address of the UE Cell Tracker port to communicate with NG-Scope (addr:port) + /// Local UE Cell Tracker address to communicate with NG-Scope (addr:port) #[arg(long, required = false)] pub ng_local_addr: Option, @@ -92,9 +97,13 @@ pub struct NgScopeArgs { #[arg(long, required = false)] pub ng_server_addr: Option, - /// Filepath for stdout + stderr logging of the NG-Scope process + /// Filepath for stdout + stderr logging of the NG-Scope process #[arg(long, required = false)] pub ng_log_file: Option, + + /// If true, UE Cell Tracker starts its own NG-Scope instance + #[arg(long, required = false)] + pub ng_start_process: Option, } #[derive(Clone, Debug)] @@ -103,6 +112,29 @@ pub struct FlattenedNgScopeArgs { pub ng_local_addr: String, pub ng_server_addr: String, pub ng_log_file: Option, + pub ng_start_process: bool, +} + +#[derive(Args, Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct RntiMatchingArgs { + /// Local UE Cell Tracker address to generate RNTI matching traffic (addr:port) + #[arg(long, required = false)] + pub matching_local_addr: Option, + + /// Define which traffic pattern to use to fetch cell data + #[arg(long, value_enum, required = false)] + pub matching_traffic_pattern: Option, + + /// The destination address which the traffic pattern is sent to + #[arg(long, required = false)] + pub matching_traffic_destination: Option, +} + +#[derive(Clone, Debug)] +pub struct FlattenedRntiMatchingArgs { + pub matching_local_addr: String, + pub matching_traffic_pattern: RntiMatchingTrafficPatternType, + pub matching_traffic_destination: String, } impl default::Default for Arguments { @@ -121,9 +153,15 @@ impl default::Default for Arguments { }), ngscope: Some(NgScopeArgs { ng_path: Some("/dev_ws/dependencies/ng-scope/build_x86/ngscope/src/".to_string()), - ng_local_addr: Some("0.0.0.0:8888".to_string()), + ng_local_addr: Some("0.0.0.0:9191".to_string()), ng_server_addr: Some("0.0.0.0:6767".to_string()), ng_log_file: Some("./.ng_scope_log.txt".to_string()), + ng_start_process: Some(true), + }), + rntimatching: Some(RntiMatchingArgs { + matching_local_addr: Some("0.0.0.0:9292".to_string()), + matching_traffic_pattern: Some(RntiMatchingTrafficPatternType::A), + matching_traffic_destination: Some("1.1.1.1:53".to_string()), }), } } @@ -138,8 +176,7 @@ impl Arguments { let parsed_args = Arguments::parse(); match parsed_args.clone().get_config_file(app_name) { Ok(parsed_config_args) => { - let printed_args = parsed_config_args - .print_config_file(app_name)?; + let printed_args = parsed_config_args.print_config_file(app_name)?; Ok(printed_args) } Err(_) => { @@ -160,6 +197,7 @@ impl Arguments { self.milesight = self.milesight.or(config_file.milesight); self.devicepublisher = self.devicepublisher.or(config_file.devicepublisher); self.ngscope = self.ngscope.or(config_file.ngscope); + self.rntimatching = self.rntimatching.or(config_file.rntimatching); self.verbose = self.verbose.or(config_file.verbose); Ok(self) @@ -176,10 +214,13 @@ impl Arguments { fn print_config_file(self, app_name: &str) -> Result> { if self.verbose.unwrap_or(true) { let file_path: PathBuf = confy::get_configuration_file_path(app_name, None)?; - println!("Configuration file: '{}'", file_path.display()); + print_debug(&format!( + "DEBUG [parse] Configuration file: '{}'", + file_path.display() + )); let yaml: String = serde_yaml::to_string(&self)?; - println!("\t{}", yaml.replace('\n', "\n\t")); + print_debug(&format!("\t{}", yaml.replace('\n', "\n\t"))); } Ok(self) @@ -216,7 +257,18 @@ impl FlattenedNgScopeArgs { ng_path: ng_args.ng_path.unwrap(), ng_local_addr: ng_args.ng_local_addr.unwrap(), ng_server_addr: ng_args.ng_server_addr.unwrap(), + ng_start_process: ng_args.ng_start_process.unwrap(), ng_log_file: ng_args.ng_log_file, }) } } + +impl FlattenedRntiMatchingArgs { + pub fn from_unflattened(rnti_args: RntiMatchingArgs) -> Result { + Ok(FlattenedRntiMatchingArgs { + matching_local_addr: rnti_args.matching_local_addr.unwrap(), + matching_traffic_pattern: rnti_args.matching_traffic_pattern.unwrap(), + matching_traffic_destination: rnti_args.matching_traffic_destination.unwrap(), + }) + } +} diff --git a/src/util.rs b/src/util.rs index 846ae46..05b28d2 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,7 +1,16 @@ +#![allow(dead_code)] + use anyhow::{anyhow, Result}; +use casual_logger::{Level, Log}; +use lazy_static::lazy_static; +use nalgebra::{DMatrix, DVector}; +use std::fs::{File, OpenOptions}; +use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use crate::logic::rnti_matcher::TrafficCollection; + pub fn prepare_sigint_notifier() -> Result> { let notifier = Arc::new(AtomicBool::new(false)); let r = notifier.clone(); @@ -29,3 +38,211 @@ pub fn helper_json_pointer( )), } } + +pub fn determine_process_id() -> u64 { + /* Taken from: https://github.com/alecmocatta/palaver/blob/cc955a9d56f187fe57a2d042e4ff4120d50dc3a7/src/thread.rs#L23 */ + match Into::::into(nix::unistd::gettid()).try_into() { + Ok(res) => res, + Err(_) => std::process::id() as u64, + } +} + +pub fn print_dci(dci: crate::ngscope::types::NgScopeCellDci) { + Log::print_info(&format!( + "DEBUG: {:?} | {:03?} | {:03?} | {:08?} | {:08?} | {:03?} | {:03?}", + dci.nof_rnti, + dci.total_dl_prb, + dci.total_ul_prb, + dci.total_dl_tbs, + dci.total_ul_tbs, + dci.total_dl_reTx, + dci.total_ul_reTx + )); +} + +pub fn print_info(s: &str) { + Log::print_info(s) +} + +pub fn print_debug(s: &str) { + if is_debug() { + Log::print_debug(s) + } +} + +pub trait LogExt { + fn print_info(s: &str); + fn print_debug(s: &str); +} +impl LogExt for Log { + /// Info level logging and add print to stdout. + fn print_info(s: &str) { + if Log::enabled(Level::Info) { + println!("{}", s); + } + Log::infoln(s); + } + + fn print_debug(s: &str) { + if Log::enabled(Level::Debug) { + println!("{}", s); + } + Log::debugln(s); + } +} + +pub fn log_rnti_matching_traffic( + file_path: &str, + traffic_collection: &TrafficCollection, +) -> Result<()> { + print_debug(&format!( + "DEBUG [rntimatcher] going to write traffic to file: {:?}", + file_path + )); + let json_string = serde_json::to_string(traffic_collection)?; + + let mut file: File = OpenOptions::new() + .create(true) + .append(true) + .open(file_path)?; + writeln!(file, "{}", json_string)?; + + Ok(()) +} + +lazy_static! { + static ref IS_DEBUG: AtomicBool = AtomicBool::new(false); +} + +pub fn set_debug(level: bool) { + IS_DEBUG.store(level, Ordering::SeqCst); +} + +pub fn is_debug() -> bool { + IS_DEBUG.load(Ordering::SeqCst) +} + +/* Feature Matching */ + +pub fn calculate_mean_variance(list: &[f64]) -> (f64, f64) { + let total_packets = list.len() as f64; + + if total_packets == 0.0 { + return (0.0, 0.0); + } + + let mean = list.iter().sum::() / total_packets; + + let variance = list + .iter() + .map(|&item| { + let diff = item - mean; + diff * diff + }) + .sum::() + / total_packets; + + (mean, variance) +} + +pub fn calculate_median(list: &[f64]) -> f64 { + let mut sorted_list: Vec = list.to_vec(); + sorted_list.sort_by(|a, b| a.partial_cmp(b).unwrap()); // Handle NaN values safely + let len = sorted_list.len(); + if len % 2 == 0 { + // If the length is even, return the average of the two middle elements + let mid1 = sorted_list[len / 2 - 1]; + let mid2 = sorted_list[len / 2]; + (mid1 + mid2) * 0.5 + } else { + // If the length is odd, return the middle element + sorted_list[len / 2] + } +} + +pub fn calculate_weighted_manhattan_distance( + vec_a: &[f64], + vec_b: &[f64], + weightings: &[f64], +) -> f64 { + assert_eq!( + vec_a.len(), + vec_b.len(), + "Calcuting Euclidean distance: Vectors must have the same length" + ); + assert_eq!( + vec_a.len(), + weightings.len(), + "Calcuting Euclidean distance: Vectors and weightings must have the same length" + ); + + vec_a + .iter() + .zip(vec_b.iter()) + .zip(weightings.iter()) + .fold(0.0, |acc, ((&a, &b), &w)| acc + w * (a - b).abs()) +} + +pub fn calculate_weighted_euclidean_distance( + vec_a: &[f64], + vec_b: &[f64], + weightings: &[f64], +) -> f64 { + assert_eq!( + vec_a.len(), + vec_b.len(), + "Calcuting Euclidean distance: Vectors must have the same length" + ); + assert_eq!( + vec_a.len(), + weightings.len(), + "Calcuting Euclidean distance: Vectors and weightings must have the same length" + ); + + let sum_of_squared_diff: f64 = vec_a + .iter() + .zip(vec_b.iter()) + .zip(weightings.iter()) + .map(|((&a, &b), &w)| w * (a - b).powi(2)) + .sum(); + + sum_of_squared_diff.sqrt() +} + +pub fn calculate_weighted_manhattan_distance_matrix( + matr_a: &DMatrix, + matr_b: &DMatrix, + weightings: &DVector, +) -> DVector { + assert_eq!( + (matr_a.nrows(), matr_a.ncols()), + (matr_b.nrows(), matr_b.ncols()), + "Calcuting Euclidean distance: Matrices must have the same dimensions" + ); + let diff_matrix = (matr_a - matr_b).abs(); + diff_matrix * weightings +} + +pub fn calculate_weighted_euclidean_distance_matrix( + matr_a: &DMatrix, + matr_b: &DMatrix, + weightings: &DVector, +) -> DVector { + assert_eq!( + (matr_a.nrows(), matr_a.ncols()), + (matr_b.nrows(), matr_b.ncols()), + "Calcuting Euclidean distance: Matrices must have the same dimensions" + ); + let diff_matrix = matr_a - matr_b; + let weighted_squared_diff_vector = diff_matrix.component_mul(&diff_matrix) * weightings; + + weighted_squared_diff_vector.map(|x| x.sqrt()) +} + +pub fn standardize_feature_vec(feature_vec: &[f64], std_vec: &[(f64, f64)]) -> Vec { + feature_vec + .iter() + .zip(std_vec.iter()) + .map(|(&feature, &(mean, std_deviation))| (feature - mean) / std_deviation) + .collect() +}