Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: only increment the current_concurrency counter if the job has max_concurrency set #39

Merged
merged 1 commit into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spinach/brokers/redis_scripts/get_jobs_from_queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ repeat
-- track the running job
redis.call('hset', running_jobs_key, job["id"], job_json)
-- If tracking concurrency, bump the current value.
if max_concurrency ~= -1 then
if max_concurrency ~= nil and max_concurrency ~= -1 then
redis.call('hincrby', current_concurrency_key, job['task_name'], 1)
end

Expand Down
15 changes: 15 additions & 0 deletions tests/test_redis_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,21 @@ def test_cant_exceed_max_concurrency(broker):
assert json.loads(queued.decode())['id'] == str(job2.id)


def test_does_not_set_concurrency_key_when_no_max_concurrency(broker):
job = Job(
CONCURRENT_TASK_NAME, 'foo_queue', datetime.now(timezone.utc), 1,
# kwargs help with debugging but are not part of the test.
task_kwargs=dict(name='job'),
)
broker.enqueue_jobs([job])
returned_jobs = broker.get_jobs_from_queue('foo_queue', 2)
assert returned_jobs[0].task_kwargs == dict(name='job')
current = broker._r.hget(
broker._to_namespaced(CURRENT_CONCURRENCY_KEY), CONCURRENT_TASK_NAME
)
assert current is None


def test_get_jobs_from_queue_returns_all_requested(broker):
# If a job is not returned because it was over concurrency limits,
# make sure the number of jobs requested is filled from other jobs
Expand Down