Skip to content

Commit

Permalink
Slot supplier interface & fixed-size implementation (#2014)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Apr 10, 2024
1 parent 3568970 commit d2a06fc
Show file tree
Hide file tree
Showing 35 changed files with 1,640 additions and 262 deletions.
13 changes: 12 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ jobs:
USE_DOCKER_SERVICE: false
run: ./gradlew --no-daemon test -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest

- name: Publish Test Report
uses: mikepenz/action-junit-report@v4
if: success() || failure() # always run even if the previous step fails
with:
report_paths: '**/build/test-results/test/TEST-*.xml'

unit_test_jdk8:
name: Unit test with docker service [JDK8]
runs-on: ubuntu-latest
Expand All @@ -54,7 +60,6 @@ jobs:
- name: Set up Gradle
uses: gradle/actions/setup-gradle@v3


- name: Start containerized server and dependencies
run: |
docker compose \
Expand All @@ -68,6 +73,12 @@ jobs:
USE_DOCKER_SERVICE: true
run: ./gradlew --no-daemon test -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava

- name: Publish Test Report
uses: mikepenz/action-junit-report@v4
if: success() || failure() # always run even if the previous step fails
with:
report_paths: '**/build/test-results/test/TEST-*.xml'

copyright:
name: Copyright and code format
runs-on: ubuntu-latest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.activity;

import io.temporal.activity.ActivityInfo;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;

public class ActivityPollResponseToInfo {
public static ActivityInfo toActivityInfoImpl(
PollActivityTaskQueueResponseOrBuilder response,
String namespace,
String activityTaskQueue,
boolean local) {
return new ActivityInfoImpl(response, namespace, activityTaskQueue, local, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
import io.temporal.internal.activity.ActivityPollResponseToInfo;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.MetricsType;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -45,7 +46,7 @@ final class ActivityPollTask implements Poller.PollTask<ActivityTask> {
private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class);

private final WorkflowServiceStubs service;
private final Semaphore pollSemaphore;
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
private final Scope metricsScope;
private final PollActivityTaskQueueRequest pollRequest;

Expand All @@ -57,11 +58,11 @@ public ActivityPollTask(
@Nullable String buildId,
boolean useBuildIdForVersioning,
double activitiesPerSecond,
Semaphore pollSemaphore,
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
@Nonnull Scope metricsScope,
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
this.service = Objects.requireNonNull(service);
this.pollSemaphore = pollSemaphore;
this.slotSupplier = slotSupplier;
this.metricsScope = Objects.requireNonNull(metricsScope);

PollActivityTaskQueueRequest.Builder pollRequest =
Expand Down Expand Up @@ -92,13 +93,22 @@ public ActivityTask poll() {
log.trace("poll request begin: " + pollRequest);
}
PollActivityTaskQueueResponse response;
SlotPermit permit;
boolean isSuccessful = false;

try {
pollSemaphore.acquire();
permit =
slotSupplier.reserveSlot(
new SlotReservationData(
pollRequest.getTaskQueue().getName(),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
log.warn("Error while trying to reserve a slot for an activity", e.getCause());
return null;
}

try {
Expand All @@ -118,9 +128,20 @@ public ActivityTask poll() {
ProtobufTimeUtils.toM3Duration(
response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
isSuccessful = true;
return new ActivityTask(response, pollSemaphore::release);
slotSupplier.markSlotUsed(
new ActivitySlotInfo(
ActivityPollResponseToInfo.toActivityInfoImpl(
response,
pollRequest.getNamespace(),
pollRequest.getTaskQueue().getNormalName(),
false),
pollRequest.getIdentity(),
pollRequest.getWorkerVersionCapabilities().getBuildId()),
permit);
return new ActivityTask(
response, () -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
} finally {
if (!isSuccessful) pollSemaphore.release();
if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
import io.temporal.worker.MetricsType;
import io.temporal.worker.WorkerMetricsTag;
import io.temporal.worker.tuning.*;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
Expand All @@ -64,16 +65,16 @@ final class ActivityWorker implements SuspendableWorker {
private final Scope workerMetricsScope;
private final GrpcRetryer grpcRetryer;
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
private final int executorSlots;
private final Semaphore executorSlotsSemaphore;
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;

public ActivityWorker(
@Nonnull WorkflowServiceStubs service,
@Nonnull String namespace,
@Nonnull String taskQueue,
double taskQueueActivitiesPerSecond,
@Nonnull SingleWorkerOptions options,
@Nonnull ActivityTaskHandler handler) {
@Nonnull ActivityTaskHandler handler,
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier) {
this.service = Objects.requireNonNull(service);
this.namespace = Objects.requireNonNull(namespace);
this.taskQueue = Objects.requireNonNull(taskQueue);
Expand All @@ -87,8 +88,8 @@ public ActivityWorker(
this.replyGrpcRetryerOptions =
new GrpcRetryer.GrpcRetryerOptions(
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
this.executorSlots = options.getTaskExecutorThreadPoolSize();
this.executorSlotsSemaphore = new Semaphore(executorSlots);
this.slotSupplier = slotSupplier;
this.slotSupplier.setMetricsScope(this.workerMetricsScope);
}

@Override
Expand All @@ -101,8 +102,7 @@ public boolean start() {
options.getIdentity(),
new TaskHandlerImpl(handler),
pollerOptions,
options.getTaskExecutorThreadPoolSize(),
workerMetricsScope,
slotSupplier.maximumSlots(),
true);
poller =
new Poller<>(
Expand All @@ -115,7 +115,7 @@ public boolean start() {
options.getBuildId(),
options.isUsingBuildIdForVersioning(),
taskQueueActivitiesPerSecond,
executorSlotsSemaphore,
this.slotSupplier,
workerMetricsScope,
service.getServerCapabilities()),
this.pollTaskExecutor,
Expand All @@ -131,14 +131,14 @@ public boolean start() {

@Override
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
String semaphoreName = this + "#executorSlotsSemaphore";
String supplierName = this + "#executorSlots";
return poller
.shutdown(shutdownManager, interruptTasks)
.thenCompose(
ignore ->
!interruptTasks
? shutdownManager.waitForSemaphorePermitsReleaseUntimed(
executorSlotsSemaphore, executorSlots, semaphoreName)
? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
slotSupplier, supplierName)
: CompletableFuture.completedFuture(null))
.thenCompose(
ignore ->
Expand Down Expand Up @@ -416,23 +416,33 @@ private void logExceptionDuringResultReporting(

private final class EagerActivityDispatcherImpl implements EagerActivityDispatcher {
@Override
public boolean tryReserveActivitySlot(
public Optional<SlotPermit> tryReserveActivitySlot(
ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
return WorkerLifecycleState.ACTIVE.equals(ActivityWorker.this.getLifecycleState())
&& Objects.equals(
commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)
&& ActivityWorker.this.executorSlotsSemaphore.tryAcquire();
if (!WorkerLifecycleState.ACTIVE.equals(ActivityWorker.this.getLifecycleState())
|| !Objects.equals(
commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)) {
return Optional.empty();
}
return ActivityWorker.this.slotSupplier.tryReserveSlot(
new SlotReservationData(
ActivityWorker.this.taskQueue, options.getIdentity(), options.getBuildId()));
}

@Override
public void releaseActivitySlotReservations(int slotCounts) {
ActivityWorker.this.executorSlotsSemaphore.release(slotCounts);
public void releaseActivitySlotReservations(Iterable<SlotPermit> permits) {
for (SlotPermit permit : permits) {
ActivityWorker.this.slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
}
}

@Override
public void dispatchActivity(PollActivityTaskQueueResponse activity) {
public void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit) {
ActivityWorker.this.pollTaskExecutor.process(
new ActivityTask(activity, ActivityWorker.this.executorSlotsSemaphore::release));
new ActivityTask(
activity,
() ->
ActivityWorker.this.slotSupplier.releaseSlot(
SlotReleaseReason.taskComplete(), permit)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,33 @@

import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributesOrBuilder;
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
import io.temporal.worker.tuning.SlotPermit;
import java.util.Optional;

public interface EagerActivityDispatcher {
boolean tryReserveActivitySlot(ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes);
Optional<SlotPermit> tryReserveActivitySlot(
ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes);

void releaseActivitySlotReservations(int slotCounts);
void releaseActivitySlotReservations(Iterable<SlotPermit> permits);

void dispatchActivity(PollActivityTaskQueueResponse activity);
void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit);

class NoopEagerActivityDispatcher implements EagerActivityDispatcher {
@Override
public boolean tryReserveActivitySlot(
public Optional<SlotPermit> tryReserveActivitySlot(
ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
return false;
return Optional.empty();
}

@Override
public void releaseActivitySlotReservations(int slotCounts) {
if (slotCounts > 0)
public void releaseActivitySlotReservations(Iterable<SlotPermit> permits) {
if (permits.iterator().hasNext())
throw new IllegalStateException(
"Trying to release activity slots on a NoopEagerActivityDispatcher");
}

@Override
public void dispatchActivity(PollActivityTaskQueueResponse activity) {
public void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit) {
throw new IllegalStateException(
"Trying to dispatch activity on a NoopEagerActivityDispatcher");
}
Expand Down
Loading

0 comments on commit d2a06fc

Please sign in to comment.