Skip to content
This repository has been archived by the owner on Oct 16, 2024. It is now read-only.

Background tasks #44

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
extra-arguments: -x --localstack-address 172.17.0.1
pre-run-script: localstack-installation.sh
charmcraft-channel: latest/edge
modules: '["test_charm.py", "test_cos.py", "test_database.py", "test_db_migration.py", "test_django.py", "test_django_integrations.py", "test_fastapi.py", "test_go.py", "test_integrations.py", "test_proxy.py"]'
modules: '["test_charm.py", "test_cos.py", "test_database.py", "test_db_migration.py", "test_django.py", "test_django_integrations.py", "test_fastapi.py", "test_go.py", "test_integrations.py", "test_proxy.py", "test_workers.py"]'
rockcraft-channel: latest/edge
juju-channel: ${{ matrix.juju-version }}
channel: 1.29-strict/stable
109 changes: 106 additions & 3 deletions examples/flask/test_rock/app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

import logging
import os
import socket
import time
import urllib.parse
from urllib.parse import urlparse
Expand All @@ -16,11 +18,86 @@
import pymysql
import pymysql.cursors
import redis
from celery import Celery, Task
from flask import Flask, g, jsonify, request


def hostname():
"""Get the hostname of the current machine."""
return socket.gethostbyname(socket.gethostname())


def celery_init_app(app: Flask, broker_url: str) -> Celery:
"""Initialise celery using the redis connection string.

See https://flask.palletsprojects.com/en/3.0.x/patterns/celery/#integrate-celery-with-flask.
"""

class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)

celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.set_default()
app.extensions["celery"] = celery_app
app.config.from_mapping(
CELERY=dict(
broker_url=broker_url,
result_backend=broker_url,
task_ignore_result=True,
),
)
celery_app.config_from_object(app.config["CELERY"])
return celery_app


app = Flask(__name__)
app.config.from_prefixed_env()

broker_url = os.environ.get("REDIS_DB_CONNECT_STRING")
javierdelapuente marked this conversation as resolved.
Show resolved Hide resolved
# Configure Celery only if Redis is configured
if broker_url:
celery_app = celery_init_app(app, broker_url)
redis_client = redis.Redis.from_url(broker_url)

@celery_app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
"""Set up periodic tasks in the scheduler."""
try:
# This will only have an effect in the beat scheduler.
sender.add_periodic_task(0.5, scheduled_task.s(hostname()), name="every 0.5s")
except NameError as e:
logging.exception("Failed to configure the periodic task")

@celery_app.task
def scheduled_task(scheduler_hostname):
"""Function to run a schedule task in a worker.

The worker that will run this task will add the scheduler hostname argument
to the "schedulers" set in Redis, and the worker's hostname to the "workers"
set in Redis.
"""
worker_hostname = hostname()
logging.info(
"scheduler host received %s in worker host %s", scheduler_hostname, worker_hostname
)
redis_client.sadd("schedulers", scheduler_hostname)
redis_client.sadd("workers", worker_hostname)
logging.info("schedulers: %s", redis_client.smembers("schedulers"))
logging.info("workers: %s", redis_client.smembers("workers"))
# The goal is to have all workers busy in all processes.
# For that it maybe necessary to exhaust all workers, but not to get the pending tasks
# too big, so all schedulers can manage to run their scheduled tasks.
# Celery prefetches tasks, and if they cannot be run they are put in reserved.
# If all processes have tasks in reserved, this task will finish immediately to not make
# queues any longer.
inspect_obj = celery_app.control.inspect()
reserved_sizes = [len(tasks) for tasks in inspect_obj.reserved().values()]
logging.info("number of reserved tasks %s", reserved_sizes)
delay = 0 if min(reserved_sizes) > 0 else 5
time.sleep(delay)


def get_mysql_database():
"""Get the mysql db connection."""
Expand Down Expand Up @@ -213,16 +290,42 @@ def mongodb_status():

@app.route("/redis/status")
def redis_status():
"""Mongodb status endpoint."""
"""Redis status endpoint."""
if database := get_redis_database():
try:
database.set("foo", "bar")
return "SUCCESS"
except pymongo.errors.PyMongoError:
pass
except redis.exceptions.RedisError:
logging.exception("Error querying redis")
return "FAIL"


@app.route("/redis/clear_celery_stats")
def redis_celery_clear_stats():
"""Reset Redis statistics about workers and schedulers."""
if database := get_redis_database():
try:
database.delete("workers")
database.delete("schedulers")
return "SUCCESS"
except redis.exceptions.RedisError:
logging.exception("Error querying redis")
return "FAIL", 500


@app.route("/redis/celery_stats")
def redis_celery_stats():
"""Read Redis statistics about workers and schedulers."""
if database := get_redis_database():
try:
worker_set = [str(host) for host in database.smembers("workers")]
beat_set = [str(host) for host in database.smembers("schedulers")]
return jsonify({"workers": worker_set, "schedulers": beat_set})
except redis.exceptions.RedisError:
logging.exception("Error querying redis")
return "FAIL", 500


@app.route("/rabbitmq/send")
def rabbitmq_send():
"""Send a message to "charm" queue."""
Expand Down
1 change: 1 addition & 0 deletions examples/flask/test_rock/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ pymongo
redis[hiredis]
boto3
pika
celery
16 changes: 16 additions & 0 deletions examples/flask/test_rock/rockcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,19 @@ platforms:

extensions:
- flask-framework

services:
celery-worker:
override: replace
# redis is not mandatory in the charm. We do not want the charm to fail immediately, so the sleep
command: bash -c "sleep 5; celery -A app:celery_app worker -c 2 --loglevel DEBUG"
startup: enabled
user: _daemon_
working-dir: /flask/app
celery-beat-scheduler:
override: replace
# redis is not mandatory in the charm. We do not want the charm to fail immediately, so the sleep
command: bash -c "sleep 5; celery -A app:celery_app beat --loglevel DEBUG -s /tmp/celerybeat-schedule"
startup: enabled
user: _daemon_
working-dir: /flask/app
4 changes: 3 additions & 1 deletion paas_app_charmer/_gunicorn/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ class GunicornBase(PaasCharm):
@property
def _workload_config(self) -> WorkloadConfig:
"""Return a WorkloadConfig instance."""
return create_workload_config(self._framework_name)
return create_workload_config(
framework_name=self._framework_name, unit_name=self.unit.name
)

def _create_app(self) -> App:
"""Build an App instance for the Gunicorn based charm.
Expand Down
4 changes: 3 additions & 1 deletion paas_app_charmer/_gunicorn/workload_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
APPLICATION_ERROR_LOG_FILE_FMT = "/var/log/{framework}/error.log"


def create_workload_config(framework_name: str) -> WorkloadConfig:
def create_workload_config(framework_name: str, unit_name: str) -> WorkloadConfig:
"""Create an WorkloadConfig for Gunicorn.

Args:
framework_name: framework name.
unit_name: name of the app unit.

Returns:
new WorkloadConfig
Expand All @@ -35,4 +36,5 @@ def create_workload_config(framework_name: str) -> WorkloadConfig:
pathlib.Path(str.format(APPLICATION_ERROR_LOG_FILE_FMT, framework=framework_name)),
],
metrics_target="*:9102",
unit_name=unit_name,
)
1 change: 1 addition & 0 deletions paas_app_charmer/_gunicorn/wsgi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class WsgiApp(App):

def __init__( # pylint: disable=too-many-arguments
self,
*,
container: ops.Container,
charm_state: CharmState,
workload_config: WorkloadConfig,
Expand Down
28 changes: 28 additions & 0 deletions paas_app_charmer/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

logger = logging.getLogger(__name__)

WORKER_SUFFIX = "-worker"
SCHEDULER_SUFFIX = "-scheduler"


@dataclass(kw_only=True)
class WorkloadConfig: # pylint: disable=too-many-instance-attributes
Expand All @@ -37,6 +40,7 @@ class WorkloadConfig: # pylint: disable=too-many-instance-attributes
log_files: list of files to monitor.
metrics_target: target to scrape for metrics.
metrics_path: path to scrape for metrics.
unit_name: Name of the unit. Needed to know if schedulers should run here.
"""

framework: str
Expand All @@ -51,13 +55,24 @@ class WorkloadConfig: # pylint: disable=too-many-instance-attributes
log_files: List[pathlib.Path]
metrics_target: str | None = None
metrics_path: str | None = "/metrics"
unit_name: str

def should_run_scheduler(self) -> bool:
"""Return if the unit should run scheduler processes.

Return:
True if the unit should run scheduler processes, False otherwise.
"""
unit_id = self.unit_name.split("/")[1]
return unit_id == "0"


class App:
"""Base class for the application manager."""

def __init__( # pylint: disable=too-many-arguments
self,
*,
container: ops.Container,
charm_state: CharmState,
workload_config: WorkloadConfig,
Expand Down Expand Up @@ -192,6 +207,19 @@ def _app_layer(self) -> ops.pebble.LayerDict:
services[self._workload_config.service_name]["override"] = "replace"
services[self._workload_config.service_name]["environment"] = self.gen_environment()

for service_name, service in services.items():
normalised_service_name = service_name.lower()
# Add environment variables to all worker processes.
if normalised_service_name.endswith(WORKER_SUFFIX):
service["environment"] = self.gen_environment()
# For scheduler processes, add environment variables if
# the scheduler should run in the unit, disable it otherwise.
if normalised_service_name.endswith(SCHEDULER_SUFFIX):
if self._workload_config.should_run_scheduler():
service["environment"] = self.gen_environment()
else:
service["startup"] = "disabled"

return ops.pebble.LayerDict(services=services)


Expand Down
2 changes: 1 addition & 1 deletion paas_app_charmer/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def __init__(self, framework: ops.Framework, framework_name: str) -> None:
)

self._observability = Observability(
self,
charm=self,
log_files=self._workload_config.log_files,
container_name=self._workload_config.container_name,
cos_dir=self.get_cos_dir(),
Expand Down
2 changes: 2 additions & 0 deletions paas_app_charmer/charm_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__( # pylint: disable=too-many-arguments
@classmethod
def from_charm( # pylint: disable=too-many-arguments
cls,
*,
charm: ops.CharmBase,
framework: str,
framework_config: BaseModel,
Expand Down Expand Up @@ -221,6 +222,7 @@ class IntegrationsState:
@classmethod
def build( # pylint: disable=too-many-arguments
cls,
*,
redis_uri: str | None,
database_requirers: dict[str, DatabaseRequires],
s3_connection_info: dict[str, str] | None,
Expand Down
1 change: 1 addition & 0 deletions paas_app_charmer/database_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def _set_status(self, status: DatabaseMigrationStatus) -> None:
# pylint: disable=too-many-arguments
def run(
self,
*,
command: list[str],
environment: dict[str, str],
working_dir: pathlib.Path,
Expand Down
1 change: 1 addition & 0 deletions paas_app_charmer/fastapi/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def _workload_config(self) -> WorkloadConfig:
log_files=[],
metrics_target=f"*:{framework_config.metrics_port}",
metrics_path=framework_config.metrics_path,
unit_name=self.unit.name,
)

def get_cos_dir(self) -> str:
Expand Down
1 change: 1 addition & 0 deletions paas_app_charmer/go/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def _workload_config(self) -> WorkloadConfig:
log_files=[],
metrics_target=f"*:{framework_config.metrics_port}",
metrics_path=framework_config.metrics_path,
unit_name=self.unit.name,
)

def get_cos_dir(self) -> str:
Expand Down
1 change: 1 addition & 0 deletions paas_app_charmer/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Observability(ops.Object):

def __init__( # pylint: disable=too-many-arguments
self,
*,
charm: ops.CharmBase,
container_name: str,
cos_dir: str,
Expand Down
20 changes: 20 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,26 @@ async def deploy_postgres_fixture(ops_test: OpsTest, model: Model):
return await model.deploy("postgresql-k8s", channel="14/stable", revision=300, trust=True)


@pytest_asyncio.fixture(scope="module", name="redis_k8s_app")
async def deploy_redisk8s_fixture(ops_test: OpsTest, model: Model):
"""Deploy Redis k8s charm."""
redis_app = await model.deploy("redis-k8s", channel="edge")
await model.wait_for_idle(apps=[redis_app.name], status="active")
return redis_app


@pytest_asyncio.fixture(scope="function", name="integrate_redis_k8s_flask")
async def integrate_redis_k8s_flask_fixture(
ops_test: OpsTest, model: Model, flask_app: Application, redis_k8s_app: Application
):
"""Integrate redis_k8s with flask apps."""
relation = await model.integrate(flask_app.name, redis_k8s_app.name)
await model.wait_for_idle(apps=[redis_k8s_app.name], status="active")
yield relation
await flask_app.destroy_relation("redis", f"{redis_k8s_app.name}")
await model.wait_for_idle()


@pytest_asyncio.fixture
def run_action(ops_test: OpsTest):
async def _run_action(application_name, action_name, **params):
Expand Down
Loading
Loading