Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamic job scaling rules #756

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open

Conversation

Andyz26
Copy link
Collaborator

@Andyz26 Andyz26 commented Mar 3, 2025

[Reviewer Guidance]

The following is a summary of critical classes for review.

Runtime

Introduce new v2 job master service with refactor to current job scaler to allow ScalerControllerActor to recreate/shutdown running scaler. Support rule actors under coordinator actor.

  • WorkerExecutionOperationsNetworkStage
  • JobScalerContext
  • JobMasterServiceV2
  • CoordinatorActor
  • ScalerControllerActor
  • mantis-runtime-executor/src/main/java/io/mantisrx/server/worker/jobmaster/rules/* (rule actors)
  • CustomRuleTriggerHandler
  • JobAutoScaler

API

New control plane API to support CRUD on scaling rules.

  • JobScalingRule
  • JobDiscoveryRouteHandlerAkkaImpl
  • JobDiscoveryRoute
  • mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/JobClustersRoute

ControlPlane

Control plane implementation to support new APIs and the scaling changes stream API.

  • JobClustersManagerActor
  • JobClusterActor
  • JobActor

Persistency

  • JobClusterScalerRuleDataImplWritable
  • KeyValueBasedPersistenceProvider

E2E Test

  • TestContainerHelloWorld

Dynamic Job Scaling Rules Implementation

Problem Statement

In order to effectively manage job scaling in various scenarios, particularly during periods of bursty traffic, there is a need to dynamically update job auto scaler settings such as minimum, maximum, step-size, and burst-change. The goal is to consolidate the APIs required to modify a job's auto scaler and to allow changes to the job scaler without necessitating job resubmission.

Solution

This pull request introduces support for dynamic job scaling rules that override the default job scaler configuration. It allows multiple rules to coexist using different modes, providing flexibility in handling diverse scaling requirements.

Key Features:

  • Dynamic Scaling Rules: Introduce the ability to define and manage multiple scaling rules that can override the default job scaler settings.
  • Rule Types: Support for three types of rules:
    • Perpetual: Continuous scaling based on predefined conditions.
    • Schedule: Time-based scaling using cron expressions. This schedule can a one time trigger or recurring event.
    • Custom: User-defined rules where users can implement and inject the JobScalingRuleCustomTrigger interface to customize the triggering conditions and actions.
  • Coexistence of Rules: Multiple rules can coexist, allowing for complex scaling strategies tailored to specific needs.

Internals

Mantis Live Scaling 1

  • Actor System: The Job Manager (JM) will now host an actor system to represent different rules and control the job scaler state.
  • Rule Priority: Rules are prioritized based on their ruleId, with newer rules taking precedence over older ones.
  • Subscription to Rule Changes: The JM subscribes to a new /jobScalerRules/ stream API to listen for rule changes, ensuring that the system is responsive to updates.
  • Rule Activation/Deactivation: Rules have the capability to activate or deactivate themselves based on the defined conditions.
  • Default Rule Handling: The default job configuration's schedulingInfo scaler config is treated as the default rule with an ID of "-1".

JM v2

Mantis Live Scaling 2 excalidraw

This implementation provides a robust framework for managing job scaling dynamically, enhancing the system's ability to handle varying traffic patterns efficiently. The introduction of multiple rule types and the ability to customize scaling behavior offers significant flexibility and control to users.

API Reference for Job Cluster Scaler Rules

This document provides details on the APIs available under api/v1/jobClusters/{}/scalerRules. These APIs allow you to manage scaler rules for job clusters, including creating, retrieving, and deleting scaler rules.

Endpoints

1. Create Scaler Rule

  • Endpoint: POST /api/v1/jobClusters/{clusterName}/scalerRules
  • Description: Creates a new scaler rule for the specified job cluster.

Request Schema

  • jobClusterName (String): The name of the job cluster.
  • scalerConfig (ScalerConfig): Configuration for the scaler.
    • type (String): Type of scaling policy. Only supports "standard" at the moment.
    • scalingPolicies (List of StageScalingPolicy): List of scaling policies. See StageScalingPolicy reference for details.
    • stageDesireSize (Map<Integer, Integer>): Desired size for each stage when the rule gets activated.
  • triggerConfig (TriggerConfig): Configuration for the trigger.
    • triggerType (String): Type of trigger (e.g., "schedule", "perpetual", "custom").
    • scheduleCron (String): Cron expression for scheduling. It can be a recurring schedule or a one-time trigger.
    • scheduleDuration (String): [Optional] Duration for the schedule to be effective. Will trigger deactivation when the duration expires.
    • customTrigger (String): Custom trigger configuration.
  • metadata (Map<String, String>): Additional metadata for the rule.

Sample Request Payload

{
  "jobClusterName": "exampleCluster",
  "scalerConfig": {
    "type": "standard",
    "scalingPolicies": [
      {
        "stage": 1,
        "min": 1,
        "max": 5,
        "increment": 1,
        "decrement": 1,
        "coolDownSecs": 300,
        "strategies": {
          "CPU": {
            "reason": "CPU",
            "scaleDownBelowPct": 20.0,
            "scaleUpAbovePct": 80.0,
            "rollingCount": {
              "count": 3,
              "of": 5
            }
          }
        },
        "allowAutoScaleManager": true
      }
    ],
    "stageDesireSize": {
      "1": 3
    }
  },
  "triggerConfig": {
    "triggerType": "schedule",
    "scheduleCron": "0 0 15 * * ?",
    "scheduleDuration": "1h",
    "customTrigger": ""
  },
  "metadata": {
    "createdBy": "admin"
  }
}

Copy link

github-actions bot commented Mar 3, 2025

Test Results

645 tests  +23   635 ✅ +23   8m 36s ⏱️ +35s
149 suites + 7    10 💤 ± 0 
149 files   + 7     0 ❌ ± 0 

Results for commit 5cbfd7f. ± Comparison against base commit 1ffc3b9.

This pull request removes 2 and adds 25 tests. Note that renamed tests count towards both.
TestContainerHelloWorld ‑ testQuickSubmitJob
TestContainerHelloWorld ‑ testRegularSubmitJob
io.mantisrx.master.api.akka.route.v1.JobsRouteTest ‑ testIt
io.mantisrx.master.jobcluster.JobClusterAkkaTest ‑ testCreateAddAndDeleteRules
io.mantisrx.master.jobcluster.JobClusterAkkaTest ‑ testCreateScalerRuleFromEmpty
io.mantisrx.master.jobcluster.JobClusterAkkaTest ‑ testDeleteScalerRule
io.mantisrx.master.jobcluster.job.JobTestScalerRule ‑ testScalerRuleStreamSubject
io.mantisrx.master.jobcluster.scaler.JobClusterScalerRuleDataImplWritableTest ‑ testDeleteRemovesExistingRuleAndKeepsOthersUnchanged
io.mantisrx.master.jobcluster.scaler.JobClusterScalerRuleDataImplWritableTest ‑ testMergeAddsNewRuleAndIncrementsRuleId
io.mantisrx.master.jobcluster.scaler.JobClusterScalerRuleDataImplWritableTest ‑ testToProtoRulesConversion
io.mantisrx.runtime.descriptor.JobScalingRuleTest ‑ jobScalingRuleDeserialization
io.mantisrx.runtime.descriptor.JobScalingRuleTest ‑ jobScalingRuleDeserializationWithUnknownProperties
…

♻️ This comment has been updated with latest results.

@Andyz26 Andyz26 force-pushed the andyz/dynamicJobScaler1 branch from c8f9ba9 to c0744fe Compare March 3, 2025 20:42
@Andyz26 Andyz26 requested a deployment to Integrate Pull Request March 3, 2025 20:42 — with GitHub Actions Waiting
@Andyz26 Andyz26 temporarily deployed to Integrate Pull Request March 3, 2025 20:51 — with GitHub Actions Inactive
Copy link

github-actions bot commented Mar 3, 2025

Uploaded Artifacts

To use these artifacts in your Gradle project, paste the following lines in your build.gradle.

resolutionStrategy {
    force "io.mantisrx:mantis-client:0.1.0-20250311.040213-569"
    force "io.mantisrx:mantis-common-akka:0.1.0-20250311.040213-4"
    force "io.mantisrx:mantis-common-serde:0.1.0-20250311.040213-568"
    force "io.mantisrx:mantis-common:0.1.0-20250311.040213-568"
    force "io.mantisrx:mantis-network:0.1.0-20250311.040213-568"
    force "io.mantisrx:mantis-remote-observable:0.1.0-20250311.040213-569"
    force "io.mantisrx:mantis-runtime-loader:0.1.0-20250311.040213-569"
    force "io.mantisrx:mantis-rxcontrol:0.1.0-20250311.040213-42"
    force "io.mantisrx:mantis-discovery-proto:0.1.0-20250311.040213-568"
    force "io.mantisrx:mantis-runtime-executor:0.1.0-20250311.040213-104"
    force "io.mantisrx:mantis-runtime:0.1.0-20250311.040213-569"
    force "io.mantisrx:mantis-connector-iceberg:0.1.0-20250311.040213-567"
    force "io.mantisrx:mantis-connector-job-source:0.1.0-20250311.040213-20"
    force "io.mantisrx:mantis-connector-kafka:0.1.0-20250311.040213-569"
    force "io.mantisrx:mantis-testcontainers:0.1.0-20250311.040213-238"
    force "io.mantisrx:mantis-control-plane-client:0.1.0-20250311.040213-568"
    force "io.mantisrx:mantis-control-plane-core:0.1.0-20250311.040213-562"
    force "io.mantisrx:mantis-control-plane-server:0.1.0-20250311.040213-562"
    force "io.mantisrx:mantis-control-plane-dynamodb:0.1.0-20250311.040213-29"
    force "io.mantisrx:mantis-examples-core:0.1.0-20250311.040213-562"
    force "io.mantisrx:mantis-examples-groupby-sample:0.1.0-20250311.040213-562"
    force "io.mantisrx:mantis-shaded:0.1.0-20250311.040213-567"
    force "io.mantisrx:mantis-connector-publish:0.1.0-20250311.040213-568"
    force "io.mantisrx:mantis-examples-jobconnector-sample:0.1.0-20250311.040213-562"
    force "io.mantisrx:mantis-examples-sine-function:0.1.0-20250311.040213-561"
    force "io.mantisrx:mantis-examples-mantis-publish-sample:0.1.0-20250311.040213-562"
    force "io.mantisrx:mantis-examples-synthetic-sourcejob:0.1.0-20250311.040213-562"
    force "io.mantisrx:mantis-publish-netty:0.1.0-20250311.040213-561"
    force "io.mantisrx:mantis-examples-twitter-sample:0.1.0-20250311.040213-562"
    force "io.mantisrx:mantis-examples-wordcount:0.1.0-20250311.040213-561"
    force "io.mantisrx:mantis-publish-core:0.1.0-20250311.040213-561"
    force "io.mantisrx:mantis-server-agent:0.1.0-20250311.040213-561"
    force "io.mantisrx:mantis-server-worker-client:0.1.0-20250311.040213-562"
    force "io.mantisrx:mantis-publish-netty-guice:0.1.0-20250311.040213-562"
    force "io.mantisrx:mantis-source-job-publish:0.1.0-20250311.040213-562"
    force "io.mantisrx:mantis-source-job-kafka:0.1.0-20250311.040213-562"
}

@Andyz26 Andyz26 requested a deployment to Integrate Pull Request March 4, 2025 00:50 — with GitHub Actions Waiting
@Andyz26 Andyz26 force-pushed the andyz/dynamicJobScaler1 branch from bc4b4c7 to 4fd427a Compare March 4, 2025 19:15
@Andyz26 Andyz26 requested a deployment to Integrate Pull Request March 4, 2025 19:16 — with GitHub Actions Waiting
Copy link
Collaborator

@fdc-ntflx fdc-ntflx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At very high level the implementation follows the idea in the drawing. I don't see any major flaws to block you for integration testing. However for a better review experience I think there is opportunity to split this PR is smaller chunks.

Comment on lines +19 to +21
public static final String TRIGGER_TYPE_SCHEDULE = "schedule";
public static final String TRIGGER_TYPE_PERPETUAL = "perpetual";
public static final String TRIGGER_TYPE_CUSTOM = "custom";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe use enum?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh i found using enum on proto class and tests to be quite verbose and error prone comparing to const strings.


@Builder
@Value
public class JobScalingRule implements Serializable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this model is quite flexible, but I'm concerned it could lead to confusion about which policies are currently playing a major role or influencing decisions. Some policies might even contradict each other, like "scaling up vs. scaling down," potentially causing the algorithm to oscillate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added a new metric to report current active rule too. Will leave the logic as is in beta and revisit later from user feedback.

import io.mantisrx.server.core.NamedJobInfo;
import io.mantisrx.server.core.PostJobStatusRequest;
import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: don't use wildcard imports.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion on this and i would like to just apply current default profile.

@@ -745,6 +741,52 @@ public Observable<JobSchedulingInfo> schedulingChanges(final String jobId) {
;
}

public Observable<JobScalerRuleInfo> jobScalerRulesStream(final String jobId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Consider adding docstrings here to provide a quick understanding of the purpose of this long function.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: These functions have similar verbose code for subscribing and streaming events from master (or at least that's my understanding of this code). Consider extracting that into a separate auxiliary function.

import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.core.NamedJobInfo;
import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.*;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

Comment on lines +20 to +21
String jobId;
boolean jobCompleted;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I didn't expect these to be part of this message. Can you explain?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the same semantics as other stream APIs and this mechanism is used to gracefully close the connection if the current job is terminated to avoid connection leak.

@@ -308,6 +315,9 @@ private Receive getInitializingBehavior() {
.match(ScaleStageRequest.class, (x) -> getSender().tell(new ScaleStageResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state), 0), getSelf()))
.match(ResubmitWorkerRequest.class, (x) -> getSender().tell(new ResubmitWorkerResponse(x.requestId, CLIENT_ERROR, genUnexpectedMsg(x.toString(), state)), getSelf()))
.match(WorkerEvent.class, (x) -> logger.warn(genUnexpectedMsg(x.toString(), state)))
.match(JobClusterScalerRuleProto.CreateScalerRuleRequest.class, (x) -> getSender().tell(JobClusterScalerRuleProto.CreateScalerRuleResponse.builder().requestId(x.requestId).responseCode(CLIENT_ERROR_NOT_FOUND).message(genUnexpectedMsg(x.toString(), state)).build(), getSelf()))
.match(JobClusterScalerRuleProto.DeleteScalerRuleRequest.class, (x) -> getSender().tell(JobClusterScalerRuleProto.DeleteScalerRuleResponse.builder().requestId(x.requestId).responseCode(CLIENT_ERROR_NOT_FOUND).message(genUnexpectedMsg(x.toString(), state)).build(), getSelf()))
.match(JobClusterScalerRuleProto.GetScalerRulesRequest.class, (x) -> getSender().tell(JobClusterScalerRuleProto.GetScalerRulesResponse.builder().requestId(x.requestId).responseCode(CLIENT_ERROR_NOT_FOUND).message(genUnexpectedMsg(x.toString(), state)).build(), getSelf()))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you accept GetJobScalerRuleStreamRequest during initialization?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! updated.

@Andyz26 Andyz26 temporarily deployed to Integrate Pull Request March 8, 2025 00:01 — with GitHub Actions Inactive
@Andyz26 Andyz26 force-pushed the andyz/dynamicJobScaler1 branch from 6838080 to a216116 Compare March 10, 2025 22:38
@Andyz26 Andyz26 requested a deployment to Integrate Pull Request March 10, 2025 22:39 — with GitHub Actions Waiting
@Andyz26 Andyz26 temporarily deployed to Integrate Pull Request March 10, 2025 22:44 — with GitHub Actions Inactive
@Andyz26 Andyz26 deployed to Integrate Pull Request March 11, 2025 03:15 — with GitHub Actions Active
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants