From 2ece83d4dfad0bb391e15022f1224d0cd4e72e86 Mon Sep 17 00:00:00 2001 From: Arnaud Esteve Date: Mon, 16 May 2022 07:57:50 +0200 Subject: [PATCH 1/2] WIP: simulating traffic using Poisson arrival rate --- rust-producer-sync/src/gen.rs | 56 ++++++++++++++++++++++++++++++++++ rust-producer-sync/src/main.rs | 1 + 2 files changed, 57 insertions(+) create mode 100644 rust-producer-sync/src/gen.rs diff --git a/rust-producer-sync/src/gen.rs b/rust-producer-sync/src/gen.rs new file mode 100644 index 0000000..eb74f27 --- /dev/null +++ b/rust-producer-sync/src/gen.rs @@ -0,0 +1,56 @@ +use std::time::Duration; +use rand::distributions::Uniform; +use rand::Rng; + +pub(crate) struct PoissonGen { + rng: T, + rate_per_sec: f64 +} + +impl PoissonGen { + pub(crate) fn new(rng: T, rate: f64) -> Self { + Uniform::new_inclusive(0.0, 1.0); + PoissonGen { rng, rate_per_sec: rate } + } + + pub(crate) fn next_wait(&mut self) -> Duration { + // https://stackoverflow.com/questions/9832919/generate-poisson-arrival-in-java + let rate_per_ms = self.rate_per_sec / 1_000.0; + let uniform_incl_0_excl_1 = self.rng.gen::(); // by default: uniformly distributed in [0, 1) + let uniform_incl_1_excl_0 = 1.0 - uniform_incl_0_excl_1; // 1 - rand() => uniformly distributed in (0, 1] + let wait_ms = -1.0 * f64::ln(uniform_incl_1_excl_0) / rate_per_ms; // -ln(U) / rate + Duration::from_millis(wait_ms as u64) + } +} + + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::thread::sleep; + use std::time::Instant; + use itertools::Itertools; + use rand::thread_rng; + use crate::gen::PoissonGen; + + #[test] + fn test_distrib() { + let mut i = 0; + let rate_per_sec = 200.0; // 200 events per sec + let mut gen = PoissonGen::new(thread_rng(), rate_per_sec); + let mut event_secs_buckets = HashMap::new(); + let origin = Instant::now(); + while i < 10_000 { + sleep(gen.next_wait()); + let time = Instant::now(); + let bucket_seconds = time.duration_since(origin).as_secs(); + let count = event_secs_buckets.entry(bucket_seconds).or_insert(0); + *count += 1; + i += 1; + } + for (secs, count) in event_secs_buckets.iter().sorted() { + println!("{},{}", secs, count); + } + } +} + diff --git a/rust-producer-sync/src/main.rs b/rust-producer-sync/src/main.rs index e83ffff..a2b802a 100644 --- a/rust-producer-sync/src/main.rs +++ b/rust-producer-sync/src/main.rs @@ -13,6 +13,7 @@ use crate::stats::{LoggingContext, StatsCheckpoint}; pub(crate) mod config; pub(crate) mod stats; pub(crate) mod utils; +pub(crate) mod gen; const CHARSET: &str = "abcdefghijklmnopqrstuvwxyz0123456789"; From 3b2d3f9d77912f6525cdeba5589d96ebe3e22c3c Mon Sep 17 00:00:00 2001 From: Arnaud Esteve Date: Tue, 17 May 2022 08:31:03 +0200 Subject: [PATCH 2/2] * refactor traffic generation into trait + impls * creates a Throttler impl (~= kafka-perf-test) --- rust-producer-sync/src/gen.rs | 56 --------- rust-producer-sync/src/main.rs | 2 +- rust-producer-sync/src/traffic/mod.rs | 21 ++++ rust-producer-sync/src/traffic/poisson.rs | 93 +++++++++++++++ rust-producer-sync/src/traffic/throttler.rs | 122 ++++++++++++++++++++ 5 files changed, 237 insertions(+), 57 deletions(-) delete mode 100644 rust-producer-sync/src/gen.rs create mode 100644 rust-producer-sync/src/traffic/mod.rs create mode 100644 rust-producer-sync/src/traffic/poisson.rs create mode 100644 rust-producer-sync/src/traffic/throttler.rs diff --git a/rust-producer-sync/src/gen.rs b/rust-producer-sync/src/gen.rs deleted file mode 100644 index eb74f27..0000000 --- a/rust-producer-sync/src/gen.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::time::Duration; -use rand::distributions::Uniform; -use rand::Rng; - -pub(crate) struct PoissonGen { - rng: T, - rate_per_sec: f64 -} - -impl PoissonGen { - pub(crate) fn new(rng: T, rate: f64) -> Self { - Uniform::new_inclusive(0.0, 1.0); - PoissonGen { rng, rate_per_sec: rate } - } - - pub(crate) fn next_wait(&mut self) -> Duration { - // https://stackoverflow.com/questions/9832919/generate-poisson-arrival-in-java - let rate_per_ms = self.rate_per_sec / 1_000.0; - let uniform_incl_0_excl_1 = self.rng.gen::(); // by default: uniformly distributed in [0, 1) - let uniform_incl_1_excl_0 = 1.0 - uniform_incl_0_excl_1; // 1 - rand() => uniformly distributed in (0, 1] - let wait_ms = -1.0 * f64::ln(uniform_incl_1_excl_0) / rate_per_ms; // -ln(U) / rate - Duration::from_millis(wait_ms as u64) - } -} - - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - use std::thread::sleep; - use std::time::Instant; - use itertools::Itertools; - use rand::thread_rng; - use crate::gen::PoissonGen; - - #[test] - fn test_distrib() { - let mut i = 0; - let rate_per_sec = 200.0; // 200 events per sec - let mut gen = PoissonGen::new(thread_rng(), rate_per_sec); - let mut event_secs_buckets = HashMap::new(); - let origin = Instant::now(); - while i < 10_000 { - sleep(gen.next_wait()); - let time = Instant::now(); - let bucket_seconds = time.duration_since(origin).as_secs(); - let count = event_secs_buckets.entry(bucket_seconds).or_insert(0); - *count += 1; - i += 1; - } - for (secs, count) in event_secs_buckets.iter().sorted() { - println!("{},{}", secs, count); - } - } -} - diff --git a/rust-producer-sync/src/main.rs b/rust-producer-sync/src/main.rs index a2b802a..6fdff1f 100644 --- a/rust-producer-sync/src/main.rs +++ b/rust-producer-sync/src/main.rs @@ -13,7 +13,7 @@ use crate::stats::{LoggingContext, StatsCheckpoint}; pub(crate) mod config; pub(crate) mod stats; pub(crate) mod utils; -pub(crate) mod gen; +pub(crate) mod traffic; const CHARSET: &str = "abcdefghijklmnopqrstuvwxyz0123456789"; diff --git a/rust-producer-sync/src/traffic/mod.rs b/rust-producer-sync/src/traffic/mod.rs new file mode 100644 index 0000000..f6ee03b --- /dev/null +++ b/rust-producer-sync/src/traffic/mod.rs @@ -0,0 +1,21 @@ +use std::time::{Duration, Instant}; + +pub(crate) mod poisson; +pub(crate) mod throttler; + +#[derive(Debug)] +/// A struct to model a Rate limit: +/// amount: maximum number of events over the specified +/// duration +pub(crate) struct Rate { + pub(crate) amount: u32, + pub(crate) duration: Duration +} + +/// A "TrafficGen" is generating to generate events in respect to a RateLimit +/// * Either by generating an array of "arrival instants" (when should the events occur, theoretically +/// * Or by keeping a local state and telling the caller how long it should wait before sending the next event +pub(crate) trait TrafficGen { + fn gen_arrivals(&mut self, nb_to_gen: usize) -> Vec<(Duration, Instant)>; + fn next_wait(&mut self) -> Duration; +} \ No newline at end of file diff --git a/rust-producer-sync/src/traffic/poisson.rs b/rust-producer-sync/src/traffic/poisson.rs new file mode 100644 index 0000000..301c159 --- /dev/null +++ b/rust-producer-sync/src/traffic/poisson.rs @@ -0,0 +1,93 @@ +use std::time::{Duration, Instant}; +use rand::Rng; +use crate::traffic::{Rate, TrafficGen}; + +pub struct PoissonGen { + rng: T, + rate: Rate +} + +impl PoissonGen { + pub(crate) fn new(rng: T, rate: Rate) -> Self { + PoissonGen { rng, rate } + } +} + + +impl TrafficGen for PoissonGen { + + fn next_wait(&mut self) -> Duration { + // https://stackoverflow.com/questions/9832919/generate-poisson-arrival-in-java + let rate_per_ms = self.rate.amount as f64 / self.rate.duration.as_millis() as f64; + let uniform_incl_0_excl_1 = self.rng.gen::(); // by default: uniformly distributed in [0, 1) + let uniform_incl_1_excl_0 = 1.0 - uniform_incl_0_excl_1; // 1 - rand() => uniformly distributed in (0, 1] + let wait_ms = -1.0 * f64::ln(uniform_incl_1_excl_0) / rate_per_ms; // -ln(U) / rate + Duration::from_millis(wait_ms as u64) + } + + fn gen_arrivals(&mut self, nb_to_gen: usize) -> Vec<(Duration, Instant)> { + let mut generated = 0; + let mut res = Vec::with_capacity(nb_to_gen); + let mut arrival = Instant::now(); + while generated < nb_to_gen { + let should_wait_for = self.next_wait(); + arrival += should_wait_for; + res.push((should_wait_for, arrival)); + generated+= 1; + } + res + } +} + + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::thread::sleep; + use std::time::{Duration, Instant}; + use itertools::Itertools; + use rand::thread_rng; + use crate::traffic::poisson::PoissonGen; + use crate::traffic::{Rate, TrafficGen}; + + #[test] + fn print_wait_distribution() { + // not a test per-se (and hard to turn into one, but prints the distribution in CSV to build an histogram) + let mut i = 0; + let rate = Rate { amount: 200, duration: Duration::from_secs(1) }; + let mut gen = PoissonGen::new(thread_rng(), rate); + let mut event_secs_buckets = HashMap::new(); + let origin = Instant::now(); + while i < 10_000 { + sleep(gen.next_wait()); + let time = Instant::now(); + let bucket_seconds = time.duration_since(origin).as_secs(); + let count = event_secs_buckets.entry(bucket_seconds).or_insert(0); + *count += 1; + i += 1; + } + for (secs, count) in event_secs_buckets.iter().sorted() { + println!("{},{}", secs, count); + } + } + + #[test] + fn print_instant_distribution() { + // not a test per-se (and hard to turn into one, but prints the distribution in CSV to build an histogram) + let nb_to_gen = 1_000_000; + let rate = Rate { amount: 200, duration: Duration::from_secs(1) }; + let mut gen = PoissonGen::new(thread_rng(), rate); + let mut event_secs_buckets = HashMap::new(); + let origin = Instant::now(); + let arrivals = gen.gen_arrivals(nb_to_gen); + for (_, arrival) in arrivals { + let bucket_seconds = arrival.duration_since(origin).as_secs(); + let count = event_secs_buckets.entry(bucket_seconds).or_insert(0); + *count += 1; + } + for (secs, count) in event_secs_buckets.iter().sorted() { + println!("{};{}", secs, count); + } + } +} + diff --git a/rust-producer-sync/src/traffic/throttler.rs b/rust-producer-sync/src/traffic/throttler.rs new file mode 100644 index 0000000..135afd4 --- /dev/null +++ b/rust-producer-sync/src/traffic/throttler.rs @@ -0,0 +1,122 @@ +use std::collections::vec_deque::VecDeque; +use std::time::{Duration, Instant}; +use crate::traffic::{Rate, TrafficGen}; + +pub(crate) struct Throttler { + rate: Rate, + queue: VecDeque +} + +impl Throttler { + + pub(crate) fn new(rate: Rate) -> Self { + let amount = rate.amount as usize; + Throttler { + rate, + queue: VecDeque::with_capacity(amount) + } + } + + fn free_up(&mut self) { + let now = Instant::now(); + let threshold = now - self.rate.duration; + while let Some(frt) = self.queue.front() { + if *frt < threshold { // no longer relevant to be counted, can be removed + self.queue.pop_front(); + } else { // oldest element is still relevant to count in bucket, can't free up more space + break; + } + } + } + +} + +impl TrafficGen for Throttler { + + fn next_wait(&mut self) -> Duration { + let now = Instant::now(); + self.free_up(); + self.queue.push_back(now); + if (self.queue.len() as u32) < self.rate.amount { + return Duration::from_secs(0); + } + let fst = self.queue.front().unwrap(); + (*fst + self.rate.duration) - now + } + + fn gen_arrivals(&mut self, nb_to_gen: usize) -> Vec<(Duration, Instant)> { + let step = self.rate.duration / self.rate.amount as u32; + let mut res = Vec::with_capacity(nb_to_gen); + let mut origin = Instant::now(); + for _ in 0..nb_to_gen { + origin += step; + res.push((step, origin)); + } + res + } + +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::thread::sleep; + use std::time::{Duration, Instant}; + use itertools::Itertools; + use crate::traffic::{Rate, TrafficGen}; + use crate::traffic::throttler::Throttler; + + #[test] + fn print_wait_distribution() { + // not a test per-se (and hard to turn into one, but prints the distribution in CSV to build an histogram) + let mut i = 0; + let amount_per_sec = 750; + let rate = Rate { amount: amount_per_sec, duration: Duration::from_secs(1) }; + let mut gen = Throttler::new(rate); + let mut event_secs_buckets = HashMap::new(); + let origin = Instant::now(); + while i < 30_000 { // 30k msg at 750msg/s should be ~>30s to run the test + let wait = gen.next_wait(); + if wait > Duration::from_secs(0) { + sleep(wait); + } + let time = Instant::now(); + let bucket_seconds = time.duration_since(origin).as_secs(); + let count = event_secs_buckets.entry(bucket_seconds).or_insert(0); + *count += 1; + i += 1; + } + let counts_per_sec = event_secs_buckets.iter() + .sorted() + .dropping(1) // first and last may be outliers + .dropping_back(1); + for (_, count) in counts_per_sec { + assert_eq!(amount_per_sec, *count); + } + } + + #[test] + fn print_instant_distribution() { + // not a test per-se (and hard to turn into one, but prints the distribution in CSV to build an histogram) + let nb_to_gen = 100_000_000; + let amount_per_sec = 750; + let rate = Rate { amount: amount_per_sec, duration: Duration::from_secs(1) }; + let mut gen = Throttler::new(rate); + let mut event_secs_buckets = HashMap::new(); + let origin = Instant::now(); + let arrivals = gen.gen_arrivals(nb_to_gen); + for (_, arrival) in arrivals { + let bucket_seconds = arrival.duration_since(origin).as_secs(); + let count = event_secs_buckets.entry(bucket_seconds).or_insert(0); + *count += 1; + } + let counts_per_sec = event_secs_buckets.iter() + .sorted() + .dropping(1) // first and last may be outliers + .dropping_back(1); + for (_, count) in counts_per_sec { + assert_eq!(amount_per_sec, *count); + } + } + +} \ No newline at end of file