From 8a2d5cdcc29b1c83e08e6d8ac61e887fe908aafe Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 21 Jun 2024 13:17:31 -0700 Subject: [PATCH] Resource based tuner (#2110) --- .github/workflows/ci.yml | 6 + temporal-sdk/build.gradle | 13 ++ .../internal/worker/TrackingSlotSupplier.java | 23 ++- .../java/io/temporal/worker/MetricsType.java | 9 + .../main/java/io/temporal/worker/Worker.java | 15 +- .../io/temporal/worker/WorkerOptions.java | 110 +++-------- .../worker/tuning/CompositeTuner.java | 83 ++++++++ .../worker/tuning/JVMSystemResourceInfo.java | 91 +++++++++ .../temporal/worker/tuning/PIDController.java | 186 ++++++++++++++++++ .../tuning/ResourceBasedController.java | 131 ++++++++++++ .../ResourceBasedControllerOptions.java | 172 ++++++++++++++++ .../tuning/ResourceBasedSlotOptions.java | 127 ++++++++++++ .../tuning/ResourceBasedSlotSupplier.java | 165 ++++++++++++++++ .../worker/tuning/ResourceBasedTuner.java | 137 +++++++++++++ .../worker/tuning/SlotReserveContext.java | 5 + .../worker/tuning/SystemResourceInfo.java | 39 ++++ .../temporal/worker/tuning/WorkerTuner.java | 46 +++++ .../worker/IndependentResourceBasedTests.java | 23 +++ .../worker/ResourceBasedTunerTests.java | 146 ++++++++++++++ .../io/temporal/worker/WorkerOptionsTest.java | 47 +++++ 20 files changed, 1483 insertions(+), 91 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/worker/tuning/CompositeTuner.java create mode 100644 temporal-sdk/src/main/java/io/temporal/worker/tuning/JVMSystemResourceInfo.java create mode 100644 temporal-sdk/src/main/java/io/temporal/worker/tuning/PIDController.java create mode 100644 temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedController.java create mode 100644 temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedControllerOptions.java create mode 100644 temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotOptions.java create mode 100644 temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotSupplier.java create mode 100644 temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedTuner.java create mode 100644 temporal-sdk/src/main/java/io/temporal/worker/tuning/SystemResourceInfo.java create mode 100644 temporal-sdk/src/main/java/io/temporal/worker/tuning/WorkerTuner.java create mode 100644 temporal-sdk/src/test/java/io/temporal/worker/IndependentResourceBasedTests.java create mode 100644 temporal-sdk/src/test/java/io/temporal/worker/ResourceBasedTunerTests.java diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index caf85fb81..b0fbfa4ab 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,6 +33,12 @@ jobs: USE_DOCKER_SERVICE: false run: ./gradlew --no-daemon test -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest + - name: Run independent resource tuner test + env: + USER: unittest + USE_DOCKER_SERVICE: false + run: ./gradlew --no-daemon temporal-sdk:testResourceIndependent -x checkLicenseMain -x checkLicenses -x spotlessCheck -x spotlessApply -x spotlessJava -P edgeDepsTest + - name: Publish Test Report uses: mikepenz/action-junit-report@v4 if: success() || failure() # always run even if the previous step fails diff --git a/temporal-sdk/build.gradle b/temporal-sdk/build.gradle index e16f826bc..21be84a23 100644 --- a/temporal-sdk/build.gradle +++ b/temporal-sdk/build.gradle @@ -34,3 +34,16 @@ task registerNamespace(type: JavaExec) { } test.dependsOn 'registerNamespace' + +test { + useJUnit { + excludeCategories 'io.temporal.worker.IndependentResourceBasedTests' + } +} + +task testResourceIndependent(type: Test) { + useJUnit { + includeCategories 'io.temporal.worker.IndependentResourceBasedTests' + maxParallelForks = 1 + } +} \ No newline at end of file diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java index 753ff3d23..89ee9c6f8 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java @@ -94,6 +94,16 @@ public void setMetricsScope(Scope metricsScope) { this.metricsScope = metricsScope; } + /** + * If any slot supplier is resource-based, we want to attach a metrics scope to the controller + * (before it's labelled with the worker type). + */ + public void attachMetricsToResourceController(Scope metricsScope) { + if (inner instanceof ResourceBasedSlotSupplier) { + ((ResourceBasedSlotSupplier) inner).getResourceController().setMetricsScope(metricsScope); + } + } + Map getUsedSlots() { return usedSlots; } @@ -109,7 +119,8 @@ private SlotReserveContext createCtx(SlotReservationData dat) { dat.taskQueue, Collections.unmodifiableMap(usedSlots), dat.workerIdentity, - dat.workerBuildId); + dat.workerBuildId, + issuedSlots); } private class SlotReserveContextImpl implements SlotReserveContext { @@ -117,16 +128,19 @@ private class SlotReserveContextImpl implements SlotReserveContext { private final Map usedSlots; private final String workerIdentity; private final String workerBuildId; + private final AtomicInteger issuedSlots; private SlotReserveContextImpl( String taskQueue, Map usedSlots, String workerIdentity, - String workerBuildId) { + String workerBuildId, + AtomicInteger issuedSlots) { this.taskQueue = taskQueue; this.usedSlots = usedSlots; this.workerIdentity = workerIdentity; this.workerBuildId = workerBuildId; + this.issuedSlots = issuedSlots; } @Override @@ -148,6 +162,11 @@ public String getWorkerIdentity() { public String getWorkerBuildId() { return workerBuildId; } + + @Override + public int getNumIssuedSlots() { + return issuedSlots.get(); + } } private class SlotMarkUsedContextImpl implements SlotMarkUsedContext { diff --git a/temporal-sdk/src/main/java/io/temporal/worker/MetricsType.java b/temporal-sdk/src/main/java/io/temporal/worker/MetricsType.java index f3627beb8..f58593c24 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/MetricsType.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/MetricsType.java @@ -169,4 +169,13 @@ private MetricsType() {} // gauge public static final String WORKFLOW_ACTIVE_THREAD_COUNT = TEMPORAL_METRICS_PREFIX + "workflow_active_thread_count"; + + // + // Resource tuner + // + // Tagged with namespace & task_queue + public static final String RESOURCE_MEM_USAGE = "resource_slots_mem_usage"; + public static final String RESOURCE_CPU_USAGE = "resource_slots_cpu_usage"; + public static final String RESOURCE_MEM_PID = "resource_slots_mem_pid_output"; + public static final String RESOURCE_CPU_PID = "resource_slots_cpu_pid_output"; } diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index ba29a743c..6c5d0740c 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -109,10 +109,11 @@ public final class Worker { } else { TrackingSlotSupplier activitySlotSupplier = new TrackingSlotSupplier<>( - this.options.getActivitySlotSupplier() == null + this.options.getWorkerTuner() == null ? new FixedSizeSlotSupplier<>( this.options.getMaxConcurrentActivityExecutionSize()) - : this.options.getActivitySlotSupplier()); + : this.options.getWorkerTuner().getActivityTaskSlotSupplier()); + activitySlotSupplier.attachMetricsToResourceController(taggedScope); activityWorker = new SyncActivityWorker( @@ -143,16 +144,18 @@ public final class Worker { TrackingSlotSupplier workflowSlotSupplier = new TrackingSlotSupplier<>( - this.options.getWorkflowSlotSupplier() == null + this.options.getWorkerTuner() == null ? new FixedSizeSlotSupplier<>( this.options.getMaxConcurrentWorkflowTaskExecutionSize()) - : this.options.getWorkflowSlotSupplier()); + : this.options.getWorkerTuner().getWorkflowTaskSlotSupplier()); + workflowSlotSupplier.attachMetricsToResourceController(taggedScope); TrackingSlotSupplier localActivitySlotSupplier = new TrackingSlotSupplier<>( - this.options.getLocalActivitySlotSupplier() == null + this.options.getWorkerTuner() == null ? new FixedSizeSlotSupplier<>( this.options.getMaxConcurrentLocalActivityExecutionSize()) - : this.options.getLocalActivitySlotSupplier()); + : this.options.getWorkerTuner().getLocalActivitySlotSupplier()); + localActivitySlotSupplier.attachMetricsToResourceController(taggedScope); workflowWorker = new SyncWorkflowWorker( service, diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java index 1fd6d1236..d9fce2e5e 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java @@ -83,9 +83,7 @@ public static final class Builder { private String buildId; private boolean useBuildIdForVersioning; private Duration stickyTaskQueueDrainTimeout; - private SlotSupplier workflowSlotSupplier; - private SlotSupplier activitySlotSupplier; - private SlotSupplier localActivitySlotSupplier; + private WorkerTuner workerTuner; private String identity; private Builder() {} @@ -98,9 +96,7 @@ private Builder(WorkerOptions o) { this.maxConcurrentActivityExecutionSize = o.maxConcurrentActivityExecutionSize; this.maxConcurrentWorkflowTaskExecutionSize = o.maxConcurrentWorkflowTaskExecutionSize; this.maxConcurrentLocalActivityExecutionSize = o.maxConcurrentLocalActivityExecutionSize; - this.workflowSlotSupplier = o.workflowSlotSupplier; - this.activitySlotSupplier = o.activitySlotSupplier; - this.localActivitySlotSupplier = o.localActivitySlotSupplier; + this.workerTuner = o.workerTuner; this.maxTaskQueueActivitiesPerSecond = o.maxTaskQueueActivitiesPerSecond; this.maxConcurrentWorkflowTaskPollers = o.maxConcurrentWorkflowTaskPollers; this.maxConcurrentActivityTaskPollers = o.maxConcurrentActivityTaskPollers; @@ -138,8 +134,7 @@ public Builder setMaxWorkerActivitiesPerSecond(double maxWorkerActivitiesPerSeco * @param maxConcurrentActivityExecutionSize Maximum number of activities executed in parallel. * Default is 200, which is chosen if set to zero. * @return {@code this} - *

Note setting is mutually exclusive with {@link - * #setActivitySlotSupplier(SlotSupplier)}. + *

Note setting is mutually exclusive with {@link #setWorkerTuner(WorkerTuner)} */ public Builder setMaxConcurrentActivityExecutionSize(int maxConcurrentActivityExecutionSize) { if (maxConcurrentActivityExecutionSize < 0) { @@ -157,7 +152,7 @@ public Builder setMaxConcurrentActivityExecutionSize(int maxConcurrentActivityEx * @return {@code this} *

Note that this is not related to the total number of open workflows which do not need * to be loaded in a worker when they are not making state transitions. - *

Note setting is mutually exclusive with {@link #setWorkflowSlotSupplier(SlotSupplier)} + *

Note setting is mutually exclusive with {@link #setWorkerTuner(WorkerTuner)} */ public Builder setMaxConcurrentWorkflowTaskExecutionSize( int maxConcurrentWorkflowTaskExecutionSize) { @@ -174,8 +169,7 @@ public Builder setMaxConcurrentWorkflowTaskExecutionSize( * @param maxConcurrentLocalActivityExecutionSize Maximum number of local activities executed in * parallel. Default is 200, which is chosen if set to zero. * @return {@code this} - *

Note setting is mutually exclusive with {@link - * #setLocalActivitySlotSupplier(SlotSupplier)} + *

Note setting is mutually exclusive with {@link #setWorkerTuner(WorkerTuner)} */ public Builder setMaxConcurrentLocalActivityExecutionSize( int maxConcurrentLocalActivityExecutionSize) { @@ -384,37 +378,13 @@ public Builder setStickyTaskQueueDrainTimeout(Duration stickyTaskQueueDrainTimeo } /** - * Set the {@link SlotSupplier} for workflow tasks. - * - *

Note that this setting is mutually exclusive with {@link - * #setMaxConcurrentWorkflowTaskExecutionSize(int)}. - */ - @Experimental - public void setWorkflowSlotSupplier(SlotSupplier workflowSlotSupplier) { - this.workflowSlotSupplier = workflowSlotSupplier; - } - - /** - * Set the {@link SlotSupplier} for activity tasks. - * - *

Note that this setting is mutually exclusive with {@link - * #setMaxConcurrentActivityExecutionSize(int)}. - */ - @Experimental - public void setActivitySlotSupplier(SlotSupplier activitySlotSupplier) { - this.activitySlotSupplier = activitySlotSupplier; - } - - /** - * Set the {@link SlotSupplier} for local activity tasks. - * - *

Note that this setting is mutually exclusive with {@link - * #setMaxConcurrentLocalActivityExecutionSize(int)}. + * Set a {@link WorkerTuner} to determine how slots will be allocated for different types of + * tasks. */ @Experimental - public void setLocalActivitySlotSupplier( - SlotSupplier localActivitySlotSupplier) { - this.localActivitySlotSupplier = localActivitySlotSupplier; + public Builder setWorkerTuner(WorkerTuner workerTuner) { + this.workerTuner = workerTuner; + return this; } /** Override identity of the worker primary specified in a WorkflowClient options. */ @@ -429,9 +399,7 @@ public WorkerOptions build() { maxConcurrentActivityExecutionSize, maxConcurrentWorkflowTaskExecutionSize, maxConcurrentLocalActivityExecutionSize, - workflowSlotSupplier, - activitySlotSupplier, - localActivitySlotSupplier, + workerTuner, maxTaskQueueActivitiesPerSecond, maxConcurrentWorkflowTaskPollers, maxConcurrentActivityTaskPollers, @@ -458,20 +426,20 @@ public WorkerOptions validateAndBuildWithDefaults() { Preconditions.checkState( maxConcurrentLocalActivityExecutionSize >= 0, "negative maxConcurrentLocalActivityExecutionSize"); - if (activitySlotSupplier != null) { + if (workerTuner != null) { Preconditions.checkState( maxConcurrentActivityExecutionSize == 0, - "maxConcurrentActivityExecutionSize must not be set if activitySlotSupplier is set"); + "maxConcurrentActivityExecutionSize must not be set if workerTuner is set"); } - if (workflowSlotSupplier != null) { + if (workerTuner != null) { Preconditions.checkState( maxConcurrentWorkflowTaskExecutionSize == 0, - "maxConcurrentWorkflowTaskExecutionSize must not be set if workflowSlotSupplier is set"); + "maxConcurrentWorkflowTaskExecutionSize must not be set if workerTuner is set"); } - if (localActivitySlotSupplier != null) { + if (workerTuner != null) { Preconditions.checkState( maxConcurrentLocalActivityExecutionSize == 0, - "maxConcurrentLocalActivityExecutionSize must not be set if localActivitySlotSupplier is set"); + "maxConcurrentLocalActivityExecutionSize must not be set if workerTuner is set"); } Preconditions.checkState( maxTaskQueueActivitiesPerSecond >= 0, "negative taskQueueActivitiesPerSecond"); @@ -505,9 +473,7 @@ public WorkerOptions validateAndBuildWithDefaults() { maxConcurrentLocalActivityExecutionSize == 0 ? DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE : maxConcurrentLocalActivityExecutionSize, - workflowSlotSupplier, - activitySlotSupplier, - localActivitySlotSupplier, + workerTuner, maxTaskQueueActivitiesPerSecond, maxConcurrentWorkflowTaskPollers == 0 ? DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_POLLERS @@ -542,9 +508,7 @@ public WorkerOptions validateAndBuildWithDefaults() { private final int maxConcurrentActivityExecutionSize; private final int maxConcurrentWorkflowTaskExecutionSize; private final int maxConcurrentLocalActivityExecutionSize; - private final SlotSupplier workflowSlotSupplier; - private final SlotSupplier activitySlotSupplier; - private final SlotSupplier localActivitySlotSupplier; + private final WorkerTuner workerTuner; private final double maxTaskQueueActivitiesPerSecond; private final int maxConcurrentWorkflowTaskPollers; private final int maxConcurrentActivityTaskPollers; @@ -564,9 +528,7 @@ private WorkerOptions( int maxConcurrentActivityExecutionSize, int maxConcurrentWorkflowTaskExecutionSize, int maxConcurrentLocalActivityExecutionSize, - SlotSupplier workflowSlotSupplier, - SlotSupplier activitySlotSupplier, - SlotSupplier localActivitySlotSupplier, + WorkerTuner workerTuner, double maxTaskQueueActivitiesPerSecond, int workflowPollThreadCount, int activityPollThreadCount, @@ -584,9 +546,7 @@ private WorkerOptions( this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize; this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowTaskExecutionSize; this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize; - this.workflowSlotSupplier = workflowSlotSupplier; - this.activitySlotSupplier = activitySlotSupplier; - this.localActivitySlotSupplier = localActivitySlotSupplier; + this.workerTuner = workerTuner; this.maxTaskQueueActivitiesPerSecond = maxTaskQueueActivitiesPerSecond; this.maxConcurrentWorkflowTaskPollers = workflowPollThreadCount; this.maxConcurrentActivityTaskPollers = activityPollThreadCount; @@ -683,16 +643,8 @@ public Duration getStickyTaskQueueDrainTimeout() { return stickyTaskQueueDrainTimeout; } - public SlotSupplier getWorkflowSlotSupplier() { - return workflowSlotSupplier; - } - - public SlotSupplier getActivitySlotSupplier() { - return activitySlotSupplier; - } - - public SlotSupplier getLocalActivitySlotSupplier() { - return localActivitySlotSupplier; + public WorkerTuner getWorkerTuner() { + return workerTuner; } @Nullable @@ -716,9 +668,7 @@ && compare(maxTaskQueueActivitiesPerSecond, that.maxTaskQueueActivitiesPerSecond && defaultDeadlockDetectionTimeout == that.defaultDeadlockDetectionTimeout && disableEagerExecution == that.disableEagerExecution && useBuildIdForVersioning == that.useBuildIdForVersioning - && Objects.equals(workflowSlotSupplier, that.workflowSlotSupplier) - && Objects.equals(activitySlotSupplier, that.activitySlotSupplier) - && Objects.equals(localActivitySlotSupplier, that.localActivitySlotSupplier) + && Objects.equals(workerTuner, that.workerTuner) && Objects.equals(maxHeartbeatThrottleInterval, that.maxHeartbeatThrottleInterval) && Objects.equals(defaultHeartbeatThrottleInterval, that.defaultHeartbeatThrottleInterval) && Objects.equals(stickyQueueScheduleToStartTimeout, that.stickyQueueScheduleToStartTimeout) @@ -734,9 +684,7 @@ public int hashCode() { maxConcurrentActivityExecutionSize, maxConcurrentWorkflowTaskExecutionSize, maxConcurrentLocalActivityExecutionSize, - workflowSlotSupplier, - activitySlotSupplier, - localActivitySlotSupplier, + workerTuner, maxTaskQueueActivitiesPerSecond, maxConcurrentWorkflowTaskPollers, maxConcurrentActivityTaskPollers, @@ -763,12 +711,8 @@ public String toString() { + maxConcurrentWorkflowTaskExecutionSize + ", maxConcurrentLocalActivityExecutionSize=" + maxConcurrentLocalActivityExecutionSize - + ", workflowSlotSupplier=" - + workflowSlotSupplier - + ", activitySlotSupplier=" - + activitySlotSupplier - + ", localActivitySlotSupplier=" - + localActivitySlotSupplier + + ", workerTuner=" + + workerTuner + ", maxTaskQueueActivitiesPerSecond=" + maxTaskQueueActivitiesPerSecond + ", maxConcurrentWorkflowTaskPollers=" diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/CompositeTuner.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/CompositeTuner.java new file mode 100644 index 000000000..679050493 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/CompositeTuner.java @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker.tuning; + +import io.temporal.common.Experimental; +import java.util.Objects; +import javax.annotation.Nonnull; + +/** + * Can be used to create a {@link WorkerTuner} which uses specific {@link SlotSupplier}s for each + * type of slot. + */ +@Experimental +public class CompositeTuner implements WorkerTuner { + private final @Nonnull SlotSupplier workflowTaskSlotSupplier; + private final @Nonnull SlotSupplier activityTaskSlotSupplier; + private final @Nonnull SlotSupplier localActivitySlotSupplier; + + public CompositeTuner( + @Nonnull SlotSupplier workflowTaskSlotSupplier, + @Nonnull SlotSupplier activityTaskSlotSupplier, + @Nonnull SlotSupplier localActivitySlotSupplier) { + this.workflowTaskSlotSupplier = Objects.requireNonNull(workflowTaskSlotSupplier); + this.activityTaskSlotSupplier = Objects.requireNonNull(activityTaskSlotSupplier); + this.localActivitySlotSupplier = Objects.requireNonNull(localActivitySlotSupplier); + + // All resource-based slot suppliers must use the same controller + validateResourceController(workflowTaskSlotSupplier, activityTaskSlotSupplier); + validateResourceController(workflowTaskSlotSupplier, localActivitySlotSupplier); + validateResourceController(activityTaskSlotSupplier, localActivitySlotSupplier); + } + + @Nonnull + @Override + public SlotSupplier getWorkflowTaskSlotSupplier() { + return workflowTaskSlotSupplier; + } + + @Nonnull + @Override + public SlotSupplier getActivityTaskSlotSupplier() { + return activityTaskSlotSupplier; + } + + @Nonnull + @Override + public SlotSupplier getLocalActivitySlotSupplier() { + return localActivitySlotSupplier; + } + + private void validateResourceController( + @Nonnull SlotSupplier supplier1, @Nonnull SlotSupplier supplier2) { + if (supplier1 instanceof ResourceBasedSlotSupplier + && supplier2 instanceof ResourceBasedSlotSupplier) { + ResourceBasedController controller1 = + ((ResourceBasedSlotSupplier) supplier1).getResourceController(); + ResourceBasedController controller2 = + ((ResourceBasedSlotSupplier) supplier2).getResourceController(); + if (controller1 != controller2) { + throw new IllegalArgumentException( + "All resource-based slot suppliers must use the same ResourceController"); + } + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/JVMSystemResourceInfo.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/JVMSystemResourceInfo.java new file mode 100644 index 000000000..ca20d2473 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/JVMSystemResourceInfo.java @@ -0,0 +1,91 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker.tuning; + +import com.sun.management.OperatingSystemMXBean; +import io.temporal.common.Experimental; +import java.lang.management.ManagementFactory; +import java.time.Instant; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** {@link SystemResourceInfo} implementation that uses JVM-specific APIs to get resource usage. */ +@Experimental +public class JVMSystemResourceInfo implements SystemResourceInfo { + // As of relatively recent Java versions (including backports), this class will properly deal with + // containerized environments as well as running on bare metal. + // See https://bugs.openjdk.org/browse/JDK-8226575 for more details on which versions the fixes + // have been backported to. + OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class); + + private final Lock refreshLock = new ReentrantLock(); + private SystemInfo lastSystemInfo; + + @Override + public double getCPUUsagePercent() { + return refresh().cpuUsagePercent; + } + + @Override + public double getMemoryUsagePercent() { + return refresh().memoryUsagePercent; + } + + @SuppressWarnings("deprecation") // deprecated APIs needed since replacements are for Java 14+ + private SystemInfo refresh() { + + refreshLock.lock(); + try { + if (lastSystemInfo == null + || Instant.now().isAfter(lastSystemInfo.refreshed.plusMillis(100))) { + // This can return NaN seemingly when usage is very low + double lastCpuUsage = osBean.getSystemCpuLoad(); + if (lastCpuUsage < 0 || Double.isNaN(lastCpuUsage)) { + lastCpuUsage = 0; + } + + Runtime runtime = Runtime.getRuntime(); + long jvmUsedMemory = runtime.totalMemory() - runtime.freeMemory(); + long jvmMaxMemory = runtime.maxMemory(); + + double lastMemUsage = ((double) jvmUsedMemory / jvmMaxMemory); + Instant lastRefresh = Instant.now(); + lastSystemInfo = new SystemInfo(lastRefresh, lastCpuUsage, lastMemUsage); + } + } finally { + refreshLock.unlock(); + } + + return lastSystemInfo; + } + + private static class SystemInfo { + private final Instant refreshed; + private final double cpuUsagePercent; + private final double memoryUsagePercent; + + private SystemInfo(Instant refreshed, double cpuUsagePercent, double memoryUsagePercent) { + this.refreshed = refreshed; + this.cpuUsagePercent = cpuUsagePercent; + this.memoryUsagePercent = memoryUsagePercent; + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/PIDController.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/PIDController.java new file mode 100644 index 000000000..d9135bae2 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/PIDController.java @@ -0,0 +1,186 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker.tuning; + +/** + * A simple PID closed control loop.
+ *
+ * License : MIT + * + * @author Charles Grassin + */ +class PIDController { + // PID coefficients + private double setPoint; + private double kP, kI, kD; + + /** Limit bound of the output. */ + private double minLimit = Double.NaN, maxLimit = Double.NaN; + + // Dynamic variables + private double previousTime = Double.NaN; + private double lastError = 0; + private double integralError = 0; + + /** + * Constructs a new PID with set coefficients. + * + * @param setPoint The initial target value. + * @param kP The proportional gain coefficient. + * @param kI The integral gain coefficient. + * @param kD The derivative gain coefficient. + */ + PIDController(final double setPoint, final double kP, final double kI, final double kD) { + this.setSetpoint(setPoint); + this.kP = kP; + this.kI = kI; + this.kD = kD; + } + + /** + * Updates the controller with the current time and value and outputs the PID controller output. + * + * @param currentTime The current time (in arbitrary time unit, such as seconds). If the PID is + * assumed to run at a constant frequency, you can simply put '1'. + * @param currentValue The current, measured value. + * @return The PID controller output. + */ + double getOutput(final double currentTime, final double currentValue) { + final double error = setPoint - currentValue; + final double dt = (!Double.isNaN(previousTime)) ? (currentTime - previousTime) : 0; + + // Compute Integral & Derivative error + final double derivativeError = (dt != 0) ? ((error - lastError) / dt) : 0; + integralError += error * dt; + + // Save history + previousTime = currentTime; + lastError = error; + + return checkLimits((kP * error) + (kI * integralError) + (kD * derivativeError)); + } + + /** Resets the integral and derivative errors. */ + void reset() { + previousTime = 0; + lastError = 0; + integralError = 0; + } + + /** + * Bounds the PID output between the lower limit and the upper limit. + * + * @param output The target output value. + * @return The output value, bounded to the limits. + */ + private double checkLimits(final double output) { + if (!Double.isNaN(minLimit) && output < minLimit) return minLimit; + else if (!Double.isNaN(maxLimit) && output > maxLimit) return maxLimit; + else return output; + } + + // Getters & Setters + + /** + * Sets the output limits of the PID controller. If the minLimit is superior to the maxLimit, it + * will use the smallest as the minLimit. + * + * @param minLimit The lower limit of the PID output. + * @param maxLimit The upper limit of the PID output. + */ + void setOuputLimits(final double minLimit, final double maxLimit) { + if (minLimit < maxLimit) { + this.minLimit = minLimit; + this.maxLimit = maxLimit; + } else { + this.minLimit = maxLimit; + this.maxLimit = minLimit; + } + } + + /** Removes the output limits of the PID controller */ + void removeOuputLimits() { + this.minLimit = Double.NaN; + this.maxLimit = Double.NaN; + } + + /** + * @return the kP parameter + */ + public double getkP() { + return kP; + } + + /** + * @param kP the kP parameter to set + */ + void setkP(double kP) { + this.kP = kP; + reset(); + } + + /** + * @return the kI parameter + */ + double getkI() { + return kI; + } + + /** + * @param kI the kI parameter to set + */ + void setkI(double kI) { + this.kI = kI; + reset(); + } + + /** + * @return the kD parameter + */ + double getkD() { + return kD; + } + + /** + * @param kD the kD parameter to set + */ + void setkD(double kD) { + this.kD = kD; + reset(); + } + + /** + * @return the setPoint + */ + double getSetPoint() { + return setPoint; + } + + /** + * Establishes a new set point for the PID controller. + * + * @param setPoint The new target point. + */ + void setSetpoint(final double setPoint) { + reset(); + this.setPoint = setPoint; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedController.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedController.java new file mode 100644 index 000000000..111df0de3 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedController.java @@ -0,0 +1,131 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker.tuning; + +import com.uber.m3.tally.Gauge; +import com.uber.m3.tally.Scope; +import io.temporal.common.Experimental; +import io.temporal.worker.MetricsType; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Is used by {@link ResourceBasedSlotSupplier} and {@link ResourceBasedTuner} to make decisions + * about whether slots should be handed out based on system resource usage. + */ +@Experimental +public class ResourceBasedController { + public final ResourceBasedControllerOptions options; + + private final ReentrantLock decisionLock = new ReentrantLock(); + private final PIDController memoryController; + private final PIDController cpuController; + private final SystemResourceInfo systemInfoSupplier; + private Instant lastPidRefresh = Instant.now(); + + private final AtomicReference metrics = new AtomicReference<>(); + + /** + * Construct a controller with the given options. If you want to use resource-based tuning for all + * slot suppliers, prefer {@link ResourceBasedTuner}. + */ + public static ResourceBasedController newSystemInfoController( + ResourceBasedControllerOptions options) { + return new ResourceBasedController(options, new JVMSystemResourceInfo()); + } + + /** + * Construct a controller with the given options and system info supplier. Users should prefer + * {@link #newSystemInfoController(ResourceBasedControllerOptions)}. + */ + public ResourceBasedController( + ResourceBasedControllerOptions options, SystemResourceInfo systemInfoSupplier) { + this.options = options; + this.systemInfoSupplier = systemInfoSupplier; + this.memoryController = + new PIDController( + options.getTargetCPUUsage(), + options.getMemoryPGain(), + options.getMemoryIGain(), + options.getMemoryDGain()); + this.cpuController = + new PIDController( + options.getTargetCPUUsage(), + options.getCpuPGain(), + options.getCpuIGain(), + options.getCpuDGain()); + } + + /** + * @return True if the PID controllers & and other constraints would allow another slot + */ + boolean pidDecision() { + decisionLock.lock(); + try { + double memoryUsage = systemInfoSupplier.getMemoryUsagePercent(); + double cpuUsage = systemInfoSupplier.getCPUUsagePercent(); + double memoryOutput = + memoryController.getOutput(lastPidRefresh.getEpochSecond(), memoryUsage); + double cpuOutput = cpuController.getOutput(lastPidRefresh.getEpochSecond(), cpuUsage); + lastPidRefresh = Instant.now(); + + Metrics metrics = this.metrics.get(); + if (metrics != null) { + metrics.memUsage.update(memoryUsage); + metrics.cpuUsage.update(cpuUsage); + metrics.memPidOut.update(memoryOutput); + metrics.cpuPidOut.update(cpuOutput); + } + + return memoryOutput > options.getMemoryOutputThreshold() + && cpuOutput > options.getCpuOutputThreshold() + && canReserve(); + } finally { + decisionLock.unlock(); + } + } + + private boolean canReserve() { + return systemInfoSupplier.getMemoryUsagePercent() < options.getTargetMemoryUsage(); + } + + /** Visible for internal usage. Can only be set once. */ + public void setMetricsScope(Scope metricsScope) { + if (metrics.get() == null) { + metrics.set(new Metrics(metricsScope)); + } + } + + private static class Metrics { + private final Gauge memUsage; + private final Gauge cpuUsage; + private final Gauge memPidOut; + private final Gauge cpuPidOut; + + private Metrics(Scope scope) { + memUsage = scope.gauge(MetricsType.RESOURCE_MEM_USAGE); + cpuUsage = scope.gauge(MetricsType.RESOURCE_CPU_USAGE); + memPidOut = scope.gauge(MetricsType.RESOURCE_MEM_PID); + cpuPidOut = scope.gauge(MetricsType.RESOURCE_CPU_PID); + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedControllerOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedControllerOptions.java new file mode 100644 index 000000000..47ca10531 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedControllerOptions.java @@ -0,0 +1,172 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker.tuning; + +import com.google.common.base.Preconditions; +import io.temporal.common.Experimental; + +/** Options for a {@link ResourceBasedController} */ +@Experimental +public class ResourceBasedControllerOptions { + + public static ResourceBasedControllerOptions.Builder newBuilder( + double targetMemoryUsage, double targetCPUUsage) { + return new ResourceBasedControllerOptions.Builder() + .setTargetMemoryUsage(targetMemoryUsage) + .setTargetCPUUsage(targetCPUUsage); + } + + public static final class Builder { + private double targetMemoryUsage; + private double targetCPUUsage; + private double memoryPGain = 5; + private double memoryIGain = 0; + private double memoryDGain = 1; + private double memoryOutputThreshold = 0.25; + private double cpuPGain = 5; + private double cpuIGain = 0; + private double cpuDGain = 1; + private double cpuOutputThreshold = 0.05; + + public Builder setTargetMemoryUsage(double targetMemoryUsage) { + this.targetMemoryUsage = targetMemoryUsage; + return this; + } + + public Builder setTargetCPUUsage(double targetCPUUsage) { + this.targetCPUUsage = targetCPUUsage; + return this; + } + + public Builder setMemoryPGain(double memoryPGain) { + this.memoryPGain = memoryPGain; + return this; + } + + public Builder setMemoryIGain(double memoryIGain) { + this.memoryIGain = memoryIGain; + return this; + } + + public Builder setMemoryDGain(double memoryDGain) { + this.memoryDGain = memoryDGain; + return this; + } + + public Builder setMemoryOutputThreshold(double memoryOutputThreshold) { + this.memoryOutputThreshold = memoryOutputThreshold; + return this; + } + + public Builder setCpuPGain(double cpuPGain) { + this.cpuPGain = cpuPGain; + return this; + } + + public Builder setCpuIGain(double cpuIGain) { + this.cpuIGain = cpuIGain; + return this; + } + + public Builder setCpuDGain(double cpuDGain) { + this.cpuDGain = cpuDGain; + return this; + } + + public Builder setCpuOutputThreshold(double cpuOutputThreshold) { + this.cpuOutputThreshold = cpuOutputThreshold; + return this; + } + + public ResourceBasedControllerOptions build() { + Preconditions.checkState( + targetMemoryUsage > 0, "targetMemoryUsage must be set and greater than 0"); + Preconditions.checkState(targetCPUUsage > 0, "targetCPUUsage must be set and greater than 0"); + return new ResourceBasedControllerOptions(this); + } + } + + private final double targetMemoryUsage; + private final double targetCPUUsage; + + private final double memoryPGain; + private final double memoryIGain; + private final double memoryDGain; + private final double memoryOutputThreshold; + + private final double cpuPGain; + private final double cpuIGain; + private final double cpuDGain; + private final double cpuOutputThreshold; + + private ResourceBasedControllerOptions(Builder builder) { + this.targetMemoryUsage = builder.targetMemoryUsage; + this.targetCPUUsage = builder.targetCPUUsage; + this.memoryPGain = builder.memoryPGain; + this.memoryIGain = builder.memoryIGain; + this.memoryDGain = builder.memoryDGain; + this.memoryOutputThreshold = builder.memoryOutputThreshold; + this.cpuPGain = builder.cpuPGain; + this.cpuIGain = builder.cpuIGain; + this.cpuDGain = builder.cpuDGain; + this.cpuOutputThreshold = builder.cpuOutputThreshold; + } + + public double getTargetMemoryUsage() { + return targetMemoryUsage; + } + + public double getTargetCPUUsage() { + return targetCPUUsage; + } + + public double getMemoryPGain() { + return memoryPGain; + } + + public double getMemoryIGain() { + return memoryIGain; + } + + public double getMemoryDGain() { + return memoryDGain; + } + + public double getMemoryOutputThreshold() { + return memoryOutputThreshold; + } + + public double getCpuPGain() { + return cpuPGain; + } + + public double getCpuIGain() { + return cpuIGain; + } + + public double getCpuDGain() { + return cpuDGain; + } + + public double getCpuOutputThreshold() { + return cpuOutputThreshold; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotOptions.java new file mode 100644 index 000000000..3dad1ea39 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotOptions.java @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker.tuning; + +import io.temporal.common.Experimental; +import java.time.Duration; +import java.util.Objects; + +/** Options resource-based slot suppliers */ +@Experimental +public class ResourceBasedSlotOptions { + private final int minimumSlots; + private final int maximumSlots; + private final Duration rampThrottle; + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder { + private int minimumSlots; + private int maximumSlots; + private Duration rampThrottle; + + private Builder() {} + + /** + * @param minimumSlots minimum number of slots that will be issued without any resource checks + */ + public Builder setMinimumSlots(int minimumSlots) { + this.minimumSlots = minimumSlots; + return this; + } + + /** + * @param maximumSlots maximum number of slots that will ever be issued + */ + public Builder setMaximumSlots(int maximumSlots) { + this.maximumSlots = maximumSlots; + return this; + } + + /** + * @param rampThrottle time to wait between slot issuance. This value matters because how many + * resources a task will use cannot be determined ahead of time, and thus the system should + * wait to see how much resources are used before issuing more slots. + */ + public Builder setRampThrottle(Duration rampThrottle) { + this.rampThrottle = rampThrottle; + return this; + } + + public ResourceBasedSlotOptions build() { + return new ResourceBasedSlotOptions(minimumSlots, maximumSlots, rampThrottle); + } + } + + /** + * @param minimumSlots minimum number of slots that will be issued without any resource checks + * @param maximumSlots maximum number of slots that will ever be issued + * @param rampThrottle time to wait between slot issuance. This value matters because how many + * resources a task will use cannot be determined ahead of time, and thus the system should + * wait to see how much resources are used before issuing more slots. + */ + private ResourceBasedSlotOptions(int minimumSlots, int maximumSlots, Duration rampThrottle) { + this.minimumSlots = minimumSlots; + this.maximumSlots = maximumSlots; + this.rampThrottle = rampThrottle; + } + + public int getMinimumSlots() { + return minimumSlots; + } + + public int getMaximumSlots() { + return maximumSlots; + } + + public Duration getRampThrottle() { + return rampThrottle; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ResourceBasedSlotOptions that = (ResourceBasedSlotOptions) o; + return minimumSlots == that.minimumSlots + && maximumSlots == that.maximumSlots + && Objects.equals(rampThrottle, that.rampThrottle); + } + + @Override + public int hashCode() { + return Objects.hash(minimumSlots, maximumSlots, rampThrottle); + } + + @Override + public String toString() { + return "ResourceBasedSlotOptions{" + + "minimumSlots=" + + minimumSlots + + ", maximumSlots=" + + maximumSlots + + ", rampThrottle=" + + rampThrottle + + '}'; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotSupplier.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotSupplier.java new file mode 100644 index 000000000..b01ed9dbb --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotSupplier.java @@ -0,0 +1,165 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker.tuning; + +import io.temporal.common.Experimental; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; + +/** Implements a {@link SlotSupplier} based on resource usage for a particular slot type. */ +@Experimental +public class ResourceBasedSlotSupplier implements SlotSupplier { + + private final ResourceBasedController resourceController; + private final ResourceBasedSlotOptions options; + private Instant lastSlotIssuedAt = Instant.EPOCH; + + /** + * Construct a slot supplier for workflow tasks with the given resource controller and options. + * + *

The resource controller must be the same among all slot suppliers in a worker. If you want + * to use resource-based tuning for all slot suppliers, prefer {@link ResourceBasedTuner}. + */ + public static ResourceBasedSlotSupplier createForWorkflow( + ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) { + return new ResourceBasedSlotSupplier<>( + WorkflowSlotInfo.class, resourceBasedController, options); + } + + /** + * Construct a slot supplier for activity tasks with the given resource controller and options. + * + *

The resource controller must be the same among all slot suppliers in a worker. If you want + * to use resource-based tuning for all slot suppliers, prefer {@link ResourceBasedTuner}. + */ + public static ResourceBasedSlotSupplier createForActivity( + ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) { + return new ResourceBasedSlotSupplier<>( + ActivitySlotInfo.class, resourceBasedController, options); + } + + /** + * Construct a slot supplier for local activities with the given resource controller and options. + * + *

The resource controller must be the same among all slot suppliers in a worker. If you want + * to use resource-based tuning for all slot suppliers, prefer {@link ResourceBasedTuner}. + */ + public static ResourceBasedSlotSupplier createForLocalActivity( + ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) { + return new ResourceBasedSlotSupplier<>( + LocalActivitySlotInfo.class, resourceBasedController, options); + } + + private ResourceBasedSlotSupplier( + Class clazz, + ResourceBasedController resourceBasedController, + ResourceBasedSlotOptions options) { + this.resourceController = resourceBasedController; + // Merge default options for any unset fields + if (WorkflowSlotInfo.class.isAssignableFrom(clazz)) { + this.options = + ResourceBasedSlotOptions.newBuilder() + .setMinimumSlots( + options.getMinimumSlots() == 0 + ? ResourceBasedTuner.DEFAULT_WORKFLOW_SLOT_OPTIONS.getMinimumSlots() + : options.getMinimumSlots()) + .setMaximumSlots( + options.getMaximumSlots() == 0 + ? ResourceBasedTuner.DEFAULT_WORKFLOW_SLOT_OPTIONS.getMaximumSlots() + : options.getMaximumSlots()) + .setRampThrottle( + options.getRampThrottle() == null + ? ResourceBasedTuner.DEFAULT_WORKFLOW_SLOT_OPTIONS.getRampThrottle() + : options.getRampThrottle()) + .build(); + } else { + this.options = + ResourceBasedSlotOptions.newBuilder() + .setMinimumSlots( + options.getMinimumSlots() == 0 + ? ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS.getMinimumSlots() + : options.getMinimumSlots()) + .setMaximumSlots( + options.getMaximumSlots() == 0 + ? ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS.getMaximumSlots() + : options.getMaximumSlots()) + .setRampThrottle( + options.getRampThrottle() == null + ? ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS.getRampThrottle() + : options.getRampThrottle()) + .build(); + } + } + + @Override + public SlotPermit reserveSlot(SlotReserveContext ctx) throws InterruptedException { + while (true) { + if (ctx.getNumIssuedSlots() < options.getMinimumSlots()) { + return new SlotPermit(); + } else { + Duration mustWaitFor; + try { + mustWaitFor = options.getRampThrottle().minus(timeSinceLastSlotIssued()); + } catch (ArithmeticException e) { + mustWaitFor = Duration.ZERO; + } + if (mustWaitFor.compareTo(Duration.ZERO) > 0) { + Thread.sleep(mustWaitFor.toMillis()); + } + + Optional permit = tryReserveSlot(ctx); + if (permit.isPresent()) { + return permit.get(); + } else { + Thread.sleep(10); + } + } + } + } + + @Override + public Optional tryReserveSlot(SlotReserveContext ctx) { + int numIssued = ctx.getNumIssuedSlots(); + if (numIssued < options.getMinimumSlots() + || (timeSinceLastSlotIssued().compareTo(options.getRampThrottle()) > 0 + && numIssued < options.getMaximumSlots() + && resourceController.pidDecision())) { + lastSlotIssuedAt = Instant.now(); + return Optional.of(new SlotPermit()); + } + return Optional.empty(); + } + + @Override + public void markSlotUsed(SlotMarkUsedContext ctx) {} + + @Override + public void releaseSlot(SlotReleaseContext ctx) {} + + public ResourceBasedController getResourceController() { + return resourceController; + } + + private Duration timeSinceLastSlotIssued() { + return Duration.between(lastSlotIssuedAt, Instant.now()); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedTuner.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedTuner.java new file mode 100644 index 000000000..47ad34e19 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedTuner.java @@ -0,0 +1,137 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker.tuning; + +import io.temporal.common.Experimental; +import java.time.Duration; +import javax.annotation.Nonnull; + +/** A {@link WorkerTuner} that attempts to allocate slots based on available system resources. */ +@Experimental +public class ResourceBasedTuner implements WorkerTuner { + public static final ResourceBasedSlotOptions DEFAULT_WORKFLOW_SLOT_OPTIONS = + ResourceBasedSlotOptions.newBuilder() + .setMinimumSlots(5) + .setMaximumSlots(500) + .setRampThrottle(Duration.ZERO) + .build(); + public static final ResourceBasedSlotOptions DEFAULT_ACTIVITY_SLOT_OPTIONS = + ResourceBasedSlotOptions.newBuilder() + .setMinimumSlots(1) + .setMaximumSlots(1000) + .setRampThrottle(Duration.ofMillis(50)) + .build(); + + private final ResourceBasedController controller; + private final ResourceBasedSlotOptions workflowSlotOptions; + private final ResourceBasedSlotOptions activitySlotOptions; + private final ResourceBasedSlotOptions localActivitySlotOptions; + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder { + private ResourceBasedControllerOptions controllerOptions; + private @Nonnull ResourceBasedSlotOptions workflowSlotOptions = DEFAULT_WORKFLOW_SLOT_OPTIONS; + private @Nonnull ResourceBasedSlotOptions activitySlotOptions = DEFAULT_ACTIVITY_SLOT_OPTIONS; + private @Nonnull ResourceBasedSlotOptions localActivitySlotOptions = + DEFAULT_ACTIVITY_SLOT_OPTIONS; + + private Builder() {} + + public Builder setControllerOptions(ResourceBasedControllerOptions controllerOptions) { + this.controllerOptions = controllerOptions; + return this; + } + + /** + * Set the slot options for workflow tasks. Has no effect after the worker using this tuner + * starts. + * + *

Defaults to minimum 5 slots, maximum 500 slots, and no ramp throttle. + */ + public Builder setWorkflowSlotOptions(@Nonnull ResourceBasedSlotOptions workflowSlotOptions) { + this.workflowSlotOptions = workflowSlotOptions; + return this; + } + + /** + * Set the slot options for activity tasks. Has no effect after the worker using this tuner + * starts. + * + *

Defaults to minimum 1 slot, maximum 1000 slots, and 50ms ramp throttle. + */ + public Builder setActivitySlotOptions(@Nonnull ResourceBasedSlotOptions activitySlotOptions) { + this.activitySlotOptions = activitySlotOptions; + return this; + } + + /** + * Set the slot options for local activity tasks. Has no effect after the worker using this + * tuner starts. + * + *

Defaults to minimum 1 slot, maximum 1000 slots, and 50ms ramp throttle. + */ + public Builder setLocalActivitySlotOptions( + @Nonnull ResourceBasedSlotOptions localActivitySlotOptions) { + this.localActivitySlotOptions = localActivitySlotOptions; + return this; + } + + public ResourceBasedTuner build() { + return new ResourceBasedTuner( + controllerOptions, workflowSlotOptions, activitySlotOptions, localActivitySlotOptions); + } + } + + /** + * @param controllerOptions options for the {@link ResourceBasedController} used by this tuner + */ + public ResourceBasedTuner( + ResourceBasedControllerOptions controllerOptions, + ResourceBasedSlotOptions workflowSlotOptions, + ResourceBasedSlotOptions activitySlotOptions, + ResourceBasedSlotOptions localActivitySlotOptions) { + this.controller = ResourceBasedController.newSystemInfoController(controllerOptions); + this.workflowSlotOptions = workflowSlotOptions; + this.activitySlotOptions = activitySlotOptions; + this.localActivitySlotOptions = localActivitySlotOptions; + } + + @Nonnull + @Override + public SlotSupplier getWorkflowTaskSlotSupplier() { + return ResourceBasedSlotSupplier.createForWorkflow(controller, workflowSlotOptions); + } + + @Nonnull + @Override + public SlotSupplier getActivityTaskSlotSupplier() { + return ResourceBasedSlotSupplier.createForActivity(controller, activitySlotOptions); + } + + @Nonnull + @Override + public SlotSupplier getLocalActivitySlotSupplier() { + return ResourceBasedSlotSupplier.createForLocalActivity(controller, localActivitySlotOptions); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotReserveContext.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotReserveContext.java index 810c2a3b8..57007d57d 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotReserveContext.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/SlotReserveContext.java @@ -46,4 +46,9 @@ public interface SlotReserveContext { * @return The worker's build ID that is associated with this reservation request. */ String getWorkerBuildId(); + + /** + * @return The number of currently outstanding slot permits of this type, whether used or not. + */ + int getNumIssuedSlots(); } diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/SystemResourceInfo.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/SystemResourceInfo.java new file mode 100644 index 000000000..ff5cf6a07 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/SystemResourceInfo.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker.tuning; + +import io.temporal.common.Experimental; + +/** Implementors determine how resource usage is measured. */ +@Experimental +public interface SystemResourceInfo { + /** + * @return System-wide CPU usage as a percentage [0.0, 1.0] + */ + double getCPUUsagePercent(); + + /** + * @return Memory usage as a percentage [0.0, 1.0]. Memory usage should reflect either system-wide + * usage or JVM-specific usage, whichever is higher, to avoid running out of memory in either + * way. + */ + double getMemoryUsagePercent(); +} diff --git a/temporal-sdk/src/main/java/io/temporal/worker/tuning/WorkerTuner.java b/temporal-sdk/src/main/java/io/temporal/worker/tuning/WorkerTuner.java new file mode 100644 index 000000000..a25099569 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/worker/tuning/WorkerTuner.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker.tuning; + +import io.temporal.common.Experimental; +import javax.annotation.Nonnull; + +/** WorkerTuners allow for the dynamic customization of some aspects of worker configuration. */ +@Experimental +public interface WorkerTuner { + /** + * @return A {@link SlotSupplier} for workflow tasks. + */ + @Nonnull + SlotSupplier getWorkflowTaskSlotSupplier(); + + /** + * @return A {@link SlotSupplier} for activity tasks. + */ + @Nonnull + SlotSupplier getActivityTaskSlotSupplier(); + + /** + * @return A {@link SlotSupplier} for local activities. + */ + @Nonnull + SlotSupplier getLocalActivitySlotSupplier(); +} diff --git a/temporal-sdk/src/test/java/io/temporal/worker/IndependentResourceBasedTests.java b/temporal-sdk/src/test/java/io/temporal/worker/IndependentResourceBasedTests.java new file mode 100644 index 000000000..c78f943a1 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/IndependentResourceBasedTests.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker; + +interface IndependentResourceBasedTests {} diff --git a/temporal-sdk/src/test/java/io/temporal/worker/ResourceBasedTunerTests.java b/temporal-sdk/src/test/java/io/temporal/worker/ResourceBasedTunerTests.java new file mode 100644 index 000000000..58974e28a --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/ResourceBasedTunerTests.java @@ -0,0 +1,146 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.worker; + +import static io.temporal.testing.internal.SDKTestWorkflowRule.NAMESPACE; + +import com.uber.m3.tally.RootScopeBuilder; +import com.uber.m3.util.ImmutableMap; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityOptions; +import io.temporal.common.reporter.TestStatsReporter; +import io.temporal.serviceclient.MetricsTag; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.tuning.*; +import io.temporal.workflow.*; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +public class ResourceBasedTunerTests { + + private final TestStatsReporter reporter = new TestStatsReporter(); + private static final Map TAGS_NAMESPACE = + new ImmutableMap.Builder().putAll(MetricsTag.defaultTags(NAMESPACE)).build(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkerOptions( + WorkerOptions.newBuilder() + .setWorkerTuner( + ResourceBasedTuner.newBuilder() + .setControllerOptions( + ResourceBasedControllerOptions.newBuilder(0.7, 0.7).build()) + .build()) + .build()) + .setActivityImplementations(new ActivitiesImpl()) + .setWorkflowTypes(ResourceTunerWorkflowImpl.class) + .setMetricsScope( + new RootScopeBuilder() + .reporter(reporter) + .reportEvery(com.uber.m3.util.Duration.ofMillis(10))) + .build(); + + @Test + public void canRunWithResourceBasedTuner() { + ResourceTunerWorkflow workflow = testWorkflowRule.newWorkflowStub(ResourceTunerWorkflow.class); + workflow.execute(10, 1000); + Map nsAndTaskQueue = + new ImmutableMap.Builder() + .putAll(TAGS_NAMESPACE) + .put(MetricsTag.TASK_QUEUE, testWorkflowRule.getTaskQueue()) + .build(); + reporter.assertGauge(MetricsType.RESOURCE_MEM_USAGE, nsAndTaskQueue, (val) -> val > 0); + reporter.assertGauge( + MetricsType.RESOURCE_CPU_USAGE, + nsAndTaskQueue, + (val) -> { + // CPU use can be so low as to be 0, so can't really make any assertion here. + return true; + }); + reporter.assertGauge(MetricsType.RESOURCE_MEM_PID, nsAndTaskQueue, (val) -> true); + reporter.assertGauge(MetricsType.RESOURCE_CPU_PID, nsAndTaskQueue, (val) -> true); + } + + @Category(IndependentResourceBasedTests.class) + @Test(timeout = 300 * 1000) + public void canRunHeavyMemoryWithResourceBasedTuner() { + ResourceTunerWorkflow workflow = testWorkflowRule.newWorkflowStub(ResourceTunerWorkflow.class); + workflow.execute(100, 30000000); + } + + @WorkflowInterface + public interface ResourceTunerWorkflow { + @WorkflowMethod + String execute(int numActivities, int memCeiling); + } + + public static class ResourceTunerWorkflowImpl implements ResourceTunerWorkflow { + @Override + public String execute(int numActivities, int memCeiling) { + SleepActivity activity = + Workflow.newActivityStub( + SleepActivity.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofMinutes(1)) + .setHeartbeatTimeout(Duration.ofSeconds(20)) + .build()); + + List> promises = new ArrayList<>(); + for (int j = 0; j < numActivities; j++) { + Promise promise = Async.procedure(activity::useResources, memCeiling); + promises.add(promise); + } + + for (Promise promise : promises) { + promise.get(); + } + + return "I'm done"; + } + } + + @ActivityInterface + public interface SleepActivity { + void useResources(int memCeiling); + } + + public static class ActivitiesImpl implements SleepActivity { + @Override + public void useResources(int memCeiling) { + try { + int randNumBytes = (int) (Math.random() * memCeiling); + @SuppressWarnings("unused") + byte[] bytes = new byte[randNumBytes]; + // Need to wait at least a second to give metrics a chance to be reported + // (and also simulate some actual work in the activity) + Thread.sleep(1100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java index 934e014cd..7b051de07 100644 --- a/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java @@ -21,7 +21,9 @@ package io.temporal.worker; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import io.temporal.worker.tuning.*; import org.junit.Test; public class WorkerOptionsTest { @@ -47,4 +49,49 @@ public void verifyWorkerOptionsEquality() { WorkerOptions w2 = WorkerOptions.newBuilder().build(); assertEquals(w1, w2); } + + @Test + public void canBuildMixedSlotSupplierTuner() { + ResourceBasedController resourceController = + ResourceBasedController.newSystemInfoController( + ResourceBasedControllerOptions.newBuilder(0.5, 0.5).build()); + + SlotSupplier workflowTaskSlotSupplier = new FixedSizeSlotSupplier<>(10); + SlotSupplier activityTaskSlotSupplier = + ResourceBasedSlotSupplier.createForActivity( + resourceController, ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS); + SlotSupplier localActivitySlotSupplier = + ResourceBasedSlotSupplier.createForLocalActivity( + resourceController, ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS); + + WorkerOptions.newBuilder() + .setWorkerTuner( + new CompositeTuner( + workflowTaskSlotSupplier, activityTaskSlotSupplier, localActivitySlotSupplier)) + .build(); + } + + @Test + public void throwsIfResourceControllerIsNotSame() { + ResourceBasedController resourceController1 = + ResourceBasedController.newSystemInfoController( + ResourceBasedControllerOptions.newBuilder(0.5, 0.5).build()); + ResourceBasedController resourceController2 = + ResourceBasedController.newSystemInfoController( + ResourceBasedControllerOptions.newBuilder(0.2, 0.3).build()); + + SlotSupplier workflowTaskSlotSupplier = new FixedSizeSlotSupplier<>(10); + SlotSupplier activityTaskSlotSupplier = + ResourceBasedSlotSupplier.createForActivity( + resourceController1, ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS); + SlotSupplier localActivitySlotSupplier = + ResourceBasedSlotSupplier.createForLocalActivity( + resourceController2, ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS); + + assertThrows( + IllegalArgumentException.class, + () -> + new CompositeTuner( + workflowTaskSlotSupplier, activityTaskSlotSupplier, localActivitySlotSupplier)); + } }