Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow override for default filter concurrency limit #1752

Merged
merged 2 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions zuul-core/src/main/java/com/netflix/zuul/filters/BaseFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,46 @@
import com.netflix.zuul.message.ZuulMessage;
import com.netflix.zuul.netty.SpectatorUtils;
import io.netty.handler.codec.http.HttpContent;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Base abstract class for ZuulFilters. The base class defines abstract methods to define:
* filterType() - to classify a filter by type. Standard types in Zuul are "pre" for pre-routing filtering,
* "route" for routing to an origin, "post" for post-routing filters, "error" for error handling.
* We also support a "static" type for static responses see StaticResponseFilter.
* Base abstract class for ZuulFilters. The base class defines abstract methods to define: filterType() - to classify a
* filter by type. Standard types in Zuul are "pre" for pre-routing filtering, "route" for routing to an origin, "post"
* for post-routing filters, "error" for error handling. We also support a "static" type for static responses see
* StaticResponseFilter.
* <p>
* filterOrder() must also be defined for a filter. Filters may have the same filterOrder if precedence is not
* important for a filter. filterOrders do not need to be sequential.
* <p>
* ZuulFilters may be disabled using Archaius Properties.
* <p>
* By default ZuulFilters are static; they don't carry state. This may be overridden by overriding the isStaticFilter() property to false
* By default ZuulFilters are static; they don't carry state. This may be overridden by overriding the isStaticFilter()
* property to false
*
* @author Mikey Cohen
* Date: 10/26/11
* Time: 4:29 PM
* @author Mikey Cohen Date: 10/26/11 Time: 4:29 PM
*/
public abstract class BaseFilter<I extends ZuulMessage, O extends ZuulMessage> implements ZuulFilter<I, O> {

private final String baseName;
private final AtomicInteger concurrentCount;
private final Counter concurrencyRejections;

private final CachedDynamicBooleanProperty filterDisabled;
private final CachedDynamicIntProperty filterConcurrencyLimit;

private static final CachedDynamicBooleanProperty concurrencyProtectEnabled =
new CachedDynamicBooleanProperty("zuul.filter.concurrency.protect.enabled", true);
protected final CachedDynamicIntProperty filterConcurrencyCustom;
protected final CachedDynamicIntProperty filterConcurrencyDefault;
private final CachedDynamicBooleanProperty concurrencyProtectionEnabled;
private static final int DEFAULT_FILTER_CONCURRENCY_LIMIT = 4000;

protected BaseFilter() {
baseName = getClass().getSimpleName() + "." + filterType();
concurrentCount = SpectatorUtils.newGauge("zuul.filter.concurrency.current", baseName, new AtomicInteger(0));
concurrencyRejections = SpectatorUtils.newCounter("zuul.filter.concurrency.rejected", baseName);
filterDisabled = new CachedDynamicBooleanProperty(disablePropertyName(), false);
filterConcurrencyLimit = new CachedDynamicIntProperty(maxConcurrencyPropertyName(), 4000);
concurrencyProtectionEnabled = new CachedDynamicBooleanProperty("zuul.filter.concurrency.protect.enabled",
true);
filterConcurrencyDefault = new CachedDynamicIntProperty("zuul.filter.concurrency.limit.default",
DEFAULT_FILTER_CONCURRENCY_LIMIT);
filterConcurrencyCustom = new CachedDynamicIntProperty(maxConcurrencyPropertyName(),
DEFAULT_FILTER_CONCURRENCY_LIMIT);
}

@Override
Expand Down Expand Up @@ -121,8 +124,8 @@ public HttpContent processContentChunk(ZuulMessage zuulMessage, HttpContent chun

@Override
public void incrementConcurrency() throws ZuulFilterConcurrencyExceededException {
final int limit = filterConcurrencyLimit.get();
if ((concurrencyProtectEnabled.get()) && (concurrentCount.get() >= limit)) {
final int limit = Math.max(filterConcurrencyCustom.get(), filterConcurrencyDefault.get());
if ((concurrencyProtectionEnabled.get()) && (concurrentCount.get() >= limit)) {
concurrencyRejections.increment();
throw new ZuulFilterConcurrencyExceededException(this, limit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,25 @@
*/
package com.netflix.zuul.filters;

import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import com.google.common.truth.Truth;
import com.netflix.config.ConfigurationManager;
import com.netflix.zuul.context.SessionContext;
import com.netflix.zuul.message.Headers;
import com.netflix.zuul.message.ZuulMessage;
import org.junit.jupiter.api.BeforeEach;
import com.netflix.zuul.message.ZuulMessageImpl;
import org.apache.commons.configuration.AbstractConfiguration;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import org.mockito.junit.MockitoJUnitRunner;
import rx.Observable;

/**
* Tests for {@link BaseFilter}. Currently named BaseFilter2Test as there is an existing
* class named BaseFilterTest.
* Tests for {@link BaseFilter}. Currently named BaseFilter2Test as there is an existing class named BaseFilterTest.
*/
@RunWith(MockitoJUnitRunner.class)
class BaseFilterTest {

@Mock
Expand All @@ -39,14 +45,10 @@ class BaseFilterTest {
@Mock
private ZuulMessage req;

@BeforeEach
void before() {
MockitoAnnotations.initMocks(this);
}

@Test
void testShouldFilter() {
class TestZuulFilter extends BaseSyncFilter {

@Override
public int filterOrder() {
return 0;
Expand Down Expand Up @@ -74,4 +76,58 @@ public ZuulMessage apply(ZuulMessage req) {
when(tf1.shouldFilter(req)).thenReturn(true);
when(tf2.shouldFilter(req)).thenReturn(false);
}

@Test
void validateDefaultConcurrencyLimit() {
final int[] limit = {0};
class ConcInboundFilter extends BaseFilter {

@Override
public Observable applyAsync(ZuulMessage input) {
limit[0] = Math.max(filterConcurrencyCustom.get(), filterConcurrencyDefault.get());
return Observable.just("Done");
}

@Override
public FilterType filterType() {
return FilterType.INBOUND;
}

@Override
public boolean shouldFilter(ZuulMessage msg) {
return true;
}
}
new ConcInboundFilter().applyAsync(new ZuulMessageImpl(new SessionContext(), new Headers()));
Truth.assertThat(limit[0]).isEqualTo(4000);
}

@Test
void validateFilterConcurrencyLimitOverride() {
AbstractConfiguration configuration = ConfigurationManager.getConfigInstance();
configuration.setProperty("zuul.filter.concurrency.limit.default", 7000);
configuration.setProperty("zuul.ConcInboundFilter.in.concurrency.limit", 4000);
final int[] limit = {0};

class ConcInboundFilter extends BaseFilter {

@Override
public Observable applyAsync(ZuulMessage input) {
limit[0] = Math.max(filterConcurrencyCustom.get(), filterConcurrencyDefault.get());
return Observable.just("Done");
}

@Override
public FilterType filterType() {
return FilterType.INBOUND;
}

@Override
public boolean shouldFilter(ZuulMessage msg) {
return true;
}
}
new ConcInboundFilter().applyAsync(new ZuulMessageImpl(new SessionContext(), new Headers()));
Truth.assertThat(limit[0]).isEqualTo(7000);
}
}
Loading