Skip to content

Commit

Permalink
More docs, remove Thread.sleep, Caffeine => Map, CompletableFuture =>…
Browse files Browse the repository at this point in the history
… Future
  • Loading branch information
andrew4699 committed Sep 13, 2024
1 parent 5e716ad commit 4e0259d
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 44 deletions.
1 change: 1 addition & 0 deletions polaris-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,5 +170,6 @@ logging:

rateLimiter:
constructionTimeoutMillis: 2000
allowRequestOnConstructionTimeout: true
factory:
type: no-op
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

/** An interface to specify how to retrieve the current wall clock time */
public interface Clock {
/** @return the current time in nanoseconds */
/**
* @return the current time in nanoseconds
*/
long nanoTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/**
* Simple rate limiter factory that just constructs an OpenTelemetryRateLimiter and is
Expand All @@ -35,7 +36,7 @@ public class DefaultRateLimiterFactory implements RateLimiterFactory {
private double windowSeconds;

@Override
public CompletableFuture<RateLimiter> createRateLimiter(String key, Clock clock) {
public Future<RateLimiter> createRateLimiter(String key, Clock clock) {
return CompletableFuture.supplyAsync(
() ->
new OpenTelemetryRateLimiter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

import com.fasterxml.jackson.annotation.JsonTypeName;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/** Rate limiter factory that constructs a no-op rate limiter */
@JsonTypeName("no-op")
public class NoOpRateLimiterFactory implements RateLimiterFactory {
@Override
public CompletableFuture<RateLimiter> createRateLimiter(String key, Clock clock) {
public Future<RateLimiter> createRateLimiter(String key, Clock clock) {
return CompletableFuture.supplyAsync(NoOpRateLimiter::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
/** Interface for rate limiting requests */
public interface RateLimiter {
/**
* This signifies that a request is being made. That is, the rate limiter should count the request at this point.
* This signifies that a request is being made. That is, the rate limiter should count the request
* at this point.
*
* @return Whether the request is allowed to proceed by the rate limiter
* */
*/
boolean tryAcquire();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public class RateLimiterConfig {
*/
private long constructionTimeoutMillis;

/**
* Since rate limiter construction is asynchronous and has a timeout, construction may fail. If
* this option is enabled, the request will still be allowed when construction fails.
*/
private boolean allowRequestOnConstructionTimeout;

@JsonProperty("factory")
public void setRateLimiterFactory(RateLimiterFactory rateLimiterFactory) {
this.rateLimiterFactory = rateLimiterFactory;
Expand All @@ -49,4 +55,14 @@ public void setConstructionTimeoutMillis(long constructionTimeoutMillis) {
public long getConstructionTimeoutMillis() {
return constructionTimeoutMillis;
}

@JsonProperty("allowRequestOnConstructionTimeout")
public void setAllowRequestOnConstructionTimeout(boolean allowRequestOnConstructionTimeout) {
this.allowRequestOnConstructionTimeout = allowRequestOnConstructionTimeout;
}

@JsonProperty("allowRequestOnConstructionTimeout")
public boolean getAllowRequestOnConstructionTimeout() {
return allowRequestOnConstructionTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.dropwizard.jackson.Discoverable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/**
* Interface for constructing a rate limiter given the rate limiting key and clock. Notably, rate
Expand All @@ -30,5 +30,13 @@
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
public interface RateLimiterFactory extends Discoverable {
CompletableFuture<RateLimiter> createRateLimiter(String key, Clock clock);
/**
* Constructs a rate limiter asynchronously. Callers may choose to set a timeout on construction.
*
* @param key The rate limiting key. Rate limiters may optionally choose to discriminate their
* behavior by the key.
* @param clock The clock which tells you the current time
* @return a Future with the constructed RateLimiter
*/
Future<RateLimiter> createRateLimiter(String key, Clock clock);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.apache.polaris.service.ratelimiter;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import jakarta.annotation.Priority;
import jakarta.servlet.Filter;
import jakarta.servlet.FilterChain;
Expand All @@ -30,6 +28,8 @@
import jakarta.ws.rs.Priorities;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -42,28 +42,18 @@
public class RateLimiterFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterFilter.class);
private static final RateLimiter NO_OP_LIMITER = new NoOpRateLimiter();
private static final RateLimiter ALWAYS_REJECT_LIMITER =
new OpenTelemetryRateLimiter(0, 0, new ClockImpl());
private static final Clock CLOCK = new ClockImpl();

private final RateLimiterConfig config;
private final AsyncLoadingCache<String, RateLimiter> perRealmLimiters;
private final Map<String, RateLimiter> perRealmLimiters = new ConcurrentHashMap<>();

public RateLimiterFilter(RateLimiterConfig config) {
this.config = config;

Clock clock = new ClockImpl();
perRealmLimiters =
Caffeine.newBuilder()
.buildAsync(
(key, executor) -> config.getRateLimiterFactory().createRateLimiter(key, clock));
}

/**
* Returns a 429 if the rate limiter says so. Otherwise, forwards the request along.
* @param request the <code>ServletRequest</code> object contains the client's request
* @param response the <code>ServletResponse</code> object contains the filter's response
* @param chain the <code>FilterChain</code> for invoking the next filter or the resource
* @throws IOException
* @throws ServletException
*/
/** Returns a 429 if the rate limiter says so. Otherwise, forwards the request along. */
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
Expand All @@ -76,14 +66,29 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
chain.doFilter(request, response);
}

RateLimiter maybeBlockToGetRateLimiter(String realm) {
private RateLimiter maybeBlockToGetRateLimiter(String realm) {
return perRealmLimiters.computeIfAbsent(realm, this::createRateLimiterBlocking);
}

/** Creates a rate limiter, enforcing a timeout on how long creation can take. */
private RateLimiter createRateLimiterBlocking(String key) {
try {
return perRealmLimiters
.get(realm)
return config
.getRateLimiterFactory()
.createRateLimiter(key, CLOCK)
.get(config.getConstructionTimeoutMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOGGER.error("Failed to fetch rate limiter", e);
return getDefaultRateLimiterOnConstructionFailed(e);
}
}

private RateLimiter getDefaultRateLimiterOnConstructionFailed(Exception e) {
if (config.getAllowRequestOnConstructionTimeout()) {
LOGGER.error("Failed to fetch rate limiter, allowing the request", e);
return NO_OP_LIMITER;
} else {
LOGGER.error("Failed to fetch rate limiter, rejecting the request", e);
return ALWAYS_REJECT_LIMITER;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class AsyncFallbackTest {
ConfigOverride.config("rateLimiter.factory.type", "mock"),
ConfigOverride.config("rateLimiter.factory.requestsPerSecond", "0"),
ConfigOverride.config("rateLimiter.factory.windowSeconds", "0"),
ConfigOverride.config("rateLimiter.factory.delaySeconds", "999"));
ConfigOverride.config("rateLimiter.factory.neverFinishConstruction", "true"));

private static String userToken;
private static String realm;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,35 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

/**
* Factory that constructs a standard OpenTelemetryRateLimiter but with a mock Clock and an optional
* construction delay
*/
@JsonTypeName("mock")
public class MockRateLimiterFactory implements RateLimiterFactory {
public static MockClock clock = new MockClock();
public static MockClock CLOCK = new MockClock();

@JsonProperty("requestsPerSecond")
public double requestsPerSecond;

@JsonProperty("windowSeconds")
public double windowSeconds;

@JsonProperty("delaySeconds")
public long delaySeconds;
@JsonProperty("neverFinishConstruction")
public boolean neverFinishConstruction;

@Override
public CompletableFuture<RateLimiter> createRateLimiter(String key, Clock clock) {
public Future<RateLimiter> createRateLimiter(String key, Clock clock) {
if (neverFinishConstruction) {
// This future will never finish
return new CompletableFuture<>();
}
return CompletableFuture.supplyAsync(
() -> {
try {
Thread.sleep(Duration.ofSeconds(delaySeconds));
} catch (InterruptedException e) {
}

return new OpenTelemetryRateLimiter(
requestsPerSecond, requestsPerSecond * windowSeconds, MockRateLimiterFactory.clock);
});
() ->
new OpenTelemetryRateLimiter(
requestsPerSecond, requestsPerSecond * windowSeconds, CLOCK));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testRateLimiter() {
(EXT.getConfiguration().getRateLimiterConfig().getRateLimiterFactory());
long windowMillis = (long) (factory.windowSeconds * 1000);

MockClock clock = MockRateLimiterFactory.clock;
MockClock clock = MockRateLimiterFactory.CLOCK;
clock.setMillis(2 * windowMillis); // Clear any counters from before this test
for (int i = 0; i < factory.requestsPerSecond * factory.windowSeconds; i++) {
requestAsserter.accept(Response.Status.OK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ logging:

rateLimiter:
constructionTimeoutMillis: 2000
allowRequestOnConstructionTimeout: true
factory:
type: default
requestsPerSecond: 10
Expand Down

0 comments on commit 4e0259d

Please sign in to comment.