Sending a message through RabbitMQ #12400
Unanswered
cristianleonie-geos
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Asked also on celery (celery/celery#9124)
PROBLEM
I would like to send an email at X time (possibly a day later) given that a model precondition on django is satisfied. To do this I would like to use celery, I have seen that I can do it using apply_asinc, but it is not recommended for a delay greater than minutes (especially with RabbitMQ), the other solution would be to run continuosly a process CELERY_BEAT, but I don't like this option as it doesn't really suit my use case.
I could also use a task scheduler or a cronjob, but since the system is already setup to work with Celery and Rabbit I would like to use them
PROPOSED SOLUTION
As a solution I would like to schedule the task using a triggering message from RabbitMQ, since using it I can schedule message delivery for an indefinite amount of time.
I have celery setted up with geonode, I've created a task in my django server and I'm able to call it programmatically using Celery.
@shared_task(
name="service.tasks.check_model_state_and_send_email",
queue="service",
ignore_result=False,
)
def check_model_state_and_send_email(model_id):
...
I can call it like this and it works
check_model_state_and_send_email.delay(12)
`
Celery is using rabbitmq for the queues.
I am able to read from the queues and send messages to the queues, the message generated by calling the delay function looks like this:
[ [ 12 ], {}, { "callbacks": null, "errbacks": null, "chain": null, "chord": null } ]
SETUP
GEONODE_EXCHANGE = Exchange("default", type="topic", durable=True)
queue = Queue("service", GEONODE_EXCHANGE, routing_key="service", priority=0)
This is my triggering message sender
connection = pika.BlockingConnection(host)
message = ...
channel = connection.channel()
channel.queue_declare(queue='service.tasks.check_model_state_and_send_email', passive=True)
channel.basic_publish(exchange='default',
routing_key='service',
body=json.dumps(message))
PROBLEM
Despite the message being consumed by Celery, it does not trigger the task, I have tried different formats for the message, from simply copying the content of "correct" messages, to generating ones suggested online
Not only it doesn't produce the task, but it doesn't generate any logs. that I can see when the task is executed correctly or it fails
Beta Was this translation helpful? Give feedback.
All reactions