Skip to content
This repository has been archived by the owner on Oct 16, 2024. It is now read-only.

Background tasks #44

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
extra-arguments: -x --localstack-address 172.17.0.1
pre-run-script: localstack-installation.sh
charmcraft-channel: latest/edge
modules: '["test_charm.py", "test_cos.py", "test_database.py", "test_db_migration.py", "test_django.py", "test_django_integrations.py", "test_fastapi.py", "test_go.py", "test_integrations.py", "test_proxy.py"]'
modules: '["test_charm.py", "test_cos.py", "test_database.py", "test_db_migration.py", "test_django.py", "test_django_integrations.py", "test_fastapi.py", "test_go.py", "test_integrations.py", "test_proxy.py", "test_workers.py"]'
rockcraft-repository: javierdelapuente/rockcraft
rockcraft-ref: fastapi-extension
juju-channel: ${{ matrix.juju-version }}
Expand Down
109 changes: 106 additions & 3 deletions examples/flask/test_rock/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

import logging
import os
import socket
import time
import urllib.parse
from urllib.parse import urlparse
Expand All @@ -16,11 +18,86 @@
import pymysql
import pymysql.cursors
import redis
from celery import Celery, Task
from flask import Flask, g, jsonify, request


def hostname():
"""Get the hostname of the current machine."""
return socket.gethostbyname(socket.gethostname())


def celery_init_app(app: Flask, broker_url: str) -> Celery:
"""Initialise celery using the redis connection string.

See https://flask.palletsprojects.com/en/3.0.x/patterns/celery/#integrate-celery-with-flask.
"""

class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)

celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.set_default()
app.extensions["celery"] = celery_app
app.config.from_mapping(
CELERY=dict(
broker_url=broker_url,
result_backend=broker_url,
task_ignore_result=True,
),
)
celery_app.config_from_object(app.config["CELERY"])
return celery_app


app = Flask(__name__)
app.config.from_prefixed_env()

broker_url = os.environ.get("REDIS_DB_CONNECT_STRING")
javierdelapuente marked this conversation as resolved.
Show resolved Hide resolved
# Configure Celery only if Redis is configured
if broker_url:
celery_app = celery_init_app(app, broker_url)
redis_client = redis.Redis.from_url(broker_url)

@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
"""Set up periodic tasks in the scheduler."""
try:
# This will only have an effect in the beat scheduler.
sender.add_periodic_task(0.5, scheduled_task.s(hostname()), name="every 0.5s")
except NameError as e:
logging.exception("Failed to configure the periodic task")

@celery_app.task
def scheduled_task(scheduler_hostname):
"""Function to run a schedule task in a worker.

The worker that will run this task will add the scheduler hostname argument
to the "schedulers" set in Redis, and the worker's hostname to the "workers"
set in Redis.
"""
worker_hostname = hostname()
logging.info(
"scheduler host received %s in worker host %s", scheduler_hostname, worker_hostname
)
redis_client.sadd("schedulers", scheduler_hostname)
redis_client.sadd("workers", worker_hostname)
logging.info("schedulers: %s", redis_client.smembers("schedulers"))
logging.info("workers: %s", redis_client.smembers("workers"))
# The goal is to have all workers busy in all processes.
# For that it maybe necessary to exhaust all workers, but not to get the pending tasks
# too big, so all schedulers can manage to run their scheduled tasks.
# Celery prefetches tasks, and if they cannot be run they are put in reserved.
# If all processes have tasks in reserved, this task will finish immediately to not make
# queues any longer.
inspect_obj = celery_app.control.inspect()
reserved_sizes = [len(tasks) for tasks in inspect_obj.reserved().values()]
logging.info("number of reserved tasks %s", reserved_sizes)
delay = 0 if min(reserved_sizes) > 0 else 5
time.sleep(delay)


def get_mysql_database():
"""Get the mysql db connection."""
Expand Down Expand Up @@ -213,16 +290,42 @@ def mongodb_status():

@app.route("/redis/status")
def redis_status():
"""Mongodb status endpoint."""
"""Redis status endpoint."""
if database := get_redis_database():
try:
database.set("foo", "bar")
return "SUCCESS"
except pymongo.errors.PyMongoError:
pass
except redis.exceptions.RedisError:
logging.exception("Error querying redis")
return "FAIL"


@app.route("/redis/clear_celery_stats")
def redis_celery_clear_stats():
"""Reset Redis statistics about workers and schedulers."""
if database := get_redis_database():
try:
database.delete("workers")
database.delete("schedulers")
return "SUCCESS"
except redis.exceptions.RedisError:
logging.exception("Error querying redis")
return "FAIL", 500


@app.route("/redis/celery_stats")
def redis_celery_stats():
"""Read Redis statistics about workers and schedulers."""
if database := get_redis_database():
try:
worker_set = [str(host) for host in database.smembers("workers")]
beat_set = [str(host) for host in database.smembers("schedulers")]
return jsonify({"workers": worker_set, "schedulers": beat_set})
except redis.exceptions.RedisError:
logging.exception("Error querying redis")
return "FAIL", 500


@app.route("/rabbitmq/send")
def rabbitmq_send():
"""Send a message to "charm" queue."""
Expand Down
1 change: 1 addition & 0 deletions examples/flask/test_rock/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ pymongo
redis[hiredis]
boto3
pika
celery
16 changes: 16 additions & 0 deletions examples/flask/test_rock/rockcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,19 @@ platforms:

extensions:
- flask-framework

services:
celery-worker:
override: replace
# redis is not mandatory in the charm. We do not want the charm to fail immediately, so the sleep
command: bash -c "sleep 5; celery -A app:celery_app worker -c 2 --loglevel DEBUG"
startup: enabled
user: _daemon_
working-dir: /flask/app
celery-beat-scheduler:
override: replace
# redis is not mandatory in the charm. We do not want the charm to fail immediately, so the sleep
command: bash -c "sleep 5; celery -A app:celery_app beat --loglevel DEBUG -s /tmp/celerybeat-schedule"
startup: enabled
user: _daemon_
working-dir: /flask/app
4 changes: 3 additions & 1 deletion paas_app_charmer/_gunicorn/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ class GunicornBase(PaasCharm):
@property
def _workload_config(self) -> WorkloadConfig:
"""Return a WorkloadConfig instance."""
return create_workload_config(self._framework_name)
return create_workload_config(
framework_name=self._framework_name, unit_name=self.unit.name
)

def _create_app(self) -> App:
"""Build an App instance for the Gunicorn based charm.
Expand Down
4 changes: 3 additions & 1 deletion paas_app_charmer/_gunicorn/workload_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
APPLICATION_ERROR_LOG_FILE_FMT = "/var/log/{framework}/error.log"


def create_workload_config(framework_name: str) -> WorkloadConfig:
def create_workload_config(framework_name: str, unit_name: str) -> WorkloadConfig:
"""Create an WorkloadConfig for Gunicorn.

Args:
framework_name: framework name.
unit_name: name of the app unit.

Returns:
new WorkloadConfig
Expand All @@ -35,4 +36,5 @@ def create_workload_config(framework_name: str) -> WorkloadConfig:
pathlib.Path(str.format(APPLICATION_ERROR_LOG_FILE_FMT, framework=framework_name)),
],
metrics_target="*:9102",
unit_name=unit_name,
)
26 changes: 26 additions & 0 deletions paas_app_charmer/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

logger = logging.getLogger(__name__)

WORKER_SUFFIX = "-worker"
SCHEDULER_SUFFIX = "-scheduler"


@dataclass(kw_only=True)
class WorkloadConfig: # pylint: disable=too-many-instance-attributes
Expand All @@ -37,6 +40,7 @@ class WorkloadConfig: # pylint: disable=too-many-instance-attributes
log_files: list of files to monitor.
metrics_target: target to scrape for metrics.
metrics_path: path to scrape for metrics.
unit_name: Name of the unit. Needed to know if schedulers should run here.
"""

framework: str
Expand All @@ -51,6 +55,16 @@ class WorkloadConfig: # pylint: disable=too-many-instance-attributes
log_files: List[pathlib.Path]
metrics_target: str | None = None
metrics_path: str | None = "/metrics"
unit_name: str

def should_run_scheduler(self) -> bool:
"""Return if the unit should run scheduler processes.

Return:
True if the unit should run scheduler processes, False otherwise.
"""
unit_id = self.unit_name.split("/")[1]
return unit_id == "0"


class App:
Expand Down Expand Up @@ -192,6 +206,18 @@ def _app_layer(self) -> ops.pebble.LayerDict:
services[self._workload_config.service_name]["override"] = "replace"
services[self._workload_config.service_name]["environment"] = self.gen_environment()

for service_name, service in services.items():
# Add environment variables to all worker processes.
if service_name.endswith(WORKER_SUFFIX):
jdkandersson marked this conversation as resolved.
Show resolved Hide resolved
service["environment"] = self.gen_environment()
# For scheduler processes, add environment variables if
# the scheduler should run in the unit, disable it otherwise.
if service_name.endswith(SCHEDULER_SUFFIX):
javierdelapuente marked this conversation as resolved.
Show resolved Hide resolved
if self._workload_config.should_run_scheduler():
service["environment"] = self.gen_environment()
else:
service["startup"] = "disabled"

return ops.pebble.LayerDict(services=services)


Expand Down
1 change: 1 addition & 0 deletions paas_app_charmer/fastapi/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def _workload_config(self) -> WorkloadConfig:
log_files=[],
metrics_target=f"*:{framework_config.metrics_port}",
metrics_path=framework_config.metrics_path,
unit_name=self.unit.name,
)

def get_cos_dir(self) -> str:
Expand Down
1 change: 1 addition & 0 deletions paas_app_charmer/go/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def _workload_config(self) -> WorkloadConfig:
log_files=[],
metrics_target=f"*:{framework_config.metrics_port}",
metrics_path=framework_config.metrics_path,
unit_name=self.unit.name,
)

def get_cos_dir(self) -> str:
Expand Down
20 changes: 20 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,26 @@ async def deploy_postgres_fixture(ops_test: OpsTest, model: Model):
return await model.deploy("postgresql-k8s", channel="14/stable", revision=300, trust=True)


@pytest_asyncio.fixture(scope="module", name="redis_k8s_app")
async def deploy_redisk8s_fixture(ops_test: OpsTest, model: Model):
"""Deploy Redis k8s charm."""
redis_app = await model.deploy("redis-k8s", channel="edge")
await model.wait_for_idle(apps=[redis_app.name], status="active")
return redis_app


@pytest_asyncio.fixture(scope="function", name="integrate_redis_k8s_flask")
async def integrate_redis_k8s_flask_fixture(
ops_test: OpsTest, model: Model, flask_app: Application, redis_k8s_app: Application
):
"""Integrate redis_k8s with flask apps."""
relation = await model.integrate(flask_app.name, redis_k8s_app.name)
await model.wait_for_idle(apps=[redis_k8s_app.name], status="active")
yield relation
await flask_app.destroy_relation("redis", f"{redis_k8s_app.name}")
await model.wait_for_idle()


@pytest_asyncio.fixture
def run_action(ops_test: OpsTest):
async def _run_action(application_name, action_name, **params):
Expand Down
70 changes: 70 additions & 0 deletions tests/integration/flask/test_workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Integration tests for Flask workers and schedulers."""

import asyncio
import logging
import time

import pytest
import requests
from juju.application import Application
from juju.model import Model
from juju.utils import block_until
from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)


@pytest.mark.parametrize(
"num_units",
[1, 3],
)
@pytest.mark.usefixtures("integrate_redis_k8s_flask")
async def test_workers_and_scheduler_services(
ops_test: OpsTest, model: Model, flask_app: Application, get_unit_ips, num_units: int
):
"""
arrange: Flask and redis deployed and integrated.
act: Scale the app to the desired number of units.
assert: There should be only one scheduler and as many workers as units. That will
be checked because the scheduler is constantly sending tasks with its hostname
to the workers, and the workers will put its own hostname and the schedulers
hostname in Redis sets. Those sets are checked through the Flask app,
that queries Redis.
"""
await flask_app.scale(num_units)
await model.wait_for_idle(apps=[flask_app.name], status="active")

# the flask unit is not important. Take the first one
flask_unit_ip = (await get_unit_ips(flask_app.name))[0]

def check_correct_celery_stats(num_schedulers, num_workers):
"""Check that the expected number of workers and schedulers is right."""
response = requests.get(f"http://{flask_unit_ip}:8000/redis/celery_stats", timeout=5)
assert response.status_code == 200
data = response.json()
logger.info(
"check_correct_celery_stats. Expected schedulers: %d, expected workers %d. Result %s",
num_schedulers,
num_workers,
data,
)
return len(data["workers"]) == num_workers and len(data["schedulers"]) == num_schedulers

# clean the current celery stats
response = requests.get(f"http://{flask_unit_ip}:8000/redis/clear_celery_stats", timeout=5)
assert response.status_code == 200
assert "SUCCESS" == response.text

# enough time for all the schedulers to send messages
time.sleep(10)
try:
await block_until(
lambda: check_correct_celery_stats(num_schedulers=1, num_workers=num_units),
timeout=60,
wait_period=1,
)
except asyncio.TimeoutError:
assert False, "Failed to get 2 workers and 1 scheduler"
2 changes: 1 addition & 1 deletion tests/unit/django/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_django_config(harness: Harness, config: dict, env: dict) -> None:
database_requirers={},
)
webserver_config = WebserverConfig.from_charm_config(harness.charm.config)
workload_config = create_workload_config(framework_name="django")
workload_config = create_workload_config(framework_name="django", unit_name="django/0")
webserver = GunicornWebserver(
webserver_config=webserver_config,
workload_config=workload_config,
Expand Down
Loading
Loading