Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of xqueue/locks #17

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
**/*.xlog
**/*.snap
.DS_Store
59 changes: 59 additions & 0 deletions examples/example-locks/init.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
box.cfg{
-- listen = '127.0.0.1:4221'
}
-- box.once('access:v1', function()
-- box.schema.user.grant('guest', 'read,write,execute', 'universe')
-- end)

require 'strict'.on()

-- box.once('schema',function()
local format = {
{name = "id", type = "string"},
{name = "status", type = "string"},
{name = "lock", type = "number", nullable=true},

{name = "action", type = "string"},
{name = "attr", type = "*"},
{name = "runat", type = "number"},
}
box.schema.space.create('tasks', { if_not_exists = true, format = format })

box.space.tasks:create_index('primary', { unique = true, parts = {'id'}, if_not_exists = true})
box.space.tasks:create_index('xq', { unique = false, parts = { 'status', 'id' }, if_not_exists = true})
box.space.tasks:create_index('lock', { unique = false, parts = { 'lock', 'status', 'id' }, if_not_exists = true})
box.space.tasks:create_index('runat', { unique = false, parts = { 'runat', 'id' }, if_not_exists = true})
-- end)

if not package.path:match('%.%./%?%.lua;') then
package.path = '../?.lua;' .. package.path
end

require 'xqueue' ( box.space.tasks, {
debug = true;
fields = {
status = 'status';
lock = 'lock';
runat = 'runat';
};
features = {
id = 'uuid',
delayed = true,
ttl = true,
};
workers = 0;
worker = function(task)
print(require'fiber'.self().id(), "got task",task.id, require'yaml'.encode(box.space.tasks:select()))
require'fiber'.sleep(10)
end;
} )


print(require'yaml'.encode(box.space.tasks:select()))
-- for i = 1, 5 do
-- local t = box.space.tasks:put{ lock=1, action = "doit", attr = {} }
-- print('inserted', t.id, t.status)
-- end

require'console'.start()
os.exit()
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
20 changes: 20 additions & 0 deletions rockspecs/xqueue-scm-6.rockspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package = "xqueue"
version = "scm-6"
source = {
url = "git://github.com/ktsstudio/xqueue.git",
branch = "locks",
}
description = {
summary = "Package for loading external lua config",
homepage = "https://github.com/moonlibs/xqueue.git",
license = "BSD"
}
dependencies = {
"lua ~> 5.1"
}
build = {
type = "builtin",
modules = {
xqueue = "xqueue.lua"
}
}
146 changes: 133 additions & 13 deletions xqueue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ Status:

D - done - task was processed and ack'ed and permanently left in database
enabled when keep feature is set

L - locked - task is locked via `lock' index. Tasks will be processed only
if there are no older tasks by `lock` index

X - reserved for statistics

Expand All @@ -102,6 +105,7 @@ Interface:
status = 'status_field_name' | status_field_no,
runat = 'runat_field_name' | runat_field_no,
priority = 'priority_field_name' | priority_field_no,
lock = 'lock_field_name' | lock_field_no,
},
features = {
id = 'auto_increment' | 'uuid' | 'required' | function
Expand Down Expand Up @@ -243,6 +247,7 @@ function M.upgrade(space,opts,depth)
self.taken = space.xq.taken
self._stat = space.xq._stat
self.bysid = space.xq.bysid
self.sid_take_fids = space.xq.sid_take_fids
self._lock = space.xq._lock
self.take_wait = space.xq.take_wait
self.take_chans = space.xq.take_chans or setmetatable({}, { __mode = 'v' })
Expand All @@ -252,6 +257,7 @@ function M.upgrade(space,opts,depth)
else
self.taken = {}
self.bysid = {}
self.sid_take_fids = {}
-- byfid = {};
self._lock = {}
self.put_wait = setmetatable({}, { __mode = 'v' })
Expand Down Expand Up @@ -300,7 +306,7 @@ function M.upgrade(space,opts,depth)
-- 1. fields check
local fields = {}
local fieldmap = {}
for _,f in pairs({{"status","str"},{"runat","num"},{"priority","num"},{"tube","str"}}) do
for _,f in pairs({{"status","str"},{"runat","num"},{"priority","num"},{"tube","str"},{"lock","*"}}) do
local fname,ftype = f[1], f[2]
local num = opts.fields[fname]
if num then
Expand All @@ -320,7 +326,7 @@ function M.upgrade(space,opts,depth)
error(string.format("wrong type %s for field %s, number or string required",type(num),fname),2 + depth)
end
-- check type
if format[num] then
if format[num] and ftype ~= "*" then
if not typeeq(format[num].type, ftype) then
error(string.format("type mismatch for field %s, required %s, got %s",fname, ftype, format[fname].type),2+depth)
end
Expand Down Expand Up @@ -457,6 +463,23 @@ function M.upgrade(space,opts,depth)
end
end
end

if fields.lock then
local lock_index
for _,index in pairs(space.index) do
if type(_) == 'number' and #index.parts >= 2 then
if index.parts[1].fieldno == fields.lock and index.parts[2].fieldno == fields.status then
-- print("found lock index", index.name)
lock_index = index
break
end
end
end
if not lock_index then
error(string.format("fields.lock requires tree index with at least fields (`lock', `status', ...)"), 2+depth)
end
self.lock_index = lock_index
end
end

local runat_index
Expand Down Expand Up @@ -626,6 +649,10 @@ function M.upgrade(space,opts,depth)
features.tube = true
end

if fields.lock then
features.lockable = true
end

self.gen_id = gen_id
self.features = features
self.space = space.id
Expand All @@ -649,6 +676,50 @@ function M.upgrade(space,opts,depth)
return t[pkf.no]
end

function self:register_take_fid()
local sid = box.session.id()
local fid = fiber.id()
if self.sid_take_fids[sid] == nil then
self.sid_take_fids[sid] = {}
end
self.sid_take_fids[sid][fid] = true
end

function self:deregister_take_fid()
local sid = box.session.id()
local fid = fiber.id()
if self.sid_take_fids[sid] == nil then
return
end

self.sid_take_fids[sid][fid] = nil
end

function self:wakeup_locked_task(t)
local locker_t = self.lock_index:select({ t[ self.fieldmap.lock ], 'L' }, {limit=1})[1]
if locker_t ~= nil then
locker_t = box.space[self.space]:update({ locker_t[ self.key.no ] }, {
{ '=', self.fields.status, 'R' }
})
log.info("Unlock: L->R {%s} (by %s) from %s/sid=%s/fid=%s",
self:getkey(locker_t), self:getkey(t), box.session.storage.peer, box.session.id(), fiber.id() )
self:wakeup(locker_t)
end
return locker_t
end

function self:find_locker_task(lock_value)
local locker_t
for _, status in ipairs({'L', 'T', 'R'}) do
locker_t = self.lock_index:select({ lock_value, status }, {limit=1})[1]
if locker_t ~= nil then
-- there is a task that should be processed first
return locker_t
end
end
return nil
end

-- Notify producer if it is still waiting us.
-- Producer waits only for successfully processed task
-- or for task which would never be processed.
Expand Down Expand Up @@ -709,7 +780,7 @@ function M.upgrade(space,opts,depth)
if xq.ready then xq.ready:get() end
log.info("I am worker %s",i)
if box.info.ro then
log.notice("Shutting down on ro instance")
log.info("Shutting down on ro instance")
return
end
while box.space[space.name] and space.xq == xq do
Expand Down Expand Up @@ -747,7 +818,7 @@ function M.upgrade(space,opts,depth)
local chan = xq.runat_chan
log.info("Runat started")
if box.info.ro then
log.notice("Shutting down on ro instance")
log.info("Shutting down on ro instance")
return
end
local maxrun = 1000
Expand All @@ -769,25 +840,35 @@ function M.upgrade(space,opts,depth)
if #collect >= maxrun then remaining = 0 break end
end

local status
for _,t in ipairs(collect) do
-- log.info("Runat: %s, %s", _, t)
if t[xq.fields.status] == 'W' then
log.info("Runat: W->R %s",xq:keyfield(t))
status = t[ xq.fields.status ]

if status == 'W' then
local target_status = 'R'
if xq.features.lockable and t[ xq.fieldmap.lock ] ~= nil then
-- check if we need to set status L or R by looking up locker task
if xq:find_locker_task(t[ xq.fieldmap.lock ]) ~= nil then
target_status = 'L'
end
end
log.info("Runat: W->%s %s",target_status,xq:keyfield(t))
-- TODO: default ttl?
local u = space:update({ xq:keyfield(t) },{
{ '=',xq.fields.status,'R' },
{ '=',xq.fields.status,target_status },
{ '=',xq.fields.runat, xq.NEVER }
})
xq:wakeup(u)
elseif t[xq.fields.status] == 'R' and xq.features.ttl then
elseif (status == 'R' or status == 'L') and xq.features.ttl then
local key = xq:keyfield(t)
log.info("Runat: Kill R by ttl %s (%+0.2fs)", key, fiber.time() - t[ xq.fields.runat ])
t = space:delete{key}
notify_producer(key, t)
elseif t[xq.fields.status] == 'Z' and xq.features.zombie then
elseif status == 'Z' and xq.features.zombie then
log.info("Runat: Kill Zombie %s",xq:keyfield(t))
space:delete{ xq:keyfield(t) }
elseif t[xq.fields.status] == 'T' and xq.features.ttr then
elseif status == 'T' and xq.features.ttr then
local key = xq:keypack(t)
local sid = xq.taken[ key ]
local peer = peers[sid] or sid
Expand All @@ -803,7 +884,7 @@ function M.upgrade(space,opts,depth)
end
xq:wakeup(u)
else
log.error("Runat: unsupported status %s for %s",t[xq.fields.status], tostring(t))
log.error("Runat: unsupported status %s for %s",status, tostring(t))
space:update({ xq:keyfield(t) },{
{ '=',xq.fields.runat, xq.NEVER }
})
Expand Down Expand Up @@ -967,6 +1048,15 @@ function M.upgrade(space,opts,depth)
end
self.bysid[sid] = nil
end

if self.sid_take_fids[sid] then
for fid in pairs(self.sid_take_fids[sid]) do
log.info('Killing take fiber %d in sid %d', fid, sid)
fiber.kill(fid)
end

self.sid_take_fids[sid] = nil
end
end, self._on_dis)

rawset(space,'xq',self)
Expand Down Expand Up @@ -1104,6 +1194,14 @@ function methods:put(t, opts)
t[ xq.fieldmap.status ] = 'R'
end

-- check lock index
if xq.features.lockable and t[ xq.fieldmap.status ] == 'R' and t[ xq.fieldmap.lock ] ~= nil then
-- check if we need to set status L or R by looking up tasks in L, R or T states
if xq:find_locker_task(t[ xq.fieldmap.lock ]) ~= nil then
t[ xq.fieldmap.status ] = 'L'
end
end

local tuple = xq.tuple(t)
local key = tuple[ xq.key.no ]

Expand Down Expand Up @@ -1142,6 +1240,7 @@ local wait_for = {
R = true,
T = true,
W = true,
L = true,
}

function methods:wait(key, timeout)
Expand Down Expand Up @@ -1228,6 +1327,7 @@ function methods:take(timeout, opts)
start_with = {'R'}
end

local wait_chan = (tube_chan or xq.take_wait)
local now = fiber.time()
local key
local found
Expand All @@ -1244,8 +1344,14 @@ function methods:take(timeout, opts)
if not found then
local left = (now + timeout) - fiber.time()
if left <= 0 then goto finish end

(tube_chan or xq.take_wait):get(left)

self.xq:register_take_fid()
local ok, r = pcall(wait_chan.get, wait_chan, left)
self.xq:deregister_take_fid()
if not ok then
log.info('take finished abruptly: %s', r)
goto finish
end
if box.session.storage.destroyed then goto finish end
end
end
Expand Down Expand Up @@ -1415,6 +1521,11 @@ function methods:ack(key, attr)
log.info("Ack: %s->delete {%s} +%s from %s/sid=%s/fid=%s", old,
key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() )
end

if xq.features.lockable and t[ xq.fieldmap.lock ] ~= nil then
-- find any task that is locked
xq:wakeup_locked_task(t)
end
end)

xq:putback(t) -- in real drop form taken key
Expand Down Expand Up @@ -1443,6 +1554,10 @@ function methods:bury(key, attr)
xq.runat_chan:put(true,0)
end
log.info("Bury {%s} by %s, sid=%s, fid=%s", key, box.session.storage.peer, box.session.id(), fiber.id())

if xq.features.lockable and t[ xq.fieldmap.lock ] ~= nil then
xq:wakeup_locked_task(t)
end
end)

xq:putback(t)
Expand Down Expand Up @@ -1497,6 +1612,10 @@ function methods:kill(key)
end
xq.taken[key] = nil
xq._lock[key] = nil

if xq.features.lockable and t[ xq.fieldmap.lock ] ~= nil then
xq:wakeup_locked_task(task)
end
end
end

Expand Down Expand Up @@ -1527,6 +1646,7 @@ local pretty_st = {
B = "Buried",
Z = "Zombie",
D = "Done",
L = "Locked",
}

function methods:stats(pretty)
Expand Down