Skip to content

Commit

Permalink
feat: add keda autoscaling support (#10)
Browse files Browse the repository at this point in the history
* feat: add keda autoscaling support

* refactor: use a hook to extend celery config

* docs: update documentation with new filter

* fix: use not required validation

* fix: use listLength as string

* docs: add pod-autoscaling notes

* fix: allow to scale default workers

* chore: quality fixes

(cherry picked from commit b529b0c)

* refactor: use enable_keda key per variant

(cherry picked from commit b52f142)

* fix: only apply overrides if default variant is set

* fix: add missing enable_keda to typed dict

* feat: removing CELERY_MULTIQUEUE_ENABLED

* docs: clarify key entries for workers config

(cherry picked from commit 70aae3b)

---------

Co-authored-by: jfavellar90 <[email protected]>
  • Loading branch information
Ian2012 and jfavellar90 authored Sep 12, 2024
1 parent 317d8ae commit d2c2f84
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 34 deletions.
92 changes: 74 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,49 @@ tutor plugins enable celery

### Celery queues

By default, tutor-contrib-celery enables the following queues with independent deployments
for each:
By default, in a standard OpenedX installation with Tutor in Kubernetes, all the LMS/CMS async tasks are executed
by a single celery deployment. This plugin allows to distribute async workload by configuring additional deployments
to execute celery tasks sent to a specific queues. This can help to:

- Achieve a better performance when having high volume of async tasks to process
- Configure different scaling parameters according to the nature of the tasks processed by a queue (I/O bound tasks,
CPU tasks, etc.)

To achieve this, the `CELERY_WORKERS_CONFIG` filter is implemented to add extra queues whose tasks require to be
processed by a separated deployment.

## Recommended multiqueue configuration

From checking the LMS and CMS codebase, the queues for every service are described below:

- **CMS**: default, high, low (taken from CMS settings [here](https://github.com/openedx/edx-platform/blob/open-release/redwood.master/cms/envs/common.py#L1578-L1582))
- **LMS**: default, high, high_mem (taken from LMS settings [here](https://github.com/openedx/edx-platform/blob/open-release/redwood.master/lms/envs/common.py#L2913-L2917))

> [!NOTE]
> We recommend using [tutor-contrib-pod-autoscaling](https://github.com/eduNEXT/tutor-contrib-pod-autoscaling)
> to setup requested resources and limits.
By default Tutor implements a single deployment to process tasks on all queues in LMS/CMS. The `CELERY_WORKERS_CONFIG` filter
can be used to add the extra queues from LMS/CMS configuration.

In case you are using different celery queues than the defaults from Open edX, you can
extend the list by setting `CELERY_WORKER_VARIANTS` on your `config.yml`. The format is the following:
```python

```yaml
CELERY_WORKER_VARIANTS:
lms:
- high
- high_mem
- lms_custom_queue
cms:
- high
- low
- cms_custom_queue
from tutorcelery.hooks import CELERY_WORKERS_CONFIG

@CELERY_WORKERS_CONFIG.add()
def _add_celery_workers_config(workers_config):
# Adding LMS extra queues
workers_config["lms"]["high"] = {} # Make sure to match the key with the queue name: edx.lms.core.high
workers_config["lms"]["high_mem"] = {}

# Adding CMS extra queues
workers_config["cms"]["high"] = {}
workers_config["cms"]["low"] = {}
return workers_config
```
With this configuration, 4 new deployments will be created (one for every new queue) to process the tasks
separately according to the queue they are sent to. Additionally, the default Tutor LMS/CMS celery deployments
are patched to ONLY process the tasks sent to the "default" queue.

This is the recommended configuration for a multiqueue approach with LMS and CMS given the queues every
service proposes in its settings files by default. However, the usage of the `CELERY_WORKERS_CONFIG` filter
can be adapted for different configuration scenarios.

This plugin also provides a setting to directly route LMS/CMS tasks to an specific queue. It can extends/overrides
the default `EXPLICIT_QUEUES` setting:
Expand All @@ -60,6 +79,42 @@ CELERY_CMS_EXPLICIT_QUEUES:
queue: edx.cms.core.high
```
### Autoscaling
As an alternative to the CPU/memory based autoscaling offered by the plugin [tutor-contrib-pod-autoscaling](https://github.com/eduNEXT/tutor-contrib-pod-autoscaling),
this plugins supports Celery workers autoscaling based on the size of the celery queue of a given worker. We are using
Keda autoscaling for this purposes, check the [Keda documentation](https://keda.sh/docs) to find out more.
To enable autoscaling you need to enable the `enable_keda` key for every queue variant. The defaults parameters are the following:

```python
{
"min_replicas": 0,
"max_replicas": 30,
"list_length": 40,
"enable_keda": False,
}
```

> [!NOTE]
> You can use the filter `CELERY_WORKERS_CONFIG` as shown above to modify the scaling parameters for every queue.

If you are using [tutor-contrib-pod-autoscaling](https://github.com/eduNEXT/tutor-contrib-pod-autoscaling) and want to setup Keda autoscaling, make sure to disable HPA for the `lms-worker` and the `cms-worker` as **using both autoscalers at the same time is not recommended**.

```python
from tutorpod_autoscaling.hooks import AUTOSCALING_CONFIG
@AUTOSCALING_CONFIG.add()
def _add_my_autoscaling(autoscaling_config):
autoscaling_config["lms-worker"].update({
"enable_hpa": False,
})
autoscaling_config["cms-worker"].update({
"enable_hpa": False,
})
return autoscaling_config
```

### Enable flower

For troubleshooting purposes, you can enable a flower deployment to monitor in realtime the Celery queues
Expand Down Expand Up @@ -91,6 +146,7 @@ CELERY_FLOWER_SERVICE_MONITOR: true
```

License
*******

---

This software is licensed under the terms of the AGPLv3.
27 changes: 27 additions & 0 deletions tutorcelery/hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
These hooks are stored in a separate module. If they were included in plugin.py, then
the pod-autoscaling hooks would be created in the context of some other plugin that imports
them.
"""

from __future__ import annotations
import sys

if sys.version_info < (3, 11):
from typing_extensions import TypedDict, NotRequired
else:
from typing import TypedDict, NotRequired

from tutor.core.hooks import Filter


class CELERY_WORKERS_ATTRS_TYPE(TypedDict):
min_replicas: NotRequired[int]
max_replicas: NotRequired[int]
list_length: NotRequired[int]
enable_keda: bool


CELERY_WORKERS_CONFIG: Filter[dict[str, dict[str, CELERY_WORKERS_ATTRS_TYPE]], []] = (
Filter()
)
6 changes: 2 additions & 4 deletions tutorcelery/patches/k8s-deployments
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{% if CELERY_MULTIQUEUE_ENABLED %}
{% for service, variants in CELERY_WORKER_VARIANTS.items() %}
{% for variant in variants%}
{% for service, variants in iter_celery_workers_config().items() %}
{% for variant, config in variants.items() if variant != 'default' %}
{% set deployment = service + "-" + "worker" + "-" + variant.replace("_", "-") %}
---
apiVersion: apps/v1
Expand Down Expand Up @@ -53,7 +52,6 @@ spec:
name: openedx-config
{% endfor %}
{% endfor %}
{% endif %}
{% if CELERY_FLOWER -%}
---
apiVersion: apps/v1
Expand Down
8 changes: 6 additions & 2 deletions tutorcelery/patches/k8s-override
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{% if CELERY_MULTIQUEUE_ENABLED %}
{% for service in ["lms", "cms"] %}
{% set exclude = "lms" if service == "cms" else "cms" %}
{% set service_variants = iter_celery_workers_config().get(service) %}
---
apiVersion: apps/v1
kind: Deployment
Expand All @@ -17,6 +18,9 @@ spec:
- "--loglevel=info"
- "--hostname=edx.{{service}}.core.default.%%h"
- "--max-tasks-per-child=100"
{% if is_celery_multiqueue(service) -%}
- "--queues=edx.{{service}}.core.default"
{% else -%}
- "--exclude-queues=edx.{{exclude}}.core.default"
{% endif -%}
{% endfor %}
{% endif %}
1 change: 1 addition & 0 deletions tutorcelery/patches/kustomization-resources
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- plugins/celery/k8s/keda.yml
2 changes: 0 additions & 2 deletions tutorcelery/patches/openedx-cms-production-settings
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
{% if CELERY_MULTIQUEUE_ENABLED %}
try:
EXPLICIT_QUEUES.update({{CELERY_CMS_EXPLICIT_QUEUES}})
except NameError:
EXPLICIT_QUEUES = {{CELERY_CMS_EXPLICIT_QUEUES}}
{% endif %}
# Prevents losing tasks when workers are shutdown
CELERY_ACKS_LATE = True
3 changes: 0 additions & 3 deletions tutorcelery/patches/openedx-lms-production-settings
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
{% if CELERY_MULTIQUEUE_ENABLED %}
try:
EXPLICIT_QUEUES.update({{CELERY_LMS_EXPLICIT_QUEUES}})
except NameError:
EXPLICIT_QUEUES = {{CELERY_LMS_EXPLICIT_QUEUES}}
{% endif %}

# Prevents losing tasks when workers are shutdown
CELERY_ACKS_LATE = True
80 changes: 75 additions & 5 deletions tutorcelery/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,93 @@

import click
import importlib_resources
import tutor
from tutor import hooks

from .__about__ import __version__
from .hooks import CELERY_WORKERS_CONFIG, CELERY_WORKERS_ATTRS_TYPE

########################################
# CONFIGURATION
########################################

CORE_CELERY_WORKER_CONFIG: dict[str, dict[str, CELERY_WORKERS_ATTRS_TYPE]] = {
"lms": {
"default": {
"min_replicas": 0,
"max_replicas": 10,
"list_length": 40,
"enable_keda": False,
},
},
"cms": {
"default": {
"min_replicas": 0,
"max_replicas": 10,
"list_length": 40,
"enable_keda": False,
},
},
}


# The core autoscaling configs are added with a high priority, such that other users can override or
# remove them.
@CELERY_WORKERS_CONFIG.add(priority=hooks.priorities.HIGH)
def _add_core_autoscaling_config(
scaling_config: dict[str, dict[str, CELERY_WORKERS_ATTRS_TYPE]]
) -> dict[str, dict[str, CELERY_WORKERS_ATTRS_TYPE]]:
scaling_config.update(CORE_CELERY_WORKER_CONFIG)
return scaling_config


@tutor.hooks.lru_cache
def get_celery_workers_config() -> dict[str, dict[str, CELERY_WORKERS_ATTRS_TYPE]]:
"""
This function is cached for performance.
"""
return CELERY_WORKERS_CONFIG.apply({})


def iter_celery_workers_config() -> dict[str, dict[str, CELERY_WORKERS_ATTRS_TYPE]]:
"""
Yield:
(name, dict)
"""
return {name: config for name, config in get_celery_workers_config().items()}


def is_celery_multiqueue(service: str) -> bool:
"""
This function validates whether celery is configured in multiqueue mode for a given service
"""
service_celery_config = iter_celery_workers_config().get(service, {})
service_queue_len = len(service_celery_config.keys())

# If no queue variants are configured, multiqueue is disabled
if not service_queue_len:
return False

# Multiqueue is not enabled if only the default variant is available
if service_queue_len == 1 and "default" in service_celery_config:
return False

return True


hooks.Filters.CONFIG_DEFAULTS.add_items(
[
# Add your new settings that have default values here.
# Each new setting is a pair: (setting_name, default_value).
# Prefix your setting names with 'CELERY_'.
("CELERY_VERSION", __version__),
(
"CELERY_WORKER_VARIANTS",
{"lms": ["high", "high_mem"], "cms": ["high", "low"]},
),
("CELERY_LMS_EXPLICIT_QUEUES", {}),
("CELERY_CMS_EXPLICIT_QUEUES", {}),
("CELERY_FLOWER", False),
("CELERY_FLOWER_EXPOSE_SERVICE", False),
("CELERY_FLOWER_HOST", "flower.{{LMS_HOST}}"),
("CELERY_FLOWER_DOCKER_IMAGE", "docker.io/mher/flower:2.0.1"),
("CELERY_MULTIQUEUE_ENABLED", False),
("CELERY_FLOWER_SERVICE_MONITOR", False),
]
)
Expand Down Expand Up @@ -158,10 +220,18 @@
[
("celery/build", "plugins"),
("celery/apps", "plugins"),
("celery/k8s", "plugins"),
],
)


# Make the pod-autoscaling hook functions available within templates
hooks.Filters.ENV_TEMPLATE_VARIABLES.add_items(
[
("iter_celery_workers_config", iter_celery_workers_config),
("is_celery_multiqueue", is_celery_multiqueue),
]
)
########################################
# PATCH LOADING
# (It is safe & recommended to leave
Expand Down
26 changes: 26 additions & 0 deletions tutorcelery/templates/celery/k8s/keda.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{% for service, variants in iter_celery_workers_config().items() %}
{% for variant, config in variants.items() if config.get('enable_keda') %}
{% set deployment = service + "-" + "worker" + "-" + variant.replace("_", "-")%}
---
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {% if variant != 'default' %}{{ deployment }}{% else %}{{service}}-worker{% endif %}-scaledobject
spec:
minReplicaCount: {{ config.get("min_replicas", 0) }}
maxReplicaCount: {{ config.get("max_replicas", 30) }}
scaleTargetRef:
kind: Deployment
name: {% if variant != 'default' %}{{ deployment }}{% else %}{{service}}-worker{% endif %}
triggers:
- metadata:
{% if REDIS_HOST == 'redis' -%}
address: redis.{{K8S_NAMESPACE}}:{{REDIS_PORT}}
{% else -%}
address: {{REDIS_HOST}}:{{REDIS_PORT}}
{% endif -%}
listLength: "{{ config.get("list_length", 40)}}"
listName: edx.{{service}}.core.{{variant}}
type: redis
{% endfor %}
{% endfor %}

0 comments on commit d2c2f84

Please sign in to comment.