Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Traffic throttling / generation #3

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust-producer-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::stats::{LoggingContext, StatsCheckpoint};
pub(crate) mod config;
pub(crate) mod stats;
pub(crate) mod utils;
pub(crate) mod traffic;

const CHARSET: &str = "abcdefghijklmnopqrstuvwxyz0123456789";

Expand Down
21 changes: 21 additions & 0 deletions rust-producer-sync/src/traffic/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::time::{Duration, Instant};

pub(crate) mod poisson;
pub(crate) mod throttler;

#[derive(Debug)]
/// A struct to model a Rate limit:
/// amount: maximum number of events over the specified
/// duration
pub(crate) struct Rate {
pub(crate) amount: u32,
pub(crate) duration: Duration
}

/// A "TrafficGen" is generating to generate events in respect to a RateLimit
/// * Either by generating an array of "arrival instants" (when should the events occur, theoretically
/// * Or by keeping a local state and telling the caller how long it should wait before sending the next event
pub(crate) trait TrafficGen {
fn gen_arrivals(&mut self, nb_to_gen: usize) -> Vec<(Duration, Instant)>;
fn next_wait(&mut self) -> Duration;
}
93 changes: 93 additions & 0 deletions rust-producer-sync/src/traffic/poisson.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::time::{Duration, Instant};
use rand::Rng;
use crate::traffic::{Rate, TrafficGen};

pub struct PoissonGen<T: Rng> {
rng: T,
rate: Rate
}

impl<T: Rng> PoissonGen<T> {
pub(crate) fn new(rng: T, rate: Rate) -> Self {
PoissonGen { rng, rate }
}
}


impl<T: Rng> TrafficGen for PoissonGen<T> {

fn next_wait(&mut self) -> Duration {
// https://stackoverflow.com/questions/9832919/generate-poisson-arrival-in-java
let rate_per_ms = self.rate.amount as f64 / self.rate.duration.as_millis() as f64;
let uniform_incl_0_excl_1 = self.rng.gen::<f64>(); // by default: uniformly distributed in [0, 1)
let uniform_incl_1_excl_0 = 1.0 - uniform_incl_0_excl_1; // 1 - rand() => uniformly distributed in (0, 1]
let wait_ms = -1.0 * f64::ln(uniform_incl_1_excl_0) / rate_per_ms; // -ln(U) / rate
Duration::from_millis(wait_ms as u64)
}

fn gen_arrivals(&mut self, nb_to_gen: usize) -> Vec<(Duration, Instant)> {
let mut generated = 0;
let mut res = Vec::with_capacity(nb_to_gen);
let mut arrival = Instant::now();
while generated < nb_to_gen {
let should_wait_for = self.next_wait();
arrival += should_wait_for;
res.push((should_wait_for, arrival));
generated+= 1;
}
res
}
}


#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::thread::sleep;
use std::time::{Duration, Instant};
use itertools::Itertools;
use rand::thread_rng;
use crate::traffic::poisson::PoissonGen;
use crate::traffic::{Rate, TrafficGen};

#[test]
fn print_wait_distribution() {
// not a test per-se (and hard to turn into one, but prints the distribution in CSV to build an histogram)
let mut i = 0;
let rate = Rate { amount: 200, duration: Duration::from_secs(1) };
let mut gen = PoissonGen::new(thread_rng(), rate);
let mut event_secs_buckets = HashMap::new();
let origin = Instant::now();
while i < 10_000 {
sleep(gen.next_wait());
let time = Instant::now();
let bucket_seconds = time.duration_since(origin).as_secs();
let count = event_secs_buckets.entry(bucket_seconds).or_insert(0);
*count += 1;
i += 1;
}
for (secs, count) in event_secs_buckets.iter().sorted() {
println!("{},{}", secs, count);
}
}

#[test]
fn print_instant_distribution() {
// not a test per-se (and hard to turn into one, but prints the distribution in CSV to build an histogram)
let nb_to_gen = 1_000_000;
let rate = Rate { amount: 200, duration: Duration::from_secs(1) };
let mut gen = PoissonGen::new(thread_rng(), rate);
let mut event_secs_buckets = HashMap::new();
let origin = Instant::now();
let arrivals = gen.gen_arrivals(nb_to_gen);
for (_, arrival) in arrivals {
let bucket_seconds = arrival.duration_since(origin).as_secs();
let count = event_secs_buckets.entry(bucket_seconds).or_insert(0);
*count += 1;
}
for (secs, count) in event_secs_buckets.iter().sorted() {
println!("{};{}", secs, count);
}
}
}

122 changes: 122 additions & 0 deletions rust-producer-sync/src/traffic/throttler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use std::collections::vec_deque::VecDeque;
use std::time::{Duration, Instant};
use crate::traffic::{Rate, TrafficGen};

pub(crate) struct Throttler {
rate: Rate,
queue: VecDeque<Instant>
}

impl Throttler {

pub(crate) fn new(rate: Rate) -> Self {
let amount = rate.amount as usize;
Throttler {
rate,
queue: VecDeque::with_capacity(amount)
}
}

fn free_up(&mut self) {
let now = Instant::now();
let threshold = now - self.rate.duration;
while let Some(frt) = self.queue.front() {
if *frt < threshold { // no longer relevant to be counted, can be removed
self.queue.pop_front();
} else { // oldest element is still relevant to count in bucket, can't free up more space
break;
}
}
}

}

impl TrafficGen for Throttler {

fn next_wait(&mut self) -> Duration {
let now = Instant::now();
self.free_up();
self.queue.push_back(now);
if (self.queue.len() as u32) < self.rate.amount {
return Duration::from_secs(0);
}
let fst = self.queue.front().unwrap();
(*fst + self.rate.duration) - now
}

fn gen_arrivals(&mut self, nb_to_gen: usize) -> Vec<(Duration, Instant)> {
let step = self.rate.duration / self.rate.amount as u32;
let mut res = Vec::with_capacity(nb_to_gen);
let mut origin = Instant::now();
for _ in 0..nb_to_gen {
origin += step;
res.push((step, origin));
}
res
}

}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::thread::sleep;
use std::time::{Duration, Instant};
use itertools::Itertools;
use crate::traffic::{Rate, TrafficGen};
use crate::traffic::throttler::Throttler;

#[test]
fn print_wait_distribution() {
// not a test per-se (and hard to turn into one, but prints the distribution in CSV to build an histogram)
let mut i = 0;
let amount_per_sec = 750;
let rate = Rate { amount: amount_per_sec, duration: Duration::from_secs(1) };
let mut gen = Throttler::new(rate);
let mut event_secs_buckets = HashMap::new();
let origin = Instant::now();
while i < 30_000 { // 30k msg at 750msg/s should be ~>30s to run the test
let wait = gen.next_wait();
if wait > Duration::from_secs(0) {
sleep(wait);
}
let time = Instant::now();
let bucket_seconds = time.duration_since(origin).as_secs();
let count = event_secs_buckets.entry(bucket_seconds).or_insert(0);
*count += 1;
i += 1;
}
let counts_per_sec = event_secs_buckets.iter()
.sorted()
.dropping(1) // first and last may be outliers
.dropping_back(1);
for (_, count) in counts_per_sec {
assert_eq!(amount_per_sec, *count);
}
}

#[test]
fn print_instant_distribution() {
// not a test per-se (and hard to turn into one, but prints the distribution in CSV to build an histogram)
let nb_to_gen = 100_000_000;
let amount_per_sec = 750;
let rate = Rate { amount: amount_per_sec, duration: Duration::from_secs(1) };
let mut gen = Throttler::new(rate);
let mut event_secs_buckets = HashMap::new();
let origin = Instant::now();
let arrivals = gen.gen_arrivals(nb_to_gen);
for (_, arrival) in arrivals {
let bucket_seconds = arrival.duration_since(origin).as_secs();
let count = event_secs_buckets.entry(bucket_seconds).or_insert(0);
*count += 1;
}
let counts_per_sec = event_secs_buckets.iter()
.sorted()
.dropping(1) // first and last may be outliers
.dropping_back(1);
for (_, count) in counts_per_sec {
assert_eq!(amount_per_sec, *count);
}
}

}