Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolve #305 abstract models #314

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
22c7c76
[#305] Added abstract models.
jose-padin May 4, 2022
c49ad2e
[#305] `ChordCounter` moved from `abstract` module to `generic` module.
jose-padin May 4, 2022
25496f8
Issue 305: abstract models
jose-padin May 12, 2022
97661fd
Update django_celery_results/models/__init__.py
auvipy Jun 7, 2022
a5554d7
[#305]: Improving abstract models implementation.
diegocastrum Aug 10, 2022
dbeb9a1
[#305]: `extend_task_props_callback` relocated.
diegocastrum Aug 10, 2022
7565b36
[#305]: Added a default callable to `get_callback_function`
diegocastrum Aug 15, 2022
1dfa0fc
Added newline to the end of
diegocastrum Aug 16, 2022
14928be
[#305] Added a sanity check to `task_props_extension`
diegocastrum Aug 16, 2022
b70c44f
Fixed a NoneType error when the callback is not defined in project se…
diegocastrum Aug 16, 2022
cedccdf
[#305] Added documentation about this feature.
diegocastrum Oct 13, 2022
0fc0b48
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 17, 2022
242edab
Fixed a "wrong" description for the `ImproperlyConfigured` exception …
diegocastrum Aug 17, 2022
7ddefa8
[#305] Fixed some pre-commit failures
diegocastrum Oct 13, 2022
a5681a6
Update docs/extending_task_results.rst
diegocastrum Oct 17, 2022
48f278a
Update docs/extending_task_results.rst
diegocastrum Oct 17, 2022
fb5ea91
Update docs/extending_task_results.rst
diegocastrum Oct 17, 2022
10a5c3a
feat(models): add `AbstractChordCounter` and update `ChordCounter`
diegocastrum Oct 21, 2024
d6b0d19
fix: refactor helper functions to avoid circular dependencies
diegocastrum Oct 29, 2024
946ff07
fix: undefined name 'ChordCounter' and minor fixes
diegocastrum Oct 29, 2024
39081b3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 29, 2024
cbdb7a4
fix: 'TypeError' introduced in previous commit
diegocastrum Oct 31, 2024
29cc821
fix: include 'extra_fields' conditionally
diegocastrum Oct 31, 2024
c508179
fix: 'get_task_props_extensions()' missing 1 required argument
diegocastrum Nov 1, 2024
101a24f
fix: TypeError introducedn on prev commit on 'AbstractChordCounter'
diegocastrum Nov 5, 2024
af843ba
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 5, 2024
90310b8
fix: ImportError introduced in previous commit in 'abstract.py'
diegocastrum Nov 6, 2024
e2a2bc0
style: Reformat import statements for better readability in 'database…
diegocastrum Nov 8, 2024
af5a226
fix: Update configuration for isort and black to enforce line length …
diegocastrum Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ cover/
.cache/
htmlcov/
coverage.xml
.vscode
5 changes: 4 additions & 1 deletion django_celery_results/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
ALLOW_EDITS = False
pass

from .models import GroupResult, TaskResult
from .models.helpers import groupresult_model, taskresult_model

GroupResult = groupresult_model()
TaskResult = taskresult_model()


class TaskResultAdmin(admin.ModelAdmin):
Expand Down
37 changes: 25 additions & 12 deletions django_celery_results/backends/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
from django.db.utils import InterfaceError
from kombu.exceptions import DecodeError

from ..models import ChordCounter
from ..models import GroupResult as GroupResultModel
from ..models import TaskResult
from ..models.helpers import (
chordcounter_model,
groupresult_model,
taskresult_model,
)
from ..settings import get_task_props_extension

EXCEPTIONS_TO_CATCH = (InterfaceError,)

Expand All @@ -30,8 +33,9 @@
class DatabaseBackend(BaseDictBackend):
"""The Django database backend, using models to store task state."""

TaskModel = TaskResult
GroupModel = GroupResultModel
TaskModel = taskresult_model()
GroupModel = groupresult_model()
ChordCounterModel = chordcounter_model()
subpolling_interval = 0.5

def exception_safe_to_retry(self, exc):
Expand Down Expand Up @@ -80,6 +84,14 @@ def _get_extended_properties(self, request, traceback):
# task protocol 1
task_kwargs = getattr(request, 'kwargs', None)

# TODO: We assume that task protocol 1 could be always in use. :/
extra_fields = get_task_props_extension(
request,
getattr(request, 'kwargs', None)
)
if extra_fields:
extended_props.update({"extra_fields": extra_fields})

# Encode input arguments
if task_args is not None:
_, _, task_args = self.encode_content(task_args)
Expand Down Expand Up @@ -141,9 +153,8 @@ def _store_result(
'using': using,
}

task_props.update(
self._get_extended_properties(request, traceback)
)
task_props.update(self._get_extended_properties(request, traceback))
task_props.update(get_task_props_extension(request, dict(task_props)))

if status == states.STARTED:
task_props['date_started'] = Now()
Expand Down Expand Up @@ -240,7 +251,7 @@ def apply_chord(self, header_result_args, body, **kwargs):
results = [r.as_tuple() for r in header_result]
chord_size = body.get("chord_size", None) or len(results)
data = json.dumps(results)
ChordCounter.objects.create(
self.ChordCounterModel.objects.create(
group_id=header_result.id, sub_tasks=data, count=chord_size
)

Expand All @@ -250,17 +261,19 @@ def on_chord_part_return(self, request, state, result, **kwargs):
if not gid or not tid:
return
call_callback = False
with transaction.atomic(using=router.db_for_write(ChordCounter)):
with transaction.atomic(
using=router.db_for_write(self.ChordCounterModel)
):
# We need to know if `count` hits 0.
# wrap the update in a transaction
# with a `select_for_update` lock to prevent race conditions.
# SELECT FOR UPDATE is not supported on all databases
try:
chord_counter = (
ChordCounter.objects.select_for_update()
self.ChordCounterModel.objects.select_for_update()
.get(group_id=gid)
)
except ChordCounter.DoesNotExist:
except self.ChordCounterModel.DoesNotExist:
logger.warning("Can't find ChordCounter for Group %s", gid)
return
chord_counter.count -= 1
Expand Down
9 changes: 7 additions & 2 deletions django_celery_results/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def store_result(self, content_type, content_encoding,
traceback=None, meta=None,
periodic_task_name=None,
task_name=None, task_args=None, task_kwargs=None,
worker=None, using=None, **kwargs):
worker=None, using=None, extra_fields=None, **kwargs):
"""Store the result and status of a task.

Arguments:
Expand All @@ -140,6 +140,7 @@ def store_result(self, content_type, content_encoding,
exception (only passed if the task failed).
meta (str): Serialized result meta data (this contains e.g.
children).
extra_fields (dict, optional): Extra (model) fields to store.

Keyword Arguments:
exception_retry_count (int): How many times to retry by
Expand All @@ -159,8 +160,12 @@ def store_result(self, content_type, content_encoding,
'task_name': task_name,
'task_args': task_args,
'task_kwargs': task_kwargs,
'worker': worker
'worker': worker,
}

if extra_fields is not None:
fields.update(extra_fields)

if 'date_started' in kwargs:
fields['date_started'] = kwargs['date_started']

Expand Down
3 changes: 3 additions & 0 deletions django_celery_results/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .generic import ChordCounter, GroupResult, TaskResult

__all__ = ["ChordCounter", "GroupResult", "TaskResult"]
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Database models."""
"""Abstract models."""

import json

Expand All @@ -9,14 +9,14 @@
from django.db import models
from django.utils.translation import gettext_lazy as _

from . import managers
from .. import managers

ALL_STATES = sorted(states.ALL_STATES)
TASK_STATE_CHOICES = sorted(zip(ALL_STATES, ALL_STATES))


class TaskResult(models.Model):
"""Task result/status."""
class AbstractTaskResult(models.Model):
"""Abstract Task result/status."""

task_id = models.CharField(
max_length=getattr(
Expand Down Expand Up @@ -98,8 +98,8 @@ class TaskResult(models.Model):
class Meta:
"""Table information."""

abstract = True
ordering = ['-date_done']

verbose_name = _('task result')
verbose_name_plural = _('task results')

Expand Down Expand Up @@ -135,14 +135,15 @@ def __str__(self):
return '<Task: {0.task_id} ({0.status})>'.format(self)


class ChordCounter(models.Model):
"""Chord synchronisation."""
class AbstractChordCounter(models.Model):
"""Abstract Chord synchronisation."""

group_id = models.CharField(
max_length=getattr(
settings,
"DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH",
255),
255
),
unique=True,
verbose_name=_("Group ID"),
help_text=_("Celery ID for the Chord header group"),
Expand All @@ -160,12 +161,17 @@ class ChordCounter(models.Model):
)
)

class Meta:
"""Table information."""

abstract = True

def group_result(self, app=None):
"""Return the :class:`celery.result.GroupResult` of self.
"""Return the GroupResult of self.

Arguments:
app (celery.app.base.Celery): app instance to create the
:class:`celery.result.GroupResult` with.
---------
app (Celery): app instance to create the GroupResult with.

"""
return CeleryGroupResult(
Expand All @@ -176,8 +182,8 @@ def group_result(self, app=None):
)


class GroupResult(models.Model):
"""Task Group result/status."""
class AbstractGroupResult(models.Model):
"""Abstract Task Group result/status."""

group_id = models.CharField(
max_length=getattr(
Expand Down Expand Up @@ -230,10 +236,10 @@ def __str__(self):
class Meta:
"""Table information."""

ordering = ['-date_done']

abstract = True
verbose_name = _('group result')
verbose_name_plural = _('group results')
ordering = ['-date_done']

# Explicit names to solve https://code.djangoproject.com/ticket/33483
indexes = [
Expand Down
37 changes: 37 additions & 0 deletions django_celery_results/models/generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Database models."""

from django_celery_results.models.abstract import (
AbstractChordCounter,
AbstractGroupResult,
AbstractTaskResult,
)


class TaskResult(AbstractTaskResult):
"""Task result/status."""

class Meta(AbstractTaskResult.Meta):
"""Table information."""

abstract = False
app_label = "django_celery_results"


class ChordCounter(AbstractChordCounter):
"""Chord synchronisation."""

class Meta(AbstractChordCounter.Meta):
"""Table information."""

abstract = False
app_label = "django_celery_results"


class GroupResult(AbstractGroupResult):
"""Task Group result/status."""

class Meta(AbstractGroupResult.Meta):
"""Table information."""

abstract = False
app_label = "django_celery_results"
72 changes: 72 additions & 0 deletions django_celery_results/models/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from django.apps import apps
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured

from .generic import ChordCounter, GroupResult, TaskResult


def taskresult_model():
"""Return the TaskResult model that is active in this project."""
if not hasattr(settings, 'CELERY_RESULTS_TASKRESULT_MODEL'):
return TaskResult

try:
return apps.get_model(
settings.CELERY_RESULTS_TASKRESULT_MODEL
)
except ValueError:
raise ImproperlyConfigured(
"CELERY_RESULTS_TASKRESULT_MODEL must be of the form "
"'app_label.model_name'"
)
except LookupError:
raise ImproperlyConfigured(
"CELERY_RESULTS_TASKRESULT_MODEL refers to model "
f"'{settings.CELERY_RESULTS_TASKRESULT_MODEL}' that has not "
"been installed"
)


def chordcounter_model():
"""Return the ChordCounter model that is active in this project."""

if not hasattr(settings, 'CELERY_RESULTS_CHORDCOUNTER_MODEL'):
return ChordCounter

try:
return apps.get_model(
settings.CELERY_RESULTS_CHORDCOUNTER_MODEL
)
except ValueError:
raise ImproperlyConfigured(
"CELERY_RESULTS_CHORDCOUNTER_MODEL must be of the form "
"'app_label.model_name'"
)
except LookupError:
raise ImproperlyConfigured(
"CELERY_RESULTS_CHORDCOUNTER_MODEL refers to model "
f"'{settings.CELERY_RESULTS_CHORDCOUNTER_MODEL}' that has not "
"been installed"
)


def groupresult_model():
"""Return the GroupResult model that is active in this project."""
if not hasattr(settings, 'CELERY_RESULTS_GROUPRESULT_MODEL'):
return GroupResult

try:
return apps.get_model(
settings.CELERY_RESULTS_GROUPRESULT_MODEL
)
except ValueError:
raise ImproperlyConfigured(
"CELERY_RESULTS_GROUPRESULT_MODEL must be of the form "
"'app_label.model_name'"
)
except LookupError:
raise ImproperlyConfigured(
"CELERY_RESULTS_GROUPRESULT_MODEL refers to model "
f"'{settings.CELERY_RESULTS_GROUPRESULT_MODEL}' that has not "
"been installed"
)
35 changes: 35 additions & 0 deletions django_celery_results/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from collections.abc import Mapping

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured


def get_callback_function(settings_name, default=None):
"""Return the callback function for the given settings name."""
callback = getattr(settings, settings_name, None)
if not callback:
return default

if not callable(callback):
raise ImproperlyConfigured(f"{settings_name} must be callable.")

return callback


extend_task_props_callback = get_callback_function(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding some sanity checks on the return value of the callback.
For example that it complies to the Mapping protocol.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, get_callback_function is quite generic, and could be used in the future for another purposes, returning different types of data. So far I can see, the sanity checks could be in _store_results() where extend_task_props_callback() is called. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm personally not a fan of this generic function, I'd err of the side of caution and make the callback handling as clean and explicit as possible so any errors we need to raise have a clear source.
But this is more of a code-style thing so maybe one of the maintainers (@auvipy) can comment.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point @AllexVeldman, what do you think @auvipy?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AllexVeldman & @auvipy I think I found a proper way to control the callback internally being able to check explicitly that return value.

I just created a new function called get_task_props_extension() into the settings module in charge to return an empty dict in case that the CELERY_RESULTS_EXTEND_TASK_PROPS_CALLBACK is undefined and otherwise will check that the return value complies with the Mapping protocol.

Let me know what you think!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

"CELERY_RESULTS_EXTEND_TASK_PROPS_CALLBACK"
)


def get_task_props_extension(request, task_props):
"""Extend the task properties with custom props to fill custom models."""
if not extend_task_props_callback:
return {}

task_props_extension = extend_task_props_callback(request, task_props) or {} # noqa E501
if not isinstance(task_props_extension, Mapping):
raise ImproperlyConfigured(
"CELERY_RESULTS_EXTEND_TASK_PROPS_CALLBACK must return a Mapping."
)

return task_props_extension
Loading
Loading