diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java index 94892ab49..4cf18e090 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivitySlotSupplierQueue.java @@ -84,6 +84,9 @@ private void processQueue() { SlotPermit slotPermit; try { slotPermit = slotSupplier.reserveSlot(request.data); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; } catch (Exception e) { log.error( "Error reserving local activity slot, dropped activity id {}", @@ -131,8 +134,4 @@ void submitAttempt(SlotReservationData data, boolean isRetry, LocalActivityAttem newExecutionsBackpressureSemaphore.release(); } } - - TrackingSlotSupplier getSlotSupplier() { - return slotSupplier; - } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java index 8fbd5771c..a15e0c876 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java @@ -717,6 +717,7 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean @Override public void awaitTermination(long timeout, TimeUnit unit) { + slotQueue.shutdown(); long timeoutMillis = unit.toMillis(timeout); ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis); } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotsSmallSizeTests.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotsSmallSizeTests.java index 1d23e8f34..ba39fd335 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotsSmallSizeTests.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowSlotsSmallSizeTests.java @@ -47,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -249,6 +250,29 @@ public void TestLocalActivitySlotAtLimit() throws InterruptedException { MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE); } + @Test + public void TestLocalActivityShutdownWhileWaitingOnSlot() throws InterruptedException { + testWorkflowRule.getTestEnvironment().start(); + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + TestWorkflow workflow = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .validateBuildWithDefaults()); + WorkflowClient.start(workflow::workflow, activitiesAreLocal); + workflow.unblock(); + parallelSemRunning.acquire(2); + testWorkflowRule.getTestEnvironment().getWorkerFactory().shutdownNow(); + parallelSemBlocked.release(2); + testWorkflowRule.getTestEnvironment().getWorkerFactory().awaitTermination(3, TimeUnit.SECONDS); + // All slots should be available + assertWorkerSlotCount( + MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE, + MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE, + MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE); + } + @Test public void TestLocalActivitySlotHitsCapacity() throws InterruptedException { testWorkflowRule.getTestEnvironment().start();