From 97cf7021f777ff3bd3de9fc19c25512173deb1f0 Mon Sep 17 00:00:00 2001 From: Chenjp Date: Fri, 6 Dec 2024 17:25:37 +0800 Subject: [PATCH 1/4] Provides an exact rate limiting mechanism you need exact rate limiting and can accept a small decrease in efficiency, ExactRateLimiter may be an alternative option. --- .../catalina/filters/RateLimitFilter.java | 43 +++- .../catalina/util/ExactRateLimiter.java | 66 ++++++ .../apache/catalina/util/FastRateLimiter.java | 153 ++++++++----- .../org/apache/catalina/util/RateLimiter.java | 32 +-- .../apache/catalina/util/RateLimiterBase.java | 112 +++++++++ .../catalina/util/TimeBucketCounter.java | 167 +++++++------- .../catalina/filters/TestRateLimitFilter.java | 7 +- ...stRateLimitFilterWithExactRateLimiter.java | 215 ++++++++++++++++++ webapps/docs/config/filter.xml | 27 ++- 9 files changed, 627 insertions(+), 195 deletions(-) create mode 100644 java/org/apache/catalina/util/ExactRateLimiter.java create mode 100644 java/org/apache/catalina/util/RateLimiterBase.java create mode 100644 test/org/apache/catalina/filters/TestRateLimitFilterWithExactRateLimiter.java diff --git a/java/org/apache/catalina/filters/RateLimitFilter.java b/java/org/apache/catalina/filters/RateLimitFilter.java index b7a4b23c2cd1..f96dc805dda8 100644 --- a/java/org/apache/catalina/filters/RateLimitFilter.java +++ b/java/org/apache/catalina/filters/RateLimitFilter.java @@ -17,6 +17,7 @@ package org.apache.catalina.filters; import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; import jakarta.servlet.FilterChain; import jakarta.servlet.FilterConfig; @@ -29,6 +30,7 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.threads.ScheduledThreadPoolExecutor; /** *

@@ -42,13 +44,16 @@ * the bucket time ends and a new bucket starts. *

*

- * The RateLimiter implementation can be set via the className init param. The default implementation, - * org.apache.catalina.util.FastRateLimiter, is optimized for efficiency and low overhead so it converts - * some configured values to more efficient values. For example, a configuration of a 60 seconds time bucket is - * converted to 65.536 seconds. That allows for very fast bucket calculation using bit shift arithmetic. In order to - * remain true to the user intent, the configured number of requests is then multiplied by the same ratio, so a - * configuration of 100 Requests per 60 seconds, has the real values of 109 Requests per 65 seconds. You can specify a - * different class as long as it implements the org.apache.catalina.util.RateLimiter interface. + * The RateLimiter implementation can be set via the rateLimitClassName init param. The default + * implementation, org.apache.catalina.util.FastRateLimiter, is optimized for efficiency and low overhead + * so it converts some configured values to more efficient values. For example, a configuration of a 60 seconds time + * bucket is converted to 65.536 seconds. That allows for very fast bucket calculation using bit shift arithmetic. In + * order to remain true to the user intent, the configured number of requests is then multiplied by the same ratio, so a + * configuration of 100 Requests per 60 seconds, has the real values of 109 Requests per 65 seconds. An alternative + * implementation, org.apache.catalina.util.ExactRateLimiter, is intended to provide a less efficient but + * more accurate control, whose effective duration in seconds and number of requests configuration are consist with the + * user declared. You can specify a different class as long as it implements the + * org.apache.catalina.util.RateLimiter interface. *

*

* It is common to set up different restrictions for different URIs. For example, a login page or authentication script @@ -197,16 +202,21 @@ protected boolean isConfigProblemFatal() { public void init(FilterConfig filterConfig) throws ServletException { super.init(filterConfig); + ScheduledExecutorService utilityExecutor = (ScheduledExecutorService) filterConfig.getServletContext() + .getAttribute(ScheduledThreadPoolExecutor.class.getName()); + if (utilityExecutor == null) { + if (newExecutorService == null) { + newExecutorService = new java.util.concurrent.ScheduledThreadPoolExecutor(1); + } + utilityExecutor = newExecutorService; + } + try { rateLimiter = (RateLimiter) Class.forName(rateLimitClassName).getConstructor().newInstance(); } catch (ReflectiveOperationException e) { throw new ServletException(e); } - rateLimiter.setDuration(bucketDuration); - rateLimiter.setRequests(bucketRequests); - rateLimiter.setFilterConfig(filterConfig); - if (policyName != null) { String trimmedName = policyName.trim(); if (!trimmedName.isEmpty()) { @@ -214,6 +224,8 @@ public void init(FilterConfig filterConfig) throws ServletException { } } + rateLimiter.initialize(utilityExecutor, bucketDuration, bucketRequests); + filterName = filterConfig.getFilterName(); log.info(sm.getString("rateLimitFilter.initialized", filterName, Integer.valueOf(bucketRequests), @@ -250,9 +262,18 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha chain.doFilter(request, response); } + private ScheduledExecutorService newExecutorService = null; + @Override public void destroy() { rateLimiter.destroy(); + if (newExecutorService != null) { + try { + newExecutorService.shutdown(); + } catch (SecurityException e) { + // ignore + } + } super.destroy(); } diff --git a/java/org/apache/catalina/util/ExactRateLimiter.java b/java/org/apache/catalina/util/ExactRateLimiter.java new file mode 100644 index 000000000000..9e4465469d0a --- /dev/null +++ b/java/org/apache/catalina/util/ExactRateLimiter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.util; + +import java.util.concurrent.ScheduledExecutorService; + +/** + * A RateLimiter that compromises efficiency for accurate in order to provide exact rate limiting. + */ +public class ExactRateLimiter extends RateLimiterBase { + + @Override + protected String getDefaultPolicyName() { + return "exact"; + } + + @Override + protected TimeBucketCounter newCounterInstance(ScheduledExecutorService executorService, int duration) { + return new ExactTimeBucketCounter(executorService, duration); + } + + /** + * An accurate counter with exact bucket index, but slightly less efficient than another fast counter provided with + * the {@link FastRateLimiter}. + */ + class ExactTimeBucketCounter extends TimeBucketCounter { + + ExactTimeBucketCounter(ScheduledExecutorService executorService, int bucketDuration) { + super(executorService, bucketDuration); + } + + @Override + public long getBucketIndex(long timestamp) { + return (timestamp / 1000) / getBucketDuration(); + } + + @Override + public double getRatio() { + // actual value is exactly same with declared. + return 1.0d; + } + + @Override + public long getMillisUntilNextBucket() { + long millis = System.currentTimeMillis(); + + long nextTimeBucketMillis = (getBucketIndex(millis) + 1) * getBucketDuration() * 1000; + long delta = nextTimeBucketMillis - millis; + return delta; + } + } +} diff --git a/java/org/apache/catalina/util/FastRateLimiter.java b/java/org/apache/catalina/util/FastRateLimiter.java index 486a133a6489..af6f170e0860 100644 --- a/java/org/apache/catalina/util/FastRateLimiter.java +++ b/java/org/apache/catalina/util/FastRateLimiter.java @@ -18,88 +18,119 @@ package org.apache.catalina.util; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicInteger; - -import jakarta.servlet.FilterConfig; - -import org.apache.tomcat.util.threads.ScheduledThreadPoolExecutor; /** * A RateLimiter that compromises accuracy for speed in order to provide maximum throughput. */ -public class FastRateLimiter implements RateLimiter { - - private static AtomicInteger index = new AtomicInteger(); - - TimeBucketCounter bucketCounter; - - int duration; - - int requests; - - int actualRequests; - - int actualDuration; - - // Initial policy name can be rewritten by setPolicyName() - private String policyName = "fast-" + index.incrementAndGet(); +public class FastRateLimiter extends RateLimiterBase implements RateLimiter { @Override - public String getPolicyName() { - return policyName; + protected String getDefaultPolicyName() { + return "fast"; } @Override - public void setPolicyName(String name) { - this.policyName = name; + protected TimeBucketCounter newCounterInstance(ScheduledExecutorService executorService, int duration) { + return new FastTimeBucketCounter(executorService, duration); } - @Override - public int getDuration() { - return actualDuration; - } + /** + * A fast counter that optimizes efficiency at the expense of approximate bucket indexing. + */ + class FastTimeBucketCounter extends TimeBucketCounter { - @Override - public void setDuration(int duration) { - this.duration = duration; - } + FastTimeBucketCounter(ScheduledExecutorService utilityExecutor, int bucketDuration) { + super(utilityExecutor, getActualDuration(bucketDuration)); + this.numBits = determineShiftBitsOfDuration(bucketDuration); + this.ratio = ratioToPowerOf2(bucketDuration * 1000); + } - @Override - public int getRequests() { - return actualRequests; - } + /** + * Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16 for 65_536ms which is about 1:05 minute + */ + private final int numBits; - @Override - public void setRequests(int requests) { - this.requests = requests; - } + /** + * Ratio of actual duration to config duration + */ + private final double ratio; - @Override - public int increment(String ipAddress) { - return bucketCounter.increment(ipAddress); - } + @Override + public long getBucketIndex(long timestamp) { + return System.currentTimeMillis() >> this.numBits; + } - @Override - public void destroy() { - bucketCounter.destroy(); - } + protected int getNumBits() { + return numBits; + } - @Override - public void setFilterConfig(FilterConfig filterConfig) { + /** + * Determines the bits of shift for the specific bucket duration in seconds, which used to figure out the + * correct bucket index. + */ + protected static final int determineShiftBitsOfDuration(int duration) { + int bits = 0; + int pof2 = nextPowerOf2(duration * 1000); + int bitCheck = pof2; + while (bitCheck > 1) { + bitCheck = pof2 >> ++bits; + } + return bits; + } - ScheduledExecutorService executorService = (ScheduledExecutorService) filterConfig.getServletContext() - .getAttribute(ScheduledThreadPoolExecutor.class.getName()); + /** + * The actual duration may differ from the configured duration because it is set to the next power of 2 value in + * order to perform very fast bit shift arithmetic. + * + * @param duration in seconds + * + * @return the actual bucket duration in seconds + * + * @see FastTimeBucketCounter#determineShiftBitsOfDuration(int) + */ + protected static final int getActualDuration(int duration) { + return (int) (1L << determineShiftBitsOfDuration(duration)) / 1000; + } - if (executorService == null) { - executorService = new java.util.concurrent.ScheduledThreadPoolExecutor(1); + + /** + * Returns the ratio between the configured duration param and the actual duration which will be set to the next + * power of 2. We then multiply the configured requests param by the same ratio in order to compensate for the + * added time, if any. + * + * @return the ratio, e.g. 1.092 if the actual duration is 65_536 for the configured duration of 60_000 + */ + public double getRatio() { + return ratio; } - bucketCounter = new TimeBucketCounter(duration, executorService); - actualRequests = (int) Math.round(bucketCounter.getRatio() * requests); - actualDuration = bucketCounter.getActualDuration() / 1000; - } + /** + * Returns the ratio to the next power of 2 so that we can adjust the value. + */ + protected static double ratioToPowerOf2(int value) { + double nextPO2 = nextPowerOf2(value); + return Math.round((1000d * nextPO2 / value)) / 1000d; + } + + /** + * Returns the next power of 2 given a value, e.g. 256 for 250, or 1024, for 1000. + */ + static int nextPowerOf2(int value) { + int valueOfHighestBit = Integer.highestOneBit(value); + if (valueOfHighestBit == value) { + return value; + } - public TimeBucketCounter getBucketCounter() { - return bucketCounter; + return valueOfHighestBit << 1; + } + + + @Override + public long getMillisUntilNextBucket() { + long millis = System.currentTimeMillis(); + long nextTimeBucketMillis = ((millis + (long) Math.pow(2, numBits)) >> numBits) << numBits; + long delta = nextTimeBucketMillis - millis; + return delta; + } } } diff --git a/java/org/apache/catalina/util/RateLimiter.java b/java/org/apache/catalina/util/RateLimiter.java index d1beb653990d..34939cb50b3c 100644 --- a/java/org/apache/catalina/util/RateLimiter.java +++ b/java/org/apache/catalina/util/RateLimiter.java @@ -17,7 +17,7 @@ package org.apache.catalina.util; -import jakarta.servlet.FilterConfig; +import java.util.concurrent.ScheduledExecutorService; public interface RateLimiter { @@ -26,33 +26,19 @@ public interface RateLimiter { */ int getDuration(); - /** - * Sets the configured duration value in seconds. - * - * @param duration The duration of the time window in seconds - */ - void setDuration(int duration); - /** * @return the maximum number of requests allowed per time window */ int getRequests(); /** - * Sets the configured number of requests allowed per time window. - * - * @param requests The number of requests per time window - */ - void setRequests(int requests); - - /** - * Increments the number of requests by the given ipAddress in the current time window. + * Increments the number of requests by the given identifier in the current time window. * - * @param ipAddress the ip address + * @param identifier of target request * * @return the new value after incrementing */ - int increment(String ipAddress); + int increment(String identifier); /** * Cleanup no longer needed resources. @@ -60,11 +46,13 @@ public interface RateLimiter { void destroy(); /** - * Pass the FilterConfig to configure the filter. + * Initialize with parameters, start {@link TimeBucketCounter}. * - * @param filterConfig The FilterConfig used to configure the associated filter + * @param executorService the executor + * @param duration the duration of the time window in seconds + * @param requests the configured number of requests allowed per time window */ - void setFilterConfig(FilterConfig filterConfig); + void initialize(ScheduledExecutorService executorService, int duration, int requests); /** * @return name of RateLimit policy @@ -92,7 +80,7 @@ default String getPolicy() { /** * Provide the quota header for this rate limit for a given request count within the current time window. * - * @param requestCount The request count within the current time window + * @param requestCount The request count within the current time window * * @return the quota header for the given value of request count * diff --git a/java/org/apache/catalina/util/RateLimiterBase.java b/java/org/apache/catalina/util/RateLimiterBase.java new file mode 100644 index 000000000000..9367703ac284 --- /dev/null +++ b/java/org/apache/catalina/util/RateLimiterBase.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.util; + +import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Basic implementation of {@link RateLimiter}, provides runtime data maintenance mechanism monitor. + */ +public abstract class RateLimiterBase implements RateLimiter { + + private static final AtomicInteger index = new AtomicInteger(); + + TimeBucketCounter bucketCounter; + + int actualRequests; + + int actualDuration; + + // Initial policy name can be rewritten by setPolicyName() + private String policyName = null; + + /** + * If policy name has not been specified, the first call of {@link #getPolicyName()} returns a auto-generated policy + * name using the default policy name as prefix and followed by auto-increase index. + * + * @return default policy name, as a prefix of auto-generated policy name. + */ + protected abstract String getDefaultPolicyName(); + + @Override + public String getPolicyName() { + if (policyName == null) { + policyName = getDefaultPolicyName() + "-" + index.incrementAndGet(); + } + return policyName; + } + + @Override + public void setPolicyName(String name) { + Objects.requireNonNull(name); + this.policyName = name; + } + + @Override + public int getDuration() { + return actualDuration; + } + + @Override + public int getRequests() { + return actualRequests; + } + + @Override + public int increment(String identifier) { + return bucketCounter.increment(identifier); + } + + @Override + public void destroy() { + bucketCounter.destroy(); + } + + /** + * Instantiate an instance of {@link TimeBucketCounter} for specific time bucket size. Concrete classes determine + * its counter policy by returning different implementation instance. + * + * @param utilityExecutor the executor + * @param duration size of each time bucket in seconds + * + * @return counter instance of {@link TimeBucketCounter} + */ + protected abstract TimeBucketCounter newCounterInstance(ScheduledExecutorService utilityExecutor, int duration); + + @Override + public void initialize(ScheduledExecutorService utilityExecutor, int duration, int requests) { + if (bucketCounter != null) { + bucketCounter.destroy(); + } + + bucketCounter = newCounterInstance(utilityExecutor, duration); + + actualDuration = bucketCounter.getBucketDuration(); + actualRequests = (int) Math.round(bucketCounter.getRatio() * requests); + } + + /** + * Returns the internal instance of {@link TimeBucketCounter} + * + * @return instance of {@link TimeBucketCounter} + */ + public TimeBucketCounter getBucketCounter() { + return bucketCounter; + } +} diff --git a/java/org/apache/catalina/util/TimeBucketCounter.java b/java/org/apache/catalina/util/TimeBucketCounter.java index 3b4726f7ff4c..27ccb0019cc8 100644 --- a/java/org/apache/catalina/util/TimeBucketCounter.java +++ b/java/org/apache/catalina/util/TimeBucketCounter.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.catalina.util; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -30,62 +30,44 @@ /** * This class maintains a thread safe hash map that has timestamp-based buckets followed by a string for a key, and a - * counter for a value. each time the increment() method is called it adds the key if it does not exist, increments its - * value and returns it. a maintenance thread cleans up keys that are prefixed by previous timestamp buckets. + * counter for a integer value. each time the increment() method is called it adds the key if it does not exist, + * increments its value and returns it. */ -public class TimeBucketCounter { +public abstract class TimeBucketCounter { private static final Log log = LogFactory.getLog(TimeBucketCounter.class); private static final StringManager sm = StringManager.getManager(TimeBucketCounter.class); + private static final String BUCKET_KEY_DELIMITER = "^"; /** * Map to hold the buckets */ private final ConcurrentHashMap map = new ConcurrentHashMap<>(); /** - * Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16 for 65_536ms which is about 1:05 minute - */ - private final int numBits; - - /** - * Ratio of actual duration to config duration - */ - private final double ratio; - - /** - * The future allowing control of the background processor. + * /** The future allowing control of the background processor. */ private ScheduledFuture maintenanceFuture; private ScheduledFuture monitorFuture; private final ScheduledExecutorService executorService; private final long sleeptime; + private int bucketDuration; /** * Creates a new TimeBucketCounter with the specified lifetime. * + * @param utilityExecutor the executor that should be used to handle maintenance task. * @param bucketDuration duration in seconds, e.g. for 1 minute pass 60 - * @param executorService the executor service which will be used to run the maintenance + * + * @throws NullPointerException if executorService is null. */ - public TimeBucketCounter(int bucketDuration, ScheduledExecutorService executorService) { - - this.executorService = executorService; - - int durationMillis = bucketDuration * 1000; - - int bits = 0; - int pof2 = nextPowerOf2(durationMillis); - int bitCheck = pof2; - while (bitCheck > 1) { - bitCheck = pof2 >> ++bits; - } + public TimeBucketCounter(ScheduledExecutorService utilityExecutor, int bucketDuration) { + Objects.requireNonNull(utilityExecutor); + this.executorService = utilityExecutor; + this.bucketDuration = bucketDuration; - this.numBits = bits; - - this.ratio = ratioToPowerOf2(durationMillis); - - int cleanupsPerBucketDuration = (durationMillis >= 60_000) ? 6 : 3; - sleeptime = durationMillis / cleanupsPerBucketDuration; + int cleanupsPerBucketDuration = (bucketDuration >= 60) ? 6 : 3; + sleeptime = bucketDuration * 1000 / cleanupsPerBucketDuration; // Start our thread if (sleeptime > 0) { @@ -93,91 +75,85 @@ public TimeBucketCounter(int bucketDuration, ScheduledExecutorService executorSe } } + /** + * @return bucketDuration in seconds + */ + public int getBucketDuration() { + return bucketDuration; + } + + /** + * Returns the ratio between the configured duration param and the actual duration. + * @return the ratio between the configured duration param and the actual duration. + */ + public abstract double getRatio(); + /** * Increments the counter for the passed identifier in the current time bucket and returns the new value. * * @param identifier an identifier for which we want to maintain count, e.g. IP Address * * @return the count within the current time bucket + * + * @see TimeBucketCounter#genKey(String) */ public final int increment(String identifier) { - String key = getCurrentBucketPrefix() + "-" + identifier; + String key = genKey(identifier); AtomicInteger ai = map.computeIfAbsent(key, v -> new AtomicInteger()); return ai.incrementAndGet(); } /** - * Calculates the current time bucket prefix by shifting bits for fast division, e.g. shift 16 bits is the same as - * dividing by 65,536 which is about 1:05m. + * Generates the key of timeBucket counter maps with the specific identifier, and the timestamp is implicitly + * equivalent to "now". * - * @return The current bucket prefix. - */ - public final int getCurrentBucketPrefix() { - return (int) (System.currentTimeMillis() >> this.numBits); - } - - public int getNumBits() { - return numBits; - } - - /** - * The actual duration may differ from the configured duration because it is set to the next power of 2 value in - * order to perform very fast bit shift arithmetic. + * @param identifier an identifier for which we want to maintain count * - * @return the actual bucket duration in milliseconds + * @return key of timeBucket counter maps */ - public int getActualDuration() { - return (int) Math.pow(2, getNumBits()); + protected final String genKey(String identifier) { + return genKey(identifier, System.currentTimeMillis()); } /** - * Returns the ratio between the configured duration param and the actual duration which will be set to the next - * power of 2. We then multiply the configured requests param by the same ratio in order to compensate for the added - * time, if any. + * Generates the key of timeBucket counter maps with the specific identifier and timestamp. + * + * @param identifier of target request + * @param timestamp when target request received * - * @return the ratio, e.g. 1.092 if the actual duration is 65_536 for the configured duration of 60_000 + * @return key of timeBucket counter maps */ - public double getRatio() { - return ratio; + protected final String genKey(String identifier, long timestamp) { + return getBucketIndex(timestamp) + BUCKET_KEY_DELIMITER + identifier; } /** - * Returns the ratio to the next power of 2 so that we can adjust the value. + * Calculate the bucket index for the specific timestamp, concrete subclass. + * + * @param timestamp the specific timestamp in milliseconds + * + * @return prefix the bucket key prefix for the specific timestamp */ - static double ratioToPowerOf2(int value) { - double nextPO2 = nextPowerOf2(value); - return Math.round((1000 * nextPO2 / value)) / 1000d; - } - - /** - * Returns the next power of 2 given a value, e.g. 256 for 250, or 1024, for 1000. - */ - static int nextPowerOf2(int value) { - int valueOfHighestBit = Integer.highestOneBit(value); - if (valueOfHighestBit == value) { - return value; - } - - return valueOfHighestBit << 1; - } + protected abstract long getBucketIndex(long timestamp); /** * When we want to test a full bucket duration we need to sleep until the next bucket starts. + *

+ * WARNING: This method is used for test purpose. * * @return the number of milliseconds until the next bucket */ - public long getMillisUntilNextBucket() { - long millis = System.currentTimeMillis(); - long nextTimeBucketMillis = ((millis + (long) Math.pow(2, numBits)) >> numBits) << numBits; - long delta = nextTimeBucketMillis - millis; - return delta; - } + @Deprecated + public abstract long getMillisUntilNextBucket(); /** - * Sets isRunning to false to terminate the maintenance thread. + * Destroy resources */ public void destroy() { - // Stop our thread + this.map.clear(); + System.err.println("###################################################################"); + System.err.println("Destroying monitorFuture:" + monitorFuture + "; maintenanceFuture:" + maintenanceFuture); + System.err.println("###################################################################"); if (monitorFuture != null) { monitorFuture.cancel(true); monitorFuture = null; @@ -188,13 +164,28 @@ public void destroy() { } } + /** + * Periodic evict, perform removal of obsolete bucket items. Absence of this operation may result in OOM after a + * long run. + */ + public void periodicEvict() { + final long minBucketIndex = getBucketIndex(System.currentTimeMillis()); + // assume that elapsed time of periodicEvict less than 1x bucketDuration. + // to avoid extreme case: 999999-xxx vs 1000000-xxx + final long maxBucket = minBucketIndex + 2; + + final String minBucketPrefix = minBucketIndex + BUCKET_KEY_DELIMITER; + final String maxBucketPrefix = maxBucket + BUCKET_KEY_DELIMITER; + + // remove obsolete items whose key are less than minBucketPrefix and maxBucketPrefix in same time. + map.keySet().removeIf(k -> k.compareTo(minBucketPrefix) < 0 && k.compareTo(maxBucketPrefix) < 0); + } + + private class Maintenance implements Runnable { @Override public void run() { - String currentBucketPrefix = String.valueOf(getCurrentBucketPrefix()); - ConcurrentHashMap.KeySetView keys = map.keySet(); - // remove obsolete keys - keys.removeIf(k -> !k.startsWith(currentBucketPrefix)); + periodicEvict(); } } diff --git a/test/org/apache/catalina/filters/TestRateLimitFilter.java b/test/org/apache/catalina/filters/TestRateLimitFilter.java index 74a1ecae3844..3533fa9d75c6 100644 --- a/test/org/apache/catalina/filters/TestRateLimitFilter.java +++ b/test/org/apache/catalina/filters/TestRateLimitFilter.java @@ -52,11 +52,11 @@ private void testRateLimitWith4Clients(boolean exposeHeaders, boolean enforce) t Tomcat tomcat = getTomcatInstance(); Context root = tomcat.addContext("", TEMP_DIR); - tomcat.start(); - MockFilterChain filterChain = new MockFilterChain(); RateLimitFilter rateLimitFilter = testRateLimitFilter(filterDef, root); + tomcat.start(); + FastRateLimiter fastRateLimiter = (FastRateLimiter) rateLimitFilter.rateLimiter; int allowedRequests = fastRateLimiter.getRequests(); @@ -140,9 +140,6 @@ private RateLimitFilter testRateLimitFilter(FilterDef filterDef, Context root) t root.addFilterMap(filterMap); FilterConfig filterConfig = TesterFilterConfigs.generateFilterConfig(filterDef); - - rateLimitFilter.init(filterConfig); - return rateLimitFilter; } diff --git a/test/org/apache/catalina/filters/TestRateLimitFilterWithExactRateLimiter.java b/test/org/apache/catalina/filters/TestRateLimitFilterWithExactRateLimiter.java new file mode 100644 index 000000000000..a293336b4104 --- /dev/null +++ b/test/org/apache/catalina/filters/TestRateLimitFilterWithExactRateLimiter.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.filters; + +import java.io.IOException; + +import jakarta.servlet.FilterChain; +import jakarta.servlet.FilterConfig; +import jakarta.servlet.ServletException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.catalina.Context; +import org.apache.catalina.filters.TestRemoteIpFilter.MockFilterChain; +import org.apache.catalina.filters.TestRemoteIpFilter.MockHttpServletRequest; +import org.apache.catalina.startup.Tomcat; +import org.apache.catalina.startup.TomcatBaseTest; +import org.apache.catalina.util.ExactRateLimiter; +import org.apache.tomcat.unittest.TesterResponse; +import org.apache.tomcat.util.descriptor.web.FilterDef; +import org.apache.tomcat.util.descriptor.web.FilterMap; + +public class TestRateLimitFilterWithExactRateLimiter extends TomcatBaseTest { + private void testRateLimitWith1Clients(boolean exposeHeaders, boolean enforce) throws Exception { + + int bucketRequests = 40; + int bucketDuration = 4; + + FilterDef filterDef = new FilterDef(); + filterDef.addInitParameter("bucketRequests", String.valueOf(bucketRequests)); + filterDef.addInitParameter("bucketDuration", String.valueOf(bucketDuration)); + filterDef.addInitParameter("enforce", String.valueOf(enforce)); + filterDef.addInitParameter("exposeHeaders", String.valueOf(exposeHeaders)); + filterDef.addInitParameter("rateLimitClassName", "org.apache.catalina.util.ExactRateLimiter"); + + Tomcat tomcat = getTomcatInstance(); + Context root = tomcat.addContext("", TEMP_DIR); + + MockFilterChain filterChain = new MockFilterChain(); + RateLimitFilter rateLimitFilter = testRateLimitFilter(filterDef, root); + tomcat.start(); + + ExactRateLimiter exactRateLimiter = (ExactRateLimiter) rateLimitFilter.rateLimiter; + + int allowedRequests = exactRateLimiter.getRequests(); + long sleepTime = exactRateLimiter.getBucketCounter().getMillisUntilNextBucket(); + System.out.printf("Sleeping %d millis for the next time bucket to start\n", Long.valueOf(sleepTime)); + Thread.sleep(sleepTime); + + TestClient tc1 = new TestClient(rateLimitFilter, filterChain, "10.20.20.5", 50, 5); // TPS: 5 + TestClient tc2 = new TestClient(rateLimitFilter, filterChain, "10.20.20.10", 100, 10); // TPS: 10 + + TestClient tc3 = new TestClient(rateLimitFilter, filterChain, "10.20.20.20", 200, 20); // TPS: 20 + TestClient tc4 = new TestClient(rateLimitFilter, filterChain, "10.20.20.40", 400, 40); // TPS: 40 + tc1.join(); + tc2.join(); + tc3.join(); + tc4.join(); + Assert.assertEquals(200, tc1.results[24]); // only 25 requests made in 5 seconds, all allowed + + Assert.assertEquals(200, tc2.results[49]); // only 50 requests made in 5 seconds, all allowed + + Assert.assertEquals(200, tc3.results[39]); // first allowedRequests allowed + + if (enforce) { + Assert.assertEquals(429, tc3.results[allowedRequests]); // subsequent requests dropped + } else { + Assert.assertEquals(200, tc3.results[allowedRequests]); + } + + Assert.assertEquals(200, tc4.results[allowedRequests - 1]); // first allowedRequests allowed + + if (enforce) { + Assert.assertEquals(429, tc4.results[allowedRequests]); // subsequent requests dropped + } else { + Assert.assertEquals(200, tc4.results[allowedRequests]); + } + + if (exposeHeaders) { + Assert.assertTrue(tc3.rlpHeader[24].contains("q=" + allowedRequests)); + Assert.assertTrue(tc3.rlpHeader[allowedRequests].contains("q=" + allowedRequests)); + if (enforce) { + Assert.assertTrue(tc3.rlHeader[24].contains("r=")); + Assert.assertFalse(tc3.rlHeader[24].contains("r=0")); + Assert.assertTrue(tc3.rlHeader[allowedRequests].contains("r=0")); + } + } else { + Assert.assertTrue(tc3.rlpHeader[24] == null); + Assert.assertTrue(tc3.rlHeader[24] == null); + Assert.assertTrue(tc3.rlpHeader[allowedRequests] == null); + Assert.assertTrue(tc3.rlHeader[allowedRequests] == null); + } + tomcat.stop(); + } + + @Test + public void testExposeHeaderAndRerferenceRateLimitWith4Clients() throws Exception { + testRateLimitWith1Clients(true, false); + } + + @Test + public void testUnexposeHeaderAndRerferenceRateLimitWith4Clients() throws Exception { + testRateLimitWith1Clients(false, false); + } + + @Test + public void testExposeHeaderAndEnforceRateLimitWith4Clients() throws Exception { + testRateLimitWith1Clients(true, true); + } + + @Test + public void testUnexposeHeaderAndEnforceRateLimitWith4Clients() throws Exception { + testRateLimitWith1Clients(false, true); + } + + private RateLimitFilter testRateLimitFilter(FilterDef filterDef, Context root) throws ServletException { + + RateLimitFilter rateLimitFilter = new RateLimitFilter(); + filterDef.setFilterClass(RateLimitFilter.class.getName()); + filterDef.setFilter(rateLimitFilter); + filterDef.setFilterName(RateLimitFilter.class.getName()); + root.addFilterDef(filterDef); + + FilterMap filterMap = new FilterMap(); + filterMap.setFilterName(RateLimitFilter.class.getName()); + filterMap.addURLPatternDecoded("*"); + root.addFilterMap(filterMap); + + FilterConfig filterConfig = TesterFilterConfigs.generateFilterConfig(filterDef); + + return rateLimitFilter; + } + + static class TestClient extends Thread { + RateLimitFilter filter; + FilterChain filterChain; + String ip; + + int requests; + int sleep; + + int[] results; + volatile String[] rlpHeader; + volatile String[] rlHeader; + + TestClient(RateLimitFilter filter, FilterChain filterChain, String ip, int requests, int rps) { + this.filter = filter; + this.filterChain = filterChain; + this.ip = ip; + this.requests = requests; + this.sleep = 1000 / rps; + this.results = new int[requests]; + this.rlpHeader = new String[requests]; + this.rlHeader = new String[requests]; + super.setDaemon(true); + super.start(); + } + + @Override + public void run() { + try { + for (int i = 0; i < requests; i++) { + MockHttpServletRequest request = new MockHttpServletRequest(); + request.setRemoteAddr(ip); + TesterResponse response = new TesterResponseWithStatus(); + response.setRequest(request); + filter.doFilter(request, response, filterChain); + results[i] = response.getStatus(); + + rlpHeader[i] = response.getHeader(RateLimitFilter.HEADER_RATE_LIMIT_POLICY); + rlHeader[i] = response.getHeader(RateLimitFilter.HEADER_RATE_LIMIT); + + if (results[i] != 200) { + break; + } + sleep(sleep); + } + } catch (Exception ex) { + ex.printStackTrace(); + } + } + } + + static class TesterResponseWithStatus extends TesterResponse { + + int status = 200; + String message = "OK"; + + @Override + public void sendError(int status, String message) throws IOException { + this.status = status; + this.message = message; + } + + @Override + public int getStatus() { + return status; + } + } +} diff --git a/webapps/docs/config/filter.xml b/webapps/docs/config/filter.xml index 5a879727ab9f..a62f3d0284cd 100644 --- a/webapps/docs/config/filter.xml +++ b/webapps/docs/config/filter.xml @@ -968,13 +968,21 @@ FINE: Request "/docs/config/manager.html" with response status "200" from that IP are dropped with a "429 Too many requests" response until the bucket time ends and a new bucket starts.

-

The filter is optimized for efficiency and low overhead, so it converts - some configured values to more efficient values. For example, a configuration - of a 60 seconds time bucket is converted to 65.536 seconds. That allows - for very fast bucket calculation using bit shift arithmetic. In order to remain - true to the user intent, the configured number of requests is then multiplied - by the same ratio, so a configuration of 100 Requests per 60 seconds, has the - real values of 109 Requests per 65 seconds.

+

The RateLimiter implementation can be set via the rateLimitClassName + init param. The default implementation, + org.apache.catalina.util.FastRateLimiter, is optimized for + efficiency and low overhead so it converts some configured values to more + efficient values. For example, a configuration of a 60 seconds time bucket is + converted to 65.536 seconds. That allows for very fast bucket calculation using + bit shift arithmetic. In order to remain true to the user intent, the + configured number of requests is then multiplied by the same ratio, so a + configuration of 100 Requests per 60 seconds, has the real values of 109 Requests + per 65 seconds. An alternative implementation, + org.apache.catalina.util.ExactRateLimiter, is intended to provide + a less efficient but more accurate control, whose effective duration in seconds + and number of requests configuration are consist with the user declared. You can + specify a different class as long as it implements the + org.apache.catalina.util.RateLimiter interface.

It is common to set up different restrictions for different URIs. For example, a login page or authentication script is typically expected @@ -1047,7 +1055,10 @@ FINE: Request "/docs/config/manager.html" with response status "200"

The full class name of an implementation of the RateLimiter interface. - Default is "org.apache.catalina.util.FastRateLimiter".

+ Default is "org.apache.catalina.util.FastRateLimiter", which is + optimized for efficiency. If you need exact rate limiting and can accept + a small decrease in efficiency, you can use + "org.apache.catalina.util.ExactRateLimiter" instead.

From d0748d6c35b903d8a1c69e9bdddf8de013a3bb05 Mon Sep 17 00:00:00 2001 From: Chenjp Date: Sat, 7 Dec 2024 03:00:39 +0800 Subject: [PATCH 2/4] remove debug print --- java/org/apache/catalina/util/TimeBucketCounter.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/java/org/apache/catalina/util/TimeBucketCounter.java b/java/org/apache/catalina/util/TimeBucketCounter.java index 27ccb0019cc8..2a979a17559c 100644 --- a/java/org/apache/catalina/util/TimeBucketCounter.java +++ b/java/org/apache/catalina/util/TimeBucketCounter.java @@ -151,9 +151,6 @@ protected final String genKey(String identifier, long timestamp) { */ public void destroy() { this.map.clear(); - System.err.println("###################################################################"); - System.err.println("Destroying monitorFuture:" + monitorFuture + "; maintenanceFuture:" + maintenanceFuture); - System.err.println("###################################################################"); if (monitorFuture != null) { monitorFuture.cancel(true); monitorFuture = null; From ee55c435df7058db8cb9f0c6ebfc320ef3f70590 Mon Sep 17 00:00:00 2001 From: Chenjp Date: Mon, 16 Dec 2024 19:37:41 +0800 Subject: [PATCH 3/4] minimize API change --- .../catalina/util/ExactRateLimiter.java | 4 +- .../apache/catalina/util/FastRateLimiter.java | 104 +-------- .../org/apache/catalina/util/RateLimiter.java | 2 +- .../apache/catalina/util/RateLimiterBase.java | 14 +- .../catalina/util/TimeBucketCounter.java | 216 +++++++----------- .../catalina/util/TimeBucketCounterBase.java | 214 +++++++++++++++++ 6 files changed, 305 insertions(+), 249 deletions(-) create mode 100644 java/org/apache/catalina/util/TimeBucketCounterBase.java diff --git a/java/org/apache/catalina/util/ExactRateLimiter.java b/java/org/apache/catalina/util/ExactRateLimiter.java index 9e4465469d0a..7be1c2575176 100644 --- a/java/org/apache/catalina/util/ExactRateLimiter.java +++ b/java/org/apache/catalina/util/ExactRateLimiter.java @@ -29,7 +29,7 @@ protected String getDefaultPolicyName() { } @Override - protected TimeBucketCounter newCounterInstance(ScheduledExecutorService executorService, int duration) { + protected TimeBucketCounterBase newCounterInstance(ScheduledExecutorService executorService, int duration) { return new ExactTimeBucketCounter(executorService, duration); } @@ -37,7 +37,7 @@ protected TimeBucketCounter newCounterInstance(ScheduledExecutorService executor * An accurate counter with exact bucket index, but slightly less efficient than another fast counter provided with * the {@link FastRateLimiter}. */ - class ExactTimeBucketCounter extends TimeBucketCounter { + class ExactTimeBucketCounter extends TimeBucketCounterBase { ExactTimeBucketCounter(ScheduledExecutorService executorService, int bucketDuration) { super(executorService, bucketDuration); diff --git a/java/org/apache/catalina/util/FastRateLimiter.java b/java/org/apache/catalina/util/FastRateLimiter.java index af6f170e0860..f20d69c2e906 100644 --- a/java/org/apache/catalina/util/FastRateLimiter.java +++ b/java/org/apache/catalina/util/FastRateLimiter.java @@ -30,107 +30,7 @@ protected String getDefaultPolicyName() { } @Override - protected TimeBucketCounter newCounterInstance(ScheduledExecutorService executorService, int duration) { - return new FastTimeBucketCounter(executorService, duration); - } - - /** - * A fast counter that optimizes efficiency at the expense of approximate bucket indexing. - */ - class FastTimeBucketCounter extends TimeBucketCounter { - - FastTimeBucketCounter(ScheduledExecutorService utilityExecutor, int bucketDuration) { - super(utilityExecutor, getActualDuration(bucketDuration)); - this.numBits = determineShiftBitsOfDuration(bucketDuration); - this.ratio = ratioToPowerOf2(bucketDuration * 1000); - } - - /** - * Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16 for 65_536ms which is about 1:05 minute - */ - private final int numBits; - - /** - * Ratio of actual duration to config duration - */ - private final double ratio; - - @Override - public long getBucketIndex(long timestamp) { - return System.currentTimeMillis() >> this.numBits; - } - - protected int getNumBits() { - return numBits; - } - - /** - * Determines the bits of shift for the specific bucket duration in seconds, which used to figure out the - * correct bucket index. - */ - protected static final int determineShiftBitsOfDuration(int duration) { - int bits = 0; - int pof2 = nextPowerOf2(duration * 1000); - int bitCheck = pof2; - while (bitCheck > 1) { - bitCheck = pof2 >> ++bits; - } - return bits; - } - - /** - * The actual duration may differ from the configured duration because it is set to the next power of 2 value in - * order to perform very fast bit shift arithmetic. - * - * @param duration in seconds - * - * @return the actual bucket duration in seconds - * - * @see FastTimeBucketCounter#determineShiftBitsOfDuration(int) - */ - protected static final int getActualDuration(int duration) { - return (int) (1L << determineShiftBitsOfDuration(duration)) / 1000; - } - - - /** - * Returns the ratio between the configured duration param and the actual duration which will be set to the next - * power of 2. We then multiply the configured requests param by the same ratio in order to compensate for the - * added time, if any. - * - * @return the ratio, e.g. 1.092 if the actual duration is 65_536 for the configured duration of 60_000 - */ - public double getRatio() { - return ratio; - } - - /** - * Returns the ratio to the next power of 2 so that we can adjust the value. - */ - protected static double ratioToPowerOf2(int value) { - double nextPO2 = nextPowerOf2(value); - return Math.round((1000d * nextPO2 / value)) / 1000d; - } - - /** - * Returns the next power of 2 given a value, e.g. 256 for 250, or 1024, for 1000. - */ - static int nextPowerOf2(int value) { - int valueOfHighestBit = Integer.highestOneBit(value); - if (valueOfHighestBit == value) { - return value; - } - - return valueOfHighestBit << 1; - } - - - @Override - public long getMillisUntilNextBucket() { - long millis = System.currentTimeMillis(); - long nextTimeBucketMillis = ((millis + (long) Math.pow(2, numBits)) >> numBits) << numBits; - long delta = nextTimeBucketMillis - millis; - return delta; - } + protected TimeBucketCounterBase newCounterInstance(ScheduledExecutorService executorService, int duration) { + return new TimeBucketCounter(executorService, duration); } } diff --git a/java/org/apache/catalina/util/RateLimiter.java b/java/org/apache/catalina/util/RateLimiter.java index 34939cb50b3c..297dea89e238 100644 --- a/java/org/apache/catalina/util/RateLimiter.java +++ b/java/org/apache/catalina/util/RateLimiter.java @@ -46,7 +46,7 @@ public interface RateLimiter { void destroy(); /** - * Initialize with parameters, start {@link TimeBucketCounter}. + * Initialize with parameters, start {@link TimeBucketCounterBase}. * * @param executorService the executor * @param duration the duration of the time window in seconds diff --git a/java/org/apache/catalina/util/RateLimiterBase.java b/java/org/apache/catalina/util/RateLimiterBase.java index 9367703ac284..ecedb3731b5c 100644 --- a/java/org/apache/catalina/util/RateLimiterBase.java +++ b/java/org/apache/catalina/util/RateLimiterBase.java @@ -27,7 +27,7 @@ public abstract class RateLimiterBase implements RateLimiter { private static final AtomicInteger index = new AtomicInteger(); - TimeBucketCounter bucketCounter; + TimeBucketCounterBase bucketCounter; int actualRequests; @@ -79,15 +79,15 @@ public void destroy() { } /** - * Instantiate an instance of {@link TimeBucketCounter} for specific time bucket size. Concrete classes determine + * Instantiate an instance of {@link TimeBucketCounterBase} for specific time bucket size. Concrete classes determine * its counter policy by returning different implementation instance. * * @param utilityExecutor the executor * @param duration size of each time bucket in seconds * - * @return counter instance of {@link TimeBucketCounter} + * @return counter instance of {@link TimeBucketCounterBase} */ - protected abstract TimeBucketCounter newCounterInstance(ScheduledExecutorService utilityExecutor, int duration); + protected abstract TimeBucketCounterBase newCounterInstance(ScheduledExecutorService utilityExecutor, int duration); @Override public void initialize(ScheduledExecutorService utilityExecutor, int duration, int requests) { @@ -102,11 +102,11 @@ public void initialize(ScheduledExecutorService utilityExecutor, int duration, i } /** - * Returns the internal instance of {@link TimeBucketCounter} + * Returns the internal instance of {@link TimeBucketCounterBase} * - * @return instance of {@link TimeBucketCounter} + * @return instance of {@link TimeBucketCounterBase} */ - public TimeBucketCounter getBucketCounter() { + public TimeBucketCounterBase getBucketCounter() { return bucketCounter; } } diff --git a/java/org/apache/catalina/util/TimeBucketCounter.java b/java/org/apache/catalina/util/TimeBucketCounter.java index 2a979a17559c..b4bde87ee219 100644 --- a/java/org/apache/catalina/util/TimeBucketCounter.java +++ b/java/org/apache/catalina/util/TimeBucketCounter.java @@ -16,192 +16,134 @@ */ package org.apache.catalina.util; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.juli.logging.Log; -import org.apache.juli.logging.LogFactory; -import org.apache.tomcat.util.res.StringManager; /** - * This class maintains a thread safe hash map that has timestamp-based buckets followed by a string for a key, and a - * counter for a integer value. each time the increment() method is called it adds the key if it does not exist, - * increments its value and returns it. + * A fast counter that optimizes efficiency at the expense of approximate bucket indexing. */ -public abstract class TimeBucketCounter { +public class TimeBucketCounter extends TimeBucketCounterBase { - private static final Log log = LogFactory.getLog(TimeBucketCounter.class); - private static final StringManager sm = StringManager.getManager(TimeBucketCounter.class); + TimeBucketCounter(int bucketDuration,ScheduledExecutorService utilityExecutor) { + this(utilityExecutor,bucketDuration); + } - private static final String BUCKET_KEY_DELIMITER = "^"; - /** - * Map to hold the buckets - */ - private final ConcurrentHashMap map = new ConcurrentHashMap<>(); + TimeBucketCounter(ScheduledExecutorService utilityExecutor, int bucketDuration) { + super(utilityExecutor, getActualDuration(bucketDuration)); + this.numBits = determineShiftBitsOfDuration(bucketDuration); + this.ratio = ratioToPowerOf2(bucketDuration * 1000); + } /** - * /** The future allowing control of the background processor. + * Milliseconds bucket size as a Power of 2 for bit shift math, e.g. 16 for 65_536ms which is about 1:05 minute */ - private ScheduledFuture maintenanceFuture; - private ScheduledFuture monitorFuture; - private final ScheduledExecutorService executorService; - private final long sleeptime; - private int bucketDuration; + private final int numBits; /** - * Creates a new TimeBucketCounter with the specified lifetime. - * - * @param utilityExecutor the executor that should be used to handle maintenance task. - * @param bucketDuration duration in seconds, e.g. for 1 minute pass 60 - * - * @throws NullPointerException if executorService is null. + * Ratio of actual duration to config duration */ - public TimeBucketCounter(ScheduledExecutorService utilityExecutor, int bucketDuration) { - Objects.requireNonNull(utilityExecutor); - this.executorService = utilityExecutor; - this.bucketDuration = bucketDuration; + private final double ratio; - int cleanupsPerBucketDuration = (bucketDuration >= 60) ? 6 : 3; - sleeptime = bucketDuration * 1000 / cleanupsPerBucketDuration; - - // Start our thread - if (sleeptime > 0) { - monitorFuture = executorService.scheduleWithFixedDelay(new MaintenanceMonitor(), 0, 60, TimeUnit.SECONDS); - } + @Override + public long getBucketIndex(long timestamp) { + return timestamp >> this.numBits; } /** - * @return bucketDuration in seconds + * Calculates the current time bucket prefix by shifting bits for fast division, e.g. shift 16 bits is the same as + * dividing by 65,536 which is about 1:05m. + * + * @return The current bucket prefix. */ - public int getBucketDuration() { - return bucketDuration; + @Override + public final int getCurrentBucketPrefix() { + return (int) (System.currentTimeMillis() >> this.numBits); } - /** - * Returns the ratio between the configured duration param and the actual duration. - * @return the ratio between the configured duration param and the actual duration. - */ - public abstract double getRatio(); + protected int getNumBits() { + return numBits; + } /** - * Increments the counter for the passed identifier in the current time bucket and returns the new value. + * The actual duration may differ from the configured duration because it is set to the next power of 2 value in + * order to perform very fast bit shift arithmetic. * - * @param identifier an identifier for which we want to maintain count, e.g. IP Address - * - * @return the count within the current time bucket - * - * @see TimeBucketCounter#genKey(String) + * @return the actual bucket duration in milliseconds */ - public final int increment(String identifier) { - String key = genKey(identifier); - AtomicInteger ai = map.computeIfAbsent(key, v -> new AtomicInteger()); - return ai.incrementAndGet(); + public int getActualDuration() { + return (int) Math.pow(2, getNumBits()); } /** - * Generates the key of timeBucket counter maps with the specific identifier, and the timestamp is implicitly - * equivalent to "now". - * - * @param identifier an identifier for which we want to maintain count - * - * @return key of timeBucket counter maps + * Determines the bits of shift for the specific bucket duration in seconds, which used to figure out the + * correct bucket index. + * @param duration bucket duration in seconds + * @return bits to be shifted */ - protected final String genKey(String identifier) { - return genKey(identifier, System.currentTimeMillis()); + protected static final int determineShiftBitsOfDuration(int duration) { + int bits = 0; + int pof2 = nextPowerOf2(duration * 1000); + int bitCheck = pof2; + while (bitCheck > 1) { + bitCheck = pof2 >> ++bits; + } + return bits; } /** - * Generates the key of timeBucket counter maps with the specific identifier and timestamp. + * The actual duration may differ from the configured duration because it is set to the next power of 2 value in + * order to perform very fast bit shift arithmetic. + * + * @param duration in seconds * - * @param identifier of target request - * @param timestamp when target request received + * @return the actual bucket duration in seconds * - * @return key of timeBucket counter maps + * @see FastTimeBucketCounter#determineShiftBitsOfDuration(int) */ - protected final String genKey(String identifier, long timestamp) { - return getBucketIndex(timestamp) + BUCKET_KEY_DELIMITER + identifier; + protected static final int getActualDuration(int duration) { + return (int) (1L << determineShiftBitsOfDuration(duration)) / 1000; } - /** - * Calculate the bucket index for the specific timestamp, concrete subclass. - * - * @param timestamp the specific timestamp in milliseconds - * - * @return prefix the bucket key prefix for the specific timestamp - */ - protected abstract long getBucketIndex(long timestamp); /** - * When we want to test a full bucket duration we need to sleep until the next bucket starts. - *

- * WARNING: This method is used for test purpose. + * Returns the ratio between the configured duration param and the actual duration which will be set to the next + * power of 2. We then multiply the configured requests param by the same ratio in order to compensate for the + * added time, if any. * - * @return the number of milliseconds until the next bucket + * @return the ratio, e.g. 1.092 if the actual duration is 65_536 for the configured duration of 60_000 */ - @Deprecated - public abstract long getMillisUntilNextBucket(); + @Override + public double getRatio() { + return ratio; + } /** - * Destroy resources + * Returns the ratio to the next power of 2 so that we can adjust the value. + * @param value of target duration in seconds + * @return the ratio to the next power of 2 so that we can adjust the value */ - public void destroy() { - this.map.clear(); - if (monitorFuture != null) { - monitorFuture.cancel(true); - monitorFuture = null; - } - if (maintenanceFuture != null) { - maintenanceFuture.cancel(true); - maintenanceFuture = null; - } + protected static double ratioToPowerOf2(int value) { + double nextPO2 = nextPowerOf2(value); + return Math.round((1000d * nextPO2 / value)) / 1000d; } /** - * Periodic evict, perform removal of obsolete bucket items. Absence of this operation may result in OOM after a - * long run. + * Returns the next power of 2 given a value, e.g. 256 for 250, or 1024, for 1000. */ - public void periodicEvict() { - final long minBucketIndex = getBucketIndex(System.currentTimeMillis()); - // assume that elapsed time of periodicEvict less than 1x bucketDuration. - // to avoid extreme case: 999999-xxx vs 1000000-xxx - final long maxBucket = minBucketIndex + 2; - - final String minBucketPrefix = minBucketIndex + BUCKET_KEY_DELIMITER; - final String maxBucketPrefix = maxBucket + BUCKET_KEY_DELIMITER; + static int nextPowerOf2(int value) { + int valueOfHighestBit = Integer.highestOneBit(value); + if (valueOfHighestBit == value) { + return value; + } - // remove obsolete items whose key are less than minBucketPrefix and maxBucketPrefix in same time. - map.keySet().removeIf(k -> k.compareTo(minBucketPrefix) < 0 && k.compareTo(maxBucketPrefix) < 0); + return valueOfHighestBit << 1; } - private class Maintenance implements Runnable { - @Override - public void run() { - periodicEvict(); - } - } - - private class MaintenanceMonitor implements Runnable { - @Override - public void run() { - if (sleeptime > 0 && (maintenanceFuture == null || maintenanceFuture.isDone())) { - if (maintenanceFuture != null && maintenanceFuture.isDone()) { - // There was an error executing the scheduled task, get it and log it - try { - maintenanceFuture.get(); - } catch (InterruptedException | ExecutionException e) { - log.error(sm.getString("timebucket.maintenance.error"), e); - } - } - maintenanceFuture = executorService.scheduleWithFixedDelay(new Maintenance(), sleeptime, sleeptime, - TimeUnit.MILLISECONDS); - } - } + @Override + public long getMillisUntilNextBucket() { + long millis = System.currentTimeMillis(); + long nextTimeBucketMillis = ((millis + (long) Math.pow(2, numBits)) >> numBits) << numBits; + long delta = nextTimeBucketMillis - millis; + return delta; } - } diff --git a/java/org/apache/catalina/util/TimeBucketCounterBase.java b/java/org/apache/catalina/util/TimeBucketCounterBase.java new file mode 100644 index 000000000000..2e5ac8920822 --- /dev/null +++ b/java/org/apache/catalina/util/TimeBucketCounterBase.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.util; + +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.res.StringManager; + +/** + * This class maintains a thread safe hash map that has timestamp-based buckets followed by a string for a key, and a + * counter for a integer value. each time the increment() method is called it adds the key if it does not exist, + * increments its value and returns it. + */ +public abstract class TimeBucketCounterBase { + + private static final Log log = LogFactory.getLog(TimeBucketCounterBase.class); + private static final StringManager sm = StringManager.getManager(TimeBucketCounterBase.class); + + private static final String BUCKET_KEY_DELIMITER = "^"; + /** + * Map to hold the buckets + */ + private final ConcurrentHashMap map = new ConcurrentHashMap<>(); + + /** + * /** The future allowing control of the background processor. + */ + private ScheduledFuture maintenanceFuture; + private ScheduledFuture monitorFuture; + private final ScheduledExecutorService executorService; + private final long sleeptime; + private int bucketDuration; + + /** + * Creates a new TimeBucketCounter with the specified lifetime. + * + * @param utilityExecutor the executor that should be used to handle maintenance task. + * @param bucketDuration duration in seconds, e.g. for 1 minute pass 60 + * + * @throws NullPointerException if executorService is null. + */ + public TimeBucketCounterBase(ScheduledExecutorService utilityExecutor, int bucketDuration) { + Objects.requireNonNull(utilityExecutor); + this.executorService = utilityExecutor; + this.bucketDuration = bucketDuration; + + int cleanupsPerBucketDuration = (bucketDuration >= 60) ? 6 : 3; + sleeptime = bucketDuration * 1000 / cleanupsPerBucketDuration; + + // Start our thread + if (sleeptime > 0) { + monitorFuture = executorService.scheduleWithFixedDelay(new MaintenanceMonitor(), 0, 60, TimeUnit.SECONDS); + } + } + + /** + * @return bucketDuration in seconds + */ + public int getBucketDuration() { + return bucketDuration; + } + + /** + * Returns the ratio between the configured duration param and the actual duration. + * @return the ratio between the configured duration param and the actual duration. + */ + public abstract double getRatio(); + + /** + * Increments the counter for the passed identifier in the current time bucket and returns the new value. + * + * @param identifier an identifier for which we want to maintain count, e.g. IP Address + * + * @return the count within the current time bucket + * + * @see TimeBucketCounterBase#genKey(String) + */ + public final int increment(String identifier) { + String key = genKey(identifier); + AtomicInteger ai = map.computeIfAbsent(key, v -> new AtomicInteger()); + return ai.incrementAndGet(); + } + + /** + * Generates the key of timeBucket counter maps with the specific identifier, and the timestamp is implicitly + * equivalent to "now". + * + * @param identifier an identifier for which we want to maintain count + * + * @return key of timeBucket counter maps + */ + protected final String genKey(String identifier) { + return genKey(identifier, System.currentTimeMillis()); + } + + /** + * Generates the key of timeBucket counter maps with the specific identifier and timestamp. + * + * @param identifier of target request + * @param timestamp when target request received + * + * @return key of timeBucket counter maps + */ + protected final String genKey(String identifier, long timestamp) { + return getBucketIndex(timestamp) + BUCKET_KEY_DELIMITER + identifier; + } + + /** + * Calculate the bucket index for the specific timestamp, concrete subclass. + * + * @param timestamp the specific timestamp in milliseconds + * + * @return prefix the bucket key prefix for the specific timestamp + */ + protected abstract long getBucketIndex(long timestamp); + + /** + * Returns current bucket prefix + * @return bucket index + */ + protected int getCurrentBucketPrefix() { + return (int)getBucketIndex(System.currentTimeMillis()); + } + /** + * When we want to test a full bucket duration we need to sleep until the next bucket starts. + *

+ * WARNING: This method is used for test purpose. + * + * @return the number of milliseconds until the next bucket + */ + @Deprecated + public abstract long getMillisUntilNextBucket(); + + /** + * Destroy resources + */ + public void destroy() { + this.map.clear(); + if (monitorFuture != null) { + monitorFuture.cancel(true); + monitorFuture = null; + } + if (maintenanceFuture != null) { + maintenanceFuture.cancel(true); + maintenanceFuture = null; + } + } + + /** + * Periodic evict, perform removal of obsolete bucket items. Absence of this operation may result in OOM after a + * long run. + */ + public void periodicEvict() { + final long minBucketIndex = getBucketIndex(System.currentTimeMillis()); + // assume that elapsed time of periodicEvict less than 1x bucketDuration. + // to avoid extreme case: 999999-xxx vs 1000000-xxx + final long maxBucket = minBucketIndex + 2; + + final String minBucketPrefix = minBucketIndex + BUCKET_KEY_DELIMITER; + final String maxBucketPrefix = maxBucket + BUCKET_KEY_DELIMITER; + + // remove obsolete items whose key are less than minBucketPrefix and maxBucketPrefix in same time. + map.keySet().removeIf(k -> k.compareTo(minBucketPrefix) < 0 && k.compareTo(maxBucketPrefix) < 0); + } + + + private class Maintenance implements Runnable { + @Override + public void run() { + periodicEvict(); + } + } + + private class MaintenanceMonitor implements Runnable { + @Override + public void run() { + if (sleeptime > 0 && (maintenanceFuture == null || maintenanceFuture.isDone())) { + if (maintenanceFuture != null && maintenanceFuture.isDone()) { + // There was an error executing the scheduled task, get it and log it + try { + maintenanceFuture.get(); + } catch (InterruptedException | ExecutionException e) { + log.error(sm.getString("timebucket.maintenance.error"), e); + } + } + maintenanceFuture = executorService.scheduleWithFixedDelay(new Maintenance(), sleeptime, sleeptime, + TimeUnit.MILLISECONDS); + } + } + } + +} From 6571f66f9ddbea94aa456f6cec1b5e585a97d0b5 Mon Sep 17 00:00:00 2001 From: Chenjp Date: Tue, 17 Dec 2024 12:07:59 +0800 Subject: [PATCH 4/4] revoke API change RateLimiter: keep api as stable as possible. --- .../catalina/filters/RateLimitFilter.java | 26 ++--------- .../apache/catalina/util/FastRateLimiter.java | 5 +++ .../org/apache/catalina/util/RateLimiter.java | 26 ++++++++--- .../apache/catalina/util/RateLimiterBase.java | 43 ++++++++++++++++--- 4 files changed, 65 insertions(+), 35 deletions(-) diff --git a/java/org/apache/catalina/filters/RateLimitFilter.java b/java/org/apache/catalina/filters/RateLimitFilter.java index f96dc805dda8..80315948bb33 100644 --- a/java/org/apache/catalina/filters/RateLimitFilter.java +++ b/java/org/apache/catalina/filters/RateLimitFilter.java @@ -17,7 +17,6 @@ package org.apache.catalina.filters; import java.io.IOException; -import java.util.concurrent.ScheduledExecutorService; import jakarta.servlet.FilterChain; import jakarta.servlet.FilterConfig; @@ -30,7 +29,6 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.res.StringManager; -import org.apache.tomcat.util.threads.ScheduledThreadPoolExecutor; /** *

@@ -202,21 +200,16 @@ protected boolean isConfigProblemFatal() { public void init(FilterConfig filterConfig) throws ServletException { super.init(filterConfig); - ScheduledExecutorService utilityExecutor = (ScheduledExecutorService) filterConfig.getServletContext() - .getAttribute(ScheduledThreadPoolExecutor.class.getName()); - if (utilityExecutor == null) { - if (newExecutorService == null) { - newExecutorService = new java.util.concurrent.ScheduledThreadPoolExecutor(1); - } - utilityExecutor = newExecutorService; - } - try { rateLimiter = (RateLimiter) Class.forName(rateLimitClassName).getConstructor().newInstance(); } catch (ReflectiveOperationException e) { throw new ServletException(e); } + rateLimiter.setDuration(bucketDuration); + rateLimiter.setRequests(bucketRequests); + rateLimiter.setFilterConfig(filterConfig); + if (policyName != null) { String trimmedName = policyName.trim(); if (!trimmedName.isEmpty()) { @@ -224,8 +217,6 @@ public void init(FilterConfig filterConfig) throws ServletException { } } - rateLimiter.initialize(utilityExecutor, bucketDuration, bucketRequests); - filterName = filterConfig.getFilterName(); log.info(sm.getString("rateLimitFilter.initialized", filterName, Integer.valueOf(bucketRequests), @@ -262,18 +253,9 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha chain.doFilter(request, response); } - private ScheduledExecutorService newExecutorService = null; - @Override public void destroy() { rateLimiter.destroy(); - if (newExecutorService != null) { - try { - newExecutorService.shutdown(); - } catch (SecurityException e) { - // ignore - } - } super.destroy(); } diff --git a/java/org/apache/catalina/util/FastRateLimiter.java b/java/org/apache/catalina/util/FastRateLimiter.java index f20d69c2e906..d4feb93a4a68 100644 --- a/java/org/apache/catalina/util/FastRateLimiter.java +++ b/java/org/apache/catalina/util/FastRateLimiter.java @@ -33,4 +33,9 @@ protected String getDefaultPolicyName() { protected TimeBucketCounterBase newCounterInstance(ScheduledExecutorService executorService, int duration) { return new TimeBucketCounter(executorService, duration); } + + @Override + public TimeBucketCounter getBucketCounter() { + return (TimeBucketCounter)bucketCounter; + } } diff --git a/java/org/apache/catalina/util/RateLimiter.java b/java/org/apache/catalina/util/RateLimiter.java index 297dea89e238..8ca41937a85b 100644 --- a/java/org/apache/catalina/util/RateLimiter.java +++ b/java/org/apache/catalina/util/RateLimiter.java @@ -17,7 +17,7 @@ package org.apache.catalina.util; -import java.util.concurrent.ScheduledExecutorService; +import jakarta.servlet.FilterConfig; public interface RateLimiter { @@ -26,11 +26,25 @@ public interface RateLimiter { */ int getDuration(); + /** + * Sets the configured duration value in seconds. + * + * @param duration The duration of the time window in seconds + */ + void setDuration(int duration); + /** * @return the maximum number of requests allowed per time window */ int getRequests(); + /** + * Sets the configured number of requests allowed per time window. + * + * @param requests The number of requests per time window + */ + void setRequests(int requests); + /** * Increments the number of requests by the given identifier in the current time window. * @@ -46,13 +60,11 @@ public interface RateLimiter { void destroy(); /** - * Initialize with parameters, start {@link TimeBucketCounterBase}. + * Pass the FilterConfig to configure the filter. * - * @param executorService the executor - * @param duration the duration of the time window in seconds - * @param requests the configured number of requests allowed per time window + * @param filterConfig The FilterConfig used to configure the associated filter */ - void initialize(ScheduledExecutorService executorService, int duration, int requests); + void setFilterConfig(FilterConfig filterConfig); /** * @return name of RateLimit policy @@ -80,7 +92,7 @@ default String getPolicy() { /** * Provide the quota header for this rate limit for a given request count within the current time window. * - * @param requestCount The request count within the current time window + * @param requestCount The request count within the current time window * * @return the quota header for the given value of request count * diff --git a/java/org/apache/catalina/util/RateLimiterBase.java b/java/org/apache/catalina/util/RateLimiterBase.java index ecedb3731b5c..589d80b1a84c 100644 --- a/java/org/apache/catalina/util/RateLimiterBase.java +++ b/java/org/apache/catalina/util/RateLimiterBase.java @@ -18,8 +18,11 @@ import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; +import jakarta.servlet.FilterConfig; + /** * Basic implementation of {@link RateLimiter}, provides runtime data maintenance mechanism monitor. */ @@ -29,8 +32,10 @@ public abstract class RateLimiterBase implements RateLimiter { TimeBucketCounterBase bucketCounter; + int requests; int actualRequests; + int duration; int actualDuration; // Initial policy name can be rewritten by setPolicyName() @@ -63,11 +68,21 @@ public int getDuration() { return actualDuration; } + @Override + public void setDuration(int duration) { + this.duration=duration; + } + @Override public int getRequests() { return actualRequests; } + @Override + public void setRequests(int requests) { + this.requests=requests; + } + @Override public int increment(String identifier) { return bucketCounter.increment(identifier); @@ -76,6 +91,13 @@ public int increment(String identifier) { @Override public void destroy() { bucketCounter.destroy(); + if (newExecutorService != null) { + try { + newExecutorService.shutdown(); + } catch (SecurityException e) { + // ignore + } + } } /** @@ -90,17 +112,20 @@ public void destroy() { protected abstract TimeBucketCounterBase newCounterInstance(ScheduledExecutorService utilityExecutor, int duration); @Override - public void initialize(ScheduledExecutorService utilityExecutor, int duration, int requests) { - if (bucketCounter != null) { - bucketCounter.destroy(); - } + public void setFilterConfig(FilterConfig filterConfig) { + + ScheduledExecutorService executorService = (ScheduledExecutorService) filterConfig.getServletContext() + .getAttribute(ScheduledThreadPoolExecutor.class.getName()); - bucketCounter = newCounterInstance(utilityExecutor, duration); + if (executorService == null) { + newExecutorService = new java.util.concurrent.ScheduledThreadPoolExecutor(1); + executorService = newExecutorService; + } + bucketCounter = newCounterInstance(executorService, duration); actualDuration = bucketCounter.getBucketDuration(); actualRequests = (int) Math.round(bucketCounter.getRatio() * requests); } - /** * Returns the internal instance of {@link TimeBucketCounterBase} * @@ -109,4 +134,10 @@ public void initialize(ScheduledExecutorService utilityExecutor, int duration, i public TimeBucketCounterBase getBucketCounter() { return bucketCounter; } + + /** + * The self-owned utility executor, will be instantiated only when ScheduledThreadPoolExecutor is absent during + * filter configure phase. + */ + private ScheduledThreadPoolExecutor newExecutorService = null; }