Skip to content

Commit

Permalink
create Django transport and channel
Browse files Browse the repository at this point in the history
  • Loading branch information
thuibr committed Jan 20, 2025
1 parent a0175b0 commit e77e14a
Show file tree
Hide file tree
Showing 11 changed files with 457 additions and 1 deletion.
3 changes: 2 additions & 1 deletion docs/includes/introduction.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ Transport Comparison
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *Pyro* | Virtual | Yes | Yes [#f1]_ | No | No | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *Django* | Virtual | Yes | Yes | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+


.. [#f1] Declarations only kept in memory, so exchanges/queues
Expand Down Expand Up @@ -264,4 +266,3 @@ There are some concepts you should be familiar with before starting:
zero or more words. For example `"*.stock.#"` matches the
routing keys `"usd.stock"` and `"eur.stock.db"` but not
`"stock.nasdaq"`.

3 changes: 3 additions & 0 deletions docs/userguide/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ All of these are valid URLs:
# Using Pyro with name server running on 'localhost'
pyro://localhost/kombu.broker
# Using Django
django:///
The query part of the URL can also be used to set options, e.g.:

Expand Down
1 change: 1 addition & 0 deletions kombu/transport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def supports_librabbitmq() -> bool | None:
'azureservicebus': 'kombu.transport.azureservicebus:Transport',
'pyro': 'kombu.transport.pyro:Transport',
'gcpubsub': 'kombu.transport.gcpubsub:Transport',
'django': 'kombu.transport.django_kombu.transport.Transport',
}

_transport_cache = {}
Expand Down
11 changes: 11 additions & 0 deletions kombu/transport/django_kombu/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from __future__ import annotations

from django.apps import AppConfig


class KombuConfig(AppConfig):
"""Django app config."""

default_auto_field = "django.db.models.BigAutoField"
name = "django_kombu"
label = "kombu"
109 changes: 109 additions & 0 deletions kombu/transport/django_kombu/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Generated by Django 5.1.5 on 2025-01-20 13:39

from __future__ import annotations

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):
"The initial migration."

initial = True

dependencies = []

operations = [
migrations.CreateModel(
name="Exchange",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("name", models.CharField(max_length=200, unique=True)),
],
),
migrations.CreateModel(
name="Queue",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("name", models.CharField(max_length=200, unique=True)),
],
),
migrations.CreateModel(
name="Message",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("visible", models.BooleanField(db_index=True, default=True)),
(
"sent_at",
models.DateTimeField(auto_now_add=True, db_index=True, null=True),
),
("message", models.TextField()),
("version", models.PositiveIntegerField(default=1)),
("priority", models.PositiveIntegerField(default=0)),
("ttl", models.IntegerField(blank=True, null=True)),
(
"queue",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="messages",
to="django_kombu.queue",
),
),
],
),
migrations.CreateModel(
name="Binding",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("routing_key", models.CharField(max_length=255, null=True)),
(
"exchange",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="bindings",
to="django_kombu.exchange",
),
),
(
"queue",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="bindings",
to="django_kombu.queue",
),
),
],
),
]
Empty file.
60 changes: 60 additions & 0 deletions kombu/transport/django_kombu/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

from datetime import timedelta

from django.db import models
from django.utils import timezone


class Queue(models.Model):
"""The queue."""

name = models.CharField(max_length=200, unique=True)

def __str__(self):
return self.name


class Message(models.Model):
"""The message."""

visible = models.BooleanField(default=True, db_index=True)
sent_at = models.DateTimeField(null=True, db_index=True, auto_now_add=True)
message = models.TextField()
version = models.PositiveIntegerField(default=1)
priority = models.PositiveIntegerField(default=0)
ttl = models.IntegerField(
null=True, blank=True
) # TTL in seconds (null means no TTL)
queue = models.ForeignKey(Queue, on_delete=models.CASCADE, related_name="messages")

def __str__(self):
return f"{self.sent_at} {self.message} {self.queue_id}"

def is_expired(self):
if self.ttl is None:
return False # No TTL set, so not expired
expiration_time = self.sent_at + timedelta(seconds=self.ttl)
return expiration_time < timezone.now()


class Exchange(models.Model):
"""The exchange."""

name = models.CharField(max_length=200, unique=True)

def __str__(self):
return f"{self.name}"

Check warning on line 47 in kombu/transport/django_kombu/models.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/django_kombu/models.py#L47

Added line #L47 was not covered by tests


class Binding(models.Model):
"""The binding."""

queue = models.ForeignKey(Queue, on_delete=models.CASCADE, related_name="bindings")
exchange = models.ForeignKey(
Exchange, on_delete=models.CASCADE, related_name="bindings"
)
routing_key = models.CharField(max_length=255, null=True)

def __str__(self):
return f"Binding: {self.queue.name} -> {self.exchange.name} with routing_key {self.routing_key}"

Check warning on line 60 in kombu/transport/django_kombu/models.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/django_kombu/models.py#L60

Added line #L60 was not covered by tests
137 changes: 137 additions & 0 deletions kombu/transport/django_kombu/transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""Django Transport module for kombu.
Kombu transport using Django ORM as the message store.
Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes
Connection String
=================
.. code-block::
django:///
"""

from __future__ import annotations

import json
import logging
from queue import Empty

from django.db import transaction

from kombu.transport import virtual
from kombu.transport.django_kombu.models import (Binding, Exchange, Message,
Queue)

VERSION = (0, 0, 1)
__version__ = ".".join(map(str, VERSION))

logger = logging.getLogger(__name__)


class Channel(virtual.Channel):
"""The channel class."""

supports_fanout = True

def _open(self):
pass

Check warning on line 46 in kombu/transport/django_kombu/transport.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/django_kombu/transport.py#L46

Added line #L46 was not covered by tests

def _put(self, queue, message, priority=0, ttl=None, **kwargs):
queue_instance, _ = Queue.objects.get_or_create(name=queue)
queue_instance.messages.create(
message=json.dumps(message), priority=priority, ttl=ttl
)

def _get(self, queue, timeout=None):
with transaction.atomic():
try:
queue_instance = Queue.objects.get(name=queue)
except Queue.DoesNotExist:
raise Empty()
message_instance = (
Message.objects.select_for_update(skip_locked=True)
.filter(visible=True, queue=queue_instance)
.order_by("priority", "sent_at", "id")
.first()
)
if message_instance is not None:
if message_instance.is_expired():
message_instance.visible = False
message_instance.save(update_fields=["visible"])
logger.debug(
f"Message with ID {message_instance.id} has expired and is discarded."
)
return self._get(queue, timeout=timeout)

message_instance.visible = False
message_instance.save(update_fields=["visible"])
msg = message_instance.message
return json.loads(msg)
raise Empty()

def _purge(self, queue):
try:
queue_instance = Queue.objects.get(name=queue)
except Queue.DoesNotExist:
return
queue_instance.messages.all().delete()

def _queue_bind(self, exchange, routing_key, pattern, queue):
queue_instance, _ = Queue.objects.get_or_create(name=queue)
exchange_instance, _ = Exchange.objects.get_or_create(name=exchange)
binding, created = Binding.objects.get_or_create(

Check warning on line 91 in kombu/transport/django_kombu/transport.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/django_kombu/transport.py#L89-L91

Added lines #L89 - L91 were not covered by tests
queue=queue_instance,
exchange=exchange_instance,
routing_key=routing_key,
)
if created:
logger.debug(f"Binding created: {binding}")

Check warning on line 97 in kombu/transport/django_kombu/transport.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/django_kombu/transport.py#L97

Added line #L97 was not covered by tests
else:
logger.debug(f"Binding already exists: {binding}")

Check warning on line 99 in kombu/transport/django_kombu/transport.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/django_kombu/transport.py#L99

Added line #L99 was not covered by tests

def _put_fanout(self, exchange, message, routing_key, priority=0, **kwargs):
try:
exchange_instance = Exchange.objects.get(name=exchange)
except Exchange.DoesNotExist:
return
queues = Queue.objects.filter(
bindings__exchange=exchange_instance, bindings__routing_key=routing_key
)
logger.debug(
f"Found {len(queues)} queues bound to fanout exchange {exchange_instance.name}"
)
for queue in queues:
# Publish the message to each bound queue
logger.debug(f"Publishing message to fanout queue: {queue.name}")
self._put(queue.name, message, priority=priority)

def get_table(self, exchange):
try:
exchange_instance = Exchange.objects.get(name=exchange)
except Exchange.DoesNotExist:
return []
bindings = exchange_instance.bindings.all()
return [(binding.routing_key, "", binding.queue.name) for binding in bindings]


class Transport(virtual.Transport):
"""The transport class."""

Channel = Channel

can_parse_url = True
driver_type = "django"
driver_name = "django"

implements = virtual.Transport.implements.extend(
exchange_type=frozenset(["direct", "topic", "fanout"])
)
1 change: 1 addition & 0 deletions requirements/extras/django.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
django>=4.2.18
1 change: 1 addition & 0 deletions requirements/test-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ urllib3>=1.26.16; sys_platform != 'win32'
-r extras/sqlalchemy.txt
-r extras/etcd.txt
-r extras/gcpubsub.txt
-r extras/django.txt
Loading

0 comments on commit e77e14a

Please sign in to comment.