Skip to content

Commit

Permalink
Fixes problems updating metrics in the fixed limit case.
Browse files Browse the repository at this point in the history
Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
  • Loading branch information
spericas committed Jan 21, 2025
1 parent 45dd1d8 commit fa2a125
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
@SuppressWarnings("removal")
@RuntimeType.PrototypedBy(AimdLimitConfig.class)
public class AimdLimit implements Limit, SemaphoreLimit, RuntimeType.Api<AimdLimitConfig> {

/**
* Default length of the queue.
*/
public static final int DEFAULT_QUEUE_LENGTH = 0;

/**
* Timeout of a request that is enqueued.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,14 @@ public void dropped() {
try {
updateWithSample(startTime, clock.get(), currentRequests, false);
} finally {
AimdLimitImpl.this.semaphore.release();
semaphore.release();
}
}

@Override
public void ignore() {
concurrentRequests.decrementAndGet();
AimdLimitImpl.this.semaphore.release();
semaphore.release();
}

@Override
Expand All @@ -273,7 +273,7 @@ public void success() {
updateWithSample(startTime, clock.get(), currentRequests, true);
concurrentRequests.decrementAndGet();
} finally {
AimdLimitImpl.this.semaphore.release();
semaphore.release();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ public class FixedLimit implements Limit, SemaphoreLimit, RuntimeType.Api<FixedL
* Default limit, meaning unlimited execution.
*/
public static final int DEFAULT_LIMIT = 0;

/**
* Default length of the queue.
*/
public static final int DEFAULT_QUEUE_LENGTH = 0;

/**
* Timeout of a request that is enqueued.
*/
Expand Down Expand Up @@ -88,7 +90,8 @@ private FixedLimit(FixedLimitConfig config) {
this.queueLength = Math.max(0, config.queueLength());
this.handler = new LimitHandlers.QueuedSemaphoreHandler(semaphore,
queueLength,
config.queueTimeout());
config.queueTimeout(),
() -> new FixedLimit.FixedToken(clock, concurrentRequests));
}
}

Expand Down Expand Up @@ -290,4 +293,45 @@ public void init(String socketName) {
queueWaitTimer = meterRegistry.getOrCreate(waitTimerBuilder);
}
}

private class FixedToken implements Limit.Token {
private final long startTime;

private FixedToken(Supplier<Long> clock, AtomicInteger concurrentRequests) {
startTime = clock.get();
concurrentRequests.incrementAndGet();
}

@Override
public void dropped() {
try {
updateMetrics(startTime, clock.get());
} finally {
semaphore.release();
}
}

@Override
public void ignore() {
concurrentRequests.decrementAndGet();
semaphore.release();
}

@Override
public void success() {
try {
updateMetrics(startTime, clock.get());
concurrentRequests.decrementAndGet();
} finally {
semaphore.release();
}
}
}

void updateMetrics(long startTime, long endTime) {
long rtt = endTime - startTime;
if (rttTimer != null) {
rttTimer.record(rtt, TimeUnit.NANOSECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,6 @@ static class QueuedSemaphoreHandler implements LimiterHandler {
private final long timeoutMillis;
private final Supplier<Token> tokenSupplier;

QueuedSemaphoreHandler(Semaphore semaphore, int queueLength, Duration queueTimeout) {
this.semaphore = semaphore;
this.queueLength = queueLength;
this.timeoutMillis = queueTimeout.toMillis();
this.tokenSupplier = () -> new SemaphoreToken(semaphore);
}

QueuedSemaphoreHandler(Semaphore semaphore, int queueLength, Duration queueTimeout, Supplier<Token> tokenSupplier) {
this.semaphore = semaphore;
this.queueLength = queueLength;
Expand Down Expand Up @@ -118,27 +111,4 @@ public Semaphore semaphore() {
return semaphore;
}
}

static class SemaphoreToken implements LimitAlgorithm.Token {
private final Semaphore semaphore;

SemaphoreToken(Semaphore semaphore) {
this.semaphore = semaphore;
}

@Override
public void dropped() {
semaphore.release();
}

@Override
public void ignore() {
semaphore.release();
}

@Override
public void success() {
semaphore.release();
}
}
}

0 comments on commit fa2a125

Please sign in to comment.