Skip to content

Commit

Permalink
Merge pull request #160 from cloudblue/f/LITE-31232
Browse files Browse the repository at this point in the history
LITE-31232 Support for `bulk_relate_cqrs_serialization`
  • Loading branch information
maxipavlovic authored Oct 22, 2024
2 parents 1d122bf + 9610845 commit 6b977d7
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 41 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ Unit testing

Run tests with various RDBMS:
- `cd integration_tests`
- `DB=postgres docker-compose -f docker-compose.yml -f rdbms.yml run app_test`
- `DB=mysql docker-compose -f docker-compose.yml -f rdbms.yml run app_test`
- `DB=postgres docker compose -f docker-compose.yml -f rdbms.yml run app_test`
- `DB=mysql docker compose -f docker-compose.yml -f rdbms.yml run app_test`

Check code style: `flake8`
Run tests: `pytest`
Expand All @@ -244,6 +244,6 @@ To generate HTML coverage reports use:

Integrational testing
------
1. docker-compose
1. docker compose
2. `cd integration_tests`
3. `docker-compose run master`
3. `docker compose run master`
14 changes: 11 additions & 3 deletions dj_cqrs/mixins.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright © 2023 Ingram Micro Inc. All rights reserved.
# Copyright © 2024 Ingram Micro Inc. All rights reserved.

import logging

Expand All @@ -20,6 +20,7 @@
from dj_cqrs.managers import MasterManager, ReplicaManager
from dj_cqrs.metas import MasterMeta, ReplicaMeta
from dj_cqrs.signals import MasterSignals, post_bulk_create, post_update
from dj_cqrs.state import cqrs_state


logger = logging.getLogger('django-cqrs')
Expand Down Expand Up @@ -292,9 +293,16 @@ def _class_serialization(self, using, sync=False):
if sync:
instance = self
else:
instance = None
db = using if using is not None else self._state.db
qs = self.__class__._default_manager.using(db)
instance = self.relate_cqrs_serialization(qs).get(pk=self.pk)

bulk_relate_cm = cqrs_state.bulk_relate_cm
if bulk_relate_cm:
instance = bulk_relate_cm.get_cached_instance(self, db)

if not instance:
qs = self.__class__._default_manager.using(db)
instance = self.relate_cqrs_serialization(qs).get(pk=self.pk)

data = self._cqrs_serializer_cls(instance).data
data['cqrs_revision'] = instance.cqrs_revision
Expand Down
7 changes: 6 additions & 1 deletion dj_cqrs/signals.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright © 2023 Ingram Micro Inc. All rights reserved.
# Copyright © 2024 Ingram Micro Inc. All rights reserved.

import logging

Expand All @@ -9,6 +9,7 @@
from dj_cqrs.constants import SignalType
from dj_cqrs.controller import producer
from dj_cqrs.dataclasses import TransportPayload
from dj_cqrs.state import cqrs_state
from dj_cqrs.utils import get_message_expiration_dt


Expand Down Expand Up @@ -64,6 +65,10 @@ def post_save(cls, sender, **kwargs):

using = kwargs['using']

bulk_relate_cm = cqrs_state.bulk_relate_cm
if bulk_relate_cm:
bulk_relate_cm.register(instance, using)

sync = kwargs.get('sync', False)
queue = kwargs.get('queue', None)

Expand Down
7 changes: 7 additions & 0 deletions dj_cqrs/state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright © 2024 Ingram Micro Inc. All rights reserved.

import threading


cqrs_state = threading.local()
cqrs_state.bulk_relate_cm = None
62 changes: 61 additions & 1 deletion dj_cqrs/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright © 2023 Ingram Micro Inc. All rights reserved.
# Copyright © 2024 Ingram Micro Inc. All rights reserved.

import logging
from collections import defaultdict
from contextlib import ContextDecorator
from datetime import date, datetime, timedelta
from uuid import UUID

Expand All @@ -10,6 +12,7 @@

from dj_cqrs.constants import DB_VENDOR_PG, SUPPORTED_TIMEOUT_DB_VENDORS
from dj_cqrs.logger import install_last_query_capturer
from dj_cqrs.state import cqrs_state


logger = logging.getLogger('django-cqrs')
Expand Down Expand Up @@ -80,3 +83,60 @@ def apply_query_timeouts(model_cls): # pragma: no cover
cursor.execute(statement, params=(query_timeout,))

install_last_query_capturer(model_cls)


class _BulkRelateCM(ContextDecorator):
def __init__(self, cqrs_id=None):
self._cqrs_id = cqrs_id
self._mapping = defaultdict(lambda: defaultdict(set))
self._cache = {}

def register(self, instance, using=None):
instance_cqrs_id = getattr(instance, 'CQRS_ID', None)
if (not instance_cqrs_id) or (self._cqrs_id and instance_cqrs_id != self._cqrs_id):
return

self._mapping[instance_cqrs_id][using].add(instance.pk)

def get_cached_instance(self, instance, using=None):
instance_cqrs_id = getattr(instance, 'CQRS_ID', None)
if (not instance_cqrs_id) or (self._cqrs_id and instance_cqrs_id != self._cqrs_id):
return

instance_pk = instance.pk
cached_instances = self._cache.get(instance_cqrs_id, {}).get(using, {})
if cached_instances:
return cached_instances.get(instance_pk)

cached_pks = self._mapping[instance_cqrs_id][using]
if not cached_pks:
return

qs = instance.__class__._default_manager.using(using)
instances_cache = {
instance.pk: instance
for instance in instance.__class__.relate_cqrs_serialization(qs)
.filter(
pk__in=cached_pks,
)
.order_by()
.all()
}
self._cache.update(
{
instance_cqrs_id: {
using: instances_cache,
},
}
)
return instances_cache.get(instance_pk)

def __enter__(self):
cqrs_state.bulk_relate_cm = self

def __exit__(self, exc_type, exc_val, exc_tb):
cqrs_state.bulk_relate_cm = None


def bulk_relate_cqrs_serialization(cqrs_id=None):
return _BulkRelateCM(cqrs_id=cqrs_id)
12 changes: 6 additions & 6 deletions examples/demo_project/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ It's a simple demo project contains 2 services:
## Start project:

```
docker-compose up -d db_pgsql db_mysql
docker-compose run master ./manage.py migrate
docker-compose run replica ./manage.py migrate
docker-compose up -d
docker-compose run master ./manage.py cqrs_sync --cqrs-id=user -f={}
docker-compose run master ./manage.py cqrs_sync --cqrs-id=product -f={}
docker compose up -d db_pgsql db_mysql
docker compose run master ./manage.py migrate
docker compose run replica ./manage.py migrate
docker compose up -d
docker compose run master ./manage.py cqrs_sync --cqrs-id=user -f={}
docker compose run master ./manage.py cqrs_sync --cqrs-id=product -f={}
```

It starts master WEB app on [http://127.0.0.1:8000](http://127.0.0.1:8000) and replica on [http://127.0.0.1:8001](http://127.0.0.1:8001)
Expand Down
22 changes: 11 additions & 11 deletions integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,40 @@
.DEFAULT_GOAL := pika

build:
docker-compose build
docker compose build

build_master_v1:
docker-compose -f docker-compose.yml -f masterV1.yml build
docker compose -f docker-compose.yml -f masterV1.yml build

build_replica_v1:
docker-compose -f docker-compose.yml -f replicaV1.yml build
docker compose -f docker-compose.yml -f replicaV1.yml build

pika: build
@echo "Run PIKA integration tests..."
docker-compose run master
docker compose run master
@echo "Stopping running containers..."
docker-compose down --remove-orphans
docker compose down --remove-orphans
@echo "Done!"

kombu: build
@echo "Run KOMBU integration tests..."
docker-compose -f docker-compose.yml -f kombu.yml run master
docker compose -f docker-compose.yml -f kombu.yml run master
@echo "Stopping running containers..."
docker-compose -f docker-compose.yml -f kombu.yml down --remove-orphans
docker compose -f docker-compose.yml -f kombu.yml down --remove-orphans
@echo "Done!"

master_v1: build_master_v1
@echo "Run regression tests Master v1.3.1..."
docker-compose -f docker-compose.yml -f masterV1.yml run master
docker compose -f docker-compose.yml -f masterV1.yml run master
@echo "Stopping running containers..."
docker-compose -f docker-compose.yml -f masterV1.yml down --remove-orphans
docker compose -f docker-compose.yml -f masterV1.yml down --remove-orphans
@echo "Done!"

replica_v1: build_replica_v1
@echo "Run regression tests Replica v1.3.1..."
docker-compose -f docker-compose.yml -f replicaV1.yml run master
docker compose -f docker-compose.yml -f replicaV1.yml run master
@echo "Stopping running containers..."
docker-compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans
docker compose -f docker-compose.yml -f replicaV1.yml down --remove-orphans
@echo "Done!"

all: pika kombu master_v1 replica_v1
31 changes: 30 additions & 1 deletion tests/test_master/test_signals.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright © 2023 Ingram Micro Inc. All rights reserved.
# Copyright © 2024 Ingram Micro Inc. All rights reserved.

from datetime import datetime, timezone

Expand All @@ -8,6 +8,7 @@

from dj_cqrs.constants import SignalType
from dj_cqrs.signals import post_bulk_create, post_update
from dj_cqrs.utils import bulk_relate_cqrs_serialization
from tests.dj_master import models
from tests.utils import assert_is_sub_dict, assert_publisher_once_called_with_args

Expand Down Expand Up @@ -127,6 +128,34 @@ def test_manual_post_bulk_create(mocker):
assert publisher_mock.call_count == 3


@pytest.mark.django_db(transaction=True)
@pytest.mark.parametrize('count', (1, 3, 5))
def test_bulk_relate_cqrs_serialization(
django_assert_num_queries,
django_v_trans_q_count_sup,
mocker,
count,
settings,
):
mocker.patch('dj_cqrs.controller.producer.produce')

if settings.DB_ENGINE == 'sqlite' and django_v_trans_q_count_sup == 0:
suppl = 1
else:
suppl = django_v_trans_q_count_sup

opt_query_count = count + 2 + suppl
with django_assert_num_queries(opt_query_count):
with bulk_relate_cqrs_serialization():
with transaction.atomic(savepoint=False):
[models.Author.objects.create(id=i) for i in range(count)]

not_opt_query_count = count + count * 2 + suppl
with django_assert_num_queries(not_opt_query_count):
with transaction.atomic(savepoint=False):
[models.Author.objects.create(id=10 + i) for i in range(count)]


@pytest.mark.django_db(transaction=True)
def test_automatic_post_bulk_create(mocker):
publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
Expand Down
Loading

0 comments on commit 6b977d7

Please sign in to comment.