diff --git a/rust-producer-sync/src/main.rs b/rust-producer-sync/src/main.rs index e83ffff..6fdff1f 100644 --- a/rust-producer-sync/src/main.rs +++ b/rust-producer-sync/src/main.rs @@ -13,6 +13,7 @@ use crate::stats::{LoggingContext, StatsCheckpoint}; pub(crate) mod config; pub(crate) mod stats; pub(crate) mod utils; +pub(crate) mod traffic; const CHARSET: &str = "abcdefghijklmnopqrstuvwxyz0123456789"; diff --git a/rust-producer-sync/src/traffic/mod.rs b/rust-producer-sync/src/traffic/mod.rs new file mode 100644 index 0000000..f6ee03b --- /dev/null +++ b/rust-producer-sync/src/traffic/mod.rs @@ -0,0 +1,21 @@ +use std::time::{Duration, Instant}; + +pub(crate) mod poisson; +pub(crate) mod throttler; + +#[derive(Debug)] +/// A struct to model a Rate limit: +/// amount: maximum number of events over the specified +/// duration +pub(crate) struct Rate { + pub(crate) amount: u32, + pub(crate) duration: Duration +} + +/// A "TrafficGen" is generating to generate events in respect to a RateLimit +/// * Either by generating an array of "arrival instants" (when should the events occur, theoretically +/// * Or by keeping a local state and telling the caller how long it should wait before sending the next event +pub(crate) trait TrafficGen { + fn gen_arrivals(&mut self, nb_to_gen: usize) -> Vec<(Duration, Instant)>; + fn next_wait(&mut self) -> Duration; +} \ No newline at end of file diff --git a/rust-producer-sync/src/traffic/poisson.rs b/rust-producer-sync/src/traffic/poisson.rs new file mode 100644 index 0000000..301c159 --- /dev/null +++ b/rust-producer-sync/src/traffic/poisson.rs @@ -0,0 +1,93 @@ +use std::time::{Duration, Instant}; +use rand::Rng; +use crate::traffic::{Rate, TrafficGen}; + +pub struct PoissonGen { + rng: T, + rate: Rate +} + +impl PoissonGen { + pub(crate) fn new(rng: T, rate: Rate) -> Self { + PoissonGen { rng, rate } + } +} + + +impl TrafficGen for PoissonGen { + + fn next_wait(&mut self) -> Duration { + // https://stackoverflow.com/questions/9832919/generate-poisson-arrival-in-java + let rate_per_ms = self.rate.amount as f64 / self.rate.duration.as_millis() as f64; + let uniform_incl_0_excl_1 = self.rng.gen::(); // by default: uniformly distributed in [0, 1) + let uniform_incl_1_excl_0 = 1.0 - uniform_incl_0_excl_1; // 1 - rand() => uniformly distributed in (0, 1] + let wait_ms = -1.0 * f64::ln(uniform_incl_1_excl_0) / rate_per_ms; // -ln(U) / rate + Duration::from_millis(wait_ms as u64) + } + + fn gen_arrivals(&mut self, nb_to_gen: usize) -> Vec<(Duration, Instant)> { + let mut generated = 0; + let mut res = Vec::with_capacity(nb_to_gen); + let mut arrival = Instant::now(); + while generated < nb_to_gen { + let should_wait_for = self.next_wait(); + arrival += should_wait_for; + res.push((should_wait_for, arrival)); + generated+= 1; + } + res + } +} + + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::thread::sleep; + use std::time::{Duration, Instant}; + use itertools::Itertools; + use rand::thread_rng; + use crate::traffic::poisson::PoissonGen; + use crate::traffic::{Rate, TrafficGen}; + + #[test] + fn print_wait_distribution() { + // not a test per-se (and hard to turn into one, but prints the distribution in CSV to build an histogram) + let mut i = 0; + let rate = Rate { amount: 200, duration: Duration::from_secs(1) }; + let mut gen = PoissonGen::new(thread_rng(), rate); + let mut event_secs_buckets = HashMap::new(); + let origin = Instant::now(); + while i < 10_000 { + sleep(gen.next_wait()); + let time = Instant::now(); + let bucket_seconds = time.duration_since(origin).as_secs(); + let count = event_secs_buckets.entry(bucket_seconds).or_insert(0); + *count += 1; + i += 1; + } + for (secs, count) in event_secs_buckets.iter().sorted() { + println!("{},{}", secs, count); + } + } + + #[test] + fn print_instant_distribution() { + // not a test per-se (and hard to turn into one, but prints the distribution in CSV to build an histogram) + let nb_to_gen = 1_000_000; + let rate = Rate { amount: 200, duration: Duration::from_secs(1) }; + let mut gen = PoissonGen::new(thread_rng(), rate); + let mut event_secs_buckets = HashMap::new(); + let origin = Instant::now(); + let arrivals = gen.gen_arrivals(nb_to_gen); + for (_, arrival) in arrivals { + let bucket_seconds = arrival.duration_since(origin).as_secs(); + let count = event_secs_buckets.entry(bucket_seconds).or_insert(0); + *count += 1; + } + for (secs, count) in event_secs_buckets.iter().sorted() { + println!("{};{}", secs, count); + } + } +} + diff --git a/rust-producer-sync/src/traffic/throttler.rs b/rust-producer-sync/src/traffic/throttler.rs new file mode 100644 index 0000000..135afd4 --- /dev/null +++ b/rust-producer-sync/src/traffic/throttler.rs @@ -0,0 +1,122 @@ +use std::collections::vec_deque::VecDeque; +use std::time::{Duration, Instant}; +use crate::traffic::{Rate, TrafficGen}; + +pub(crate) struct Throttler { + rate: Rate, + queue: VecDeque +} + +impl Throttler { + + pub(crate) fn new(rate: Rate) -> Self { + let amount = rate.amount as usize; + Throttler { + rate, + queue: VecDeque::with_capacity(amount) + } + } + + fn free_up(&mut self) { + let now = Instant::now(); + let threshold = now - self.rate.duration; + while let Some(frt) = self.queue.front() { + if *frt < threshold { // no longer relevant to be counted, can be removed + self.queue.pop_front(); + } else { // oldest element is still relevant to count in bucket, can't free up more space + break; + } + } + } + +} + +impl TrafficGen for Throttler { + + fn next_wait(&mut self) -> Duration { + let now = Instant::now(); + self.free_up(); + self.queue.push_back(now); + if (self.queue.len() as u32) < self.rate.amount { + return Duration::from_secs(0); + } + let fst = self.queue.front().unwrap(); + (*fst + self.rate.duration) - now + } + + fn gen_arrivals(&mut self, nb_to_gen: usize) -> Vec<(Duration, Instant)> { + let step = self.rate.duration / self.rate.amount as u32; + let mut res = Vec::with_capacity(nb_to_gen); + let mut origin = Instant::now(); + for _ in 0..nb_to_gen { + origin += step; + res.push((step, origin)); + } + res + } + +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::thread::sleep; + use std::time::{Duration, Instant}; + use itertools::Itertools; + use crate::traffic::{Rate, TrafficGen}; + use crate::traffic::throttler::Throttler; + + #[test] + fn print_wait_distribution() { + // not a test per-se (and hard to turn into one, but prints the distribution in CSV to build an histogram) + let mut i = 0; + let amount_per_sec = 750; + let rate = Rate { amount: amount_per_sec, duration: Duration::from_secs(1) }; + let mut gen = Throttler::new(rate); + let mut event_secs_buckets = HashMap::new(); + let origin = Instant::now(); + while i < 30_000 { // 30k msg at 750msg/s should be ~>30s to run the test + let wait = gen.next_wait(); + if wait > Duration::from_secs(0) { + sleep(wait); + } + let time = Instant::now(); + let bucket_seconds = time.duration_since(origin).as_secs(); + let count = event_secs_buckets.entry(bucket_seconds).or_insert(0); + *count += 1; + i += 1; + } + let counts_per_sec = event_secs_buckets.iter() + .sorted() + .dropping(1) // first and last may be outliers + .dropping_back(1); + for (_, count) in counts_per_sec { + assert_eq!(amount_per_sec, *count); + } + } + + #[test] + fn print_instant_distribution() { + // not a test per-se (and hard to turn into one, but prints the distribution in CSV to build an histogram) + let nb_to_gen = 100_000_000; + let amount_per_sec = 750; + let rate = Rate { amount: amount_per_sec, duration: Duration::from_secs(1) }; + let mut gen = Throttler::new(rate); + let mut event_secs_buckets = HashMap::new(); + let origin = Instant::now(); + let arrivals = gen.gen_arrivals(nb_to_gen); + for (_, arrival) in arrivals { + let bucket_seconds = arrival.duration_since(origin).as_secs(); + let count = event_secs_buckets.entry(bucket_seconds).or_insert(0); + *count += 1; + } + let counts_per_sec = event_secs_buckets.iter() + .sorted() + .dropping(1) // first and last may be outliers + .dropping_back(1); + for (_, count) in counts_per_sec { + assert_eq!(amount_per_sec, *count); + } + } + +} \ No newline at end of file