diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java index d545839dcb5..aaac9447be9 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java @@ -34,10 +34,12 @@ @SuppressWarnings("removal") @RuntimeType.PrototypedBy(AimdLimitConfig.class) public class AimdLimit implements Limit, SemaphoreLimit, RuntimeType.Api { + /** * Default length of the queue. */ public static final int DEFAULT_QUEUE_LENGTH = 0; + /** * Timeout of a request that is enqueued. */ diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java index c545a2e86e3..6fedb0684d0 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java @@ -257,14 +257,14 @@ public void dropped() { try { updateWithSample(startTime, clock.get(), currentRequests, false); } finally { - AimdLimitImpl.this.semaphore.release(); + semaphore.release(); } } @Override public void ignore() { concurrentRequests.decrementAndGet(); - AimdLimitImpl.this.semaphore.release(); + semaphore.release(); } @Override @@ -273,7 +273,7 @@ public void success() { updateWithSample(startTime, clock.get(), currentRequests, true); concurrentRequests.decrementAndGet(); } finally { - AimdLimitImpl.this.semaphore.release(); + semaphore.release(); } } } diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java index 12a335d9583..7987ec7c7a9 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java @@ -48,10 +48,12 @@ public class FixedLimit implements Limit, SemaphoreLimit, RuntimeType.Api new FixedLimit.FixedToken(clock, concurrentRequests)); } } @@ -290,4 +293,45 @@ public void init(String socketName) { queueWaitTimer = meterRegistry.getOrCreate(waitTimerBuilder); } } + + private class FixedToken implements Limit.Token { + private final long startTime; + + private FixedToken(Supplier clock, AtomicInteger concurrentRequests) { + startTime = clock.get(); + concurrentRequests.incrementAndGet(); + } + + @Override + public void dropped() { + try { + updateMetrics(startTime, clock.get()); + } finally { + semaphore.release(); + } + } + + @Override + public void ignore() { + concurrentRequests.decrementAndGet(); + semaphore.release(); + } + + @Override + public void success() { + try { + updateMetrics(startTime, clock.get()); + concurrentRequests.decrementAndGet(); + } finally { + semaphore.release(); + } + } + } + + void updateMetrics(long startTime, long endTime) { + long rtt = endTime - startTime; + if (rttTimer != null) { + rttTimer.record(rtt, TimeUnit.NANOSECONDS); + } + } } diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitHandlers.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitHandlers.java index a029a14ab20..10ad4bcd31c 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitHandlers.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitHandlers.java @@ -78,13 +78,6 @@ static class QueuedSemaphoreHandler implements LimiterHandler { private final long timeoutMillis; private final Supplier tokenSupplier; - QueuedSemaphoreHandler(Semaphore semaphore, int queueLength, Duration queueTimeout) { - this.semaphore = semaphore; - this.queueLength = queueLength; - this.timeoutMillis = queueTimeout.toMillis(); - this.tokenSupplier = () -> new SemaphoreToken(semaphore); - } - QueuedSemaphoreHandler(Semaphore semaphore, int queueLength, Duration queueTimeout, Supplier tokenSupplier) { this.semaphore = semaphore; this.queueLength = queueLength; @@ -118,27 +111,4 @@ public Semaphore semaphore() { return semaphore; } } - - static class SemaphoreToken implements LimitAlgorithm.Token { - private final Semaphore semaphore; - - SemaphoreToken(Semaphore semaphore) { - this.semaphore = semaphore; - } - - @Override - public void dropped() { - semaphore.release(); - } - - @Override - public void ignore() { - semaphore.release(); - } - - @Override - public void success() { - semaphore.release(); - } - } }