Skip to content

Commit

Permalink
Add support for permit reservations to rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman committed Feb 9, 2022
1 parent 68e6c1f commit 94d85aa
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 20 deletions.
71 changes: 71 additions & 0 deletions src/main/java/dev/failsafe/RateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
* <ul>
* <li>{@link #tryAcquirePermit()}</li>
* <li>{@link #tryAcquirePermits(int)}</li>
* <li>{@link #reservePermit()}</li>
* <li>{@link #reservePermits(int)}</li>
* <li>{@link #tryReservePermit(Duration)}</li>
* <li>{@link #tryReservePermits(int, Duration)}</li>
* </ul>
* </p>
* <p>
Expand All @@ -50,6 +54,10 @@
* permits cannot be acquired, and the {@code tryAcquire} methods return a boolean.
* </p>
* <p>
* The {@code reserve} methods attempt to reserve permits and return an expected wait time before the permit can be
* used. This helps integrate with scenarios where you need to wait externally
* </p>
* <p>
* This class is threadsafe.
* </p>
*
Expand Down Expand Up @@ -132,6 +140,7 @@ static <R> RateLimiterBuilder<R> builder(RateLimiterConfig<R> config) {
*
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
* @see #tryAcquirePermit()
* @see #reservePermit()
*/
default void acquirePermit() throws InterruptedException {
acquirePermits(1);
Expand All @@ -144,6 +153,7 @@ default void acquirePermit() throws InterruptedException {
* @throws IllegalArgumentException if {@code permits} is < 1
* @throws InterruptedException if the current thread is interrupted while waiting to acquire the {@code permits}
* @see #tryAcquirePermits(int)
* @see #reservePermits(int)
*/
void acquirePermits(int permits) throws InterruptedException;

Expand Down Expand Up @@ -196,10 +206,68 @@ default boolean isBursty() {
return getConfig().getPeriod() != null;
}

/**
* Reserves a permit to perform an execution against the rate limiter, and returns the time that the caller is
* expected to wait before acting on the permit. Returns {@code 0} if the permit is immediately available and no
* waiting is needed.
*
* @see #acquirePermit()
* @see #tryAcquirePermit()
*/
default Duration reservePermit() {
return reservePermits(1);
}

/**
* Reserves the {@code permits} to perform executions against the rate limiter, and returns the time that the caller
* is expected to wait before acting on the permits. Returns {@code 0} if the permits are immediately available and no
* waiting is needed.
*
* @throws IllegalArgumentException if {@code permits} is < 1
* @see #acquirePermits(int)
* @see #tryAcquirePermits(int)
*/
Duration reservePermits(int permits);

/**
* Tries to reserve a permit to perform an execution against the rate limiter, and returns the time that the caller is
* expected to wait before acting on the permit, as long as it's less than the {@code maxWaitTime}.
* <ul>
* <li>Returns the expected wait time for the permit if it was successfully reserved.</li>
* <li>Returns {@code 0} if the permit was successfully reserved and no waiting is needed.</li>
* <li>Returns {@code -1 nanoseconds} if the permit was not reserved because the wait time would be greater than the {@code maxWaitTime}.</li>
* </ul>
*
* @throws NullPointerException if {@code maxWaitTime} is null
* @see #acquirePermit(Duration)
* @see #tryAcquirePermit(Duration)
*/
default Duration tryReservePermit(Duration maxWaitTime) {
return tryReservePermits(1, maxWaitTime);
}

/**
* Tries to reserve the {@code permits} to perform executions against the rate limiter, and returns the time that the
* caller is expected to wait before acting on the permits, as long as it's less than the {@code maxWaitTime}.
* <ul>
* <li>Returns the expected wait time for the permits if they were successfully reserved.</li>
* <li>Returns {@code 0} if the permits were successfully reserved and no waiting is needed.</li>
* <li>Returns {@code -1 nanoseconds} if the permits were not reserved because the wait time would be greater than the {@code maxWaitTime}.</li>
* </ul>
*
* @throws IllegalArgumentException if {@code permits} is < 1
* @throws NullPointerException if {@code maxWaitTime} is null
* @see #acquirePermit(Duration)
* @see #tryAcquirePermit(Duration)
*/
Duration tryReservePermits(int permits, Duration maxWaitTime);

/**
* Tries to acquire a permit to perform an execution against the rate limiter, returning immediately without waiting.
*
* @return whether the requested {@code permits} are successfully acquired or not
* @see #acquirePermit()
* @see #reservePermits(int)
*/
default boolean tryAcquirePermit() {
return tryAcquirePermits(1);
Expand All @@ -211,6 +279,7 @@ default boolean tryAcquirePermit() {
*
* @return whether the requested {@code permits} are successfully acquired or not
* @throws IllegalArgumentException if {@code permits} is < 1
* @see #acquirePermits(int)
*/
boolean tryAcquirePermits(int permits);

Expand All @@ -221,6 +290,7 @@ default boolean tryAcquirePermit() {
* @return whether a permit is successfully acquired
* @throws NullPointerException if {@code maxWaitTime} is null
* @throws InterruptedException if the current thread is interrupted while waiting to acquire a permit
* @see #acquirePermit(Duration)
*/
default boolean tryAcquirePermit(Duration maxWaitTime) throws InterruptedException {
return tryAcquirePermits(1, maxWaitTime);
Expand All @@ -234,6 +304,7 @@ default boolean tryAcquirePermit(Duration maxWaitTime) throws InterruptedExcepti
* @throws IllegalArgumentException if {@code permits} is < 1
* @throws NullPointerException if {@code maxWaitTime} is null
* @throws InterruptedException if the current thread is interrupted while waiting to acquire the {@code permits}
* @see #acquirePermits(int, Duration)
*/
boolean tryAcquirePermits(int permits, Duration maxWaitTime) throws InterruptedException;
}
5 changes: 4 additions & 1 deletion src/main/java/dev/failsafe/RateLimiterBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import dev.failsafe.internal.util.Assert;

import java.time.Duration;
import java.time.temporal.ChronoUnit;

/**
* Builds {@link RateLimiter} instances.
Expand Down Expand Up @@ -57,6 +56,10 @@ public RateLimiter<R> build() {
/**
* Configures the {@code maxWaitTime} to wait for permits to be available. If permits cannot be acquired before the
* {@code maxWaitTime} is exceeded, then the rate limiter will throw {@link RateLimitExceededException}.
* <p>
* This setting only applies when the resulting RateLimiter is used with the {@link Failsafe} class. It does not apply
* when the RateLimiter is used in a standalone way.
* </p>
*
* @throws NullPointerException if {@code maxWaitTime} is null
*/
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/dev/failsafe/RateLimiterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public Duration getPeriod() {
/**
* Returns the max time to wait for permits to be available. If permits cannot be acquired before the max wait time is
* exceeded, then the rate limiter will throw {@link RateLimitExceededException}.
* <p>
* This setting only applies when the RateLimiter is used with the {@link Failsafe} class. It does not apply when the
* RateLimiter is used in a standalone way.
* </p>
*
* @see RateLimiterBuilder#withMaxWaitTime(Duration)
*/
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/dev/failsafe/internal/RateLimiterExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,20 @@ protected ExecutionResult<R> preExecute() {
try {
return rateLimiter.tryAcquirePermit(maxWaitTime) ?
null :
ExecutionResult.failure(new RateLimitExceededException(rateLimiter));
ExecutionResult.exception(new RateLimitExceededException(rateLimiter));
} catch (InterruptedException e) {
// Set interrupt flag
Thread.currentThread().interrupt();
return ExecutionResult.failure(e);
return ExecutionResult.exception(e);
}
}

@Override
protected CompletableFuture<ExecutionResult<R>> preExecuteAsync(Scheduler scheduler, FailsafeFuture<R> future) {
CompletableFuture<ExecutionResult<R>> promise = new CompletableFuture<>();
long waitNanos = rateLimiter.acquirePermitWaitNanos(maxWaitTime);
long waitNanos = rateLimiter.reservePermits(1, maxWaitTime);
if (waitNanos == -1)
promise.complete(ExecutionResult.failure(new RateLimitExceededException(rateLimiter)));
promise.complete(ExecutionResult.exception(new RateLimitExceededException(rateLimiter)));
else {
try {
// Wait for the permit
Expand Down
32 changes: 17 additions & 15 deletions src/main/java/dev/failsafe/internal/RateLimiterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import dev.failsafe.internal.util.Assert;
import dev.failsafe.internal.util.Durations;
import dev.failsafe.spi.PolicyExecutor;
import dev.failsafe.spi.Scheduler;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -54,41 +52,45 @@ public RateLimiterConfig<R> getConfig() {

@Override
public void acquirePermits(int permits) throws InterruptedException {
Assert.isTrue(permits > 0, "permits must be > 0");
long waitNanos = stats.acquirePermits(permits, null);
long waitNanos = reservePermits(permits).toNanos();
if (waitNanos > 0)
TimeUnit.NANOSECONDS.sleep(waitNanos);
}

@Override
public boolean tryAcquirePermits(int permits) {
public Duration reservePermits(int permits) {
Assert.isTrue(permits > 0, "permits must be > 0");
long waitNanos = stats.acquirePermits(permits, Duration.ZERO);
return waitNanos == 0;
return Duration.ofNanos(stats.acquirePermits(permits, null));
}

@Override
public boolean tryAcquirePermits(int permits) {
return reservePermits(permits, Duration.ZERO) == 0;
}

@Override
public boolean tryAcquirePermits(int permits, Duration maxWaitTime) throws InterruptedException {
Assert.isTrue(permits > 0, "permits must be > 0");
Assert.notNull(maxWaitTime, "maxWaitTime");
long waitNanos = stats.acquirePermits(permits, Durations.ofSafeNanos(maxWaitTime));
long waitNanos = reservePermits(permits, maxWaitTime);
if (waitNanos == -1)
return false;
if (waitNanos > 0)
TimeUnit.NANOSECONDS.sleep(waitNanos);
return true;
}

@Override
public Duration tryReservePermits(int permits, Duration maxWaitTime) {
return Duration.ofNanos(reservePermits(permits, maxWaitTime));
}

@Override
public PolicyExecutor<R> toExecutor(int policyIndex) {
return new RateLimiterExecutor<>(this, policyIndex);
}

/**
* Returns the wait nanos for an acquired permit which can be used to externally wait.
*/
long acquirePermitWaitNanos(Duration maxWaitTime) {
long reservePermits(int permits, Duration maxWaitTime) {
Assert.isTrue(permits > 0, "permits must be > 0");
Assert.notNull(maxWaitTime, "maxWaitTime");
return stats.acquirePermits(1, Durations.ofSafeNanos(maxWaitTime));
return stats.acquirePermits(permits, Durations.ofSafeNanos(maxWaitTime));
}
}
24 changes: 24 additions & 0 deletions src/test/java/dev/failsafe/functional/RateLimiterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,36 @@
import java.time.Duration;

import static dev.failsafe.internal.InternalTesting.resetLimiter;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

/**
* Tests various RateLimiter scenarios.
*/
@Test
public class RateLimiterTest extends Testing {
public void testReservePermit() {
// Given
RateLimiter<Object> limiter = RateLimiter.smoothBuilder(Duration.ofMillis(100)).build();

// When / Then
assertEquals(limiter.reservePermit(), Duration.ZERO);
assertTrue(limiter.reservePermit().toMillis() > 0);
assertTrue(limiter.reservePermit().toMillis() > 100);
}

public void testTryReservePermit() {
// Given
RateLimiter<Object> limiter = RateLimiter.smoothBuilder(Duration.ofMillis(100)).build();

// When / Then
assertEquals(limiter.tryReservePermit(Duration.ofMillis(1)), Duration.ZERO);
assertEquals(limiter.tryReservePermit(Duration.ofMillis(10)), Duration.ofNanos(-1));
assertTrue(limiter.tryReservePermit(Duration.ofMillis(100)).toMillis() > 0);
assertTrue(limiter.tryReservePermit(Duration.ofMillis(200)).toMillis() > 100);
assertEquals(limiter.tryReservePermit(Duration.ofMillis(100)), Duration.ofNanos(-1));
}

public void testPermitAcquiredAfterWait() {
// Given
RateLimiter<Object> limiter = RateLimiter.smoothBuilder(Duration.ofMillis(50))
Expand Down

0 comments on commit 94d85aa

Please sign in to comment.