Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pagination to worker counts, preserves backwards compatibility #42

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ QlessAPI.heartbeat = function(now, jid, worker, data)
return Qless.job(jid):heartbeat(now, worker, data)
end

QlessAPI.workers = function(now, worker)
return cjson.encode(QlessWorker.counts(now, worker))
QlessAPI.workers = function(now, ...)
return cjson.encode(QlessWorker.counts(now, unpack(arg)))
end

QlessAPI.track = function(now, command, jid)
Expand Down
31 changes: 31 additions & 0 deletions test/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ def setUp(self):
# No grace period
self.lua('config.set', 0, 'grace-period', 0)

def test_malformed(self):
'''Enumerate all the ways in which the input can be malformed'''
self.assertMalformed(self.lua, [
('workers', 1, 0, 'arg2') # Count arg malformed
])

def test_basic(self):
'''Basic worker-level information'''
self.lua('put', 0, 'worker', 'queue', 'jid', 'klass', {}, 0)
Expand All @@ -24,6 +30,31 @@ def test_basic(self):
'stalled': 0
}])

def test_paginated(self):
'''Paginated worker-level information'''

self.lua('put', 0, 'worker-1', 'queue', 'jid-1', 'klass', {}, 0)
self.lua('pop', 1, 'queue', 'worker-1', 10)
self.lua('put', 2, 'worker-2', 'queue', 'jid-2', 'klass', {}, 0)
self.lua('pop', 3, 'queue', 'worker-2', 10)
self.lua('put', 2, 'worker-3', 'queue', 'jid-3', 'klass', {}, 0)
self.lua('pop', 3, 'queue', 'worker-3', 10)

worker_names = [w['name'] for w in self.lua('workers', 20)]
self.assertEqual(worker_names, ['worker-3', 'worker-2', 'worker-1'])

worker_names = [w['name'] for w in self.lua('workers', 20, 0, 1)]
self.assertEqual(worker_names, ['worker-3'])

worker_names = [w['name'] for w in self.lua('workers', 20, 1, 1)]
self.assertEqual(worker_names, ['worker-2'])

worker_names = [w['name'] for w in self.lua('workers', 20, 2, 1)]
self.assertEqual(worker_names, ['worker-1'])

worker_names = [w['name'] for w in self.lua('workers', 20, 1, 2)]
self.assertEqual(worker_names, ['worker-2', 'worker-1'])

def test_stalled(self):
'''We should be able to detect stalled jobs'''
self.lua('put', 0, 'worker', 'queue', 'jid', 'klass', {}, 0)
Expand Down
18 changes: 16 additions & 2 deletions worker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ function QlessWorker.deregister(...)
redis.call('zrem', 'ql:workers', unpack(arg))
end

-- Counts(now, [offset, [count]])
-- Counts(now, worker)
-- Provide data about all the workers, or if a specific worker is provided,
-- then which jobs that worker is responsible for. If no worker is provided,
-- expect a response of the form:
Expand Down Expand Up @@ -31,7 +33,7 @@ end
-- ]
-- }
--
function QlessWorker.counts(now, worker)
function QlessWorker.counts(now, ...)
-- Clean up all the workers' job lists if they're too old. This is
-- determined by the `max-worker-age` configuration, defaulting to the
-- last day. Seems like a 'reasonable' default
Expand All @@ -45,14 +47,26 @@ function QlessWorker.counts(now, worker)
-- And now remove them from the list of known workers
redis.call('zremrangebyscore', 'ql:workers', 0, now - interval)

--- Preserve backwards compatibility for counts() which only
--- takes a worker and not offset/count
local worker
if not tonumber(arg[1]) then
worker = arg[1]
end

if worker then
return {
jobs = redis.call('zrevrangebyscore', 'ql:w:' .. worker .. ':jobs', now + 8640000, now),
stalled = redis.call('zrevrangebyscore', 'ql:w:' .. worker .. ':jobs', now, 0)
}
else
local offset = assert(tonumber(arg[1] or 0),
'Failed(): Arg "offset" is not a number: ' .. tostring(arg[1]))
local count = assert(tonumber(arg[2] or 0),
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If backwards compatibility is not an issue, the count should default to 25 like the rest of the paginated methods. When both offset and count are 0, it gets the entire data set

'Failed(): Arg "count" is not a number: ' .. tostring(arg[2]))

local response = {}
local workers = redis.call('zrevrange', 'ql:workers', 0, -1)
local workers = redis.call('zrevrange', 'ql:workers', offset, offset + count - 1)
for index, worker in ipairs(workers) do
table.insert(response, {
name = worker,
Expand Down