Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support redis queue expiration #2251

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open

Conversation

ffje
Copy link

@ffje ffje commented Feb 26, 2025

Redis Queue Expiration Feature

Description

This PR adds support for automatically expiring Redis queues after a period of inactivity. This is particularly useful for managing ephemeral queues that should be automatically cleaned up when they're no longer being used.

The implementation adds the following capabilities:

  • Support for the x-expires queue argument for Redis transport, specifying milliseconds after which an inactive queue is deleted
  • Support for the simpler expires parameter on Queue objects (in seconds) which is translated to x-expires internally
  • Automatic TTL refresh on both queue read and write operations

Implementation

The implementation adds two key methods to the Redis transport Channel class:

  • _maybe_update_queues_expire: Updates TTL on queue keys when there is activity
  • _get_queue_expire: Extracts the expiration time from queue arguments

These methods ensure that Redis queue keys have TTL set appropriately based on the queue definition, and that the TTL gets refreshed on both get and put operations.

Testing

Added comprehensive integration tests that verify:

  • Expiration is correctly set on queue declaration
  • TTL is reset when publishing new messages
  • TTL is reset when consuming messages
  • Expiration works correctly with prioritized queues

Documentation

Added documentation of the x-expires option to the module docstring in the Transport Options section.

Usage Example

# Queue that expires after 60 seconds of inactivity
queue = kombu.Queue('temporary_queue', expires=60) 

@Nusnus Nusnus self-requested a review February 26, 2025 11:38
Copy link

codecov bot commented Feb 26, 2025

Codecov Report

Attention: Patch coverage is 68.18182% with 7 lines in your changes missing coverage. Please review.

Project coverage is 81.51%. Comparing base (19d9750) to head (570fecb).

Files with missing lines Patch % Lines
kombu/transport/redis.py 68.18% 6 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2251      +/-   ##
==========================================
- Coverage   81.54%   81.51%   -0.04%     
==========================================
  Files          77       77              
  Lines        9517     9539      +22     
  Branches     1154     1156       +2     
==========================================
+ Hits         7761     7776      +15     
- Misses       1564     1570       +6     
- Partials      192      193       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Member

@Nusnus Nusnus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can run tox -e lint locally to check the lint errors instead of waiting for the CI

@ffje
Copy link
Author

ffje commented Feb 26, 2025

@Nusnus would appreciate feedback and guideline on the further actions. Should I cover by unit tests the code additionally? Since it seems covered in integration tests but codecov is not aware of those

@Nusnus
Copy link
Member

Nusnus commented Feb 26, 2025

@ffje

@Nusnus would appreciate feedback and guideline on the further actions. Should I cover by unit tests the code additionally? Since it seems covered in integration tests but codecov is not aware of those

I will need to properly review this change which might take some time (days to week or two).

In the meantime, and until I'll add it to Kombu's CI later this year, you can check manually if the entire test suits of Celery are passing with a patched Kombu running this change.

Unit, integration and smoke tests.

You don't need to add any new test, just make sure nothing is broken due to the changes of this PR.

Let me know if you need any assistance running the CI locally or configuring tox for your patched Kombu.

TL;DR remove Kombu from all req.txt files in Celery and add to the deps section in tox.ini to install with -e relative/path/to/kombu & check python-package.yml for running the tests.

@auvipy auvipy self-requested a review February 27, 2025 05:30
@auvipy auvipy added this to the 5.6.0 milestone Feb 27, 2025
return None

def prepare_queue_arguments(self, arguments, **kwargs):
return to_rabbitmq_queue_arguments(arguments, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just return a rabbitmq function or think something else?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's rather canonical ampq view of queue_arguments, also mongodb uses exactly the same setup so I assumed it should be fine for redis as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants