Skip to content

Commit

Permalink
Increase log interest (#265)
Browse files Browse the repository at this point in the history
* Update message_handler.rs

* create ability to quiet extra stat logging

* reformat output to be nicer

* Update message_handler.rs

* Actually silence output of extra stats

* fix issues

* bump version

* sort by count
  • Loading branch information
fredclausen authored Nov 17, 2023
1 parent f7ee3cb commit 0d96bf7
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ resolver = "2"

[workspace.package]
edition = "2021"
version = "1.0.18"
version = "1.0.19"
authors = ["Fred Clausen", "Mike Nye", "Alex Austin"]
description = "ACARS Router: A Utility to ingest ACARS/VDLM2 from many sources, process, and feed out to many consumers."
documentation = "https://github.com/sdr-enthusiasts/acars_router"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ services:
- AR_SEND_UDP_VDLM2=acarshub:5555
- AR_RECV_ZMQ_VDLM2=dumpvdl2:45555
- AR_OVERRIDE_STATION_NAME=${FEEDER_NAME}
- AR_STATS_VERBOSE=false
tmpfs:
- /run:exec,size=64M
- /var/log
Expand Down
3 changes: 3 additions & 0 deletions rust/libraries/acars_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub struct Input {
/// Print statistics every N minutes
#[clap(long, env = "AR_STATS_EVERY", value_parser, default_value = "5")]
pub stats_every: u64,
/// Chatty logging of stats
#[clap(long, env = "AR_STATS_VERBOSE", value_parser)]
pub stats_verbose: bool,
/// Attempt message reassembly on incomplete messages within the specified number of seconds
#[clap(
long,
Expand Down
98 changes: 96 additions & 2 deletions rust/libraries/acars_connection_manager/src/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};

pub struct FrequencyCount {
freq: String,
count: u32,
}

#[derive(Clone, Debug, Default)]
pub struct MessageHandlerConfig {
pub add_proxy_id: bool,
Expand All @@ -27,6 +32,7 @@ pub struct MessageHandlerConfig {
pub should_override_station_name: bool,
pub station_name: String,
pub stats_every: u64,
pub stats_verbose: bool,
}

impl MessageHandlerConfig {
Expand All @@ -41,6 +47,7 @@ impl MessageHandlerConfig {
should_override_station_name: args.override_station_name.is_some(),
station_name: station_name.to_string(),
stats_every: args.stats_every,
stats_verbose: args.stats_verbose,
}
} else {
Self {
Expand All @@ -52,6 +59,7 @@ impl MessageHandlerConfig {
should_override_station_name: false,
station_name: Default::default(),
stats_every: args.stats_every,
stats_verbose: args.stats_verbose,
}
}
}
Expand All @@ -65,6 +73,8 @@ impl MessageHandlerConfig {
Arc::new(Mutex::new(VecDeque::with_capacity(100)));
let total_messages_processed: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
let total_messages_since_last: Arc<Mutex<i32>> = Arc::new(Mutex::new(0));
let all_frequencies_logged: Arc<Mutex<Vec<FrequencyCount>>> =
Arc::new(Mutex::new(Vec::new()));
let queue_type_stats: String = self.queue_type.clone();
let queue_type_dedupe: String = self.queue_type.clone();
let stats_every: u64 = self.stats_every * 60; // Value has to be in seconds. Input is in minutes.
Expand All @@ -77,11 +87,18 @@ impl MessageHandlerConfig {
let stats_total_messages_context: Arc<Mutex<i32>> = Arc::clone(&total_messages_processed);
let stats_total_messages_since_last_context: Arc<Mutex<i32>> =
Arc::clone(&total_messages_since_last);
let stats_frequency_context: Option<Arc<Mutex<Vec<FrequencyCount>>>> = if self.stats_verbose
{
Some(Arc::clone(&all_frequencies_logged))
} else {
None
};

tokio::spawn(async move {
print_stats(
stats_total_messages_context,
stats_total_messages_since_last_context,
stats_frequency_context,
stats_every,
queue_type_stats.as_str(),
)
Expand Down Expand Up @@ -132,6 +149,56 @@ impl MessageHandlerConfig {
Err(_) => f64::default(),
};

// See if the frequency is in the list of frequencies we've seen
// If not, add it to the list and log it
// match the message type

match &message {
AcarsVdlm2Message::Vdlm2Message(m) => {
// get the freq from Vdlm2Message::Vdlm2Body
let frequency: String = m.vdl2.freq.to_string();
// check and see if we have the frequency in all_frequencies_logged. If so, increment the count.
// if not, add it
let mut found: bool = false;
for freq in all_frequencies_logged.lock().await.iter_mut() {
if freq.freq == frequency {
freq.count += 1;
found = true;
break;
}
}

if !found {
let new_frequency: FrequencyCount = FrequencyCount {
freq: frequency,
count: 1,
};
all_frequencies_logged.lock().await.push(new_frequency);
}
}
AcarsVdlm2Message::AcarsMessage(m) => {
// get the freq from AcarsMessage::AcarsBody
let frequency: String = m.freq.to_string();

let mut found: bool = false;
for freq in all_frequencies_logged.lock().await.iter_mut() {
if freq.freq == frequency {
freq.count += 1;
found = true;
break;
}
}

if !found {
let new_frequency: FrequencyCount = FrequencyCount {
freq: frequency,
count: 1,
};
all_frequencies_logged.lock().await.push(new_frequency);
}
}
}

let get_message_time: Option<f64> = message.get_time();

match get_message_time {
Expand Down Expand Up @@ -251,15 +318,42 @@ impl MessageHandlerConfig {
pub async fn print_stats(
total_all_time: Arc<Mutex<i32>>,
total_since_last: Arc<Mutex<i32>>,
frequencies: Option<Arc<Mutex<Vec<FrequencyCount>>>>,
stats_every: u64,
queue_type: &str,
) {
let stats_minutes = stats_every / 60;
loop {
sleep(Duration::from_secs(stats_every)).await;
info!("{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}",
queue_type, stats_minutes, total_all_time.lock().await, total_since_last.lock().await);
let total_all_time_locked = *total_all_time.lock().await;
let mut output: String = String::new();
output.push_str(&format!(
"{} in the last {} minute(s):\nTotal messages processed: {}\nTotal messages processed since last update: {}\n",
queue_type, stats_minutes, total_all_time_locked, total_since_last.lock().await
));
*total_since_last.lock().await = 0;

// now print the frequencies, and show each as a percentage of the total_all_time

if let Some(f) = &frequencies {
// sort the frequencies by count
if let Some(f) = &frequencies {
f.lock().await.sort_by(|a, b| b.count.cmp(&a.count));
}

for freq in f.lock().await.iter() {
let percentage: f64 = (freq.count as f64 / total_all_time_locked as f64) * 100.0;
output.push_str(
format!(
"{} {}: {}/{} ({:.2}%)\n",
queue_type, freq.freq, freq.count, total_all_time_locked, percentage
)
.as_str(),
);
}
}

println!("{}", output);
}
}

Expand Down

0 comments on commit 0d96bf7

Please sign in to comment.