Skip to content

Commit

Permalink
release Semaphore before onDone called
Browse files Browse the repository at this point in the history
  • Loading branch information
Xin Zheng committed Dec 12, 2024
1 parent fe3b03a commit 629c7d2
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -83,48 +84,47 @@ private <Ctx> void resume(Request<Ctx> req) {
// Enqueue as much we can.
while (true) {
assert req.worker == currentThread();
if (req.isFatalError) {
if (req.isFatalError.get()) {
log.trace("return from 'resume()' because isFatalError");
assert req.numTokensAvailForOurself == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")";
assert req.numTokensAvailForOurself.get() == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")";
return;
}
if (!req.hasMore) {
if (req.numInProgress == 0 && !req.isDoneCalled) {
req.isDoneCalled = true;
if (!req.hasMore.get()) {
if (req.numInProgress.get() == 0 && req.isDoneCalled.compareAndSet(false, true)) {
// give up lock because we don't know how much time mentor will use.
req.lock.unlock();
log.debug("Release remaining {} tokens", req.numTokensAvailForOurself.get());
// release before calling in to onDone.
req.limit.release(req.numTokensAvailForOurself.getAndSet(0));
log.trace("call 'mentor.onDone()'");
try {
req.mentor.onDone(req.ctx);
} finally {
req.lock.lock(); // MUST get back our lock RIGHT NOW.
}
log.debug("Release remaining {} tokens", req.numTokensAvailForOurself);
req.limit.release(req.numTokensAvailForOurself);
req.numTokensAvailForOurself = 0;
} else {
log.trace("return for now (hasMore = {}, numInProgress = {})", req.hasMore, req.numInProgress);
}
return;
}
if (req.numTokensAvailForOurself > 0) {
if (req.numTokensAvailForOurself.get() > 0) {
// We still have a token reserved for ourself. Use those first before acquiring
// new ones. Explanation see comment in 'onOneDone()'.
req.numTokensAvailForOurself -= 1;
req.numTokensAvailForOurself.decrementAndGet();
} else if (!req.limit.tryAcquire()) {
log.debug("redis request limit reached. Need to pause now.");
break; // Go to end of loop to schedule a run later.
}
req.hasStarted = true;
req.numInProgress += 1;
assert req.hasMore : "assert(hasMore)";
req.hasStarted.set(true);
req.numInProgress.incrementAndGet();
assert req.hasMore.get() : "assert(hasMore)";
boolean hasMore = true;
try {
// We MUST give up our lock while calling mentor. We cannot know how long
// mentor is going to block (which would then cascade to all threads
// waiting for our lock).
assert req.worker == currentThread();
assert req.hasMore;
assert req.hasMore.get();
req.lock.unlock();
log.trace("mentor.runOneMore() numInProgress={}", req.numInProgress);
assert req.worker == currentThread();
Expand All @@ -144,31 +144,31 @@ private <Ctx> void resume(Request<Ctx> req) {
log.trace("mentor.runOneMore() -> hasMore={}", hasMore);
req.lock.lock();
assert req.worker == currentThread();
assert req.hasMore;
req.hasMore = hasMore;
assert req.hasMore.get();
req.hasMore.set(hasMore);
}
}
assert req.numInProgress >= 0 : req.numInProgress;
if (req.numInProgress == 0) {
if (!req.hasStarted) {
assert req.numInProgress.get() >= 0 : req.numInProgress;
if (req.numInProgress.get() == 0) {
if (!req.hasStarted.get()) {
// We couldn't even trigger one single task. No resources available to
// handle any more requests. This caller has to try later.
req.isFatalError = true;
req.isFatalError.set(true);
Exception ex = exceptionFactory.newResourceExhaustionException(
"No more resources to handle yet another request now.", null);
req.mentor.onError(ex, req.ctx);
assert req.numTokensAvailForOurself == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")";
assert req.numTokensAvailForOurself.get() == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")";
} else {
log.error("If you see this log, some unreachable code got reached. numInProgress={}, hasStarted={}",
req.numInProgress, req.hasStarted);
req.numInProgress, req.hasStarted);
vertx.setTimer(4000, nonsense -> resume(req));
}
}
} finally {
req.worker = null;
req.lock.unlock();
}
assert req.numTokensAvailForOurself == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")";
assert req.numTokensAvailForOurself.get() == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")";
}

private <Ctx> void onOneDone(Request<Ctx> req, Throwable ex) {
Expand All @@ -186,11 +186,11 @@ private <Ctx> void onOneDone(Request<Ctx> req, Throwable ex) {
// requests. So by keeping that token we already got reserved to ourself, we
// can apply backpressure to new incoming requests. This allows us to complete
// the already running requests.
req.numInProgress -= 1;
req.numTokensAvailForOurself += 1;
req.numInProgress.decrementAndGet();
req.numTokensAvailForOurself.incrementAndGet();
// ^^-- Token transfer only consists of those two statements.
log.trace("onOneDone({}) {} remaining", ex != null ? "ex" : "null", req.numInProgress);
assert req.numInProgress >= 0 : req.numInProgress + " >= 0 (BTW: mentor MUST call 'onDone' EXACTLY once)";
assert req.numInProgress.get() >= 0 : req.numInProgress.get() + " >= 0 (BTW: mentor MUST call 'onDone' EXACTLY once)";
boolean isFatalError = true;
if (ex != null) try {
// Unlock, to prevent thread stalls as we don't know for how long mentor
Expand All @@ -202,10 +202,10 @@ private <Ctx> void onOneDone(Request<Ctx> req, Throwable ex) {
isFatalError = !req.mentor.onError(ex, req.ctx);
} finally {
req.lock.lock(); // Need our lock back.
req.isFatalError = isFatalError;
req.isFatalError.set(isFatalError);
// Need to release our token now. As we won't do it later anymore.
if (isFatalError) {
req.numTokensAvailForOurself -= 1;
req.numTokensAvailForOurself.decrementAndGet();
req.limit.release();
}
}
Expand All @@ -220,13 +220,13 @@ private static final class Request<Ctx> {
private final Mentor<Ctx> mentor;
private final Lock lock = new ReentrantLock();
private final Semaphore limit;
private Thread worker = null;
private int numInProgress = 0;
private int numTokensAvailForOurself = 0;
private boolean hasMore = true;
private boolean hasStarted = false; // true, as soon we could start at least once.
private boolean isFatalError = false;
private boolean isDoneCalled = false;
private volatile Thread worker = null;
private final AtomicInteger numInProgress = new AtomicInteger(0);
private final AtomicInteger numTokensAvailForOurself = new AtomicInteger(0);
private final AtomicBoolean hasMore = new AtomicBoolean(true);
private final AtomicBoolean hasStarted = new AtomicBoolean(false); // true, as soon we could start at least once.
private final AtomicBoolean isFatalError = new AtomicBoolean(false);
private final AtomicBoolean isDoneCalled = new AtomicBoolean(false);

private Request(Ctx ctx, Mentor<Ctx> mentor, Semaphore limit) {
this.ctx = ctx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

import static org.swisspush.redisques.exception.RedisQuesExceptionFactory.newWastefulExceptionFactory;
Expand Down Expand Up @@ -193,4 +195,48 @@ public void reportsErrorIfNoTokensLeft(TestContext testContext) {
});
}

@Test
public void testSemaphoreAreAllReleasedBeforeOnDoneCall(TestContext testContext) {
Async async = testContext.async();
int semaphoreLimit = 3;
int totalTasks = 10;
Semaphore semaphore = new Semaphore(semaphoreLimit);
CountDownLatch latch = new CountDownLatch(totalTasks);
AtomicInteger completedTasks = new AtomicInteger(0);
UpperBoundParallel parallel = new UpperBoundParallel(vertx, newWastefulExceptionFactory());
parallel.request(semaphore, null, new UpperBoundParallel.Mentor<>() {
private final AtomicInteger taskCounter = new AtomicInteger(0);

@Override
public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Object ctx) {
int taskId = taskCounter.getAndIncrement();
System.out.println("runOneMore: " + taskId);
// Simulate task execution with a slight delay.
vertx.setTimer(100, id -> {
try {
System.out.println("Task completed: " + taskId);
completedTasks.incrementAndGet();
} finally {
onDone.accept(null, null); // Mark task as done.
latch.countDown();
}
});
return taskId < totalTasks-1;
}

@Override
public boolean onError(Throwable ex, Object ctx) {
System.err.println("Error in task: " + ex.getMessage());
return false; // Stop processing on error.
}

@Override
public void onDone(Object ctx) {
System.out.println("All tasks completed.");
testContext.assertEquals(totalTasks, completedTasks.get(), "Number of completed tasks should match total tasks.");
testContext.assertEquals(semaphoreLimit, semaphore.availablePermits(), "All semaphore permits should be released.");
async.complete();
}
});
}
}

0 comments on commit 629c7d2

Please sign in to comment.