From 3eef1301f7c474580fc1120a7f99190324306e1a Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 25 Sep 2024 15:46:50 +0800 Subject: [PATCH 1/3] support config minimul wait duration Signed-off-by: glorv --- src/limiter.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/limiter.rs b/src/limiter.rs index 8100885..96b8a1b 100644 --- a/src/limiter.rs +++ b/src/limiter.rs @@ -33,6 +33,10 @@ struct Bucket { /// The number of bytes the bucket is carrying at the time `last_updated`. /// This value can be negative. value: f64, + /// The minimum duration to wait if self.value is smaller than 0 after + /// call `self.consume` (unit: s). + /// By default, is the same as `refill`. + min_wait: f64, } impl Bucket { @@ -51,7 +55,7 @@ impl Bucket { if self.value > 0.0 { Duration::from_secs(0) } else { - let sleep_secs = self.refill - self.value / self.speed_limit; + let sleep_secs = self.min_wait - self.value / self.speed_limit; Duration::from_secs_f64(sleep_secs) } } @@ -111,6 +115,7 @@ impl> Bucket { pub struct Builder { clock: C, bucket: Bucket, + min_wait: Option, } impl Builder { @@ -125,6 +130,7 @@ impl Builder { speed_limit: 0.0, refill: 0.1, value: 0.0, + min_wait: 0.1, }, clock, }; @@ -163,6 +169,12 @@ impl Builder { self } + /// Set the minimum wait duration. + pub fn min_wait(&mut self, dur: Duration) -> &mut Self { + self.min_wait = Some(dur.as_secs_f64()); + self + } + /// Sets the clock instance used by the limiter. pub fn clock(&mut self, clock: C) -> &mut Self { self.clock = clock; @@ -174,6 +186,8 @@ impl Builder { self.bucket.value = self.bucket.capacity(); self.bucket.last_updated = self.clock.now(); let is_unlimited = self.bucket.speed_limit.is_infinite(); + let min_wait = self.min_wait.unwrap_or(self.bucket.refill); + self.bucket.min_wait = min_wait; Limiter { bucket: Arc::new(Mutex::new(self.bucket)), clock: mem::take(&mut self.clock), From 0727ed1a6afcf8a3168ce4ff7aa8dedb25a5200d Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 25 Sep 2024 17:26:53 +0800 Subject: [PATCH 2/3] fix Signed-off-by: glorv --- src/limiter.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/limiter.rs b/src/limiter.rs index 96b8a1b..e45ce27 100644 --- a/src/limiter.rs +++ b/src/limiter.rs @@ -133,6 +133,7 @@ impl Builder { min_wait: 0.1, }, clock, + min_wait: None, }; result.speed_limit(speed_limit); result @@ -169,7 +170,9 @@ impl Builder { self } - /// Set the minimum wait duration. + /// Sets the minimum wait duration when the speed limit was exceeded. + /// + /// The default value is same as the refill period. pub fn min_wait(&mut self, dur: Duration) -> &mut Self { self.min_wait = Some(dur.as_secs_f64()); self From e9a2deb2b5ac036931df33cbaa34e2b80aa88539 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 25 Sep 2024 18:06:22 +0800 Subject: [PATCH 3/3] add a unit test Signed-off-by: glorv --- src/limiter.rs | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/limiter.rs b/src/limiter.rs index e45ce27..2d9e7ea 100644 --- a/src/limiter.rs +++ b/src/limiter.rs @@ -598,10 +598,15 @@ mod tests_with_manual_clock { impl Fixture { fn new() -> Self { + Self::with_min_wait(Duration::from_secs(1)) + } + + fn with_min_wait(min_wait: Duration) -> Self { Self { shared: SharedFixture { limiter: Limiter::builder(512.0) .refill(Duration::from_secs(1)) + .min_wait(min_wait) .build(), }, pool: LocalPool::new(), @@ -697,6 +702,43 @@ mod tests_with_manual_clock { assert_eq!(fx.total_bytes_consumed(), 1215); } + #[test] + fn over_limit_single_thread_with_min_wait() { + let mut fx = Fixture::with_min_wait(Duration::from_millis(100)); + + fx.spawn(|sfx| { + async move { + sfx.consume(200).await; + assert_eq!(sfx.now(), 0); + sfx.consume(201).await; + assert_eq!(sfx.now(), 0); + sfx.consume(202).await; + assert_eq!(sfx.now(), 277_734_375); + // 277_734_375 ns = 100_000_000 + (200+201+202-512)/512 seconds + + sfx.consume(203).await; + assert_eq!(sfx.now(), 674_218_750); + sfx.consume(204).await; + assert_eq!(sfx.now(), 1_072_656_250); + sfx.consume(205).await; + assert_eq!(sfx.now(), 1_473_046_875); + } + }); + + fx.set_time(0); + assert_eq!(fx.total_bytes_consumed(), 603); + fx.set_time(277_734_374); + assert_eq!(fx.total_bytes_consumed(), 603); + fx.set_time(277_734_375); + assert_eq!(fx.total_bytes_consumed(), 806); + fx.set_time(674_218_750); + assert_eq!(fx.total_bytes_consumed(), 1010); + fx.set_time(1_072_656_250); + assert_eq!(fx.total_bytes_consumed(), 1215); + fx.set_time(1_473_046_875); + assert_eq!(fx.total_bytes_consumed(), 1215); + } + #[test] fn over_limit_multi_thread() { let mut fx = Fixture::new();