diff --git a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java index d0a8aa1..e6ab68a 100644 --- a/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java +++ b/src/main/java/org/swisspush/redisques/performance/UpperBoundParallel.java @@ -6,6 +6,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; @@ -83,48 +84,47 @@ private void resume(Request req) { // Enqueue as much we can. while (true) { assert req.worker == currentThread(); - if (req.isFatalError) { + if (req.isFatalError.get()) { log.trace("return from 'resume()' because isFatalError"); - assert req.numTokensAvailForOurself == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")"; + assert req.numTokensAvailForOurself.get() == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")"; return; } - if (!req.hasMore) { - if (req.numInProgress == 0 && !req.isDoneCalled) { - req.isDoneCalled = true; + if (!req.hasMore.get()) { + if (req.numInProgress.get() == 0 && req.isDoneCalled.compareAndSet(false, true)) { // give up lock because we don't know how much time mentor will use. req.lock.unlock(); + log.debug("Release remaining {} tokens", req.numTokensAvailForOurself.get()); + // release before calling in to onDone. + req.limit.release(req.numTokensAvailForOurself.getAndSet(0)); log.trace("call 'mentor.onDone()'"); try { req.mentor.onDone(req.ctx); } finally { req.lock.lock(); // MUST get back our lock RIGHT NOW. } - log.debug("Release remaining {} tokens", req.numTokensAvailForOurself); - req.limit.release(req.numTokensAvailForOurself); - req.numTokensAvailForOurself = 0; } else { log.trace("return for now (hasMore = {}, numInProgress = {})", req.hasMore, req.numInProgress); } return; } - if (req.numTokensAvailForOurself > 0) { + if (req.numTokensAvailForOurself.get() > 0) { // We still have a token reserved for ourself. Use those first before acquiring // new ones. Explanation see comment in 'onOneDone()'. - req.numTokensAvailForOurself -= 1; + req.numTokensAvailForOurself.decrementAndGet(); } else if (!req.limit.tryAcquire()) { log.debug("redis request limit reached. Need to pause now."); break; // Go to end of loop to schedule a run later. } - req.hasStarted = true; - req.numInProgress += 1; - assert req.hasMore : "assert(hasMore)"; + req.hasStarted.set(true); + req.numInProgress.incrementAndGet(); + assert req.hasMore.get() : "assert(hasMore)"; boolean hasMore = true; try { // We MUST give up our lock while calling mentor. We cannot know how long // mentor is going to block (which would then cascade to all threads // waiting for our lock). assert req.worker == currentThread(); - assert req.hasMore; + assert req.hasMore.get(); req.lock.unlock(); log.trace("mentor.runOneMore() numInProgress={}", req.numInProgress); assert req.worker == currentThread(); @@ -144,23 +144,23 @@ private void resume(Request req) { log.trace("mentor.runOneMore() -> hasMore={}", hasMore); req.lock.lock(); assert req.worker == currentThread(); - assert req.hasMore; - req.hasMore = hasMore; + assert req.hasMore.get(); + req.hasMore.set(hasMore); } } - assert req.numInProgress >= 0 : req.numInProgress; - if (req.numInProgress == 0) { - if (!req.hasStarted) { + assert req.numInProgress.get() >= 0 : req.numInProgress; + if (req.numInProgress.get() == 0) { + if (!req.hasStarted.get()) { // We couldn't even trigger one single task. No resources available to // handle any more requests. This caller has to try later. - req.isFatalError = true; + req.isFatalError.set(true); Exception ex = exceptionFactory.newResourceExhaustionException( "No more resources to handle yet another request now.", null); req.mentor.onError(ex, req.ctx); - assert req.numTokensAvailForOurself == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")"; + assert req.numTokensAvailForOurself.get() == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")"; } else { log.error("If you see this log, some unreachable code got reached. numInProgress={}, hasStarted={}", - req.numInProgress, req.hasStarted); + req.numInProgress, req.hasStarted); vertx.setTimer(4000, nonsense -> resume(req)); } } @@ -168,7 +168,7 @@ private void resume(Request req) { req.worker = null; req.lock.unlock(); } - assert req.numTokensAvailForOurself == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")"; + assert req.numTokensAvailForOurself.get() == 0 : "assert(numTokensAvailForOurself != " + req.numTokensAvailForOurself + ")"; } private void onOneDone(Request req, Throwable ex) { @@ -186,11 +186,11 @@ private void onOneDone(Request req, Throwable ex) { // requests. So by keeping that token we already got reserved to ourself, we // can apply backpressure to new incoming requests. This allows us to complete // the already running requests. - req.numInProgress -= 1; - req.numTokensAvailForOurself += 1; + req.numInProgress.decrementAndGet(); + req.numTokensAvailForOurself.incrementAndGet(); // ^^-- Token transfer only consists of those two statements. log.trace("onOneDone({}) {} remaining", ex != null ? "ex" : "null", req.numInProgress); - assert req.numInProgress >= 0 : req.numInProgress + " >= 0 (BTW: mentor MUST call 'onDone' EXACTLY once)"; + assert req.numInProgress.get() >= 0 : req.numInProgress.get() + " >= 0 (BTW: mentor MUST call 'onDone' EXACTLY once)"; boolean isFatalError = true; if (ex != null) try { // Unlock, to prevent thread stalls as we don't know for how long mentor @@ -202,10 +202,10 @@ private void onOneDone(Request req, Throwable ex) { isFatalError = !req.mentor.onError(ex, req.ctx); } finally { req.lock.lock(); // Need our lock back. - req.isFatalError = isFatalError; + req.isFatalError.set(isFatalError); // Need to release our token now. As we won't do it later anymore. if (isFatalError) { - req.numTokensAvailForOurself -= 1; + req.numTokensAvailForOurself.decrementAndGet(); req.limit.release(); } } @@ -220,13 +220,13 @@ private static final class Request { private final Mentor mentor; private final Lock lock = new ReentrantLock(); private final Semaphore limit; - private Thread worker = null; - private int numInProgress = 0; - private int numTokensAvailForOurself = 0; - private boolean hasMore = true; - private boolean hasStarted = false; // true, as soon we could start at least once. - private boolean isFatalError = false; - private boolean isDoneCalled = false; + private volatile Thread worker = null; + private final AtomicInteger numInProgress = new AtomicInteger(0); + private final AtomicInteger numTokensAvailForOurself = new AtomicInteger(0); + private final AtomicBoolean hasMore = new AtomicBoolean(true); + private final AtomicBoolean hasStarted = new AtomicBoolean(false); // true, as soon we could start at least once. + private final AtomicBoolean isFatalError = new AtomicBoolean(false); + private final AtomicBoolean isDoneCalled = new AtomicBoolean(false); private Request(Ctx ctx, Mentor mentor, Semaphore limit) { this.ctx = ctx; diff --git a/src/test/java/org/swisspush/redisques/performance/UpperBoundParallelTest.java b/src/test/java/org/swisspush/redisques/performance/UpperBoundParallelTest.java index 5a903db..87fd2c2 100644 --- a/src/test/java/org/swisspush/redisques/performance/UpperBoundParallelTest.java +++ b/src/test/java/org/swisspush/redisques/performance/UpperBoundParallelTest.java @@ -11,7 +11,9 @@ import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import static org.swisspush.redisques.exception.RedisQuesExceptionFactory.newWastefulExceptionFactory; @@ -193,4 +195,48 @@ public void reportsErrorIfNoTokensLeft(TestContext testContext) { }); } + @Test + public void testSemaphoreAreAllReleasedBeforeOnDoneCall(TestContext testContext) { + Async async = testContext.async(); + int semaphoreLimit = 3; + int totalTasks = 10; + Semaphore semaphore = new Semaphore(semaphoreLimit); + CountDownLatch latch = new CountDownLatch(totalTasks); + AtomicInteger completedTasks = new AtomicInteger(0); + UpperBoundParallel parallel = new UpperBoundParallel(vertx, newWastefulExceptionFactory()); + parallel.request(semaphore, null, new UpperBoundParallel.Mentor<>() { + private final AtomicInteger taskCounter = new AtomicInteger(0); + + @Override + public boolean runOneMore(BiConsumer onDone, Object ctx) { + int taskId = taskCounter.getAndIncrement(); + System.out.println("runOneMore: " + taskId); + // Simulate task execution with a slight delay. + vertx.setTimer(100, id -> { + try { + System.out.println("Task completed: " + taskId); + completedTasks.incrementAndGet(); + } finally { + onDone.accept(null, null); // Mark task as done. + latch.countDown(); + } + }); + return taskId < totalTasks-1; + } + + @Override + public boolean onError(Throwable ex, Object ctx) { + System.err.println("Error in task: " + ex.getMessage()); + return false; // Stop processing on error. + } + + @Override + public void onDone(Object ctx) { + System.out.println("All tasks completed."); + testContext.assertEquals(totalTasks, completedTasks.get(), "Number of completed tasks should match total tasks."); + testContext.assertEquals(semaphoreLimit, semaphore.availablePermits(), "All semaphore permits should be released."); + async.complete(); + } + }); + } }