Skip to content

Enable dispatcherd under feature flag 1st iteration - AAP-46009 #1308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
af14e40
wip: added dependency and initial conf (#1281)
Alex-Izquierdo May 6, 2025
ba958a2
wip: fix initial conf (#1282)
Alex-Izquierdo May 6, 2025
4c74f3c
wip: move worker settings to settings file (#1285)
Alex-Izquierdo May 7, 2025
94411c3
wip: upd poetry, add dispatcher to analitics and orchestrator (#1286)
Alex-Izquierdo May 8, 2025
99a5a8e
wip: initial implemention finished, pending of real tests (#1288)
Alex-Izquierdo May 8, 2025
b1cd6bb
wip projects, moved hazmat (#1289)
Alex-Izquierdo May 8, 2025
36f54a6
wip: project import fixed, pending project sync. DAB upgraded (#1290)
Alex-Izquierdo May 8, 2025
14f9648
wip: cache and simplify getting feature flag (#1291)
bzwei May 9, 2025
fb6b14e
chore: rebase from main, solving conflicts with new feature flags
Alex-Izquierdo May 13, 2025
69b0003
wip: upgrade dispatcher, projects and activations working. (#1296)
Alex-Izquierdo May 13, 2025
73f2b88
rebase from main
Alex-Izquierdo May 15, 2025
e39469c
fix: test_orchestrator (#1303)
bzwei May 16, 2025
22f2332
fix test_tasking (#1302)
bzwei May 16, 2025
bc90e58
fix: wsapi test_consumer (#1301)
bzwei May 16, 2025
ebf3277
Fix integration tests (#1300)
bzwei May 16, 2025
41246e7
test: fix job uniqueness test (#1299)
Alex-Izquierdo May 16, 2025
a72e7b9
rebase from main
Alex-Izquierdo May 16, 2025
11ceefc
ci: fix isort (#1305)
Alex-Izquierdo May 16, 2025
40d7956
test: fix test_manager tests (#1304)
Alex-Izquierdo May 16, 2025
e82c7cf
fix: test_orchestrator (#1309)
bzwei May 16, 2025
b1e2309
fix: linting (#1310)
bzwei May 16, 2025
a29e480
chore: remove metrics extra and rename hazmat (#1312)
Alex-Izquierdo May 19, 2025
887e5d9
rebase from main
Alex-Izquierdo May 19, 2025
076e35e
chore: followup hazmat renaming, missing ref (#1313)
Alex-Izquierdo May 19, 2025
9e7caee
fix command wrapper tests (#1314)
Alex-Izquierdo May 19, 2025
d5ab1ea
remove unnecessary decorator for rq analytics (#1317)
Alex-Izquierdo May 21, 2025
accfe71
rebase from main
Alex-Izquierdo May 21, 2025
ba58e7c
upgrade dispatcherd and poetry
Alex-Izquierdo May 22, 2025
be870c4
rebase from main
Alex-Izquierdo May 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ Taskfile.yaml
venv/
.coverage*
coverage.xml
test-results.xml
test-results.xml

# Dispatcherd development
dispatcherd/
176 changes: 104 additions & 72 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ cryptography = ">=42,<43"
kubernetes = "26.1.*"
podman = "5.4.*"
rq-scheduler = "^0.10"
django-ansible-base = { git = "https://github.com/ansible/django-ansible-base.git", tag = "2025.3.7", extras = [
django-ansible-base = { git = "https://github.com/ansible/django-ansible-base.git", tag = "2025.5.8", extras = [
"channel-auth",
"rbac",
"redis-client",
Expand All @@ -70,6 +70,8 @@ validators = "^0.34.0"
django-flags = "^5.0.13"
insights-analytics-collector = "^0.3.2"
distro = "^1.9.0"
dispatcherd = { version = "v2025.05.19", extras = ["pg_notify"] }


[tool.poetry.group.test.dependencies]
pytest = "*"
Expand Down
4 changes: 2 additions & 2 deletions src/aap_eda/api/views/eda_credential.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def partial_update(self, request, pk):
eda_credential.refresh_from_db()
new_interval = get_analytics_interval_if_exist(eda_credential)
if new_interval != old_interval:
reschedule_gather_analytics.delay()
reschedule_gather_analytics()
return Response(
serializers.EdaCredentialSerializer(eda_credential).data,
)
Expand Down Expand Up @@ -307,7 +307,7 @@ def _create_eda_credential(self, request, data):
)

if get_analytics_interval_if_exist(response) > 0:
reschedule_gather_analytics.delay()
reschedule_gather_analytics()

return Response(
serializers.EdaCredentialSerializer(response).data,
Expand Down
4 changes: 4 additions & 0 deletions src/aap_eda/api/views/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from aap_eda.api import exceptions as api_exc
from aap_eda.core import tasking
from aap_eda.settings import features


# TODO: need revisit from cuwater
Expand Down Expand Up @@ -147,5 +148,8 @@ def redis_is_available(
self,
message: Optional[str] = "Redis is required but unavailable.",
):
# dispatcherd does not need redis
if features.DISPATCHERD:
return True
if tasking.is_redis_failed():
raise api_exc.Conflict(message)
19 changes: 12 additions & 7 deletions src/aap_eda/api/views/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,14 @@ def create(self, request):
request.user, serializer.instance
)

job = tasks.import_project.delay(project_id=project.id)
job_id = tasks.import_project(project.id)
except redis.ConnectionError:
return RedisDependencyMixin.redis_unavailable_response()

# Atomically update `import_task_id` field only.
models.Project.objects.filter(pk=project.id).update(
import_task_id=job.id
import_task_id=job_id
)
project.import_task_id = job.id
project.import_task_id = job_id
serializer = self.get_serializer(project)
headers = self.get_success_headers(serializer.data)
logger.info(
Expand Down Expand Up @@ -271,16 +270,22 @@ def sync(self, request, pk):
detail="Project import or sync is already running."
)

# check if redis is available
self.redis_is_available()

try:
job = tasks.sync_project.delay(project_id=project.id)
job_id = tasks.sync_project(project.id)
except redis.ConnectionError:
return RedisDependencyMixin.redis_unavailable_response()

project.import_state = models.Project.ImportState.PENDING
project.import_task_id = job.id
project.import_error = None

# job_id can be none if there is already a task running.
# this is unlikely since we check the state above
# but safety first
if job_id:
project.import_task_id = job_id

project.save()

logger.info(
Expand Down
7 changes: 6 additions & 1 deletion src/aap_eda/core/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import logging
import sys

from dispatcherd.config import setup as dispatcher_setup
from django.apps import AppConfig
from django.conf import settings

logger = logging.getLogger(__name__)

Expand All @@ -29,8 +31,11 @@ def ready(self):
from aap_eda.api.views import dab_decorate # noqa: F401

# Run the startup logging for rq worker

# WARNING: rqworker can run rq workers or dispatcherd workers
if "rqworker" in sys.argv:
from aap_eda.utils.logging import startup_logging

startup_logging(logger)

# Enable default dispatcher config. Workers may override this
dispatcher_setup(settings.DISPATCHERD_DEFAULT_SETTINGS)
34 changes: 34 additions & 0 deletions src/aap_eda/core/dispatcherd_pre_fork.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2025 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import django
from django.core.cache import cache
from django.db import connection

"""This module is an optimization for dispatcherd workers

This sets up Django pre-fork, which must be implemented as a module to run
on-import for compatibility with multiprocessing forkserver.
This should never be imported by other modules, which is why it is called
hazmat.
"""


django.setup()

# connections may or may not be open, but
# before forking, all connections should be closed

cache.close()
connection.close()
35 changes: 29 additions & 6 deletions src/aap_eda/core/management/commands/rqworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@

"""Wrapper for rqworker command."""

import logging

from dispatcherd import run_service as run_dispatcherd_service
from dispatcherd.config import setup as dispatcherd_setup
from django.conf import settings
from django.core.management.base import BaseCommand, CommandParser
from django_rq.management.commands import rqworker

from aap_eda.settings import features

logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""Wrapper for rqworker command.
Expand All @@ -35,12 +41,29 @@ def add_arguments(self, parser: CommandParser) -> None:

def handle(self, *args, **options) -> None:
if features.DISPATCHERD:
self.stderr.write(
if "worker_class" not in options:
Copy link
Contributor

@mkanoor mkanoor May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Alex-Izquierdo Can this code block be moved into something like handle_dispatcherd

self.style.ERROR("Missing required argument: --worker-class")
raise SystemExit(1)

# Use rqworker expected args to determine worker type
if "ActivationWorker" in options["worker_class"]:
dispatcherd_setup(
settings.DISPATCHERD_ACTIVATION_WORKER_SETTINGS,
)

elif "DefaultWorker" in options["worker_class"]:
dispatcherd_setup(settings.DISPATCHERD_DEFAULT_WORKER_SETTINGS)
else:
self.style.ERROR(
"DISPATCHERD feature not implemented yet. "
f"Please disable {settings.DISPATCHERD_FEATURE_FLAG_NAME} "
"in your settings.",
"Invalid worker class. "
"Please use either ActivationWorker or DefaultWorker."
)
)
raise SystemExit(1)
raise SystemExit(1)

logger.info("Starting worker with dispatcherd.")
run_dispatcherd_service()
return None

# run rqworker command if dispatcherd is not enabled
logger.info("Starting worker with rqworker.")
return rqworker.Command.handle(self, *args, **options)
Loading