From e62821b606168874a0de0ee4439ed2c5d65aedf9 Mon Sep 17 00:00:00 2001 From: igorcoding Date: Wed, 16 Dec 2020 20:18:31 +0300 Subject: [PATCH 1/7] cancel take fibers of clients that already been disconnected --- xqueue.lua | 41 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/xqueue.lua b/xqueue.lua index 5911e61..1689d4a 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -243,6 +243,7 @@ function M.upgrade(space,opts,depth) self.taken = space.xq.taken self._stat = space.xq._stat self.bysid = space.xq.bysid + self.sid_take_fids = space.xq.sid_take_fids self._lock = space.xq._lock self.take_wait = space.xq.take_wait self.take_chans = space.xq.take_chans or setmetatable({}, { __mode = 'v' }) @@ -252,6 +253,7 @@ function M.upgrade(space,opts,depth) else self.taken = {} self.bysid = {} + self.sid_take_fids = {} -- byfid = {}; self._lock = {} self.put_wait = setmetatable({}, { __mode = 'v' }) @@ -649,6 +651,25 @@ function M.upgrade(space,opts,depth) return t[pkf.no] end + function self:register_take_fid() + local sid = box.session.id() + local fid = fiber.id() + if self.sid_take_fids[sid] == nil then + self.sid_take_fids[sid] = {} + end + self.sid_take_fids[sid][fid] = true + end + + function self:deregister_take_fid() + local sid = box.session.id() + local fid = fiber.id() + if self.sid_take_fids[sid] == nil then + return + end + + self.sid_take_fids[sid][fid] = nil + end + -- Notify producer if it is still waiting us. -- Producer waits only for successfully processed task -- or for task which would never be processed. @@ -967,6 +988,15 @@ function M.upgrade(space,opts,depth) end self.bysid[sid] = nil end + + if self.sid_take_fids[sid] then + for fid in pairs(self.sid_take_fids[sid]) do + log.info('Killing take fiber %d in sid %d', fid, sid) + fiber.kill(fid) + end + + self.sid_take_fids[sid] = nil + end end, self._on_dis) rawset(space,'xq',self) @@ -1228,6 +1258,7 @@ function methods:take(timeout, opts) start_with = {'R'} end + local wait_chan = (tube_chan or xq.take_wait) local now = fiber.time() local key local found @@ -1244,8 +1275,14 @@ function methods:take(timeout, opts) if not found then local left = (now + timeout) - fiber.time() if left <= 0 then goto finish end - - (tube_chan or xq.take_wait):get(left) + + self.xq:register_take_fid() + local ok, r = pcall(wait_chan.get, wait_chan, left) + self.xq:deregister_take_fid() + if not ok then + log.info('take finished abruptly: %s', r) + goto finish + end if box.session.storage.destroyed then goto finish end end end From 8b1d80c00e1eed8147907ddc53607cd071aa1ed0 Mon Sep 17 00:00:00 2001 From: igorcoding Date: Wed, 16 Dec 2020 20:42:38 +0300 Subject: [PATCH 2/7] rockspec --- rockspecs/xqueue-scm-6.rockspec | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 rockspecs/xqueue-scm-6.rockspec diff --git a/rockspecs/xqueue-scm-6.rockspec b/rockspecs/xqueue-scm-6.rockspec new file mode 100644 index 0000000..e1dda83 --- /dev/null +++ b/rockspecs/xqueue-scm-6.rockspec @@ -0,0 +1,20 @@ +package = "xqueue" +version = "scm-6" +source = { + url = "git://github.com/ktsstudio/xqueue.git", + branch = "v5-fixes", +} +description = { + summary = "Package for loading external lua config", + homepage = "https://github.com/moonlibs/xqueue.git", + license = "BSD" +} +dependencies = { + "lua ~> 5.1" +} +build = { + type = "builtin", + modules = { + xqueue = "xqueue.lua" + } +} From 7306b4fb07bcdf905d84ea2ea76414d6f514744a Mon Sep 17 00:00:00 2001 From: igorcoding Date: Sat, 29 May 2021 19:52:42 +0300 Subject: [PATCH 3/7] locking mechanism This commit adds new feature to xqueue - `lockable`. User may specify a `lock` field in fields map when doing xqueue.upgrade(). If specified xqueue checks that there is an additional index with fields (lock, status, ...). This index is used by put(), ack(), bury() and kill() methods in order to determine if current task should be locked or unlocked. This feature allows to process tasks only in a strict order or in other words, doesn't allow workers to take a task when another task by lock field is already being processed. Notable additions: * put(): When putting a task additional 1-3 queries are done to find if there is any other task in the queue with statuses L, T or R. If there is at least one task, then current task's status will be set to L rather than R. * ack(), kill(), bury(): In these methods after successful status change of task xqueue will try to find a task with L status within `lock` index and wake it up, transferring to an R state. This `lockable` feature currently cannot be used alongside with ttl or delayed features. --- example-locks/init.lua | 53 ++++++++++++++++++++++++++ xqueue.lua | 86 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 137 insertions(+), 2 deletions(-) create mode 100644 example-locks/init.lua diff --git a/example-locks/init.lua b/example-locks/init.lua new file mode 100644 index 0000000..d75efe2 --- /dev/null +++ b/example-locks/init.lua @@ -0,0 +1,53 @@ +box.cfg{ + -- listen = '127.0.0.1:4221' +} +-- box.once('access:v1', function() +-- box.schema.user.grant('guest', 'read,write,execute', 'universe') +-- end) + +require 'strict'.on() + +-- box.once('schema',function() + local format = { + {name = "id", type = "string"}, + {name = "status", type = "string"}, + {name = "lock", type = "number", nullable=true}, + + {name = "action", type = "string"}, + {name = "attr", type = "*"}, + } + box.schema.space.create('tasks', { if_not_exists = true, format = format }) + + box.space.tasks:create_index('primary', { unique = true, parts = {'id'}, if_not_exists = true}) + box.space.tasks:create_index('xq', { unique = false, parts = { 'status', 'id' }, if_not_exists = true}) + box.space.tasks:create_index('lock', { unique = false, parts = { 'lock', 'status', 'id' }, if_not_exists = true}) +-- end) + +if not package.path:match('%.%./%?%.lua;') then + package.path = '../?.lua;' .. package.path +end + +require 'xqueue' ( box.space.tasks, { + debug = true; + fields = { + status = 'status'; + lock = 'lock'; + }; + features = { + id = 'uuid', + }; + workers = 1; + worker = function(task) + print(require'fiber'.self().id(), "got task",task.id, require'yaml'.encode(box.space.tasks:select())) + end; +} ) + + +print(require'yaml'.encode(box.space.tasks:select())) +for i = 1, 10 do + local t = box.space.tasks:put{ lock=1, action = "doit", attr = {} } + print('inserted', t.id, t.status) +end + +require'console'.start() +os.exit() diff --git a/xqueue.lua b/xqueue.lua index 5911e61..f559b84 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -84,6 +84,9 @@ Status: D - done - task was processed and ack'ed and permanently left in database enabled when keep feature is set + + L - locked - task is locked via `lock' index. Tasks will be processed only + if there are no older tasks by `lock` index X - reserved for statistics @@ -102,6 +105,7 @@ Interface: status = 'status_field_name' | status_field_no, runat = 'runat_field_name' | runat_field_no, priority = 'priority_field_name' | priority_field_no, + lock = 'lock_field_name' | lock_field_no, }, features = { id = 'auto_increment' | 'uuid' | 'required' | function @@ -300,7 +304,7 @@ function M.upgrade(space,opts,depth) -- 1. fields check local fields = {} local fieldmap = {} - for _,f in pairs({{"status","str"},{"runat","num"},{"priority","num"},{"tube","str"}}) do + for _,f in pairs({{"status","str"},{"runat","num"},{"priority","num"},{"tube","str"},{"lock","*"}}) do local fname,ftype = f[1], f[2] local num = opts.fields[fname] if num then @@ -320,7 +324,7 @@ function M.upgrade(space,opts,depth) error(string.format("wrong type %s for field %s, number or string required",type(num),fname),2 + depth) end -- check type - if format[num] then + if format[num] and ftype ~= "*" then if not typeeq(format[num].type, ftype) then error(string.format("type mismatch for field %s, required %s, got %s",fname, ftype, format[fname].type),2+depth) end @@ -457,6 +461,23 @@ function M.upgrade(space,opts,depth) end end end + + if fields.lock then + local lock_index + for _,index in pairs(space.index) do + if type(_) == 'number' and #index.parts >= 2 then + if index.parts[1].fieldno == fields.lock and index.parts[2].fieldno == fields.status then + -- print("found lock index", index.name) + lock_index = index + break + end + end + end + if not lock_index then + error(string.format("fields.lock requires tree index with at least fields (`lock', `status', ...)"), 2+depth) + end + self.lock_index = lock_index + end end local runat_index @@ -626,6 +647,16 @@ function M.upgrade(space,opts,depth) features.tube = true end + if fields.lock then + features.lockable = true + end + + if features.lockable then + if features.ttl or features.delayed or features.ttl_default then + error(string.format("feature lockable cannot be combined with delayed or ttl"), 2+depth) + end + end + self.gen_id = gen_id self.features = features self.space = space.id @@ -649,6 +680,19 @@ function M.upgrade(space,opts,depth) return t[pkf.no] end + function self:wakeup_locked_task(t) + local locker_t = self.lock_index:select({ t[ self.fieldmap.lock ], 'L' }, {limit=1})[1] + if locker_t ~= nil then + locker_t = box.space[self.space]:update({ locker_t[ self.key.no ] }, { + { '=', self.fields.status, 'R' } + }) + log.info("Unlock: L->R {%s} (by %s) from %s/sid=%s/fid=%s", + self:getkey(locker_t), self:getkey(t), box.session.storage.peer, box.session.id(), fiber.id() ) + self:wakeup(locker_t) + end + return locker_t + end + -- Notify producer if it is still waiting us. -- Producer waits only for successfully processed task -- or for task which would never be processed. @@ -1082,6 +1126,9 @@ function methods:put(t, opts) if opts.ttl then error("Features ttl and delay are mutually exclusive",2) end + if xq.features.lockable then + error("Delay is not supported for lockable queues") + end t[ xq.fieldmap.status ] = 'W' t[ xq.fieldmap.runat ] = xq.timeoffset(opts.delay) @@ -1092,9 +1139,15 @@ function methods:put(t, opts) if not xq.features.ttl then error("Feature ttl is not enabled",2) end + if xq.features.lockable then + error("TTL is not supported for lockable queues") + end t[ xq.fieldmap.status ] = 'R' t[ xq.fieldmap.runat ] = xq.timeoffset(opts.ttl) elseif xq.features.ttl_default then + if xq.features.lockable then + error("Delay is not supported for lockable queues") + end t[ xq.fieldmap.status ] = 'R' t[ xq.fieldmap.runat ] = xq.timeoffset(xq.features.ttl_default) elseif xq.have_runat then @@ -1104,6 +1157,20 @@ function methods:put(t, opts) t[ xq.fieldmap.status ] = 'R' end + -- check lock index + if xq.features.lockable and t[ xq.fieldmap.status ] == 'R' then + -- check if we need to set status L or R by looking up tasks in L, R or T states + local locker_t + for _, status in ipairs({'L', 'T', 'R'}) do + locker_t = xq.lock_index:select({ t[ xq.fieldmap.lock ], status }, {limit=1})[1] + if locker_t ~= nil then + -- there is a task that should be processed first + t[ xq.fieldmap.status ] = 'L' + break + end + end + end + local tuple = xq.tuple(t) local key = tuple[ xq.key.no ] @@ -1142,6 +1209,7 @@ local wait_for = { R = true, T = true, W = true, + L = true, } function methods:wait(key, timeout) @@ -1415,6 +1483,11 @@ function methods:ack(key, attr) log.info("Ack: %s->delete {%s} +%s from %s/sid=%s/fid=%s", old, key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() ) end + + if xq.features.lockable then + -- find any task that is locked + xq:wakeup_locked_task(t) + end end) xq:putback(t) -- in real drop form taken key @@ -1443,6 +1516,10 @@ function methods:bury(key, attr) xq.runat_chan:put(true,0) end log.info("Bury {%s} by %s, sid=%s, fid=%s", key, box.session.storage.peer, box.session.id(), fiber.id()) + + if features.lockable then + xq:wakeup_locked_task(t) + end end) xq:putback(t) @@ -1497,6 +1574,10 @@ function methods:kill(key) end xq.taken[key] = nil xq._lock[key] = nil + + if xq.feature.lockable then + xq:wakeup_locked_task(task) + end end end @@ -1527,6 +1608,7 @@ local pretty_st = { B = "Buried", Z = "Zombie", D = "Done", + L = "Locked", } function methods:stats(pretty) From b155a1344e0ac5c4d2f18c67c1a6b21db4a2fdfc Mon Sep 17 00:00:00 2001 From: igorcoding Date: Sat, 29 May 2021 20:51:55 +0300 Subject: [PATCH 4/7] check if lock field is not nil --- xqueue.lua | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/xqueue.lua b/xqueue.lua index f559b84..927bfd7 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -1158,7 +1158,7 @@ function methods:put(t, opts) end -- check lock index - if xq.features.lockable and t[ xq.fieldmap.status ] == 'R' then + if xq.features.lockable and t[ xq.fieldmap.status ] == 'R' and t[ xq.fieldmap.lock ] ~= nil then -- check if we need to set status L or R by looking up tasks in L, R or T states local locker_t for _, status in ipairs({'L', 'T', 'R'}) do @@ -1484,7 +1484,7 @@ function methods:ack(key, attr) key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() ) end - if xq.features.lockable then + if xq.features.lockable and t[ xq.fieldmap.lock ] ~= nil then -- find any task that is locked xq:wakeup_locked_task(t) end @@ -1517,7 +1517,7 @@ function methods:bury(key, attr) end log.info("Bury {%s} by %s, sid=%s, fid=%s", key, box.session.storage.peer, box.session.id(), fiber.id()) - if features.lockable then + if xq.features.lockable and t[ xq.fieldmap.lock ] ~= nil then xq:wakeup_locked_task(t) end end) @@ -1575,7 +1575,7 @@ function methods:kill(key) xq.taken[key] = nil xq._lock[key] = nil - if xq.feature.lockable then + if xq.features.lockable and t[ xq.fieldmap.lock ] ~= nil then xq:wakeup_locked_task(task) end end From 34c960084642b98188f24b10dd0858a777469580 Mon Sep 17 00:00:00 2001 From: igorcoding Date: Tue, 8 Jun 2021 00:04:37 +0300 Subject: [PATCH 5/7] allow locks with delay --- example-locks/init.lua | 16 +++++++---- xqueue.lua | 61 +++++++++++++++++++++--------------------- 2 files changed, 42 insertions(+), 35 deletions(-) diff --git a/example-locks/init.lua b/example-locks/init.lua index d75efe2..b378126 100644 --- a/example-locks/init.lua +++ b/example-locks/init.lua @@ -15,12 +15,14 @@ require 'strict'.on() {name = "action", type = "string"}, {name = "attr", type = "*"}, + {name = "runat", type = "number"}, } box.schema.space.create('tasks', { if_not_exists = true, format = format }) box.space.tasks:create_index('primary', { unique = true, parts = {'id'}, if_not_exists = true}) box.space.tasks:create_index('xq', { unique = false, parts = { 'status', 'id' }, if_not_exists = true}) box.space.tasks:create_index('lock', { unique = false, parts = { 'lock', 'status', 'id' }, if_not_exists = true}) + box.space.tasks:create_index('runat', { unique = false, parts = { 'runat', 'id' }, if_not_exists = true}) -- end) if not package.path:match('%.%./%?%.lua;') then @@ -32,22 +34,26 @@ require 'xqueue' ( box.space.tasks, { fields = { status = 'status'; lock = 'lock'; + runat = 'runat'; }; features = { id = 'uuid', + delayed = true, + ttl = true, }; - workers = 1; + workers = 0; worker = function(task) print(require'fiber'.self().id(), "got task",task.id, require'yaml'.encode(box.space.tasks:select())) + require'fiber'.sleep(10) end; } ) print(require'yaml'.encode(box.space.tasks:select())) -for i = 1, 10 do - local t = box.space.tasks:put{ lock=1, action = "doit", attr = {} } - print('inserted', t.id, t.status) -end +-- for i = 1, 5 do +-- local t = box.space.tasks:put{ lock=1, action = "doit", attr = {} } +-- print('inserted', t.id, t.status) +-- end require'console'.start() os.exit() diff --git a/xqueue.lua b/xqueue.lua index 927bfd7..042ebb1 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -651,12 +651,6 @@ function M.upgrade(space,opts,depth) features.lockable = true end - if features.lockable then - if features.ttl or features.delayed or features.ttl_default then - error(string.format("feature lockable cannot be combined with delayed or ttl"), 2+depth) - end - end - self.gen_id = gen_id self.features = features self.space = space.id @@ -693,6 +687,18 @@ function M.upgrade(space,opts,depth) return locker_t end + function self:find_locker_task(lock_value) + local locker_t + for _, status in ipairs({'L', 'T', 'R'}) do + locker_t = self.lock_index:select({ lock_value, status }, {limit=1})[1] + if locker_t ~= nil then + -- there is a task that should be processed first + return locker_t + end + end + return nil + end + -- Notify producer if it is still waiting us. -- Producer waits only for successfully processed task -- or for task which would never be processed. @@ -813,25 +819,35 @@ function M.upgrade(space,opts,depth) if #collect >= maxrun then remaining = 0 break end end + local status for _,t in ipairs(collect) do -- log.info("Runat: %s, %s", _, t) - if t[xq.fields.status] == 'W' then - log.info("Runat: W->R %s",xq:keyfield(t)) + status = t[ xq.fields.status ] + + if status == 'W' then + local target_status = 'R' + if xq.features.lockable and t[ xq.fieldmap.lock ] ~= nil then + -- check if we need to set status L or R by looking up locker task + if xq:find_locker_task(t[ xq.fieldmap.lock ]) ~= nil then + target_status = 'L' + end + end + log.info("Runat: W->%s %s",target_status,xq:keyfield(t)) -- TODO: default ttl? local u = space:update({ xq:keyfield(t) },{ - { '=',xq.fields.status,'R' }, + { '=',xq.fields.status,target_status }, { '=',xq.fields.runat, xq.NEVER } }) xq:wakeup(u) - elseif t[xq.fields.status] == 'R' and xq.features.ttl then + elseif (status == 'R' or status == 'L') and xq.features.ttl then local key = xq:keyfield(t) log.info("Runat: Kill R by ttl %s (%+0.2fs)", key, fiber.time() - t[ xq.fields.runat ]) t = space:delete{key} notify_producer(key, t) - elseif t[xq.fields.status] == 'Z' and xq.features.zombie then + elseif status == 'Z' and xq.features.zombie then log.info("Runat: Kill Zombie %s",xq:keyfield(t)) space:delete{ xq:keyfield(t) } - elseif t[xq.fields.status] == 'T' and xq.features.ttr then + elseif status == 'T' and xq.features.ttr then local key = xq:keypack(t) local sid = xq.taken[ key ] local peer = peers[sid] or sid @@ -847,7 +863,7 @@ function M.upgrade(space,opts,depth) end xq:wakeup(u) else - log.error("Runat: unsupported status %s for %s",t[xq.fields.status], tostring(t)) + log.error("Runat: unsupported status %s for %s",status, tostring(t)) space:update({ xq:keyfield(t) },{ { '=',xq.fields.runat, xq.NEVER } }) @@ -1126,9 +1142,6 @@ function methods:put(t, opts) if opts.ttl then error("Features ttl and delay are mutually exclusive",2) end - if xq.features.lockable then - error("Delay is not supported for lockable queues") - end t[ xq.fieldmap.status ] = 'W' t[ xq.fieldmap.runat ] = xq.timeoffset(opts.delay) @@ -1139,15 +1152,9 @@ function methods:put(t, opts) if not xq.features.ttl then error("Feature ttl is not enabled",2) end - if xq.features.lockable then - error("TTL is not supported for lockable queues") - end t[ xq.fieldmap.status ] = 'R' t[ xq.fieldmap.runat ] = xq.timeoffset(opts.ttl) elseif xq.features.ttl_default then - if xq.features.lockable then - error("Delay is not supported for lockable queues") - end t[ xq.fieldmap.status ] = 'R' t[ xq.fieldmap.runat ] = xq.timeoffset(xq.features.ttl_default) elseif xq.have_runat then @@ -1160,14 +1167,8 @@ function methods:put(t, opts) -- check lock index if xq.features.lockable and t[ xq.fieldmap.status ] == 'R' and t[ xq.fieldmap.lock ] ~= nil then -- check if we need to set status L or R by looking up tasks in L, R or T states - local locker_t - for _, status in ipairs({'L', 'T', 'R'}) do - locker_t = xq.lock_index:select({ t[ xq.fieldmap.lock ], status }, {limit=1})[1] - if locker_t ~= nil then - -- there is a task that should be processed first - t[ xq.fieldmap.status ] = 'L' - break - end + if xq:find_locker_task(t[ xq.fieldmap.lock ]) ~= nil then + t[ xq.fieldmap.status ] = 'L' end end From a45f62af7f628a4c2b1d07b10dd637941d0eba21 Mon Sep 17 00:00:00 2001 From: igorcoding Date: Fri, 16 Jul 2021 15:21:05 +0300 Subject: [PATCH 6/7] use log.info instead of log.notice --- xqueue.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xqueue.lua b/xqueue.lua index 042ebb1..e9c26f5 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -759,7 +759,7 @@ function M.upgrade(space,opts,depth) if xq.ready then xq.ready:get() end log.info("I am worker %s",i) if box.info.ro then - log.notice("Shutting down on ro instance") + log.info("Shutting down on ro instance") return end while box.space[space.name] and space.xq == xq do @@ -797,7 +797,7 @@ function M.upgrade(space,opts,depth) local chan = xq.runat_chan log.info("Runat started") if box.info.ro then - log.notice("Shutting down on ro instance") + log.info("Shutting down on ro instance") return end local maxrun = 1000 From d926499a768084a6767e8f1ac749f135b528b860 Mon Sep 17 00:00:00 2001 From: igorcoding Date: Mon, 26 Jul 2021 16:26:43 +0300 Subject: [PATCH 7/7] move examples to one directory --- .DS_Store | Bin 6148 -> 0 bytes .gitignore | 1 + .../example-locks}/init.lua | 0 .../example-putwait}/init.lua | 0 .../example-runat}/init.lua | 0 {example-tube => examples/example-tube}/init.lua | 0 .../example-worker}/init.lua | 0 {example1 => examples/example1}/init.lua | 0 8 files changed, 1 insertion(+) delete mode 100644 .DS_Store rename {example-locks => examples/example-locks}/init.lua (100%) rename {example-putwait => examples/example-putwait}/init.lua (100%) rename {example-runat => examples/example-runat}/init.lua (100%) rename {example-tube => examples/example-tube}/init.lua (100%) rename {example-worker => examples/example-worker}/init.lua (100%) rename {example1 => examples/example1}/init.lua (100%) diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index 5008ddfcf53c02e82d7eee2e57c38e5672ef89f6..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0