Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add deadline limiter #151

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.concurrency.limits.limiter;

import com.netflix.concurrency.limits.Limiter;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;

/**
* {@link Limiter} that blocks the caller when the limit has been reached. The caller is
* blocked until the limiter has been released, or a deadline has been passed.
*
* @param <ContextT>
*/
public final class DeadlineLimiter<ContextT> implements Limiter<ContextT> {

/**
* Wrap a limiter such that acquire will block until a provided deadline if the limit was reached
* instead of returning an empty listener immediately
*
* @param delegate Non-blocking limiter to wrap
* @param deadline The deadline to wait until for the limit to be released.
* @return Wrapped limiter
*/
public static <ContextT> DeadlineLimiter<ContextT> wrap(Limiter<ContextT> delegate, Instant deadline) {
return new DeadlineLimiter<>(delegate, deadline);
}

private final Limiter<ContextT> delegate;
private final Instant deadline;

/**
* Lock used to block and unblock callers as the limit is reached
*/
private final Object lock = new Object();

private DeadlineLimiter(Limiter<ContextT> limiter, Instant deadline) {
this.delegate = limiter;
this.deadline = deadline;
}

private Optional<Listener> tryAcquire(ContextT context) {
synchronized (lock) {
while (true) {
long timeout = Duration.between(Instant.now(), deadline).toMillis();
if (timeout <= 0) {
return Optional.empty();
}
// Try to acquire a token and return immediately if successful
final Optional<Listener> listener = delegate.acquire(context);
if (listener.isPresent()) {
return listener;
}

// We have reached the limit so block until a token is released
try {
lock.wait(timeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return Optional.empty();
}
}
}
}

private void unblock() {
synchronized (lock) {
lock.notifyAll();
}
}

@Override
public Optional<Listener> acquire(ContextT context) {
return tryAcquire(context).map(delegate -> new Listener() {
@Override
public void onSuccess() {
delegate.onSuccess();
unblock();
}

@Override
public void onIgnore() {
delegate.onIgnore();
unblock();
}

@Override
public void onDropped() {
delegate.onDropped();
unblock();
}
});
}

@Override
public String toString() {
return "DeadlineLimiter [" + delegate + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testTimeout() {
SettableLimit limit = SettableLimit.startingAt(1);
BlockingLimiter<Void> limiter = BlockingLimiter.wrap(SimpleLimiter.newBuilder().limit(limit).build(), timeout);

// Acquire first, will succeeed an not block
// Acquire first, will succeed and not block
limiter.acquire(null);

// Second acquire should time out after at least 50 millis
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.netflix.concurrency.limits.limiter;

import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.limit.SettableLimit;
import org.junit.Assert;
import org.junit.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.junit.Assert.assertTrue;

public class DeadlineLimiterTest {
@Test
public void test() {
SettableLimit limit = SettableLimit.startingAt(10);
DeadlineLimiter<Void> limiter = DeadlineLimiter.wrap(
SimpleLimiter.newBuilder().limit(limit).build(),
Instant.now().plusMillis(10));

LinkedList<Limiter.Listener> listeners = new LinkedList<>();
for (int i = 0; i < 10; i++) {
limiter.acquire(null).ifPresent(listeners::add);
}

limit.setLimit(1);

while (!listeners.isEmpty()) {
listeners.remove().onSuccess();
}

limiter.acquire(null);
}

@Test
public void testMultipleBlockedThreads() throws InterruptedException, ExecutionException, TimeoutException {
int numThreads = 8;
SettableLimit limit = SettableLimit.startingAt(1);
DeadlineLimiter<Void> limiter = DeadlineLimiter.wrap(
SimpleLimiter.newBuilder().limit(limit).build(),
Instant.now().plusSeconds(1));
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
try {
for (Future<?> future : IntStream.range(0, numThreads)
.mapToObj(x -> executorService.submit(() -> limiter.acquire(null).get().onSuccess()))
.collect(Collectors.toList())) {
future.get(1, TimeUnit.SECONDS);
}
} finally {
executorService.shutdown();
}
}

@Test
public void testExceedDeadline() {
Instant deadline = Instant.now().plusMillis(50);
SettableLimit limit = SettableLimit.startingAt(1);
DeadlineLimiter<Void> limiter = DeadlineLimiter.wrap(
SimpleLimiter.newBuilder().limit(limit).build(),
deadline);

// Acquire first, will succeed and not block
limiter.acquire(null);

// Second acquire should time out after the deadline has been reached
Assert.assertFalse(limiter.acquire(null).isPresent());
Instant after = Instant.now();

assertTrue(after.isAfter(deadline));
}

@Test(expected=TimeoutException.class)
public void testNoTimeout() throws InterruptedException, ExecutionException, TimeoutException {
SettableLimit limit = SettableLimit.startingAt(1);
DeadlineLimiter<Void> limiter = DeadlineLimiter.wrap(
SimpleLimiter.newBuilder().limit(limit).build(),
Instant.now().plusSeconds(2));
limiter.acquire(null);

CompletableFuture<Optional<Limiter.Listener>> future = CompletableFuture.supplyAsync(() -> limiter.acquire(null));
future.get(1, TimeUnit.SECONDS);
}
}