Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate limiter #278

Merged
merged 22 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d34ac04
Add a rate limiter
andrew4699 Sep 10, 2024
48d0f49
More javadoc
andrew4699 Sep 10, 2024
1734c78
ratelimiting => ratelimiter, general clean up
andrew4699 Sep 10, 2024
00d08c1
Add new line EOFs, add RateLimiterConfig javadoc
andrew4699 Sep 10, 2024
194d754
Update javadoc, tryAcquire
andrew4699 Sep 12, 2024
a73ff08
More docs, remove Thread.sleep, Caffeine => Map, CompletableFuture =>…
andrew4699 Sep 13, 2024
6a22321
Change Map value to be a Future itself, use System.nanoTime
andrew4699 Sep 16, 2024
617e891
Implement our own rate limiter
andrew4699 Sep 16, 2024
303c63c
Delete unused OpenTelemetryClock
andrew4699 Sep 16, 2024
b43d68b
Add javadoc for TokenBucketRateLimiter
andrew4699 Sep 16, 2024
514a2ab
Change NoOpRateLimiterFactory to use completedFuture
andrew4699 Sep 18, 2024
7ce9662
Update to use PolarisRealm annotation after rebase
andrew4699 Sep 18, 2024
57576ab
Update comment
andrew4699 Sep 18, 2024
d25f5d7
Update comment and change some doubles to longs
andrew4699 Sep 19, 2024
bc455bb
Apply spotless, make RateLimitResultAsserter re-usable
andrew4699 Sep 19, 2024
1652fe2
Use InstantSource, RealmContext key, general cleanup
andrew4699 Sep 21, 2024
4be8f12
Various cleanup such as overflow guards, RateLimiterKey
andrew4699 Sep 23, 2024
f00b9fc
lastAcquireMillis => lastTokenGenerationMillis
andrew4699 Sep 23, 2024
a10a18e
Remove debug line
andrew4699 Sep 23, 2024
c6a3283
Greatly simplify by removing RateLimiterFactory/async stuff
andrew4699 Sep 23, 2024
9ff2a68
Clean up
andrew4699 Sep 23, 2024
948683b
Remove sleep from tests
andrew4699 Sep 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions polaris-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,7 @@ logging:

# Limits the size of request bodies sent to Polaris. -1 means no limit.
maxRequestBodyBytes: -1

# Optional, not specifying a "rateLimiter" section also means no rate limiter
rateLimiter:
type: no-op
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
import org.apache.polaris.service.context.RealmContextResolver;
import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
import org.apache.polaris.service.ratelimiter.RateLimiterFilter;
import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.ManifestFileCleanupTaskHandler;
import org.apache.polaris.service.task.TableCleanupTaskHandler;
Expand Down Expand Up @@ -252,6 +253,14 @@ public void run(PolarisApplicationConfig configuration, Environment environment)
.servlets()
.addFilter("tracing", new TracingFilter(openTelemetry))
.addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), true, "/*");

if (configuration.getRateLimiter() != null) {
environment
.servlets()
.addFilter("ratelimiter", new RateLimiterFilter(configuration.getRateLimiter()))
.addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), true, "/*");
}

DiscoverableAuthenticator<String, AuthenticatedPolarisPrincipal> authenticator =
configuration.getPolarisAuthenticator();
authenticator.setEntityManagerFactory(entityManagerFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
Expand All @@ -32,6 +33,7 @@
import org.apache.polaris.service.catalog.FileIOFactory;
import org.apache.polaris.service.context.CallContextResolver;
import org.apache.polaris.service.context.RealmContextResolver;
import org.apache.polaris.service.ratelimiter.RateLimiter;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand All @@ -55,6 +57,7 @@ public class PolarisApplicationConfig extends Configuration {
private String awsAccessKey;
private String awsSecretKey;
private FileIOFactory fileIOFactory;
private RateLimiter rateLimiter;

public static final long REQUEST_BODY_BYTES_NO_LIMIT = -1;
private long maxRequestBodyBytes = REQUEST_BODY_BYTES_NO_LIMIT;
Expand Down Expand Up @@ -137,6 +140,16 @@ public void setCorsConfiguration(CorsConfiguration corsConfiguration) {
this.corsConfiguration = corsConfiguration;
}

@JsonProperty("rateLimiter")
public RateLimiter getRateLimiter() {
return rateLimiter;
}

@JsonProperty("rateLimiter")
public void setRateLimiter(@Nullable RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}

public void setTaskHandler(TaskHandlerConfiguration taskHandler) {
this.taskHandler = taskHandler;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.ratelimiter;

import com.fasterxml.jackson.annotation.JsonTypeName;

/** Rate limiter that always allows the request */
@JsonTypeName("no-op")
public class NoOpRateLimiter implements RateLimiter {
@Override
public boolean tryAcquire() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.ratelimiter;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.dropwizard.jackson.Discoverable;

/** Interface for rate limiting requests */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
public interface RateLimiter extends Discoverable {
/**
* This signifies that a request is being made. That is, the rate limiter should count the request
* at this point.
*
* @return Whether the request is allowed to proceed by the rate limiter
*/
boolean tryAcquire();
andrew4699 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.ratelimiter;

import jakarta.annotation.Priority;
import jakarta.servlet.Filter;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletResponse;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.ws.rs.Priorities;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Request filter that returns a 429 Too Many Requests if the rate limiter says so */
@Priority(Priorities.AUTHORIZATION + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious if we want to apply the rate limiter filter before the authentication and after TracingFilter. Should we even authenticate the request if there are too many requests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The authentication layer can additionally have its own rate limiting, but it seems reasonable to rate limit authenticated requests like we have here.

Copy link
Contributor Author

@andrew4699 andrew4699 Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I think there's room for both types of limiters. It should be straight forward to extend this work to add a pre-auth limiter, but I'm a bit worried about the scope of this initial PR growing too big. This PR should just aim to be extensible and provide guidance on how one might extend this. I think it does that by creating generic-enough interfaces and providing one full implementation of them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, can we file an issue to track it?

public class RateLimiterFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterFilter.class);

private final RateLimiter rateLimiter;

public RateLimiterFilter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}

/** Returns a 429 if the rate limiter says so. Otherwise, forwards the request along. */
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
if (response instanceof HttpServletResponse servletResponse && !rateLimiter.tryAcquire()) {
servletResponse.setStatus(Response.Status.TOO_MANY_REQUESTS.getStatusCode());
LOGGER.atDebug().log("Rate limiting request");
return;
}
chain.doFilter(request, response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.ratelimiter;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import java.time.Clock;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.jetbrains.annotations.VisibleForTesting;

/**
* Rate limiter that maps the request's realm identifier to its own TokenBucketRateLimiter, with its
* own capacity.
*/
@JsonTypeName("realm-token-bucket")
public class RealmTokenBucketRateLimiter implements RateLimiter {
private final long requestsPerSecond;
private final long windowSeconds;
private final Map<String, RateLimiter> perRealmLimiters;

@VisibleForTesting
@JsonCreator
public RealmTokenBucketRateLimiter(
@JsonProperty("requestsPerSecond") final long requestsPerSecond,
@JsonProperty("windowSeconds") final long windowSeconds) {
this.requestsPerSecond = requestsPerSecond;
this.windowSeconds = windowSeconds;
this.perRealmLimiters = new ConcurrentHashMap<>();
}

/**
* This signifies that a request is being made. That is, the rate limiter should count the request
* at this point.
*
* @return Whether the request is allowed to proceed by the rate limiter
*/
@Override
public boolean tryAcquire() {
String key =
Optional.ofNullable(CallContext.getCurrentContext())
.map(CallContext::getRealmContext)
.map(RealmContext::getRealmIdentifier)
.orElse("");

return perRealmLimiters
.computeIfAbsent(
key,
(k) ->
new TokenBucketRateLimiter(
requestsPerSecond,
Math.multiplyExact(requestsPerSecond, windowSeconds),
getClock()))
.tryAcquire();
}

@VisibleForTesting
protected Clock getClock() {
return Clock.systemUTC();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.ratelimiter;

import java.time.InstantSource;

/**
* Token bucket implementation of a Polaris RateLimiter. Acquires tokens at a fixed rate and has a
* maximum amount of tokens. Each successful "tryAcquire" costs 1 token.
*/
public class TokenBucketRateLimiter implements RateLimiter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it could be a Semaphore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a Semaphore would work as a limit on the number of concurrent requests that blocks instead of rejecting the request. This is a limit on request rate that rejects requests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java's Semaphore has a tryAcquire, and you could reject the request if that returns false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, then it would work if the intent was a concurrent request limit. I think there's room for debate on whether or not there should be a concurrent request limiter but it doesn't feel like that has to be answered in this PR.

private final double tokensPerMilli;
private final long maxTokens;
private final InstantSource instantSource;

private double tokens;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is tokens a double?

Copy link
Contributor Author

@andrew4699 andrew4699 Sep 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Token acquisition happens during tryAcquire as opposed to happening on a timer, so you acquire partial tokens. Alternatively this could be modeled as long milliTokens but I think that would be less legible. I suspect precision isn't an issue because the minimum increment is 1/1000 with the latest change to milliseconds.

private long lastTokenGenerationMillis;

public TokenBucketRateLimiter(long tokensPerSecond, long maxTokens, InstantSource instantSource) {
this.tokensPerMilli = tokensPerSecond / 1000D;
this.maxTokens = maxTokens;
this.instantSource = instantSource;

tokens = maxTokens;
lastTokenGenerationMillis = instantSource.millis();
}

/**
* Tries to acquire and spend 1 token. Doesn't block if a token isn't available.
*
* @return whether a token was successfully acquired & spent
*/
@Override
public synchronized boolean tryAcquire() {
// Grant tokens for the time that has passed since our last tryAcquire()
long t = instantSource.millis();
long millisPassed = Math.subtractExact(t, lastTokenGenerationMillis);
lastTokenGenerationMillis = t;
tokens = Math.min(maxTokens, tokens + (millisPassed * tokensPerMilli));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check for overflow here too. All of this is kind of scary. I would rather us not write this ourselves if we can use something existing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to use existing lib if possible. Guava provides an impl. https://guava.dev/releases/23.0/api/docs/com/google/common/util/concurrent/RateLimiter.html.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think overflowing to infinity is natural/expected if the values are high enough to cause that. I do think we should check for long overflows as those wrap around, so I've added some checks there.

Copy link
Contributor Author

@andrew4699 andrew4699 Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at the libraries we already pull in and all of the ones available (including the one you linked above) either don't allow specifying window sizes or are internal APIs that the library doesn't want others to use.

Luckily the amount of code is relatively small and I added some tests.


// Take a token if they have one available
if (tokens >= 1) {
tokens--;
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ org.apache.polaris.service.context.RealmContextResolver
org.apache.polaris.service.context.CallContextResolver
org.apache.polaris.service.auth.TokenBrokerFactory
org.apache.polaris.service.catalog.FileIOFactory
org.apache.polaris.service.ratelimiter.RateLimiter
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

org.apache.polaris.service.ratelimiter.RealmTokenBucketRateLimiter
org.apache.polaris.service.ratelimiter.NoOpRateLimiter
Loading