diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java index 8dd72b10..c03a4f54 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limit/VegasLimit.java @@ -48,7 +48,7 @@ public static class Builder { private int maxConcurrency = 1000; private MetricRegistry registry = EmptyMetricRegistry.INSTANCE; private double smoothing = 1.0; - + private double bufferFactor = 0.0; private Function alphaFunc = (limit) -> 3 * LOG10.apply(limit.intValue()); private Function betaFunc = (limit) -> 6 * LOG10.apply(limit.intValue()); private Function thresholdFunc = (limit) -> LOG10.apply(limit.intValue()); @@ -104,7 +104,13 @@ public Builder decrease(Function decrease) { this.decreaseFunc = decrease; return this; } - + + public Builder bufferFactor(double bufferFactor) { + Preconditions.checkArgument(bufferFactor >= 0.0, "buffer factor must >= 0.0"); + this.bufferFactor = bufferFactor; + return this; + } + public Builder smoothing(double smoothing) { this.smoothing = smoothing; return this; @@ -154,7 +160,8 @@ public static VegasLimit newDefault() { private volatile double estimatedLimit; private volatile long rtt_noload = 0; - + + private volatile long pauseUpdateUntil = 0; /** * Maximum allowed limit providing an upper bound failsafe */ @@ -166,6 +173,8 @@ public static VegasLimit newDefault() { private final Function thresholdFunc; private final Function increaseFunc; private final Function decreaseFunc; + private final double bufferFactor; + private final int initialLimit; private final SampleListener rttSampleListener; private final int probeMultiplier; private int probeCount = 0; @@ -173,6 +182,7 @@ public static VegasLimit newDefault() { private VegasLimit(Builder builder) { super(builder.initialLimit); + this.initialLimit = builder.initialLimit; this.estimatedLimit = builder.initialLimit; this.maxLimit = builder.maxConcurrency; this.alphaFunc = builder.alphaFunc; @@ -180,6 +190,7 @@ private VegasLimit(Builder builder) { this.increaseFunc = builder.increaseFunc; this.decreaseFunc = builder.decreaseFunc; this.thresholdFunc = builder.thresholdFunc; + this.bufferFactor = builder.bufferFactor; this.smoothing = builder.smoothing; this.probeMultiplier = builder.probeMultiplier; @@ -206,6 +217,10 @@ protected int _update(long startTime, long rtt, int inflight, boolean didDrop) { resetProbeJitter(); probeCount = 0; rtt_noload = rtt; + if (bufferFactor > 0.0) { + pauseUpdateUntil = startTime + rtt + (long)(rtt * (bufferFactor / (bufferFactor + 1))); + estimatedLimit = Math.max(initialLimit, Math.ceil(estimatedLimit / (bufferFactor + 1))); + } return (int)estimatedLimit; } @@ -217,11 +232,15 @@ protected int _update(long startTime, long rtt, int inflight, boolean didDrop) { rttSampleListener.addSample(rtt_noload); + if (pauseUpdateUntil != 0 && pauseUpdateUntil > startTime) { + return (int)estimatedLimit; + } + return updateEstimatedLimit(rtt, inflight, didDrop); } private int updateEstimatedLimit(long rtt, int inflight, boolean didDrop) { - final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt)); + final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload * (bufferFactor + 1) / rtt)); double newLimit; // Treat any drop (i.e timeout) as needing to reduce the limit diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java index 521becf3..ef0db2e4 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limit/VegasLimitTest.java @@ -3,6 +3,9 @@ import junit.framework.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; public class VegasLimitTest { @@ -15,6 +18,17 @@ public static VegasLimit create() { .maxConcurrency(20) .build(); } + + public static VegasLimit create(double bufferFactor) { + return VegasLimit.newBuilder() + .alpha(3) + .beta(6) + .smoothing(1.0) + .initialLimit(10) + .bufferFactor(bufferFactor) + .maxConcurrency(20) + .build(); + } @Test public void largeLimitIncrease() { @@ -45,6 +59,15 @@ public void decreaseLimit() { limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(50), 11, false); Assert.assertEquals(9, limit.getLimit()); } + + @Test + public void decreaseLimitWithBufferFactor() { + VegasLimit limit = create(1.0); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 10, false); + Assert.assertEquals(10, limit.getLimit()); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(50), 11, false); + Assert.assertEquals(10, limit.getLimit()); + } @Test public void noChangeIfWithinThresholds() { @@ -54,7 +77,16 @@ public void noChangeIfWithinThresholds() { limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(14), 14, false); Assert.assertEquals(10, limit.getLimit()); } - + + @Test + public void noChangeIfWithinThresholdsWithBuffer() { + VegasLimit limit = create(1.0); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 10, false); + Assert.assertEquals(10, limit.getLimit()); + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(5), 14, false); + Assert.assertEquals(10, limit.getLimit()); + } + @Test public void decreaseSmoothing() { VegasLimit limit = VegasLimit.newBuilder() @@ -97,4 +129,15 @@ public void decreaseWithoutSmoothing() { limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(20), 100, false); Assert.assertEquals(25, limit.getLimit()); } + + @Test + public void pauseUpdateWhenProbe() { + VegasLimit limit = create(1.0); + List limits = new ArrayList<>(); + limit.notifyOnChange(v -> limits.add(v)); + for (int i = 0; i < 600; ++i) { + limit.onSample(0, TimeUnit.MILLISECONDS.toNanos(10), 100, false); + } + Assert.assertEquals(Arrays.asList(16,20,10), limits); + } }