Skip to content

Commit

Permalink
Resource based tuner (#2110)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Jun 21, 2024
1 parent 69769cb commit 8a2d5cd
Show file tree
Hide file tree
Showing 20 changed files with 1,483 additions and 91 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ jobs:
USE_DOCKER_SERVICE: false
run: ./gradlew --no-daemon test -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest

- name: Run independent resource tuner test
env:
USER: unittest
USE_DOCKER_SERVICE: false
run: ./gradlew --no-daemon temporal-sdk:testResourceIndependent -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest

- name: Publish Test Report
uses: mikepenz/action-junit-report@v4
if: success() || failure() # always run even if the previous step fails
Expand Down
13 changes: 13 additions & 0 deletions temporal-sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,16 @@ task registerNamespace(type: JavaExec) {
}

test.dependsOn 'registerNamespace'

test {
useJUnit {
excludeCategories 'io.temporal.worker.IndependentResourceBasedTests'
}
}

task testResourceIndependent(type: Test) {
useJUnit {
includeCategories 'io.temporal.worker.IndependentResourceBasedTests'
maxParallelForks = 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ public void setMetricsScope(Scope metricsScope) {
this.metricsScope = metricsScope;
}

/**
* If any slot supplier is resource-based, we want to attach a metrics scope to the controller
* (before it's labelled with the worker type).
*/
public void attachMetricsToResourceController(Scope metricsScope) {
if (inner instanceof ResourceBasedSlotSupplier) {
((ResourceBasedSlotSupplier<?>) inner).getResourceController().setMetricsScope(metricsScope);
}
}

Map<SlotPermit, SI> getUsedSlots() {
return usedSlots;
}
Expand All @@ -109,24 +119,28 @@ private SlotReserveContext<SI> createCtx(SlotReservationData dat) {
dat.taskQueue,
Collections.unmodifiableMap(usedSlots),
dat.workerIdentity,
dat.workerBuildId);
dat.workerBuildId,
issuedSlots);
}

private class SlotReserveContextImpl implements SlotReserveContext<SI> {
private final String taskQueue;
private final Map<SlotPermit, SI> usedSlots;
private final String workerIdentity;
private final String workerBuildId;
private final AtomicInteger issuedSlots;

private SlotReserveContextImpl(
String taskQueue,
Map<SlotPermit, SI> usedSlots,
String workerIdentity,
String workerBuildId) {
String workerBuildId,
AtomicInteger issuedSlots) {
this.taskQueue = taskQueue;
this.usedSlots = usedSlots;
this.workerIdentity = workerIdentity;
this.workerBuildId = workerBuildId;
this.issuedSlots = issuedSlots;
}

@Override
Expand All @@ -148,6 +162,11 @@ public String getWorkerIdentity() {
public String getWorkerBuildId() {
return workerBuildId;
}

@Override
public int getNumIssuedSlots() {
return issuedSlots.get();
}
}

private class SlotMarkUsedContextImpl implements SlotMarkUsedContext<SI> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,13 @@ private MetricsType() {}
// gauge
public static final String WORKFLOW_ACTIVE_THREAD_COUNT =
TEMPORAL_METRICS_PREFIX + "workflow_active_thread_count";

//
// Resource tuner
//
// Tagged with namespace & task_queue
public static final String RESOURCE_MEM_USAGE = "resource_slots_mem_usage";
public static final String RESOURCE_CPU_USAGE = "resource_slots_cpu_usage";
public static final String RESOURCE_MEM_PID = "resource_slots_mem_pid_output";
public static final String RESOURCE_CPU_PID = "resource_slots_cpu_pid_output";
}
15 changes: 9 additions & 6 deletions temporal-sdk/src/main/java/io/temporal/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ public final class Worker {
} else {
TrackingSlotSupplier<ActivitySlotInfo> activitySlotSupplier =
new TrackingSlotSupplier<>(
this.options.getActivitySlotSupplier() == null
this.options.getWorkerTuner() == null
? new FixedSizeSlotSupplier<>(
this.options.getMaxConcurrentActivityExecutionSize())
: this.options.getActivitySlotSupplier());
: this.options.getWorkerTuner().getActivityTaskSlotSupplier());
activitySlotSupplier.attachMetricsToResourceController(taggedScope);

activityWorker =
new SyncActivityWorker(
Expand Down Expand Up @@ -143,16 +144,18 @@ public final class Worker {

TrackingSlotSupplier<WorkflowSlotInfo> workflowSlotSupplier =
new TrackingSlotSupplier<>(
this.options.getWorkflowSlotSupplier() == null
this.options.getWorkerTuner() == null
? new FixedSizeSlotSupplier<>(
this.options.getMaxConcurrentWorkflowTaskExecutionSize())
: this.options.getWorkflowSlotSupplier());
: this.options.getWorkerTuner().getWorkflowTaskSlotSupplier());
workflowSlotSupplier.attachMetricsToResourceController(taggedScope);
TrackingSlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier =
new TrackingSlotSupplier<>(
this.options.getLocalActivitySlotSupplier() == null
this.options.getWorkerTuner() == null
? new FixedSizeSlotSupplier<>(
this.options.getMaxConcurrentLocalActivityExecutionSize())
: this.options.getLocalActivitySlotSupplier());
: this.options.getWorkerTuner().getLocalActivitySlotSupplier());
localActivitySlotSupplier.attachMetricsToResourceController(taggedScope);
workflowWorker =
new SyncWorkflowWorker(
service,
Expand Down
110 changes: 27 additions & 83 deletions temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ public static final class Builder {
private String buildId;
private boolean useBuildIdForVersioning;
private Duration stickyTaskQueueDrainTimeout;
private SlotSupplier<WorkflowSlotInfo> workflowSlotSupplier;
private SlotSupplier<ActivitySlotInfo> activitySlotSupplier;
private SlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier;
private WorkerTuner workerTuner;
private String identity;

private Builder() {}
Expand All @@ -98,9 +96,7 @@ private Builder(WorkerOptions o) {
this.maxConcurrentActivityExecutionSize = o.maxConcurrentActivityExecutionSize;
this.maxConcurrentWorkflowTaskExecutionSize = o.maxConcurrentWorkflowTaskExecutionSize;
this.maxConcurrentLocalActivityExecutionSize = o.maxConcurrentLocalActivityExecutionSize;
this.workflowSlotSupplier = o.workflowSlotSupplier;
this.activitySlotSupplier = o.activitySlotSupplier;
this.localActivitySlotSupplier = o.localActivitySlotSupplier;
this.workerTuner = o.workerTuner;
this.maxTaskQueueActivitiesPerSecond = o.maxTaskQueueActivitiesPerSecond;
this.maxConcurrentWorkflowTaskPollers = o.maxConcurrentWorkflowTaskPollers;
this.maxConcurrentActivityTaskPollers = o.maxConcurrentActivityTaskPollers;
Expand Down Expand Up @@ -138,8 +134,7 @@ public Builder setMaxWorkerActivitiesPerSecond(double maxWorkerActivitiesPerSeco
* @param maxConcurrentActivityExecutionSize Maximum number of activities executed in parallel.
* Default is 200, which is chosen if set to zero.
* @return {@code this}
* <p>Note setting is mutually exclusive with {@link
* #setActivitySlotSupplier(SlotSupplier)}.
* <p>Note setting is mutually exclusive with {@link #setWorkerTuner(WorkerTuner)}
*/
public Builder setMaxConcurrentActivityExecutionSize(int maxConcurrentActivityExecutionSize) {
if (maxConcurrentActivityExecutionSize < 0) {
Expand All @@ -157,7 +152,7 @@ public Builder setMaxConcurrentActivityExecutionSize(int maxConcurrentActivityEx
* @return {@code this}
* <p>Note that this is not related to the total number of open workflows which do not need
* to be loaded in a worker when they are not making state transitions.
* <p>Note setting is mutually exclusive with {@link #setWorkflowSlotSupplier(SlotSupplier)}
* <p>Note setting is mutually exclusive with {@link #setWorkerTuner(WorkerTuner)}
*/
public Builder setMaxConcurrentWorkflowTaskExecutionSize(
int maxConcurrentWorkflowTaskExecutionSize) {
Expand All @@ -174,8 +169,7 @@ public Builder setMaxConcurrentWorkflowTaskExecutionSize(
* @param maxConcurrentLocalActivityExecutionSize Maximum number of local activities executed in
* parallel. Default is 200, which is chosen if set to zero.
* @return {@code this}
* <p>Note setting is mutually exclusive with {@link
* #setLocalActivitySlotSupplier(SlotSupplier)}
* <p>Note setting is mutually exclusive with {@link #setWorkerTuner(WorkerTuner)}
*/
public Builder setMaxConcurrentLocalActivityExecutionSize(
int maxConcurrentLocalActivityExecutionSize) {
Expand Down Expand Up @@ -384,37 +378,13 @@ public Builder setStickyTaskQueueDrainTimeout(Duration stickyTaskQueueDrainTimeo
}

/**
* Set the {@link SlotSupplier} for workflow tasks.
*
* <p>Note that this setting is mutually exclusive with {@link
* #setMaxConcurrentWorkflowTaskExecutionSize(int)}.
*/
@Experimental
public void setWorkflowSlotSupplier(SlotSupplier<WorkflowSlotInfo> workflowSlotSupplier) {
this.workflowSlotSupplier = workflowSlotSupplier;
}

/**
* Set the {@link SlotSupplier} for activity tasks.
*
* <p>Note that this setting is mutually exclusive with {@link
* #setMaxConcurrentActivityExecutionSize(int)}.
*/
@Experimental
public void setActivitySlotSupplier(SlotSupplier<ActivitySlotInfo> activitySlotSupplier) {
this.activitySlotSupplier = activitySlotSupplier;
}

/**
* Set the {@link SlotSupplier} for local activity tasks.
*
* <p>Note that this setting is mutually exclusive with {@link
* #setMaxConcurrentLocalActivityExecutionSize(int)}.
* Set a {@link WorkerTuner} to determine how slots will be allocated for different types of
* tasks.
*/
@Experimental
public void setLocalActivitySlotSupplier(
SlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier) {
this.localActivitySlotSupplier = localActivitySlotSupplier;
public Builder setWorkerTuner(WorkerTuner workerTuner) {
this.workerTuner = workerTuner;
return this;
}

/** Override identity of the worker primary specified in a WorkflowClient options. */
Expand All @@ -429,9 +399,7 @@ public WorkerOptions build() {
maxConcurrentActivityExecutionSize,
maxConcurrentWorkflowTaskExecutionSize,
maxConcurrentLocalActivityExecutionSize,
workflowSlotSupplier,
activitySlotSupplier,
localActivitySlotSupplier,
workerTuner,
maxTaskQueueActivitiesPerSecond,
maxConcurrentWorkflowTaskPollers,
maxConcurrentActivityTaskPollers,
Expand All @@ -458,20 +426,20 @@ public WorkerOptions validateAndBuildWithDefaults() {
Preconditions.checkState(
maxConcurrentLocalActivityExecutionSize >= 0,
"negative maxConcurrentLocalActivityExecutionSize");
if (activitySlotSupplier != null) {
if (workerTuner != null) {
Preconditions.checkState(
maxConcurrentActivityExecutionSize == 0,
"maxConcurrentActivityExecutionSize must not be set if activitySlotSupplier is set");
"maxConcurrentActivityExecutionSize must not be set if workerTuner is set");
}
if (workflowSlotSupplier != null) {
if (workerTuner != null) {
Preconditions.checkState(
maxConcurrentWorkflowTaskExecutionSize == 0,
"maxConcurrentWorkflowTaskExecutionSize must not be set if workflowSlotSupplier is set");
"maxConcurrentWorkflowTaskExecutionSize must not be set if workerTuner is set");
}
if (localActivitySlotSupplier != null) {
if (workerTuner != null) {
Preconditions.checkState(
maxConcurrentLocalActivityExecutionSize == 0,
"maxConcurrentLocalActivityExecutionSize must not be set if localActivitySlotSupplier is set");
"maxConcurrentLocalActivityExecutionSize must not be set if workerTuner is set");
}
Preconditions.checkState(
maxTaskQueueActivitiesPerSecond >= 0, "negative taskQueueActivitiesPerSecond");
Expand Down Expand Up @@ -505,9 +473,7 @@ public WorkerOptions validateAndBuildWithDefaults() {
maxConcurrentLocalActivityExecutionSize == 0
? DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE
: maxConcurrentLocalActivityExecutionSize,
workflowSlotSupplier,
activitySlotSupplier,
localActivitySlotSupplier,
workerTuner,
maxTaskQueueActivitiesPerSecond,
maxConcurrentWorkflowTaskPollers == 0
? DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_POLLERS
Expand Down Expand Up @@ -542,9 +508,7 @@ public WorkerOptions validateAndBuildWithDefaults() {
private final int maxConcurrentActivityExecutionSize;
private final int maxConcurrentWorkflowTaskExecutionSize;
private final int maxConcurrentLocalActivityExecutionSize;
private final SlotSupplier<WorkflowSlotInfo> workflowSlotSupplier;
private final SlotSupplier<ActivitySlotInfo> activitySlotSupplier;
private final SlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier;
private final WorkerTuner workerTuner;
private final double maxTaskQueueActivitiesPerSecond;
private final int maxConcurrentWorkflowTaskPollers;
private final int maxConcurrentActivityTaskPollers;
Expand All @@ -564,9 +528,7 @@ private WorkerOptions(
int maxConcurrentActivityExecutionSize,
int maxConcurrentWorkflowTaskExecutionSize,
int maxConcurrentLocalActivityExecutionSize,
SlotSupplier<WorkflowSlotInfo> workflowSlotSupplier,
SlotSupplier<ActivitySlotInfo> activitySlotSupplier,
SlotSupplier<LocalActivitySlotInfo> localActivitySlotSupplier,
WorkerTuner workerTuner,
double maxTaskQueueActivitiesPerSecond,
int workflowPollThreadCount,
int activityPollThreadCount,
Expand All @@ -584,9 +546,7 @@ private WorkerOptions(
this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize;
this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowTaskExecutionSize;
this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize;
this.workflowSlotSupplier = workflowSlotSupplier;
this.activitySlotSupplier = activitySlotSupplier;
this.localActivitySlotSupplier = localActivitySlotSupplier;
this.workerTuner = workerTuner;
this.maxTaskQueueActivitiesPerSecond = maxTaskQueueActivitiesPerSecond;
this.maxConcurrentWorkflowTaskPollers = workflowPollThreadCount;
this.maxConcurrentActivityTaskPollers = activityPollThreadCount;
Expand Down Expand Up @@ -683,16 +643,8 @@ public Duration getStickyTaskQueueDrainTimeout() {
return stickyTaskQueueDrainTimeout;
}

public SlotSupplier<WorkflowSlotInfo> getWorkflowSlotSupplier() {
return workflowSlotSupplier;
}

public SlotSupplier<ActivitySlotInfo> getActivitySlotSupplier() {
return activitySlotSupplier;
}

public SlotSupplier<LocalActivitySlotInfo> getLocalActivitySlotSupplier() {
return localActivitySlotSupplier;
public WorkerTuner getWorkerTuner() {
return workerTuner;
}

@Nullable
Expand All @@ -716,9 +668,7 @@ && compare(maxTaskQueueActivitiesPerSecond, that.maxTaskQueueActivitiesPerSecond
&& defaultDeadlockDetectionTimeout == that.defaultDeadlockDetectionTimeout
&& disableEagerExecution == that.disableEagerExecution
&& useBuildIdForVersioning == that.useBuildIdForVersioning
&& Objects.equals(workflowSlotSupplier, that.workflowSlotSupplier)
&& Objects.equals(activitySlotSupplier, that.activitySlotSupplier)
&& Objects.equals(localActivitySlotSupplier, that.localActivitySlotSupplier)
&& Objects.equals(workerTuner, that.workerTuner)
&& Objects.equals(maxHeartbeatThrottleInterval, that.maxHeartbeatThrottleInterval)
&& Objects.equals(defaultHeartbeatThrottleInterval, that.defaultHeartbeatThrottleInterval)
&& Objects.equals(stickyQueueScheduleToStartTimeout, that.stickyQueueScheduleToStartTimeout)
Expand All @@ -734,9 +684,7 @@ public int hashCode() {
maxConcurrentActivityExecutionSize,
maxConcurrentWorkflowTaskExecutionSize,
maxConcurrentLocalActivityExecutionSize,
workflowSlotSupplier,
activitySlotSupplier,
localActivitySlotSupplier,
workerTuner,
maxTaskQueueActivitiesPerSecond,
maxConcurrentWorkflowTaskPollers,
maxConcurrentActivityTaskPollers,
Expand All @@ -763,12 +711,8 @@ public String toString() {
+ maxConcurrentWorkflowTaskExecutionSize
+ ", maxConcurrentLocalActivityExecutionSize="
+ maxConcurrentLocalActivityExecutionSize
+ ", workflowSlotSupplier="
+ workflowSlotSupplier
+ ", activitySlotSupplier="
+ activitySlotSupplier
+ ", localActivitySlotSupplier="
+ localActivitySlotSupplier
+ ", workerTuner="
+ workerTuner
+ ", maxTaskQueueActivitiesPerSecond="
+ maxTaskQueueActivitiesPerSecond
+ ", maxConcurrentWorkflowTaskPollers="
Expand Down
Loading

0 comments on commit 8a2d5cd

Please sign in to comment.