Skip to content

Commit

Permalink
This PR builds on the work in the initial PR to move business rules t…
Browse files Browse the repository at this point in the history
…o celery along with info learned deploying this.

Avoid filling the task queue with orchestration tasks and starving the workers.
===============================================================================

In the previous system there were about 3 layers of tasks, that orchestrated other tasks,
by using the .replace() API in each task.

Unfortunately it was possible for celery workers to become full of orchestration tasks
leaving no room for the business rule tasks at the bottom of the to actually run.

This PR attempts two mitigations:

1. Use celery workflows instead of .replace()

This PR builds a celery workflow in the check_workbasket using celery constructs such as chain and group.
In theory, since most of the work is done ahead of time the system should have more awareness of the task structure avoiding the issue of starvation.

2. Cancel existing workbasket checks when a new check is requested.

When check_workbasket is started, it will attempt to revoke existing check_workbasket tasks for the same workbasket.

Treat intermediate data structures as ephemeral
===============================================

A celery task may execute at any time, right now - or when a system comes up tomorrow, based on this assumption models such as TrackedModelCheck (which stores the result of a business rule check on a TrackedModel) are no longer passed to celery tasks by ID, instead all the information needed to receate the data is passed to the celery task, this means the system will still work even if developers delete these while it is running.

Reduce layers in business rule checking
=======================================

BusinessRuleChecker and LinkedModelsBusinessRuleChecker are now the only checkers, these now take BusinessRule instances, instead of being subclassed for each business rule.
While more parameters are passed when rules are checked a conceptual layer has been removed and the simplification is reflected with around 20 lines of code being removed from checks.py

Celery flower is now very easier to read
========================================
Due to the changes above, the output in celery flower should correspond more closely to a users intentions - ids of models.

Content Checksums
=================

Result caching now validates using checksums of the content, which should reduce the amount of checking the system needs to do.

When a workbasket has been published, it's content could invalidate some content in other unpublished workbaskets, by associating business rule checks with checksums of a models content, any models that do not clash can be skipped.

Model checksums (generated by `.content_hash()`) are not currently stored in the database (though it may be desirable to store them on TrackedModels, as it would provide an mechanism to address any content in the system).
The checksuming scheme is a combination of the type and a sha256 of the fields in `.copyable_fields` (which should represent the fields a user can edit, but not fields such as pk).
Blake3 was tested, as it provides a fast hashing algorithm, in practice it didn't provide much of a speedup over sha256.

PK ranges
=========

Occasionally workbaskets with many items may need to be checker (the initial workbasket has 9 million items).
Based on the observations that the ID column of the contained TrackedModels is mostly continguous, the system allows passing sequences of contiguous TrackedModels specified by tuples of (first_pk, last_pk).
This is relatively compact, suitable for passing over the network with celery and readable in Celery flower.

This also enables chunking of tasks - further enabled by specifying a maximum amount of items in each tuple.

On TrackedModelQueryset `.as_pk_intervals` and `.from_pk_intervals` are provided to go to and from this format.

Greets
======

This PR adapts changes and builds on the hard work done in the initial work to check the business rules with celery, thanks to Simon Worthington and the hard work of the other devs on the project.
  • Loading branch information
stuaxo committed Aug 3, 2022
1 parent e59fc8a commit 2a77ba1
Show file tree
Hide file tree
Showing 33 changed files with 1,530 additions and 895 deletions.
317 changes: 147 additions & 170 deletions checks/checks.py
Original file line number Diff line number Diff line change
@@ -1,226 +1,203 @@
from functools import cached_property
from typing import Collection
from typing import Dict
from typing import Iterator
import logging
from collections import defaultdict
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Type
from typing import TypeVar

from django.conf import settings

from checks.models import TrackedModelCheck
from checks.models import TransactionCheck
from common.business_rules import ALL_RULES
from common.business_rules import BusinessRule
from common.business_rules import BusinessRuleViolation
from common.models.trackedmodel import TrackedModel
from common.models import TrackedModel
from common.models import Transaction
from common.models.utils import get_current_transaction
from common.models.utils import override_current_transaction

CheckResult = Tuple[bool, Optional[str]]

logger = logging.getLogger(__name__)

Self = TypeVar("Self")
CheckResult = Tuple[bool, Optional[str]]


class Checker:
"""
A ``Checker`` is an object that knows how to perform a certain kind of check
against a model.
Checkers can be applied against a model. The logic of the checker will be
run and the result recorded as a ``TrackedModelCheck``.
"""

@cached_property
def name(self) -> str:
"""
The name string that on a per-model basis uniquely identifies the
checker.
The name should be deterministic (i.e. not rely on the current
environment, memory locations or random data) so that the system can
record the name in the database and later use it to work out whether
this check has been run. The name doesn't need to include any details
about the model.
By default this is the name of the class, but it can include any other
non-model data that is unique to the checker. For a more complex
example, see ``IndirectBusinessRuleChecker.name``.
"""
return type(self).__name__

@classmethod
def checkers_for(cls: Type[Self], model: TrackedModel) -> Collection[Self]:
def run_rule(
cls,
rule: BusinessRule,
transaction: Transaction,
model: TrackedModel,
) -> CheckResult:
"""
Returns instances of this ``Checker`` that should apply to the model.
Run a single business rule on a single model.
What checks apply to a model is sometimes data-dependent, so it is the
responsibility of the ``Checker`` class to tell the system what
instances of itself it would expect to run against the model. For an
example, see ``IndirectBusinessRuleChecker.checkers_for``.
:return CheckResult, a Tuple(rule_passed: str, violation_reason: Optional[str]).
"""
raise NotImplementedError()
logger.debug(f"run_rule %s %s %s", model, rule, transaction.pk)
try:
rule(transaction).validate(model)
logger.debug(f"%s [tx:%s] %s [passed]", model, rule, transaction.pk)
return True, None
except BusinessRuleViolation as violation:
reason = violation.args[0]
logger.debug(f"%s [tx:%s] %s [failed]", model, rule, transaction.pk, reason)
return False, reason

def run(self, model: TrackedModel) -> CheckResult:
"""Runs Checker-dependent logic and returns an indication of success."""
raise NotImplementedError()
@classmethod
def apply_rule(
cls,
rule: BusinessRule,
transaction: Transaction,
model: TrackedModel,
):
"""
Applies the check to the model and records success.
def apply(self, model: TrackedModel, context: TransactionCheck):
"""Applies the check to the model and records success."""
:return: TrackedModelCheck instance containing the result of the check.
During debugging the developer can set settings.RAISE_BUSINESS_RULE_FAILURES
to True to raise business rule violations as exceptions.
"""
success, message = False, None
try:
with override_current_transaction(context.transaction):
success, message = self.run(model)
with override_current_transaction(transaction):
success, message = cls.run_rule(rule, transaction, model)
except Exception as e:
success, message = False, str(e)
if settings.RAISE_BUSINESS_RULE_FAILURES:
raise
finally:
return TrackedModelCheck.objects.create(
check, created = TrackedModelCheck.objects.get_or_create(
{
"successful": success,
"message": message,
"content_hash": model.content_hash().digest(),
},
model=model,
transaction_check=context,
check_name=self.name,
successful=success,
message=message,
check_name=rule.__name__,
)


class BusinessRuleChecker(Checker):
"""
A ``Checker`` that runs a ``BusinessRule`` against a model.
This class is expected to be sub-typed for a specific rule by a call to
``of()``.
Attributes:
checker_cache (dict): (class attribute) Cache of Business checkers created by ``of()``.
"""

rule: Type[BusinessRule]

_checker_cache: Dict[str, BusinessRule] = {}
if not created:
check.successful = success
check.message = message
check.content_hash = model.content_hash().digest()
check.save()
return check

@classmethod
def of(cls: Type, rule_type: Type[BusinessRule]) -> Type:
def apply_rule_cached(
cls,
rule: BusinessRule,
transaction: Transaction,
model: TrackedModel,
):
"""
Return a subclass of a Checker, e.g. BusinessRuleChecker,
IndirectBusinessRuleChecker that runs the passed in business rule.
If a matching TrackedModelCheck instance exists, returns it, otherwise
check rule, and return the result as a TrackedModelCheck instance.
Example, creating a BusinessRuleChecker for ME32:
:return: TrackedModelCheck instance containing the result of the check.
"""
try:
check = TrackedModelCheck.objects.get(
model=model,
check_name=rule.__name__,
)
except TrackedModelCheck.DoesNotExist:
logger.debug(
"apply_rule_cached (no existing check) %s, %s apply rule",
rule.__name__,
transaction,
)
return cls.apply_rule(rule, transaction, model)

# Re-run the rule if the check is not successful.
check_hash = bytes(check.content_hash)
model_hash = model.content_hash().digest()
if check_hash == model_hash:
logger.debug(
"apply_rule_cached (matching content hash) %s, tx: %s, using cached result %s",
rule.__name__,
transaction.pk,
check,
)
return check

>>> BusinessRuleChecker.of(measures.business_rules.ME32)
<class 'checks.checks.BusinessRuleCheckerOf[measures.business_rules.ME32]'>
logger.debug(
"apply_rule_cached (check.content_hash != model.content_hash()) %s != %s %s, %s apply rule",
check_hash,
model_hash,
rule.__name__,
transaction,
)
check.delete()
return cls.apply_rule(rule, transaction, model)

This API is usually called by .applicable_to, however this docstring should
illustrate what it does.

Checkers are created once and then cached in _checker_cache.
class BusinessRuleChecker(Checker):
"""Apply BusinessRules specified in a TrackedModels business_rules
attribute."""

As well as a small performance improvement, caching aids debugging by ensuring
the same checker instance is returned if the same cls is passed to ``of``.
@classmethod
def get_model_rules(cls, model: TrackedModel, rules: Optional[Set[str]] = None):
"""
checker_name = f"{cls.__name__}Of[{rule_type.__module__}.{rule_type.__name__}]"

# If the checker class was already created, return it.
checker_class = cls._checker_cache.get(checker_name)
if checker_class is not None:
return checker_class
# No existing checker was found, so create it:
:param model: TrackedModel instance
:param rules: Optional list of rule names to filter by.
:return: Dict mapping models to a set of the BusinessRules that apply to them.
"""
model_rules = defaultdict(set)

class BusinessRuleCheckerOf(cls):
# Creating this class explicitly in code is more readable than using type(...)
# Once created the name will be mangled to include the rule to be checked.
for rule in model.business_rules:
if rules is not None and rule.__name__ not in rules:
continue

f"""Apply the following checks as specified in {rule_type.__name__}"""
rule = rule_type
model_rules[model].add(rule)

def __repr__(self):
return f"<{checker_name}>"
return model_rules

BusinessRuleCheckerOf.__name__ = checker_name

cls._checker_cache[checker_name] = BusinessRuleCheckerOf
return BusinessRuleCheckerOf
class LinkedModelsBusinessRuleChecker(Checker):
"""Apply BusinessRules specified in a TrackedModels indirect_business_rules
attribute to models returned by get_linked_models on those rules."""

@classmethod
def checkers_for(cls: Type[Self], model: TrackedModel) -> Collection[Self]:
"""If the rule attribute on this BusinessRuleChecker matches any in the
supplied TrackedModel instance's business_rules, return it in a list,
otherwise there are no matches so return an empty list."""
if cls.rule in model.business_rules:
return [cls()]
return []

def run(self, model: TrackedModel) -> CheckResult:
"""
:return CheckResult, a Tuple(rule_passed: str, violation_reason: Optional[str]).
def apply_rule(
cls,
rule: BusinessRule,
transaction: Transaction,
model: TrackedModel,
):
"""
transaction = get_current_transaction()
try:
self.rule(transaction).validate(model)
return True, None
except BusinessRuleViolation as violation:
return False, violation.args[0]


class IndirectBusinessRuleChecker(BusinessRuleChecker):
"""
A ``Checker`` that runs a ``BusinessRule`` against a model that is linked to
the model being checked, and for which a change in the checked model could
result in a business rule failure against the linked model.
LinkedModelsBusinessRuleChecker assumes that the linked models are still
the current.
This is a base class: subclasses for checking specific rules are created by
calling ``of()``.
"""
versions (TODO - ensure a business rule checks this),
rule: Type[BusinessRule]
linked_model: TrackedModel

def __init__(self, linked_model: TrackedModel) -> None:
self.linked_model = linked_model
super().__init__()

@cached_property
def name(self) -> str:
# Include the identity of the linked model in the checker name, so that
# each linked model needs to be checked for all checks to be complete.
return f"{super().name}[{self.linked_model.pk}]"
The transaction to check is set to that of the model, which enables
"""
return super().apply_rule(rule, model.transaction, model)

@classmethod
def checkers_for(cls: Type[Self], model: TrackedModel) -> Collection[Self]:
"""Return a set of IndirectBusinessRuleCheckers for every model found on
rule.get_linked_models."""
rules = set()
transaction = get_current_transaction()
if cls.rule in model.indirect_business_rules:
for linked_model in cls.rule.get_linked_models(model, transaction):
rules.add(cls(linked_model))
return rules

def run(self, model: TrackedModel) -> CheckResult:
def get_model_rules(cls, model: TrackedModel, rules: Optional[Set] = None):
"""
Return the result of running super.run, passing self.linked_model, and.
return it as a CheckResult - a Tuple(rule_passed: str, violation_reason: Optional[str])
:param model: TrackedModel instance
:param rules: Optional list of rule names to filter by.
:return: Dict mapping models to a set of the BusinessRules that apply to them.
"""
result, message = super().run(self.linked_model)
message = f"{self.linked_model}: " + message if message else None
return result, message
tx = get_current_transaction()

model_rules = defaultdict(set)

for rule in [*model.indirect_business_rules]:
for linked_model in rule.get_linked_models(model, tx):
if rules is not None and rule.__name__ not in rules:
continue

def checker_types() -> Iterator[Type[Checker]]:
"""
Return all registered Checker types.
model_rules[linked_model].add(rule)

See ``checks.checks.BusinessRuleChecker.of``.
"""
for rule in ALL_RULES:
yield BusinessRuleChecker.of(rule)
yield IndirectBusinessRuleChecker.of(rule)
return model_rules


def applicable_to(model: TrackedModel) -> Iterator[Checker]:
"""Return instances of any Checker classes applicable to the supplied
TrackedModel instance."""
for checker_type in checker_types():
yield from checker_type.checkers_for(model)
# Checkers in priority list order, checkers for linked models come first.
ALL_CHECKERS = {
"LinkedModelsBusinessRuleChecker": LinkedModelsBusinessRuleChecker,
"BusinessRuleChecker": BusinessRuleChecker,
}
Loading

0 comments on commit 2a77ba1

Please sign in to comment.