diff --git a/docs/includes/introduction.txt b/docs/includes/introduction.txt index c86c26231..a0ce4e30b 100644 --- a/docs/includes/introduction.txt +++ b/docs/includes/introduction.txt @@ -93,6 +93,8 @@ Transport Comparison +---------------+----------+------------+------------+---------------+--------------+-----------------------+ | *Pyro* | Virtual | Yes | Yes [#f1]_ | No | No | No | +---------------+----------+------------+------------+---------------+--------------+-----------------------+ +| *Django* | Virtual | Yes | Yes | Yes | Yes | Yes | ++---------------+----------+------------+------------+---------------+--------------+-----------------------+ .. [#f1] Declarations only kept in memory, so exchanges/queues @@ -264,4 +266,3 @@ There are some concepts you should be familiar with before starting: zero or more words. For example `"*.stock.#"` matches the routing keys `"usd.stock"` and `"eur.stock.db"` but not `"stock.nasdaq"`. - diff --git a/docs/userguide/connections.rst b/docs/userguide/connections.rst index c7b3af9c5..87dbff1f0 100644 --- a/docs/userguide/connections.rst +++ b/docs/userguide/connections.rst @@ -147,6 +147,9 @@ All of these are valid URLs: # Using Pyro with name server running on 'localhost' pyro://localhost/kombu.broker + # Using Django + django:/// + The query part of the URL can also be used to set options, e.g.: diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 180a27b4b..c31f392ad 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -45,6 +45,7 @@ def supports_librabbitmq() -> bool | None: 'azureservicebus': 'kombu.transport.azureservicebus:Transport', 'pyro': 'kombu.transport.pyro:Transport', 'gcpubsub': 'kombu.transport.gcpubsub:Transport', + 'django': 'kombu.transport.django_kombu.transport.Transport', } _transport_cache = {} diff --git a/kombu/transport/django_kombu/app.py b/kombu/transport/django_kombu/app.py new file mode 100644 index 000000000..b5d957087 --- /dev/null +++ b/kombu/transport/django_kombu/app.py @@ -0,0 +1,11 @@ +from __future__ import annotations + +from django.apps import AppConfig + + +class KombuConfig(AppConfig): + """Django app config.""" + + default_auto_field = "django.db.models.BigAutoField" + name = "django_kombu" + label = "kombu" diff --git a/kombu/transport/django_kombu/migrations/0001_initial.py b/kombu/transport/django_kombu/migrations/0001_initial.py new file mode 100644 index 000000000..056f3ae28 --- /dev/null +++ b/kombu/transport/django_kombu/migrations/0001_initial.py @@ -0,0 +1,109 @@ +# Generated by Django 5.1.5 on 2025-01-20 13:39 + +from __future__ import annotations + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + "The initial migration." + + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="Exchange", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("name", models.CharField(max_length=200, unique=True)), + ], + ), + migrations.CreateModel( + name="Queue", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("name", models.CharField(max_length=200, unique=True)), + ], + ), + migrations.CreateModel( + name="Message", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("visible", models.BooleanField(db_index=True, default=True)), + ( + "sent_at", + models.DateTimeField(auto_now_add=True, db_index=True, null=True), + ), + ("message", models.TextField()), + ("version", models.PositiveIntegerField(default=1)), + ("priority", models.PositiveIntegerField(default=0)), + ("ttl", models.IntegerField(blank=True, null=True)), + ( + "queue", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="messages", + to="django_kombu.queue", + ), + ), + ], + ), + migrations.CreateModel( + name="Binding", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("routing_key", models.CharField(max_length=255, null=True)), + ( + "exchange", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="bindings", + to="django_kombu.exchange", + ), + ), + ( + "queue", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="bindings", + to="django_kombu.queue", + ), + ), + ], + ), + ] diff --git a/kombu/transport/django_kombu/migrations/__init__.py b/kombu/transport/django_kombu/migrations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kombu/transport/django_kombu/models.py b/kombu/transport/django_kombu/models.py new file mode 100644 index 000000000..7de2dd62f --- /dev/null +++ b/kombu/transport/django_kombu/models.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from datetime import timedelta + +from django.db import models +from django.utils import timezone + + +class Queue(models.Model): + """The queue.""" + + name = models.CharField(max_length=200, unique=True) + + def __str__(self): + return self.name + + +class Message(models.Model): + """The message.""" + + visible = models.BooleanField(default=True, db_index=True) + sent_at = models.DateTimeField(null=True, db_index=True, auto_now_add=True) + message = models.TextField() + version = models.PositiveIntegerField(default=1) + priority = models.PositiveIntegerField(default=0) + ttl = models.IntegerField( + null=True, blank=True + ) # TTL in seconds (null means no TTL) + queue = models.ForeignKey(Queue, on_delete=models.CASCADE, related_name="messages") + + def __str__(self): + return f"{self.sent_at} {self.message} {self.queue_id}" + + def is_expired(self): + if self.ttl is None: + return False # No TTL set, so not expired + expiration_time = self.sent_at + timedelta(seconds=self.ttl) + return expiration_time < timezone.now() + + +class Exchange(models.Model): + """The exchange.""" + + name = models.CharField(max_length=200, unique=True) + + def __str__(self): + return f"{self.name}" + + +class Binding(models.Model): + """The binding.""" + + queue = models.ForeignKey(Queue, on_delete=models.CASCADE, related_name="bindings") + exchange = models.ForeignKey( + Exchange, on_delete=models.CASCADE, related_name="bindings" + ) + routing_key = models.CharField(max_length=255, null=True) + + def __str__(self): + return f"Binding: {self.queue.name} -> {self.exchange.name} with routing_key {self.routing_key}" diff --git a/kombu/transport/django_kombu/transport.py b/kombu/transport/django_kombu/transport.py new file mode 100644 index 000000000..1759f228e --- /dev/null +++ b/kombu/transport/django_kombu/transport.py @@ -0,0 +1,137 @@ +"""Django Transport module for kombu. + +Kombu transport using Django ORM as the message store. + +Features +======== +* Type: Virtual +* Supports Direct: Yes +* Supports Topic: Yes +* Supports Fanout: Yes +* Supports Priority: Yes +* Supports TTL: Yes + +Connection String +================= + +.. code-block:: + + django:/// +""" + +from __future__ import annotations + +import json +import logging +from queue import Empty + +from django.db import transaction + +from kombu.transport import virtual +from kombu.transport.django_kombu.models import (Binding, Exchange, Message, + Queue) + +VERSION = (0, 0, 1) +__version__ = ".".join(map(str, VERSION)) + +logger = logging.getLogger(__name__) + + +class Channel(virtual.Channel): + """The channel class.""" + + supports_fanout = True + + def _open(self): + pass + + def _put(self, queue, message, priority=0, ttl=None, **kwargs): + queue_instance, _ = Queue.objects.get_or_create(name=queue) + queue_instance.messages.create( + message=json.dumps(message), priority=priority, ttl=ttl + ) + + def _get(self, queue, timeout=None): + with transaction.atomic(): + try: + queue_instance = Queue.objects.get(name=queue) + except Queue.DoesNotExist: + raise Empty() + message_instance = ( + Message.objects.select_for_update(skip_locked=True) + .filter(visible=True, queue=queue_instance) + .order_by("priority", "sent_at", "id") + .first() + ) + if message_instance is not None: + if message_instance.is_expired(): + message_instance.visible = False + message_instance.save(update_fields=["visible"]) + logger.debug( + f"Message with ID {message_instance.id} has expired and is discarded." + ) + return self._get(queue, timeout=timeout) + + message_instance.visible = False + message_instance.save(update_fields=["visible"]) + msg = message_instance.message + return json.loads(msg) + raise Empty() + + def _purge(self, queue): + try: + queue_instance = Queue.objects.get(name=queue) + except Queue.DoesNotExist: + return + queue_instance.messages.all().delete() + + def _queue_bind(self, exchange, routing_key, pattern, queue): + queue_instance, _ = Queue.objects.get_or_create(name=queue) + exchange_instance, _ = Exchange.objects.get_or_create(name=exchange) + binding, created = Binding.objects.get_or_create( + queue=queue_instance, + exchange=exchange_instance, + routing_key=routing_key, + ) + if created: + logger.debug(f"Binding created: {binding}") + else: + logger.debug(f"Binding already exists: {binding}") + + def _put_fanout(self, exchange, message, routing_key, priority=0, **kwargs): + try: + exchange_instance = Exchange.objects.get(name=exchange) + except Exchange.DoesNotExist: + return + queues = Queue.objects.filter( + bindings__exchange=exchange_instance, bindings__routing_key=routing_key + ) + logger.debug( + f"Found {len(queues)} queues bound to fanout exchange {exchange_instance.name}" + ) + for queue in queues: + # Publish the message to each bound queue + logger.debug(f"Publishing message to fanout queue: {queue.name}") + self._put(queue.name, message, priority=priority) + + def get_table(self, exchange): + try: + exchange_instance = Exchange.objects.get(name=exchange) + except Exchange.DoesNotExist: + return [] + bindings = exchange_instance.bindings.all() + return [(binding.routing_key, "", binding.queue.name) for binding in bindings] + + +class Transport(virtual.Transport): + """The transport class.""" + + Channel = Channel + + can_parse_url = True + driver_type = "django" + driver_name = "django" + + implements = virtual.Transport.implements.extend( + exchange_type=frozenset(["direct", "topic", "fanout"]) + ) diff --git a/requirements/extras/django.txt b/requirements/extras/django.txt new file mode 100644 index 000000000..f75d32b08 --- /dev/null +++ b/requirements/extras/django.txt @@ -0,0 +1 @@ +django>=4.2.18 diff --git a/requirements/test-ci.txt b/requirements/test-ci.txt index 7d67e7c26..6b3fed463 100644 --- a/requirements/test-ci.txt +++ b/requirements/test-ci.txt @@ -17,3 +17,4 @@ urllib3>=1.26.16; sys_platform != 'win32' -r extras/sqlalchemy.txt -r extras/etcd.txt -r extras/gcpubsub.txt +-r extras/django.txt diff --git a/t/unit/transport/test_django.py b/t/unit/transport/test_django.py new file mode 100644 index 000000000..4591da965 --- /dev/null +++ b/t/unit/transport/test_django.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +from queue import Empty +from unittest.mock import patch + +import django +import pytest +from django.conf import settings +from django.core.management import call_command + +from kombu import Connection + +settings.configure( + INSTALLED_APPS=("kombu.transport.django_kombu",), + DATABASES={"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:"}}, +) +django.setup() +call_command("migrate", database="default") + +# Need to import after setting up Django +from kombu.transport.django_kombu.models import Binding # noqa: E402 +from kombu.transport.django_kombu.models import Exchange # noqa: E402 +from kombu.transport.django_kombu.models import Message, Queue # noqa: E402 + + +@pytest.fixture +def channel(): + conn = Connection("django:///") + conn.connect() + channel = conn.channel() + + yield channel + + channel._purge("celery") + conn.release() + + +def test_url_parser(): + with patch("kombu.transport.django_kombu.transport.Channel._open"): + url = "django:///" + Connection(url).connect() + + +def test_simple_queueing(channel): + channel._put("celery", "DATA_SIMPLE_QUEUEING") + assert channel._get("celery") == "DATA_SIMPLE_QUEUEING" + + +def test_queueing_multiple(channel): + channel._put("celery", "DATA_SIMPLE_QUEUEING") + channel._put("celery", "DATA_SIMPLE_QUEUEING2") + + assert channel._get("celery") == "DATA_SIMPLE_QUEUEING" + assert channel._get("celery") == "DATA_SIMPLE_QUEUEING2" + + +def test__get_queue_that_does_not_exist(channel): + with pytest.raises(Empty): + channel._get("queue_that_does_not_exist") + + +def test__get_queue_when_empty(channel): + channel._put("celery", "MESSAGE") + channel._get("celery") + with pytest.raises(Empty): + channel._get("celery") + + +def test__purge_queue_that_does_not_exist(channel): + channel._purge("queue_that_does_not_exist") # does not raise an Exception + + +def test_queue_name(channel): + queue = Queue.objects.create(name="celery_queue") + assert "celery_queue" in str(queue) + + +def test_message_name(channel): + queue = Queue.objects.create(name="another_queue") + message = Message.objects.create(message="message", queue=queue) + assert "message" in str(message) + + +def test_get_table_exchange_does_not_exist(channel): + assert [] == channel.get_table("celery") + + +def test_get_table_empty(channel): + Exchange.objects.create(name="celery") + assert [] == channel.get_table("celery") + + +def test_get_table_one_binding(channel): + exchange = Exchange.objects.create(name="another_exchange") + queue = Queue.objects.create(name="another_queue2") + Binding.objects.create( + queue=queue, exchange=exchange, routing_key="routing_key" + ) + assert [("routing_key", "", "another_queue2")] == channel.get_table( + "another_exchange" + ) + + +def test__put_fanout_exchange_does_not_exist(channel): + channel._put_fanout( + "exchange_that_does_not_exist", "message", "routing_key" + ) # does not raise an exception + + +def test__put_fanout_calls__put(channel): + exchange = Exchange.objects.create(name="another_exchange2") + queue = Queue.objects.create(name="another_queue3") + Binding.objects.create( + queue=queue, exchange=exchange, routing_key="routing_key" + ) + with patch.object(channel, "_put") as m: + + channel._put_fanout("another_exchange2", "HELLO", "routing_key") + + m.assert_called_once_with("another_queue3", "HELLO", priority=0) + + +def test_priority(channel): + channel._put('priority', 'message1', priority=1) + channel._put('priority', 'message2', priority=0) + assert 'message2' == channel._get('priority') + + +def test_ttl(channel): + channel._put('ttl', 'message', ttl=0) + with pytest.raises(Empty): + channel._get('ttl')