Skip to content

Commit

Permalink
Ensure shutdown of LA slot queue isn't swallowed (#2161)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Jul 30, 2024
1 parent e5c08a1 commit 1acafa3
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ private void processQueue() {
SlotPermit slotPermit;
try {
slotPermit = slotSupplier.reserveSlot(request.data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
log.error(
"Error reserving local activity slot, dropped activity id {}",
Expand Down Expand Up @@ -131,8 +134,4 @@ void submitAttempt(SlotReservationData data, boolean isRetry, LocalActivityAttem
newExecutionsBackpressureSemaphore.release();
}
}

TrackingSlotSupplier<LocalActivitySlotInfo> getSlotSupplier() {
return slotSupplier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,7 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean

@Override
public void awaitTermination(long timeout, TimeUnit unit) {
slotQueue.shutdown();
long timeoutMillis = unit.toMillis(timeout);
ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -249,6 +250,29 @@ public void TestLocalActivitySlotAtLimit() throws InterruptedException {
MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE);
}

@Test
public void TestLocalActivityShutdownWhileWaitingOnSlot() throws InterruptedException {
testWorkflowRule.getTestEnvironment().start();
WorkflowClient client = testWorkflowRule.getWorkflowClient();
TestWorkflow workflow =
client.newWorkflowStub(
TestWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(testWorkflowRule.getTaskQueue())
.validateBuildWithDefaults());
WorkflowClient.start(workflow::workflow, activitiesAreLocal);
workflow.unblock();
parallelSemRunning.acquire(2);
testWorkflowRule.getTestEnvironment().getWorkerFactory().shutdownNow();
parallelSemBlocked.release(2);
testWorkflowRule.getTestEnvironment().getWorkerFactory().awaitTermination(3, TimeUnit.SECONDS);
// All slots should be available
assertWorkerSlotCount(
MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE,
MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE,
MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE);
}

@Test
public void TestLocalActivitySlotHitsCapacity() throws InterruptedException {
testWorkflowRule.getTestEnvironment().start();
Expand Down

0 comments on commit 1acafa3

Please sign in to comment.