From fbc440129642e89a1bb0f0388da2756e2f66fb04 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 15 Jan 2025 16:15:37 +0100 Subject: [PATCH 1/4] fix: Ensure stats information is end periodically --- breakwater/src/main.rs | 2 +- breakwater/src/statistics.rs | 118 ++++++++++++++++++++--------------- 2 files changed, 69 insertions(+), 51 deletions(-) diff --git a/breakwater/src/main.rs b/breakwater/src/main.rs index 7cc88c0..7f3bbe9 100644 --- a/breakwater/src/main.rs +++ b/breakwater/src/main.rs @@ -127,7 +127,7 @@ async fn main() -> Result<(), Error> { .context(StartPrometheusExporterSnafu)?; let server_listener_thread = tokio::spawn(async move { server.start().await }); - let statistics_thread = tokio::spawn(async move { statistics.start().await }); + let statistics_thread = tokio::spawn(async move { statistics.run().await }); let prometheus_exporter_thread = tokio::spawn(async move { prometheus_exporter.run().await }); let mut display_sinks = Vec:: + Send>>::new(); diff --git a/breakwater/src/statistics.rs b/breakwater/src/statistics.rs index 7c58a17..40fa237 100644 --- a/breakwater/src/statistics.rs +++ b/breakwater/src/statistics.rs @@ -6,9 +6,12 @@ use std::{ collections::{hash_map::Entry, HashMap}, fs::File, net::IpAddr, - time::{Duration, Instant}, + time::Duration, +}; +use tokio::{ + sync::{broadcast, mpsc}, + time::interval, }; -use tokio::sync::{broadcast, mpsc}; pub const STATS_REPORT_INTERVAL: Duration = Duration::from_millis(1000); pub const STATS_SLIDING_WINDOW_SIZE: usize = 5; @@ -137,62 +140,77 @@ impl Statistics { statistics } - pub async fn start(&mut self) -> Result<(), Error> { - let mut last_stat_report = Instant::now(); - let mut last_save_file_written = Instant::now(); + pub async fn run(&mut self) -> Result<(), Error> { let mut statistics_information_event = StatisticsInformationEvent::default(); - while let Some(statistics_update) = self.statistics_rx.recv().await { - self.statistic_events += 1; - match statistics_update { - StatisticsEvent::ConnectionCreated { ip } => { - *self.connections_for_ip.entry(ip).or_insert(0) += 1; - } - StatisticsEvent::ConnectionClosed { ip } => { - if let Entry::Occupied(mut o) = self.connections_for_ip.entry(ip) { - let connections = o.get_mut(); - *connections -= 1; - if *connections == 0 { - o.remove_entry(); - } - } - } - StatisticsEvent::ConnectionDenied { ip } => { - *self.denied_connections_for_ip.entry(ip).or_insert(0) += 1; - } - StatisticsEvent::BytesRead { ip, bytes } => { - *self.bytes_for_ip.entry(ip).or_insert(0) += bytes; - } - StatisticsEvent::VncFrameRendered => self.frame += 1, - } + let mut stat_report = interval(STATS_REPORT_INTERVAL); + let (mut stats_save, save_file) = match &self.statistics_save_mode { + StatisticsSaveMode::Disabled => (interval(Duration::MAX), None), + StatisticsSaveMode::Enabled { + save_file, + interval_s, + } => ( + interval(Duration::from_secs(*interval_s)), + Some(save_file.clone()), + ), + }; - // As there is an event for every frame we are guaranteed to land here every second - let last_stat_report_elapsed = last_stat_report.elapsed(); - if last_stat_report_elapsed > STATS_REPORT_INTERVAL { - last_stat_report = Instant::now(); - statistics_information_event = self.calculate_statistics_information_event( - &statistics_information_event, - last_stat_report_elapsed, - ); - self.statistics_information_tx - .send(statistics_information_event.clone()) - .map_err(Box::new) - .context(WriteToStatisticsInformationChannelSnafu)?; - - if let StatisticsSaveMode::Enabled { - save_file, - interval_s, - } = &self.statistics_save_mode - { - if last_save_file_written.elapsed() > Duration::from_secs(*interval_s) { - last_save_file_written = Instant::now(); + loop { + tokio::select! { + // Cancellation safety: mpsc::Receiver::recv is cancellation safe + maybe_event = self.statistics_rx.recv() => { + let Some(event) = maybe_event else { + // `self.statistics_rx` is closed, program is terminating + return Ok(()); + }; + self.process_statistics_event(event); + }, + // Cancellation safety: This method is cancellation safe. If tick is used as the branch in a tokio::select! + // and another branch completes first, then no tick has been consumed. + _ = stat_report.tick() => { + statistics_information_event = self.calculate_statistics_information_event( + &statistics_information_event, + STATS_REPORT_INTERVAL, + ); + self.statistics_information_tx + .send(statistics_information_event.clone()) + .map_err(Box::new) + .context(WriteToStatisticsInformationChannelSnafu)?; + }, + // Cancellation safety: This method is cancellation safe. If tick is used as the branch in a tokio::select! + // and another branch completes first, then no tick has been consumed. + _ = stats_save.tick() => { + if let Some(save_file) = &save_file { statistics_information_event.save_to_file(save_file)?; } + }, + }; + } + } + + fn process_statistics_event(&mut self, event: StatisticsEvent) { + self.statistic_events += 1; + match event { + StatisticsEvent::ConnectionCreated { ip } => { + *self.connections_for_ip.entry(ip).or_insert(0) += 1; + } + StatisticsEvent::ConnectionClosed { ip } => { + if let Entry::Occupied(mut o) = self.connections_for_ip.entry(ip) { + let connections = o.get_mut(); + *connections -= 1; + if *connections == 0 { + o.remove_entry(); + } } } + StatisticsEvent::ConnectionDenied { ip } => { + *self.denied_connections_for_ip.entry(ip).or_insert(0) += 1; + } + StatisticsEvent::BytesRead { ip, bytes } => { + *self.bytes_for_ip.entry(ip).or_insert(0) += bytes; + } + StatisticsEvent::VncFrameRendered => self.frame += 1, } - - Ok(()) } fn calculate_statistics_information_event( From 57022424ee1129302a3339623cf2cfc670436368 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 15 Jan 2025 16:19:02 +0100 Subject: [PATCH 2/4] changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d94909..8b44ce0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +### Fixed + +- Ensures statistic information updates are send periodically ([#49]) + +[#49]: https://github.com/sbernauer/breakwater/pull/49 + ## [0.16.3] - 2025-01-11 ### Fixed From 6ec17c49c6c2b0c62e7f333922d65322108f74ab Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Wed, 15 Jan 2025 16:19:48 +0100 Subject: [PATCH 3/4] typo --- breakwater/src/statistics.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/breakwater/src/statistics.rs b/breakwater/src/statistics.rs index 40fa237..bf96a1b 100644 --- a/breakwater/src/statistics.rs +++ b/breakwater/src/statistics.rs @@ -143,7 +143,7 @@ impl Statistics { pub async fn run(&mut self) -> Result<(), Error> { let mut statistics_information_event = StatisticsInformationEvent::default(); - let mut stat_report = interval(STATS_REPORT_INTERVAL); + let mut stats_report = interval(STATS_REPORT_INTERVAL); let (mut stats_save, save_file) = match &self.statistics_save_mode { StatisticsSaveMode::Disabled => (interval(Duration::MAX), None), StatisticsSaveMode::Enabled { @@ -167,7 +167,7 @@ impl Statistics { }, // Cancellation safety: This method is cancellation safe. If tick is used as the branch in a tokio::select! // and another branch completes first, then no tick has been consumed. - _ = stat_report.tick() => { + _ = stats_report.tick() => { statistics_information_event = self.calculate_statistics_information_event( &statistics_information_event, STATS_REPORT_INTERVAL, From 0f1036931b02f2ab68998a41c8d68b10c3396647 Mon Sep 17 00:00:00 2001 From: Sebastian Bernauer Date: Fri, 17 Jan 2025 08:42:48 +0100 Subject: [PATCH 4/4] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b44ce0..00077a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ All notable changes to this project will be documented in this file. ### Fixed -- Ensures statistic information updates are send periodically ([#49]) +- Ensure statistic information updates are send periodically ([#49]) [#49]: https://github.com/sbernauer/breakwater/pull/49