Skip to content

Commit

Permalink
Merge pull request #10 from glorv/min_wait
Browse files Browse the repository at this point in the history
support config minimal wait duration
  • Loading branch information
kennytm authored Sep 26, 2024
2 parents 0e74acf + e9a2deb commit a113aef
Showing 1 changed file with 60 additions and 1 deletion.
61 changes: 60 additions & 1 deletion src/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ struct Bucket<I> {
/// The number of bytes the bucket is carrying at the time `last_updated`.
/// This value can be negative.
value: f64,
/// The minimum duration to wait if self.value is smaller than 0 after
/// call `self.consume` (unit: s).
/// By default, is the same as `refill`.
min_wait: f64,
}

impl<I> Bucket<I> {
Expand All @@ -51,7 +55,7 @@ impl<I> Bucket<I> {
if self.value > 0.0 {
Duration::from_secs(0)
} else {
let sleep_secs = self.refill - self.value / self.speed_limit;
let sleep_secs = self.min_wait - self.value / self.speed_limit;
Duration::from_secs_f64(sleep_secs)
}
}
Expand Down Expand Up @@ -111,6 +115,7 @@ impl<I: Copy + Sub<Output = Duration>> Bucket<I> {
pub struct Builder<C: Clock> {
clock: C,
bucket: Bucket<C::Instant>,
min_wait: Option<f64>,
}

impl<C: Clock> Builder<C> {
Expand All @@ -125,8 +130,10 @@ impl<C: Clock> Builder<C> {
speed_limit: 0.0,
refill: 0.1,
value: 0.0,
min_wait: 0.1,
},
clock,
min_wait: None,
};
result.speed_limit(speed_limit);
result
Expand Down Expand Up @@ -163,6 +170,14 @@ impl<C: Clock> Builder<C> {
self
}

/// Sets the minimum wait duration when the speed limit was exceeded.
///
/// The default value is same as the refill period.
pub fn min_wait(&mut self, dur: Duration) -> &mut Self {
self.min_wait = Some(dur.as_secs_f64());
self
}

/// Sets the clock instance used by the limiter.
pub fn clock(&mut self, clock: C) -> &mut Self {
self.clock = clock;
Expand All @@ -174,6 +189,8 @@ impl<C: Clock> Builder<C> {
self.bucket.value = self.bucket.capacity();
self.bucket.last_updated = self.clock.now();
let is_unlimited = self.bucket.speed_limit.is_infinite();
let min_wait = self.min_wait.unwrap_or(self.bucket.refill);
self.bucket.min_wait = min_wait;
Limiter {
bucket: Arc::new(Mutex::new(self.bucket)),
clock: mem::take(&mut self.clock),
Expand Down Expand Up @@ -581,10 +598,15 @@ mod tests_with_manual_clock {

impl Fixture {
fn new() -> Self {
Self::with_min_wait(Duration::from_secs(1))
}

fn with_min_wait(min_wait: Duration) -> Self {
Self {
shared: SharedFixture {
limiter: Limiter::builder(512.0)
.refill(Duration::from_secs(1))
.min_wait(min_wait)
.build(),
},
pool: LocalPool::new(),
Expand Down Expand Up @@ -680,6 +702,43 @@ mod tests_with_manual_clock {
assert_eq!(fx.total_bytes_consumed(), 1215);
}

#[test]
fn over_limit_single_thread_with_min_wait() {
let mut fx = Fixture::with_min_wait(Duration::from_millis(100));

fx.spawn(|sfx| {
async move {
sfx.consume(200).await;
assert_eq!(sfx.now(), 0);
sfx.consume(201).await;
assert_eq!(sfx.now(), 0);
sfx.consume(202).await;
assert_eq!(sfx.now(), 277_734_375);
// 277_734_375 ns = 100_000_000 + (200+201+202-512)/512 seconds

sfx.consume(203).await;
assert_eq!(sfx.now(), 674_218_750);
sfx.consume(204).await;
assert_eq!(sfx.now(), 1_072_656_250);
sfx.consume(205).await;
assert_eq!(sfx.now(), 1_473_046_875);
}
});

fx.set_time(0);
assert_eq!(fx.total_bytes_consumed(), 603);
fx.set_time(277_734_374);
assert_eq!(fx.total_bytes_consumed(), 603);
fx.set_time(277_734_375);
assert_eq!(fx.total_bytes_consumed(), 806);
fx.set_time(674_218_750);
assert_eq!(fx.total_bytes_consumed(), 1010);
fx.set_time(1_072_656_250);
assert_eq!(fx.total_bytes_consumed(), 1215);
fx.set_time(1_473_046_875);
assert_eq!(fx.total_bytes_consumed(), 1215);
}

#[test]
fn over_limit_multi_thread() {
let mut fx = Fixture::new();
Expand Down

0 comments on commit a113aef

Please sign in to comment.