From 47731e2825f86c218b513a89ac1f29c52497ad2d Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Tue, 21 Mar 2023 23:46:10 +0300 Subject: [PATCH 1/9] Implements per tube statistics --- README.md | 2 + example-tube/init.lua | 3 +- xqueue.lua | 110 +++++++++++++++++++++++++++++++++++++----- 3 files changed, 102 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index bbdbb5e..0979de4 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,8 @@ M.upgrade(space, { -- Time To Release. Task is returned into [R]eady unless processed (turned to ack|release from taken) within time -- if number, then with default ttl, otherwise only if set during take }, + -- Set tubes for which statistics collector will be enabled + tube_stats = { 'tube-1', 'tube-2' }, }) ``` diff --git a/example-tube/init.lua b/example-tube/init.lua index 8f1fd01..9c59986 100644 --- a/example-tube/init.lua +++ b/example-tube/init.lua @@ -44,8 +44,8 @@ do end require 'xqueue'.upgrade(box.space.utube, { + format = box.space.utube:format(), debug = true, - format = format, fields = { status = 'status', runat = 'runat', @@ -59,6 +59,7 @@ require 'xqueue'.upgrade(box.space.utube, { retval = 'tuple', }, + tube_stats = {'tube', 'dead'}, }) local fiber = require 'fiber' diff --git a/xqueue.lua b/xqueue.lua index d5679af..61905c9 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -428,16 +428,6 @@ function M.upgrade(space,opts,depth) self.fields = fields self.fieldmap = fieldmap - if not self._stat then - self._stat = { - counts = {}; - transition = {}; - } - for _, t in space:pairs(nil, { iterator = box.index.ALL }) do - local s = t[self.fields.status] - self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1 - end - end function self:getkey(arg) local _type = type(arg) @@ -547,7 +537,43 @@ function M.upgrade(space,opts,depth) end self.have_runat = have_runat + ---@type table + local stat_tube = {} + if self.fields.tube and type(opts.tube_stats) == 'table' then + for _, tube_name in ipairs(opts.tube_stats) do + if type(tube_name) == 'string' then + stat_tube[tube_name] = { + counts = {}, + transition = {}, + } + end + end + end + if not self._stat then + self._stat = { + counts = {}; + transition = {}; + tube = stat_tube; + } + -- TODO: benchmark index:count() + if self.fields.tube then + for _, t in space:pairs(nil, { iterator = box.index.ALL }) do + local s = t[self.fields.status] + self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1 + + local tube = t[self.fields.tube] + if stat_tube[tube] then + stat_tube[tube].counts[s] = (stat_tube[tube].counts[s] or 0LL) + 1 + end + end + else + for _, t in space:pairs(nil, { iterator = box.index.ALL }) do + local s = t[self.fields.status] + self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1 + end + end + end -- 3. features check @@ -971,9 +997,13 @@ function M.upgrade(space,opts,depth) -- because raising error earlier leads to trigger inconsistency self._on_repl = space:on_replace(function(old, new) local old_st, new_st + local old_tube, new_tube + local old_tube_stat, new_tube_stat local counts = self._stat.counts + local tube_ss = self._stat.tube + if old then - old_st = old[self.fields.status] + old_st = old[self.fields.status] --[[@as string]] if counts[old_st] and counts[old_st] > 0 then counts[old_st] = counts[old_st] - 1 else @@ -982,22 +1012,71 @@ function M.upgrade(space,opts,depth) old_st, tostring(counts[old_st]) ) end + if self.fields.tube then + old_tube = old[self.fields.tube] + old_tube_stat = tube_ss[old_tube] + if type(old_tube_stat) == 'table' and type(old_tube_stat.counts) == 'table' then + if (tonumber(old_tube_stat.counts[old_st]) or 0) > 0 then + old_tube_stat.counts[old_st] = old_tube_stat.counts[old_st] - 1 + else + log.error( + "Have not valid statistic by tubs/status: tube %s with value: %s", + old_tube, old_st, tostring(tube_ss[old_tube].counts[old_st]) + ) + end + else + old_tube_stat = nil + end + end else old_st = 'X' end if new then - new_st = new[self.fields.status] + new_st = new[self.fields.status] --[[@as string]] counts[new_st] = (counts[new_st] or 0LL) + 1 if counts[new_st] < 0 then log.error("Statistic overflow by task type: %s", new_st) end + if self.fields.tube then + new_tube = new[self.fields.tube] + new_tube_stat = tube_ss[new_tube] + if type(new_tube_stat) == 'table' and type(new_tube_stat.counts) == 'table' then + if (tonumber(new_tube_stat.counts[new_st]) or 0) >= 0 then + new_tube_stat.counts[new_st] = (new_tube_stat.counts[new_st] or 0ULL) + 1 + else + log.error( + "Have not valid statistic tube/status: tube %q with value: %s", + new_tube, new_st, tostring(new_tube_stat.counts[new_st]) + ) + end + else + new_tube_stat = nil + end + end else new_st = 'X' end local field = old_st.."-"..new_st self._stat.transition[field] = (self._stat.transition[field] or 0ULL) + 1 + if old_tube_stat then + if not new_tube_stat or old_tube_stat == new_tube_stat then + -- no new tube or new and old tubes are the same + old_tube_stat.transition[field] = (old_tube_stat.transition[field] or 0ULL) + 1 + else + -- nil != old_tube != new_tube != nil + -- cross tube transition ? + -- Can this be backoff with tube change? + local old_field = old_st.."-S" + local new_field = "S-"..new_st + old_tube_stat.transition[old_field] = (old_tube_stat.transition[old_field] or 0ULL) + 1 + new_tube_stat.transition[new_field] = (new_tube_stat.transition[new_field] or 0ULL) + 1 + end + elseif new_tube_stat then + -- old_tube_stat == nil + new_tube_stat.transition[field] = (new_tube_stat.transition[field] or 0ULL) + 1 + end end, self._on_repl) self._on_dis = box.session.on_disconnect(function() @@ -1600,8 +1679,15 @@ function methods:stats(pretty) if pretty then stats.counts[ps] = stats.counts[s] or 0LL stats.counts[s] = nil + for _, tube_stat in pairs(stats.tube) do + tube_stat.counts[ps] = tube_stat.counts[s] or 0ULL + tube_stat.counts[s] = nil + end else stats.counts[s] = stats.counts[s] or 0LL + for _, tube_stat in pairs(stats.tube) do + tube_stat.counts[s] = tube_stat.counts[s] or 0ULL + end end end return stats From 8ebb820ea0143b9296138d5f1448defc69b72f48 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Mon, 1 May 2023 19:28:26 +0300 Subject: [PATCH 2/9] test: adds tests via luatest --- .luacheckrc | 16 ++-- .vscode/settings.json | 3 +- test/basic_test.lua | 216 ++++++++++++++++++++++++++++++++++++++++++ xqueue.lua | 28 +++--- 4 files changed, 239 insertions(+), 24 deletions(-) create mode 100644 test/basic_test.lua diff --git a/.luacheckrc b/.luacheckrc index f5dfb8a..67ddb44 100644 --- a/.luacheckrc +++ b/.luacheckrc @@ -10,12 +10,14 @@ globals = { "package.reload", } +include_files = { + "xqueue.lua" +} + +max_line_length = 140 + ignore = { - "211", - "212", - "431", - "432", - "542", - "611", --- "631", + "431", -- shadowing upvalue self + "432", -- shadowing upvalue argument + "542", -- empty if branch } diff --git a/.vscode/settings.json b/.vscode/settings.json index 4afda37..ed8d5c0 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,4 @@ { - "Lua.runtime.version": "LuaJIT" + "Lua.runtime.version": "LuaJIT", + "Lua.workspace.checkThirdParty": false } \ No newline at end of file diff --git a/test/basic_test.lua b/test/basic_test.lua new file mode 100644 index 0000000..0eb6adb --- /dev/null +++ b/test/basic_test.lua @@ -0,0 +1,216 @@ +local fio = require 'fio' +local fiber = require 'fiber' +local xqueue = require 'xqueue' + +local t = require 'luatest' --[[@as luatest]] +local g = t.group('basic') + +t.before_suite(function() + local tmpdir = assert(fio.tempdir()) + box.cfg{ memtx_dir = tmpdir, wal_dir = tmpdir, vinyl_dir = tmpdir } +end) + +g.before_each(function() + if box.space.queue then + for i = #box.space.queue.index, 0, -1 do + local ind = box.space.queue.index[i] + ind:drop() + end + box.space.queue:drop() + end +end) + +function g.test_basic_queue() + box.schema.space.create('queue', { if_not_exists = true }) + box.space.queue:format({ + { name = 'id', type = 'string' }, + { name = 'status', type = 'string' }, + { name = 'payload', type = 'any' }, + }) + + box.space.queue:create_index('primary', { parts = {'id'} }) + box.space.queue:create_index('status', { parts = {'status', 'id'} }) + + xqueue.upgrade(box.space.queue, { + debug = true, + fields = { + status = 'status', + }, + features = { + id = 'uuid', + }, + }) + + local task = box.space.queue:put({ payload = { task = 1 } }) + t.assert_equals(task.status, 'R', "newly inserted task must be Ready") + t.assert_items_include(task.payload, { task = 1 }, "payload of the task remains the same") + + local ft = fiber.time() + local taken = box.space.queue:take() + local elapsed = fiber.time()-ft + + t.assert(taken, "queue:take() must return task") + t.assert_le(elapsed, 0.1, "queue:take() must return task after single yield") + t.assert_equals(taken.status, 'T', "taken task must be in status T") + + local tuple = box.space.queue:get(taken.id) + t.assert_equals(tuple.status, 'T', "status of taken task in space must be T") + + ft = fiber.time() + local not_taken = box.space.queue:take(0.001) + elapsed = fiber.time()-ft + t.assert_not(not_taken, "second queue:take() must return nothing") + t.assert_ge(elapsed, 0.001, "second queue:take() must be returned ≥ 0.001 seconds") + t.assert_le(elapsed, 0.1, "second queue:take() must be returned in ≤ 0.1 seconds") + + box.space.queue:release(taken) + tuple = box.space.queue:get(taken.id) + + t.assert_equals(tuple.status, 'R', "queue:release() must return task in R status") + taken = box.space.queue:take(0.001) + t.assert_equals(taken.status, 'T', "queue:take() after queue:release() must return task back") + + taken = box.space.queue:ack(taken) + + local nothing = box.space.queue:get(taken.id) + t.assert_not(nothing, "queue:ack() for taken task must remove task from space") + + local taken_tasks = {} + local fin = fiber.channel() + + local function taker(id) + fiber.name('taker/'..id) + local taken = box.space.queue:take({ timeout = 1 }) + if not taken then fin:put({ id = id }) return end + + local taken_id = taken.id + t.assert_not(taken_tasks[taken_id], "task must be taken only once") + taken_tasks[taken_id] = true + + box.space.queue:bury({ id = taken_id }) + t.assert_is(box.space.queue:get({taken_id}).status, 'B', "queue:bury() must move tuple to B") + fin:put({id = id, task = taken_id}) + end + + fiber.create(taker, 1) + fiber.create(taker, 2) + + local published = box.space.queue:put({}) + + for _ = 1, 2 do + local v = fin:get() + if v.id == 1 then + t.assert_equals(v.task, published.id) + else + t.assert_not(v.task) + end + end + fin:close() + + t.assert_equals(box.space.queue:get(published.id).status, 'B', "task left in B status") + + nothing = box.space.queue:take({ timeout = 0.001 }) + t.assert_not(nothing, "task in B status is not takeable") + + box.space.queue:kick(published) + t.assert_equals(box.space.queue:get(published.id).status, 'R', "queue:kick() kicks B task to R") + + taken = box.space.queue:take(0.001) + t.assert_equals(taken.id, published.id, "published task can be retaken from R (after kick)") + + box.space.queue:ack({ id = taken.id }) + t.assert_not(box.space.queue:get(published.id), "queue:ack({id=id}) removes task from space") +end + +function g.test_delayed_queue() + local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueueSpace]] + queue:format({ + { name = 'id', type = 'unsigned' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'payload', type = 'any' }, + }) + + queue:create_index('primary', { parts = {'id'} }) + queue:create_index('status', { parts = {'status', 'id'} }) + queue:create_index('runat', { parts = {'runat', 'id'} }) + + xqueue.upgrade(queue, { + debug = true, + fields = { + runat = 'runat', + status = 'status', + }, + features = { + id = 'time64', + delayed = true, + zombie = true, + }, + }) + + local task_put_delay_500ms = queue:put({ payload = { id = 2 } }, { + delay = 0.1, + }) + t.assert_equals(task_put_delay_500ms.status, 'W', 'queue:put(...,{delay=<>}) must insert task in W status') + t.assert_equals(queue:get({task_put_delay_500ms.id}).status, 'W', 'double check task status in space (must be W)') + + local task_put = queue:put({ payload = { id = 1 } }) + t.assert_equals(task_put.status, 'R', "queue:put() without delay in delayed queue must set task to R") + t.assert_equals(queue:get({task_put.id}).status, 'R', 'double check task status in space (must be R)') + + t.assert_lt(task_put_delay_500ms.id, task_put.id, "first task must have smaller id than second (fifo constraint)") + + local taken = queue:take({ timeout = 0.05 }) + t.assert_equals(taken.id, task_put.id, 'queue:take() must return ready task') + + queue:release(taken, { delay = 0.12 }) + t.assert_equals(queue:get({taken.id}).status, 'W', 'queue:release(..., {delay=<>}) must put task in W') + t.assert_le(queue:get({task_put_delay_500ms.id}).runat, queue:get({taken.id}).runat, "first task must wakeup earlier") + + local taken_delayed = queue:take({ timeout = 0.11 }) + t.assert(taken_delayed, 'delayed task must be taken after timeout') + + local taken_put = queue:take({ timeout = 0.1 }) + t.assert(taken_put, 'released task must be taken after timeout') + + t.assert_equals(taken_delayed.id, task_put_delay_500ms.id, "firstly delayed task must be taken") + t.assert_equals(taken_put.id, task_put.id, "secondly released task must be taken") + + queue:ack(taken_delayed) + queue:ack(taken_put) + + t.assert_is(queue:get(task_put.id).status, 'Z', "acknowledged task in zombied queue must left in Z status") + t.assert_is(queue:get(taken_delayed.id).status, 'Z', "acknowledged task in zombied queue must left in Z status") + + -- put /take / release / take / ack+delay + local put = queue:put({payload = { task = 2 }}) + local take = queue:take(0.001) + + t.assert_equals(put.id, take.id, "put/take returns same task on empty space") + + queue:release(take, { + update = { + { '=', 'payload', { new = 2 } } + } + }) + + t.assert_items_equals(queue:get(take.id).payload, { new = 2 }, ":release()/update must change tuple in space") + t.assert_is(queue:get(take.id).status, 'R', ':release()/update without delay must return task into R') + + take = queue:take(0.001) + + queue:ack(take, { + update = { + { '=', 'payload', { finished = 2 } }, + }, + delay = 0.5, + }) + + t.assert_is(queue:get(take.id).status, 'Z', ':ack()+delay return task into Z status') + t.assert_items_equals(queue:get(take.id).payload, { finished = 2 }, ':ack()/update must replace tuple content') + + local nothing = queue:take({ timeout = 0.75 }) + t.assert_not(nothing, "Z tasks must are not takable") + + t.assert_not(queue:get(take.id), "Z task must be collected after delay time out exhausted") +end diff --git a/xqueue.lua b/xqueue.lua index 61905c9..b7b2ac8 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -158,10 +158,6 @@ sp:kick(N | id, [attr]) -- put buried task id or N oldest buried tasks to [R]ead local json = require 'json' json.cfg{ encode_invalid_as_nil = true } -local yaml = require 'yaml' -local function dd(x) - print(yaml.encode(x)) -end local function typeeq(src, ref) @@ -447,7 +443,7 @@ function M.upgrade(space,opts,depth) end end - function self:packkey(key) + function self.packkey(_, key) if type(key) == 'cdata' then return tostring(ffi.cast("uint64_t", key)) else @@ -731,10 +727,10 @@ function M.upgrade(space,opts,depth) -- self.NEVER = -1ULL self.NEVER = 0 - function self:keyfield(t) + function self.keyfield(_,t) return t[pkf.no] end - function self:keypack(t) + function self.keypack(_,t) return t[pkf.no] end @@ -789,7 +785,7 @@ function M.upgrade(space,opts,depth) if opts.worker then local workers = opts.workers or 1 local worker = opts.worker - for i = 1,workers do + for id = 1,workers do fiber.create(function(space,xq,i) local fname = space.name .. '.xq.wrk' .. tostring(i) if package.reload then fname = fname .. '.' .. package.reload.count end @@ -821,7 +817,7 @@ function M.upgrade(space,opts,depth) fiber.yield() end log.info("worker %s ended", i) - end,space,self,i) + end,space,self,id) end end @@ -1059,23 +1055,23 @@ function M.upgrade(space,opts,depth) end local field = old_st.."-"..new_st - self._stat.transition[field] = (self._stat.transition[field] or 0ULL) + 1 + self._stat.transition[field] = (self._stat.transition[field] or 0LL) + 1 if old_tube_stat then if not new_tube_stat or old_tube_stat == new_tube_stat then -- no new tube or new and old tubes are the same - old_tube_stat.transition[field] = (old_tube_stat.transition[field] or 0ULL) + 1 + old_tube_stat.transition[field] = (old_tube_stat.transition[field] or 0LL) + 1 else -- nil != old_tube != new_tube != nil -- cross tube transition ? -- Can this be backoff with tube change? local old_field = old_st.."-S" local new_field = "S-"..new_st - old_tube_stat.transition[old_field] = (old_tube_stat.transition[old_field] or 0ULL) + 1 - new_tube_stat.transition[new_field] = (new_tube_stat.transition[new_field] or 0ULL) + 1 + old_tube_stat.transition[old_field] = (old_tube_stat.transition[old_field] or 0LL) + 1 + new_tube_stat.transition[new_field] = (new_tube_stat.transition[new_field] or 0LL) + 1 end elseif new_tube_stat then -- old_tube_stat == nil - new_tube_stat.transition[field] = (new_tube_stat.transition[field] or 0ULL) + 1 + new_tube_stat.transition[field] = (new_tube_stat.transition[field] or 0LL) + 1 end end, self._on_repl) @@ -1680,13 +1676,13 @@ function methods:stats(pretty) stats.counts[ps] = stats.counts[s] or 0LL stats.counts[s] = nil for _, tube_stat in pairs(stats.tube) do - tube_stat.counts[ps] = tube_stat.counts[s] or 0ULL + tube_stat.counts[ps] = tube_stat.counts[s] or 0LL tube_stat.counts[s] = nil end else stats.counts[s] = stats.counts[s] or 0LL for _, tube_stat in pairs(stats.tube) do - tube_stat.counts[s] = tube_stat.counts[s] or 0ULL + tube_stat.counts[s] = tube_stat.counts[s] or 0LL end end end From aa22738779df8d334ba5c0b8d4fec4b858455ca4 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Mon, 1 May 2023 19:28:56 +0300 Subject: [PATCH 3/9] ci: adds github actions --- .github/actions/lint.yml | 17 +++++++++++++++++ .github/actions/test.yml | 31 +++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 .github/actions/lint.yml create mode 100644 .github/actions/test.yml diff --git a/.github/actions/lint.yml b/.github/actions/lint.yml new file mode 100644 index 0000000..d4378d4 --- /dev/null +++ b/.github/actions/lint.yml @@ -0,0 +1,17 @@ +name: Linting with luacheck + +on: + - push + - pull_request + +jobs: + run-luacheck-linter: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: tarantool/setup-tarantool@v2 + with: + tarantool-version: '2.10.4' + + - name: install tarantool/luacheck and execute it + run: tarantoolctl rocks install luacheck && .rocks/bin/luacheck . diff --git a/.github/actions/test.yml b/.github/actions/test.yml new file mode 100644 index 0000000..c474dc1 --- /dev/null +++ b/.github/actions/test.yml @@ -0,0 +1,31 @@ +name: Testing with unit tests + +on: + - push + - pull_request + +jobs: + run-unit-tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: tarantool/setup-tarantool@v2 + with: + tarantool-version: '2.10.4' + - name: install luacov-console + run: tarantoolctl rocks install luacov-console + - name: install luacov-coveralls + run: tarantoolctl rocks install --server=http://luarocks.org luacov-coveralls + - name: install luatest + run: tarantoolctl rocks install luatest + - name: run tests + env: + LUACOV_ENABLE: true + run: | + .rocks/bin/luatest --coverage -b -c -v test/ + - name: print luacov-console report + run: .rocks/bin/luacov-console "$(pwd)" && .rocks/bin/luacov-console -s + - name: publish coveralls report + env: + COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} + run: .rocks/bin/luacov-coveralls -v \ No newline at end of file From 0789534c4eb4fc12d16ddcce53763133b687dcd4 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Mon, 1 May 2023 19:31:44 +0300 Subject: [PATCH 4/9] ci: fix directory naming --- .github/{actions => workflows}/lint.yml | 0 .github/{actions => workflows}/test.yml | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename .github/{actions => workflows}/lint.yml (100%) rename .github/{actions => workflows}/test.yml (100%) diff --git a/.github/actions/lint.yml b/.github/workflows/lint.yml similarity index 100% rename from .github/actions/lint.yml rename to .github/workflows/lint.yml diff --git a/.github/actions/test.yml b/.github/workflows/test.yml similarity index 100% rename from .github/actions/test.yml rename to .github/workflows/test.yml From 128f396e5d2fd100632af0c080c5c71d2377f275 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Mon, 1 May 2023 19:34:05 +0300 Subject: [PATCH 5/9] test: adds .luacov config --- .luacov | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 .luacov diff --git a/.luacov b/.luacov new file mode 100644 index 0000000..a99d1eb --- /dev/null +++ b/.luacov @@ -0,0 +1,20 @@ +deletestats = false +runreport = false + +exclude = { + "test/", + "%.rocks/", + "builtin/", + "example", +} + +runreport = false +deletestats = false + +coveralls = { + root = "/", + debug = true, + pathcorrect = { + { "^/home/runner/work/xqueue/xqueue/", "" }, + }, +} From 4e91a5be87cf6ac45b927fe095f3e4d5877b3e58 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Fri, 19 Jul 2024 21:26:20 +0300 Subject: [PATCH 6/9] allow background fibers of xqueue survive rw-ro swap --- README.md | 2 +- xqueue.lua | 83 +++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 74 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index bbdbb5e..aa35f44 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ M.upgrade(space, { -- id is always taken from pk status = 'status_field_name' | status_field_no, runat = 'runat_field_name' | runat_field_no, - proirity = 'proirity_field_name' | proirity_field_no, + priority = 'priority_field_name' | priority_field_no, }, features = { id = 'auto_increment' | 'time64' | 'uuid' | 'required' | function diff --git a/xqueue.lua b/xqueue.lua index 84a06bd..9774c2f 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -219,7 +219,16 @@ local function _table2tuple ( qformat ) return dostring(fun) end ----@type xqueueSpace +---@alias xqStatus +---| "R" Ready +---| "T" Taken +---| "W" Waiting +---| "B" Buried +---| "Z" Zombie +---| "D" Done + +---@class xqueueSpace: boxSpaceObject +---@field xq xq xqueue specific storage local methods = {} ---@class PrimaryKeyField:table @@ -276,9 +285,6 @@ local methods = {} ---@field _on_dis fun() ----@class xqueueSpace: boxSpaceObject ----@field xq xq xqueue specific storage - ---Upgrades given space to xqueue instance ---@param space xqueueSpace ---@param opts table @@ -761,11 +767,33 @@ function M.upgrade(space,opts,depth) self.ready = fiber.channel(0) + local function rw_fiber_f(func, ...) + local xq = self + repeat + if box.info.ro then + log.verbose("awaiting rw") + repeat + if box.ctl.wait_rw then + box.ctl.wait_rw(1) + else + fiber.sleep(0.001) + end + until not box.info.ro + end + + local ok, err = pcall(func, ...) + if not ok then + log.error("%s", err) + end + fiber.testcancel() + until (not box.space[space.name]) or space.xq ~= xq + end + if opts.worker then local workers = opts.workers or 1 local worker = opts.worker for i = 1,workers do - fiber.create(function(space,xq,i) + fiber.create(rw_fiber_f, function(space,xq) local fname = space.name .. '.xq.wrk' .. tostring(i) if package.reload then fname = fname .. '.' .. package.reload.count end fiber.name(string.sub(fname,1,32)) @@ -773,7 +801,7 @@ function M.upgrade(space,opts,depth) if xq.ready then xq.ready:get() end log.info("I am worker %s",i) if box.info.ro then - log.notice("Shutting down on ro instance") + log.info("Shutting down on ro instance") return end while box.space[space.name] and space.xq == xq do @@ -796,13 +824,13 @@ function M.upgrade(space,opts,depth) fiber.yield() end log.info("worker %s ended", i) - end,space,self,i) + end,space,self) end end if have_runat then self.runat_chan = fiber.channel(0) - self.runat = fiber.create(function(space,xq,runat_index) + self.runat = fiber.create(rw_fiber_f, function(space,xq,runat_index) local fname = space.name .. '.xq' if package.reload then fname = fname .. '.' .. package.reload.count end fiber.name(string.sub(fname,1,32)) @@ -811,7 +839,7 @@ function M.upgrade(space,opts,depth) local chan = xq.runat_chan log.info("Runat started") if box.info.ro then - log.notice("Shutting down on ro instance") + log.info("Shutting down on ro instance") return end local maxrun = 1000 @@ -1110,6 +1138,10 @@ box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 }) ``` ]] +---Puts new task into queue +---@param t table|box.tuple +---@param opts { delay: number?, ttl: number?, wait: number? } +---@return table|box.tuple, boolean is_processed? function methods:put(t, opts) local xq = self.xq opts = opts or {} @@ -1208,6 +1240,11 @@ local wait_for = { W = true, } +---Waits for task to be processed for given timeout +---@param key any +---@param timeout number +---@return table|box.tuple task +---@return boolean is_processed function methods:wait(key, timeout) local xq = self.xq key = xq:getkey(key) @@ -1250,6 +1287,11 @@ end - *TODO*: ttr must be there ]] +---Takes task from the queue (main consumer's method) +---This method prefered to be called as queue:take{ timeout = ..., tube = ... } +---@param timeout number|{timeout: number?, tube: string?, ttr: number?} +---@param opts? {timeout: number?, tube: string?, ttr: number?} +---@return table|box.tuple? task method returns task to be processed or void function methods:take(timeout, opts) local xq = self.xq timeout = timeout or 0 @@ -1371,7 +1413,12 @@ end * if set, task will become `W` instead of `R` for `delay` seconds ]] - +---Puts back into queue (consumer method). +--- +---Task will be trasfered to status R (if not delay was passed) or W (if delay was passed). +---@param key any task or key of the task (i.e ID) +---@param attr? { delay: number?, ttl: number?, update: update_operation[]? } attributes of release +---@return box.tuple function methods:release(key, attr) local xq = self.xq key = xq:getkey(key) @@ -1429,6 +1476,13 @@ function methods:release(key, attr) return t end +---Acknowledges task processing. +--- +---if delay is given task will be transfered to Zombie status for `delay` seconds (Zombie must be enabled). +---if keep is given task will be saved into space in status Done. +---@param key any task or key of the task (i.e ID) +---@param attr? { delay: number?, keep: boolean?, update: update_operation[]? } attributes of ack +---@return box.tuple function methods:ack(key, attr) -- features.zombie -- features.keep @@ -1488,6 +1542,12 @@ function methods:ack(key, attr) return t end +---Buries task. Consumer's method aimed to save failed tasks inside queue. +---Tasks from `Bury` status will not be given to consumer again. +--- +---It is a good practice to save reason of failure into tuple and timestamp of operation for future investigation. +---@param key any task or key of the task (i.e ID) +---@param attr { update: update_operation[]? } attributes of bury function methods:bury(key, attr) attr = attr or {} @@ -1595,6 +1655,9 @@ local pretty_st = { D = "Done", } +---Reports queue statistics +---@param pretty? any +---@return { counts: table, transition: table } function methods:stats(pretty) local stats = table.deepcopy(self.xq._stat) for s, ps in pairs(pretty_st) do From 2de0410f28ccd88d411bfc83cfdcc50efd0b2c92 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Sat, 20 Jul 2024 14:34:09 +0300 Subject: [PATCH 7/9] tests, annotations and fixes --- .github/workflows/lint.yml | 4 +- .github/workflows/test.yml | 4 +- .luacheckrc | 5 +- .luacov | 2 +- README.md | 7 +- example-internal-queue/queue.lua | 4 +- test/basic_test.lua | 35 +++++---- test/setup.lua | 7 ++ test/tube_test.lua | 126 +++++++++++++++++++++++++++++++ xqueue.lua | 89 +++++++++++++++++----- 10 files changed, 237 insertions(+), 46 deletions(-) create mode 100644 test/setup.lua create mode 100644 test/tube_test.lua diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index d4378d4..d8439e6 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -8,8 +8,8 @@ jobs: run-luacheck-linter: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: tarantool/setup-tarantool@v2 + - uses: actions/checkout@v4 + - uses: tarantool/setup-tarantool@v3 with: tarantool-version: '2.10.4' diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c474dc1..518f38d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,8 +8,8 @@ jobs: run-unit-tests: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: tarantool/setup-tarantool@v2 + - uses: actions/checkout@v4 + - uses: tarantool/setup-tarantool@v3 with: tarantool-version: '2.10.4' - name: install luacov-console diff --git a/.luacheckrc b/.luacheckrc index 67ddb44..e2dc460 100644 --- a/.luacheckrc +++ b/.luacheckrc @@ -1,8 +1,7 @@ -std = "luajit" +std = "tarantool" codes = true globals = { - -- Tarantool variable: - "box", + -- Tarantool variables: "table.deepcopy", "dostring", diff --git a/.luacov b/.luacov index a99d1eb..30fdd19 100644 --- a/.luacov +++ b/.luacov @@ -8,7 +8,7 @@ exclude = { "example", } -runreport = false +runreport = true deletestats = false coveralls = { diff --git a/README.md b/README.md index 0979de4..6b1f85b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ ### TODO + * reload/upgrade and feature switch * composite primary keys * switch - turn non-taken task into any state @@ -35,7 +36,7 @@ # Interface -## Creator methods: +## Creator methods * `.upgrade(space, options)` @@ -83,7 +84,7 @@ M.upgrade(space, { }) ``` -## Producer methods: +## Producer methods * `space:put` - `task` - table or array or tuple @@ -146,7 +147,7 @@ box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 }) + `update` - table for update, like in space:update + `ttl` - timeout for time to live (?) -## Admin methods: +## Admin methods * `space:dig(id, [attr])` - dig out task from buried state - `id` - as in `:ack` diff --git a/example-internal-queue/queue.lua b/example-internal-queue/queue.lua index 2e96686..c822607 100644 --- a/example-internal-queue/queue.lua +++ b/example-internal-queue/queue.lua @@ -1,5 +1,5 @@ ---@class Queue ----@field q xqueueSpace space with Queue +---@field q xqueue.space space with Queue ---@field opts QueueOptions options of the queue local M = require 'obj'.class({}, 'queue') @@ -131,7 +131,7 @@ function M:_init(cfg) ---@type table self._task_map = {} - ---@cast space xqueueSpace + ---@cast space xqueue.space self.q = space self._on = {} diff --git a/test/basic_test.lua b/test/basic_test.lua index 0eb6adb..9515469 100644 --- a/test/basic_test.lua +++ b/test/basic_test.lua @@ -1,17 +1,15 @@ -local fio = require 'fio' +---@diagnostic disable: inject-field local fiber = require 'fiber' local xqueue = require 'xqueue' +require 'test.setup' + local t = require 'luatest' --[[@as luatest]] local g = t.group('basic') -t.before_suite(function() - local tmpdir = assert(fio.tempdir()) - box.cfg{ memtx_dir = tmpdir, wal_dir = tmpdir, vinyl_dir = tmpdir } -end) - g.before_each(function() if box.space.queue then + box.space.queue:truncate() for i = #box.space.queue.index, 0, -1 do local ind = box.space.queue.index[i] ind:drop() @@ -22,6 +20,10 @@ end) function g.test_basic_queue() box.schema.space.create('queue', { if_not_exists = true }) + ---@class test.xqueue.basic.tuple: box.tuple + ---@field id string + ---@field status string + ---@field payload any box.space.queue:format({ { name = 'id', type = 'string' }, { name = 'status', type = 'string' }, @@ -41,7 +43,7 @@ function g.test_basic_queue() }, }) - local task = box.space.queue:put({ payload = { task = 1 } }) + local task = box.space.queue:put({ payload = { task = 1 } }) --[[@as test.xqueue.basic.tuple]] t.assert_equals(task.status, 'R', "newly inserted task must be Ready") t.assert_items_include(task.payload, { task = 1 }, "payload of the task remains the same") @@ -53,7 +55,7 @@ function g.test_basic_queue() t.assert_le(elapsed, 0.1, "queue:take() must return task after single yield") t.assert_equals(taken.status, 'T', "taken task must be in status T") - local tuple = box.space.queue:get(taken.id) + local tuple = box.space.queue:get(taken.id) --[[@as test.xqueue.basic.tuple]] t.assert_equals(tuple.status, 'T', "status of taken task in space must be T") ft = fiber.time() @@ -64,7 +66,7 @@ function g.test_basic_queue() t.assert_le(elapsed, 0.1, "second queue:take() must be returned in ≤ 0.1 seconds") box.space.queue:release(taken) - tuple = box.space.queue:get(taken.id) + tuple = box.space.queue:get(taken.id) --[[@as test.xqueue.basic.tuple]] t.assert_equals(tuple.status, 'R', "queue:release() must return task in R status") taken = box.space.queue:take(0.001) @@ -123,7 +125,12 @@ function g.test_basic_queue() end function g.test_delayed_queue() - local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueueSpace]] + local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]] + ---@class test.xqueue.delayed.tuple: box.tuple + ---@field id number + ---@field status string + ---@field runat number + ---@field payload any queue:format({ { name = 'id', type = 'unsigned' }, { name = 'status', type = 'string' }, @@ -150,17 +157,19 @@ function g.test_delayed_queue() local task_put_delay_500ms = queue:put({ payload = { id = 2 } }, { delay = 0.1, - }) + }) --[[@as test.xqueue.delayed.tuple]] t.assert_equals(task_put_delay_500ms.status, 'W', 'queue:put(...,{delay=<>}) must insert task in W status') t.assert_equals(queue:get({task_put_delay_500ms.id}).status, 'W', 'double check task status in space (must be W)') - local task_put = queue:put({ payload = { id = 1 } }) + local task_put = queue:put({ payload = { id = 1 } }) --[[@as test.xqueue.delayed.tuple]] t.assert_equals(task_put.status, 'R', "queue:put() without delay in delayed queue must set task to R") t.assert_equals(queue:get({task_put.id}).status, 'R', 'double check task status in space (must be R)') t.assert_lt(task_put_delay_500ms.id, task_put.id, "first task must have smaller id than second (fifo constraint)") local taken = queue:take({ timeout = 0.05 }) + t.assert(taken, "task must be taken") + ---@cast taken -nil t.assert_equals(taken.id, task_put.id, 'queue:take() must return ready task') queue:release(taken, { delay = 0.12 }) @@ -209,7 +218,7 @@ function g.test_delayed_queue() t.assert_is(queue:get(take.id).status, 'Z', ':ack()+delay return task into Z status') t.assert_items_equals(queue:get(take.id).payload, { finished = 2 }, ':ack()/update must replace tuple content') - local nothing = queue:take({ timeout = 0.75 }) + local nothing = queue:take({ timeout = 1.5 }) t.assert_not(nothing, "Z tasks must are not takable") t.assert_not(queue:get(take.id), "Z task must be collected after delay time out exhausted") diff --git a/test/setup.lua b/test/setup.lua new file mode 100644 index 0000000..5610d45 --- /dev/null +++ b/test/setup.lua @@ -0,0 +1,7 @@ +local fio = require 'fio' +local t = require 'luatest' --[[@as luatest]] + +t.before_suite(function() + local tmpdir = assert(fio.tempdir()) + box.cfg{ memtx_dir = tmpdir, wal_dir = tmpdir, vinyl_dir = tmpdir } +end) diff --git a/test/tube_test.lua b/test/tube_test.lua new file mode 100644 index 0000000..1d9a94d --- /dev/null +++ b/test/tube_test.lua @@ -0,0 +1,126 @@ +---@diagnostic disable: inject-field +local fiber = require 'fiber' +local xqueue = require 'xqueue' +require 'test.setup' + +local t = require 'luatest' --[[@as luatest]] +local g = t.group('tube') + +g.before_each(function() + if box.space.queue then + box.space.queue:truncate() + for i = #box.space.queue.index, 0, -1 do + local ind = box.space.queue.index[i] + ind:drop() + end + box.space.queue:drop() + end +end) + +function g.test_tube_queue() + local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]] + queue:format({ + { name = 'id', type = 'unsigned' }, + { name = 'tube', type = 'string' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'payload', type = 'any' }, + }) + + queue:create_index('primary', { parts = {'id'} }) + queue:create_index('tube_status', { parts = {'tube', 'status', 'id'} }) + queue:create_index('status', { parts = {'status', 'id'} }) + queue:create_index('runat', { parts = {'runat', 'id'} }) + + xqueue.upgrade(queue, { + features = { + id = 'time64', + delayed = true, + }, + fields = { + status = 'status', + runat = 'runat', + tube = 'tube', + }, + }) + + local trans = setmetatable({ __cache = {}, },{__index=function(self, tx) + local s,e = unpack(tostring(tx):split("_")) + local n = s:upper().."-"..e:upper() + self[n] = rawget(self, n) or 0 + local func = self.__cache[n] or function(inc) self[n] = self[n] + inc end + self.__cache[n] = func + return func + end}) + + for i = 1, 10 do + local tube_name = 'tube-'..i + local task = queue:put({ tube = tube_name, payload = { id = i } }) + t.assert_equals(task.payload.id, i, "task:put("..i..")") + t.assert_items_include(task.payload, { id = i }, "payload of the task remains the same") + end + trans.x_r(10) + + for i = 10, 1, -1 do + local tube_name = 'tube-'..i + local task = queue:take({ tube = tube_name }) + trans.r_t(1) + t.assert_equals(task.status, 'T', "task has been taken from "..tube_name) + t.assert_equals(task.tube, tube_name, "task was returned from the tube we requested") + + local notask = queue:take({ + tube = tube_name, + timeout = 0.005, + }) + + t.assert_equals(notask, nil, "no task should be returned from the empty tube") + + if i % 2 == 0 then + local ret = queue:ack(task) + trans.t_x(1) + t.assert_is_not(ret, nil, ":ack() returns task back") + t.assert_equals(task.id, ret.id, ":ack() returned same task") + else + local ret = queue:release(task) + trans.t_r(1) + t.assert_is_not(ret, nil, ":release() returns task back") + t.assert_equals(task.id, ret.id, ":release() returned same task") + t.assert_equals(ret.status, "R", ":release() returns task into Ready status") + end + end + + -- now test generic worker + for i = 1, queue:len() do + local task = queue:take(0) -- no-yield take + t.assert_is_not(task, nil, ":take() returned task") + ---@cast task -nil + + trans.r_t(1) + + -- FIFO order of the tubes. + local expected_tube_name = 'tube-'..(2*i-1) + t.assert_equals(task.tube, expected_tube_name, ":take() must follow FIFO and return task from correct tube") + + local ret = queue:release(task, { delay = 1 }) + trans.t_w(1) + t.assert_equals(ret.status, 'W', ":release(+delay) returns task into W state") + t.assert_equals(ret.id, task.id, ":release() returned the same task we sent to release") + end + + local stats = queue:stats() + + trans.__cache = nil + setmetatable(trans, nil) + t.assert_covers(stats, { + counts = { + B = 0, + D = 0, + R = 0, + T = 0, + W = 5, + Z = 0, + }, + transition = trans, + tube = {}, + }, "queue:stats() with tubes") +end diff --git a/xqueue.lua b/xqueue.lua index b7b2ac8..d22a87e 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -215,7 +215,7 @@ local function _table2tuple ( qformat ) return dostring(fun) end ----@type xqueueSpace +---@class xqueue.space local methods = {} ---@class PrimaryKeyField:table @@ -272,12 +272,41 @@ local methods = {} ---@field _on_dis fun() ----@class xqueueSpace: boxSpaceObject +---@class xqueue.space: boxSpaceObject ---@field xq xq xqueue specific storage +---@class xqueue.fields +---@field status? string|number Xqueue name for the Status field +---@field runat? string|number Xqueue name for the RunAt field +---@field priority? string|number Xqueue name for Task priority field +---@field tube? string|number Xqueue name for Tube field + +---@class xqueue.features +---@field id 'uuid' | 'auto_increment' | 'required' | 'time64' | (fun(): number|string) Mandatory Field for TaskID generation +---@field retval? 'table'|'tuple' Type of return Value of the task in xqueue methods. +---(Default: table when space with format, tuple when space is without format) +---@field buried? boolean should xqueue allow buring of the tasks (by default: true) +---@field keep? boolean should xqueue keep :ack'ed tasks in the Space or not +---@field delayed? boolean should xqueue allow delay tasks (requires runat field and index) (defauled: false) +---@field zombie? boolean|number should xqueue temporarily keep :ack'ed tasks in the Space (default: false). Mutually exclusive with keep +---when zombie is configured with number, then this value is treated as zombie_delay (requires runat to be present) +---@field ttl? boolean|number should xqueue allow Time-To-Live on tasks. When specified with number, this value used for ttl_default. +---Requires runat field and index. +---@field ttr? boolean|number should xqueue allow Time-To-Release on tasks. When specified with number, this value used for ttl_default. +---Requires runat field and index. + +---@class xqueue.upgrade.options +---@field format? boxSpaceFormat +---@field fields xqueue.fields +---@field debug? boolean +---@field tube_stats? string[] List of tube names for per-tube statistics +---@field features xqueue.features +---@field worker? fun(task: box.tuple|table) simple ad-hoc worker callback +---@field workers? number (number of workers to spawn) + ---Upgrades given space to xqueue instance ----@param space xqueueSpace ----@param opts table +---@param space xqueue.space +---@param opts xqueue.upgrade.options ---@param depth? number function M.upgrade(space,opts,depth) depth = depth or 0 @@ -320,9 +349,9 @@ function M.upgrade(space,opts,depth) __serialize='map', __newindex = function(t, key, val) if type(val) == 'table' then - return rawset(t, key, setmetatable(val, taken_mt)) + rawset(t, key, setmetatable(val, taken_mt)) else - return rawset(t, key, val) + rawset(t, key, val) end end }) @@ -788,6 +817,7 @@ function M.upgrade(space,opts,depth) for id = 1,workers do fiber.create(function(space,xq,i) local fname = space.name .. '.xq.wrk' .. tostring(i) + ---@diagnostic disable-next-line: undefined-field if package.reload then fname = fname .. '.' .. package.reload.count end fiber.name(string.sub(fname,1,32)) repeat fiber.sleep(0.001) until space.xq @@ -1039,7 +1069,7 @@ function M.upgrade(space,opts,depth) new_tube_stat = tube_ss[new_tube] if type(new_tube_stat) == 'table' and type(new_tube_stat.counts) == 'table' then if (tonumber(new_tube_stat.counts[new_st]) or 0) >= 0 then - new_tube_stat.counts[new_st] = (new_tube_stat.counts[new_st] or 0ULL) + 1 + new_tube_stat.counts[new_st] = (new_tube_stat.counts[new_st] or 0LL) + 1 else log.error( "Have not valid statistic tube/status: tube %q with value: %s", @@ -1184,6 +1214,9 @@ box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 }) ``` ]] +---@param t any[]|box.tuple +---@param opts? { delay: number?, ttl: number?, wait: number? } +---@return table|box.tuple, boolean? has_been_processed function methods:put(t, opts) local xq = self.xq opts = opts or {} @@ -1282,6 +1315,9 @@ local wait_for = { W = true, } +---@param key table|scalar|box.tuple +---@param timeout number +---@return table|box.tuple, boolean? has_been_processed function methods:wait(key, timeout) local xq = self.xq key = xq:getkey(key) @@ -1324,6 +1360,9 @@ end - *TODO*: ttr must be there ]] +---@param timeout? number|{ timeout: number?, ttr: number?, tube: string? } +---@param opts? { timeout: number?, ttr: number?, tube: string? } +---@return table|box.tuple? function methods:take(timeout, opts) local xq = self.xq timeout = timeout or 0 @@ -1365,6 +1404,7 @@ function methods:take(timeout, opts) index = xq.index start_with = {'R'} end + ---@cast index -nil local now = fiber.time() local key @@ -1445,7 +1485,9 @@ end * if set, task will become `W` instead of `R` for `delay` seconds ]] - +---@param key table|scalar|box.tuple +---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] } +---@return table|box.tuple function methods:release(key, attr) local xq = self.xq key = xq:getkey(key) @@ -1503,6 +1545,9 @@ function methods:release(key, attr) return t end +---@param key table|scalar|box.tuple +---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] } +---@return table|box.tuple function methods:ack(key, attr) -- features.zombie -- features.keep @@ -1562,6 +1607,8 @@ function methods:ack(key, attr) return t end +---@param key table|scalar|box.tuple +---@param attr? {delay: number?, ttl: number?, update: { [1]: update_operation, [2]: number|string, [3]: tuple_type }[] } function methods:bury(key, attr) attr = attr or {} @@ -1618,26 +1665,27 @@ function methods:kick(nr_tasks_or_task, attr) end end +---@param key table|scalar|box.tuple +---@return box.tuple|table function methods:kill(key) local xq = self.xq - key = xq:getkey(key) + key = xq:getkey(key) local task = self:get(key) - local status = task[xq.fields.status] + if not task then + error(("Task by %s not found to kill"):format(key)) + end local peer = box.session.storage.peer if xq.debug then log.info("Kill {%s} by %s, sid=%s, fid=%s", key, peer, box.session.id(), fiber.id()) end - self:delete(key) - - if status == 'T' then - for sid in pairs(xq.bysid) do - xq.bysid[sid][key] = nil - end - xq.taken[key] = nil - xq._lock[key] = nil - end + task = self:delete(key) + ---@cast task -nil + ---Kill is treated as a synonim of Bury + task = task:update({{ '=', xq.fields.status, 'B' }}) + xq:putback(task) + return xq.retwrap(task) end -- special remap of truncate for deliting stats and saving methods @@ -1669,6 +1717,7 @@ local pretty_st = { D = "Done", } +---@param pretty? boolean function methods:stats(pretty) local stats = table.deepcopy(self.xq._stat) for s, ps in pairs(pretty_st) do @@ -1690,7 +1739,7 @@ function methods:stats(pretty) end setmetatable(M,{ - __call = function(M, space, opts) + __call = function(_, space, opts) M.upgrade(space,opts,1) end }) From 6ae38de85c9021d393fabc0471637c9f6eb74899 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Sat, 20 Jul 2024 14:57:56 +0300 Subject: [PATCH 8/9] network tests --- test/network_test.lua | 87 +++++++++++++++++++++++++++++++++++++++++++ test/setup.lua | 3 +- test/tube_test.lua | 1 - 3 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 test/network_test.lua diff --git a/test/network_test.lua b/test/network_test.lua new file mode 100644 index 0000000..d8f829d --- /dev/null +++ b/test/network_test.lua @@ -0,0 +1,87 @@ +---@diagnostic disable: inject-field +local xqueue = require 'xqueue' +require 'test.setup' +local clock = require 'clock' +local fiber = require 'fiber' + +local t = require 'luatest' --[[@as luatest]] +local g = t.group('network') + +local netbox = require 'net.box' + +g.before_each(function() + if box.space.queue then + box.space.queue:truncate() + for i = #box.space.queue.index, 0, -1 do + local ind = box.space.queue.index[i] + ind:drop() + end + box.space.queue:drop() + end +end) + +function g.test_untake() + local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]] + queue:format({ + { name = 'id', type = 'unsigned' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'payload', type = 'any' }, + }) + + queue:create_index('primary', { parts = {'id'} }) + queue:create_index('status', { parts = {'status', 'id'} }) + queue:create_index('runat', { parts = {'runat', 'id'} }) + + xqueue.upgrade(queue, { + features = { + id = 'time64', + delayed = true, + }, + fields = { + status = 'status', + runat = 'runat', + }, + }) + + local tt = netbox.connect('127.0.0.1:3301', { wait_connected = true }) + t.assert(tt:ping(), "connected to self") + + local task = queue:put({ payload = { time = clock.time() } }) + t.assert(task, ":put() has been inserted task") + + local awaiter_fin = fiber.channel() + + fiber.create(function() + local ret, is_processed = queue:wait(task, 3) + t.assert_equals(ret.id, task.id, "Task has been awaited") + t.assert_equals(is_processed, true, "Task has been processed by the consumer") + t.assert(awaiter_fin:put({ ret, is_processed }), "awaiter results were measured") + end) + + local taken = tt:call('box.space.queue:take', {1}, { timeout = 1 }) + t.assert(taken, ":take() returned task via the network") + t.assert_equals(task.id, taken.id, "retutned the same task") + + -- untake: + tt:close() + fiber.sleep(1) + + t.assert_equals(queue:get({ task.id }).status, 'R', "task was returned to R status") + + tt = netbox.connect('127.0.0.1:3301', { wait_connected = true }) + + taken = tt:call('box.space.queue:take', {1}, { timeout = 1 }) + t.assert(taken, ":take() returned task via the network (2nd)") + t.assert_equals(task.id, taken.id, "retutned the same task (2nd)") + + local processed_at = clock.time() + local acked = tt:call('box.space.queue:ack', {taken, { update = {{'=', 'payload', { processed_at = processed_at }}} }}) + t.assert_equals(acked.id, taken.id, ":ack() returned taken but completed task") + + local awaiter_res = awaiter_fin:get() + t.assert_equals(awaiter_res[1].id, acked.id, "awaiter saw acknowledged task") + t.assert_equals(awaiter_res[2], true, "awaiter saw task as processed") + + tt:close() +end diff --git a/test/setup.lua b/test/setup.lua index 5610d45..2286885 100644 --- a/test/setup.lua +++ b/test/setup.lua @@ -3,5 +3,6 @@ local t = require 'luatest' --[[@as luatest]] t.before_suite(function() local tmpdir = assert(fio.tempdir()) - box.cfg{ memtx_dir = tmpdir, wal_dir = tmpdir, vinyl_dir = tmpdir } + box.cfg{ memtx_dir = tmpdir, wal_dir = tmpdir, vinyl_dir = tmpdir, listen = '127.0.0.1:3301' } + box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists = true}) end) diff --git a/test/tube_test.lua b/test/tube_test.lua index 1d9a94d..c819338 100644 --- a/test/tube_test.lua +++ b/test/tube_test.lua @@ -1,5 +1,4 @@ ---@diagnostic disable: inject-field -local fiber = require 'fiber' local xqueue = require 'xqueue' require 'test.setup' From 0d11e9e3a95d952f268195abd689a75dd6a7bc85 Mon Sep 17 00:00:00 2001 From: Vladislav Grubov Date: Tue, 23 Jul 2024 14:57:28 +0300 Subject: [PATCH 9/9] test: adds tests and ci --- .github/workflows/lint.yml | 1 - .github/workflows/push-rockspec.yml | 52 +++++ .github/workflows/test.yml | 39 +++- .luacov | 5 + README.md | 351 ++++++++++++++++++---------- benchmarks/001_put_take_bench.lua | 104 +++++++++ test/basic_test.lua | 8 +- test/interface_test.lua | 160 +++++++++++++ test/network_test.lua | 8 +- test/replicaset_test.lua | 281 ++++++++++++++++++++++ xqueue-dev-1.rockspec | 21 ++ xqueue.lua | 55 +++-- 12 files changed, 939 insertions(+), 146 deletions(-) create mode 100644 .github/workflows/push-rockspec.yml create mode 100644 benchmarks/001_put_take_bench.lua create mode 100644 test/interface_test.lua create mode 100644 test/replicaset_test.lua create mode 100644 xqueue-dev-1.rockspec diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index d8439e6..0ef0ac6 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -2,7 +2,6 @@ name: Linting with luacheck on: - push - - pull_request jobs: run-luacheck-linter: diff --git a/.github/workflows/push-rockspec.yml b/.github/workflows/push-rockspec.yml new file mode 100644 index 0000000..4f86518 --- /dev/null +++ b/.github/workflows/push-rockspec.yml @@ -0,0 +1,52 @@ +name: Create and push rockspec for moonlibs/xqueue + +on: + push: + tags: + - '*' + +env: + ROCK_NAME: xqueue + +jobs: + pack-and-push-tagged-rockspec: + runs-on: ubuntu-latest + if: startsWith(github.ref, 'refs/tags') + steps: + - uses: actions/checkout@master + - uses: tarantool/setup-tarantool@v3 + with: + tarantool-version: '2.6' + + # https://stackoverflow.com/questions/58177786/get-the-current-pushed-tag-in-github-actions + - name: Set env + run: echo "TAG=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV + + - run: tarantoolctl rocks new_version --tag=${{ env.TAG }} rockspecs/xqueue-scm-5.rockspec ${{ env.TAG }} "git+https://github.com/${{ github.repository }}.git" + - run: tarantoolctl rocks --server https://moonlibs.github.io/rocks install ${{ env.ROCK_NAME }}-${{ env.TAG }}-1.rockspec + - run: tarantoolctl rocks pack ${{ env.ROCK_NAME }}-${{ env.TAG }}-1.rockspec + # Install native lua with luarocks + - uses: leafo/gh-actions-lua@v9 + with: + luaVersion: "luajit-2.1.0-beta3" + - uses: leafo/gh-actions-luarocks@v4 + with: + luarocksVersion: "3.8.0" + - uses: unfor19/install-aws-cli-action@v1.0.3 + - run: mkdir .build && cp ${{env.ROCK_NAME}}-dev-1.rockspec ${{env.ROCK_NAME}}-${{env.TAG}}-1.rockspec .build/ && cp *.src.rock .build/ + - name: rebuild and publish s3 luarocks server + env: + AWS_ACCESS_KEY_ID: ${{ secrets.MOONLIBS_S3_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.MOONLIBS_S3_SECRET_KEY}} + AWS_EC2_METADATA_DISABLED: true + run: | + cd .build && aws s3 sync s3://moonlibs/ ./ && luarocks-admin make_manifest . && aws s3 sync --acl public-read ./ s3://moonlibs/; + - uses: "marvinpinto/action-automatic-releases@latest" + with: + repo_token: "${{ secrets.GITHUB_TOKEN }}" + prerelease: false + files: | + README.md + ${{env.ROCK_NAME}}-dev-1.rockspec + ${{env.ROCK_NAME}}-${{env.TAG}}-1.rockspec + ${{env.ROCK_NAME}}-${{env.TAG}}-1.src.rock diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 518f38d..209c887 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,20 +2,20 @@ name: Testing with unit tests on: - push - - pull_request jobs: run-unit-tests: runs-on: ubuntu-latest + strategy: + matrix: + version: ["1.10.15", "2.10.6", "2.11.0", "2.11.2"] steps: - uses: actions/checkout@v4 - uses: tarantool/setup-tarantool@v3 with: - tarantool-version: '2.10.4' + tarantool-version: '${{matrix.version}}' - name: install luacov-console run: tarantoolctl rocks install luacov-console - - name: install luacov-coveralls - run: tarantoolctl rocks install --server=http://luarocks.org luacov-coveralls - name: install luatest run: tarantoolctl rocks install luatest - name: run tests @@ -25,7 +25,36 @@ jobs: .rocks/bin/luatest --coverage -b -c -v test/ - name: print luacov-console report run: .rocks/bin/luacov-console "$(pwd)" && .rocks/bin/luacov-console -s + - name: rename luacov.stats.out + run: mv luacov.stats.out luacov.stats.out-${{matrix.version}} + - uses: actions/upload-artifact@master + with: + name: luacov.stats.out-${{matrix.version}} + path: luacov.stats.out-${{matrix.version}} + run-coverage-report: + runs-on: ubuntu-latest + needs: ["run-unit-tests"] + steps: + - uses: actions/checkout@master + - uses: tarantool/setup-tarantool@v3 + with: + tarantool-version: '2.10.7' + - name: install luacov-coveralls 0.2.3 + run: tarantoolctl rocks install --server=https://luarocks.org luacov-coveralls 0.2.3 + - name: install luacov-console 1.2.0 + run: tarantoolctl rocks --server http://moonlibs.github.io/rocks install luacov-console 1.2.0 + - name: Download run artifacts + uses: actions/download-artifact@v4 + with: + pattern: luacov.stats.out-* + merge-multiple: true + - name: debug + run: ls -la . + - name: merge luacov.stats.out + run: cat luacov.stats.out-* | >luacov.stats.out tarantool -e 'm={} for k in io.lines() do local vs=io.read():split(" ") vs[#vs]=nil local r = m[k] if r then for i, v in pairs(vs) do r[i]=r[i]+v end else m[k]=vs end end; for k, v in pairs(m) do print(k) print(table.concat(v, " ")) end' + - name: prepare coverage report + run: .rocks/bin/luacov-console . && .rocks/bin/luacov-console -s - name: publish coveralls report env: COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} - run: .rocks/bin/luacov-coveralls -v \ No newline at end of file + run: .rocks/bin/luacov-coveralls -v diff --git a/.luacov b/.luacov index 30fdd19..89087c7 100644 --- a/.luacov +++ b/.luacov @@ -8,6 +8,11 @@ exclude = { "example", } +-- path renaming +modules = { + ['.xqueue'] = io.popen('pwd'):read('*line') .. '/xqueue.lua', +} + runreport = true deletestats = false diff --git a/README.md b/README.md index 1460888..1c029ea 100644 --- a/README.md +++ b/README.md @@ -1,42 +1,44 @@ -### TODO +# xQueue -* reload/upgrade and feature switch -* composite primary keys -* switch - turn non-taken task into any state -* snatch - take away Taken task from its owner and turn it into R/W -* restart (renew, requeue?) - take task from W/D/Z states and put it into R/W -* it seems, that ttl/ttr/bury requires attrs field +[![Coverage Status](https://coveralls.io/repos/github/moonlibs/xqueue/badge.svg?branch=master)](https://coveralls.io/github/moonlibs/xqueue?branch=master) + +Add power of the Queue into Tarantool Space + +Latest Release: 5.0.0 + +Backward compatibility rockspec: rockspecs/xqueue-scm-5.rockspec +Always latest rockspec: rockspecs/xqueue-dev-1.rockspec -# Status +## Status * **R** - ready - task is ready to be taken - - created by put without delay - - turned on by release/kick without delay - - turned on from W when delay passed - - deleted after ttl if ttl is enabled + * created by put without delay + * turned on by release/kick without delay + * turned on from W when delay passed + * deleted after ttl if ttl is enabled * **T** - taken - task is taken by consumer. may not be taken by other. - - turned into R after ttr if ttr is enabled + * turned into R after ttr if ttr is enabled * **W** - waiting - task is not ready to be taken. waiting for its `delay` - - requires `runat` - - `delay` may be set during put, release, kick - - turned into R after delay + * requires `runat` + * `delay` may be set during put, release, kick + * turned into R after delay * **B** - buried - task was temporary discarded from queue by consumer - - may be revived using kick by administrator - - use it in unpredicted conditions, when man intervention is required - - Without mandatory monitoring (stats.buried) usage of buried is useless and awry + * may be revived using kick by administrator + * use it in unpredicted conditions, when man intervention is required + * Without mandatory monitoring (stats.buried) usage of buried is useless and awry * **Z** - zombie - task was processed and ack'ed and **temporary** kept for delay * **D** - done - task was processed and ack'ed and **permanently** left in database - - enabled when keep feature is set + * enabled when keep feature is set -# Interface +## Interface -## Creator methods +### Creator methods * `.upgrade(space, options)` @@ -44,64 +46,64 @@ Imbue space with power of queue ```lua M.upgrade(space, { - format = { - -- space format. applied to space.format() if passed - }, - fields = { - -- id is always taken from pk - status = 'status_field_name' | status_field_no, - runat = 'runat_field_name' | runat_field_no, - priority = 'priority_field_name' | priority_field_no, - }, - features = { - id = 'auto_increment' | 'time64' | 'uuid' | 'required' | function - -- auto_increment - if pk is number, then use it for auto_increment - -- uuid - if pk is string, then use uuid for id - -- required - primary key MUST be present in tuple during put - -- function - funciton will be called to aquire id for task - retval = 'table' | 'tuple' - -- table requires space format. default if applicable. a bit slower - - buried = true, -- if true, support bury/kick - delayed = true, -- if true, support delayed tasks, requires `runat` - - keep = true, -- if true, keep ack'ed tasks in [D]one state, instead of deleting - -- mutually exclusive with zombie - - zombie = true|number, -- requires `runat` field - -- if number, then with default zombie delay, otherwise only if set delay during ack - -- mutually exclusive with keep - - ttl = true|number, -- requires `runat` field - -- Time To Live. Task is expired unless taken within time - -- if number, then with default ttl, otherwise only if set during put/release - ttr = true|number, -- requires `runat` field - -- Time To Release. Task is returned into [R]eady unless processed (turned to ack|release from taken) within time - -- if number, then with default ttl, otherwise only if set during take - }, - -- Set tubes for which statistics collector will be enabled - tube_stats = { 'tube-1', 'tube-2' }, + format = { + -- space format. applied to space.format() if passed + }, + fields = { + -- id is always taken from pk + status = 'status_field_name' | status_field_no, + runat = 'runat_field_name' | runat_field_no, + priority = 'priority_field_name' | priority_field_no, + }, + features = { + id = 'auto_increment' | 'time64' | 'uuid' | 'required' | function() ... return task_id end + -- auto_increment - if pk is number, then use it for auto_increment + -- uuid - if pk is string, then use uuid for id + -- required - primary key MUST be present in tuple during put + -- function - funciton will be called to aquire id for task + retval = 'table' | 'tuple' + -- table requires space format. default if applicable. a bit slower + + buried = true, -- if true, support bury/kick + delayed = true, -- if true, support delayed tasks, requires `runat` + + keep = true, -- if true, keep ack'ed tasks in [D]one state, instead of deleting + -- mutually exclusive with zombie + + zombie = true|number, -- requires `runat` field + -- if number, then with default zombie delay, otherwise only if set delay during ack + -- mutually exclusive with keep + + ttl = true|number, -- requires `runat` field + -- Time To Live. Task is expired unless taken within time + -- if number, then with default ttl, otherwise only if set during put/release + ttr = true|number, -- requires `runat` field + -- Time To Release. Task is returned into [R]eady unless processed (turned to ack|release from taken) within time + -- if number, then with default ttl, otherwise only if set during take + }, + -- Set tubes for which statistics collector will be enabled + tube_stats = { 'tube-1', 'tube-2' }, }) ``` -## Producer methods +### Producer methods * `space:put` - - `task` - table or array or tuple - + `table` - * **requires** space format - * suitable for id generation - + `array` - * ignores space format - * for id generation use `NULL` (**not** `nil`) - + `tuple` - * ignores space format - * **can't** be used with id generation - - `attr` - table of attributes - + `delay` - number of seconds - * if set, task will become `W` instead of `R` for `delay` seconds - + `ttl` - number of seconds - * if set, task will be discarded after ttl seconds unless was taken + * `task` - table or array or tuple + * `table` + * **requires** space format + * suitable for id generation + * `array` + * ignores space format + * for id generation use `NULL` (**not** `nil`) + * `tuple` + * ignores space format + * **can't** be used with id generation + * `attr` - table of attributes + * `delay` - number of seconds + * if set, task will become `W` instead of `R` for `delay` seconds + * `ttl` - number of seconds + * if set, task will be discarded after ttl seconds unless was taken ```lua box.space.myqueue:put{ name="xxx"; data="yyy"; } @@ -112,57 +114,172 @@ box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5 }) box.space.myqueue:put({ name="xxx"; data="yyy"; }, { delay = 1.5; ttl = 100 }) ``` -## Consumer methods +### Consumer methods * `space:take(timeout)` - - `timeout` - number of seconds to wait for new task - + choose reasonable time - + beware of **readahead** size (see tarantool docs) - - returns task tuple or table (see retval) or nothing on timeout - - *TODO*: ttr must be there + * `timeout` - number of seconds to wait for new task + * choose reasonable time + * beware of **readahead** size (see tarantool docs) + * returns task tuple or table (see retval) or nothing on timeout + * *TODO*: ttr must be there * `space:ack(id, [attr])` - - `id`: - + `string` | `number` - primary key - + `tuple` - key will be extracted using index - + *TODO*: composite pk - - `attr`: - + `update` - table for update, like in space:update - + `delay` - number of seconds - * applicable only when `zombie` enabled - * if `zombie` is enabled, delay deletion for delay time - - returns task tuple or table (see retval) + * `id`: + * `string` | `number` - primary key + * `tuple` - key will be extracted using index + * *TODO*: composite pk + * `attr`: + * `update` - table for update, like in space:update + * `delay` - number of seconds + * applicable only when `zombie` enabled + * if `zombie` is enabled, delay deletion for delay time + * returns task tuple or table (see retval) * `space:release(id, [attr])` - - `id` - as in `:ack` - - `attr` - + `update` - table for update, like in space:update - + `ttl` - timeout for time to live - + `delay` - number of seconds - * if set, task will become `W` instead of `R` for `delay` seconds + * `id` - as in `:ack` + * `attr` + * `update` - table for update, like in space:update + * `ttl` - timeout for time to live + * `delay` - number of seconds + * if set, task will become `W` instead of `R` for `delay` seconds * `space:bury(id, [attr])` - - `id` - as in `:ack` - - `attr` - + `update` - table for update, like in space:update - + `ttl` - timeout for time to live (?) + * `id` - as in `:ack` + * `attr` + * `update` - table for update, like in space:update + * `ttl` - timeout for time to live (?) -## Admin methods +* `space:wait(id, [timeout])` + * `id` - as in `:ack` + * `timeout` - number of seconds to wait for the task processing + * returns task tuple or table (see retval) and boolean `was_processed` flag + +### Admin methods * `space:dig(id, [attr])` - dig out task from buried state - - `id` - as in `:ack` - - `attr` - + `update` - table for update, like in space:update - + `ttl` - timeout for time to live - + `delay` - number of seconds - * if set, task will become `W` instead of `R` for `delay` seconds - - returns task tuple or table (see retval) - -* `space:plow(N, [attr])` - dig out N oldest tasks from buried state - - `N` - number. To dig out all buried tasks set infinity or nil - - `attr` - attrs for every task, like in dig - -* `space:queue_stats()` - - returns table - + *TODO* + * `id` - as in `:ack` + * `attr` + * `update` - table for update, like in space:update + * `ttl` - timeout for time to live + * `delay` - number of seconds + * if set, task will become `W` instead of `R` for `delay` seconds + * returns task tuple or table (see retval) + +* `space:kill(id)` - kill the task when it was taken + * `id` - as in `:ack` + * returns task tuple or table (see retval) + +* `space:dig(N, [attr])` - dig out N oldest tasks from buried state + * `N` - number. To dig out all buried tasks set infinity or nil + * `attr` - attrs for every task, like in dig + +* `space:stats()` + * returns table + + ```yaml + tarantool> box.space.queue:stats() + --- + - tube: + B: + counts: {'T': 0, 'W': 0, 'R': 0, 'D': 0, 'Z': 0, 'B': 0} + transition: + X-R: 6 + R-T: 7 + T-S: 3 + T-X: 4 + A: + counts: {'B': 0, 'W': 0, 'D': 0, 'T': 0, 'Z': 0, 'R': 0} + transition: [] + C: + counts: {'T': 0, 'W': 0, 'R': 0, 'D': 0, 'Z': 0, 'B': 0} + transition: + T-R: 16 + R-T: 19 + S-R: 3 + T-X: 3 + transition: + X-R: 6 + R-T: 26 + T-R: 19 + T-X: 7 + counts: {'T': 0, 'W': 0, 'R': 0, 'D': 0, 'Z': 0, 'B': 0} + ... + ``` + +## Explanation of Statuses in :stats() + +| Status | Description | +|--------|-----------------------------------------------------------------------------| +| X | Special status, means task went to void, or appeared from void (makes sense in transitions) | +| R | (Ready) Task is Ready to be Taken | +| W | (Waiting) Task processing was delayed by producer or consumer | +| T | (Taken) Task has been taken by consumer | +| B | (Buried) Fatal error happened during task processing (and consumer buried it) | +| Z | (Zombie) Task was processed successfully and left in the Queue for zombie_delay | +| D | (Done) Task was processed successfully and left in the Queue | +| S | Extra status for tube transition | + +## Status Transitions + +| Transition | Description | +|------------|---------------------------------------------------------------------------------------------| +| X -> R | Task was put into the Queue into Ready state (method :put) | +| X -> W | Task was put into the Queue into Waiting state (method :put with specified delay) | +| W -> R | Task was scheduled by runat mechanism of the Queue and became ready to be Taken by consumer | +| R -> T | Task was taken by consumer | +| T -> W | Consumer returned task into the Queue and delayed its execution | +| T -> R | Consumer returned task into the Queue for immediate execution | +| T -> X | Consumer acknowledged successful execution of the Task, and task was removed from the Queue | +| T -> Z | Consumer acknowledged successful execution of the Task, and task was moved to Zombie state for zombie_delay timeout | +| T -> D | Consumer acknowledged successful execution of the Task, and task was moved to Done status and will be left in the Queue forever | +| T -> S | Task was moved into another tube | +| S -> R | Task was moved from another tube to be Ready | +| S -> W | Task was moved from another tube to be Waiting | +| S -> D | Task was moved from another tube to be Done | +| S -> B | Task was moved from another tube to be Buried | + +## Benchmarks + +```bash +❯ .rocks/bin/luabench -d 1000000x +Tarantool version: Tarantool 2.10.7-0-g60f7e18 +Tarantool build: Darwin-arm64-RelWithDebInfo (static) +Tarantool build flags: -Wno-unknown-pragmas -fexceptions -funwind-tables -fno-common -Wformat -Wformat-security -Werror=format-security -fstack-protector-strong -fPIC -fmacro-prefix-map=/var/folders/8x/1m5v3n6d4mn62g9w_65vvt_r0000gn/T/tarantool_install1565297302=. -std=c11 -Wall -Wextra -Wno-strict-aliasing -Wno-char-subscripts -Wno-gnu-alignof-expression -Wno-cast-function-type +CPU: Apple M1 @ 8 +JIT: Enabled +JIT: fold cse dce fwd dse narrow loop abc sink fuse +Duration: 1000000 iters + +--- BENCH: 001_put_take_bench::bench_producer:producer-1 + 1000000 44046 ns/op 22703 op/s 4137 B/op +3946.04MB + +--- BENCH: 001_put_take_bench::bench_producer:producer-2 + 1000000 27519 ns/op 36339 op/s 4282 B/op +4084.35MB + +--- BENCH: 001_put_take_bench::bench_producer:producer-4 + 1000000 21510 ns/op 46491 op/s 4500 B/op +4291.69MB + +--- BENCH: 001_put_take_bench::bench_producer:producer-8 + 1000000 20804 ns/op 48068 op/s 4379 B/op +4176.26MB + +--- BENCH: 001_put_take_bench::bench_producer:producer-12 + 1000000 20383 ns/op 49062 op/s 4372 B/op +4169.59MB + +--- BENCH: 001_put_take_bench::bench_producer:producer-16 + 1000000 21495 ns/op 46523 op/s 4215 B/op +4019.98MB + +--- BENCH: 001_put_take_bench::bench_producer:producer-20 + 1000000 23676 ns/op 42238 op/s 4258 B/op +4061.49MB + +--- BENCH: 001_put_take_bench::bench_producer:producer-24 + 1000000 24456 ns/op 40891 op/s 4274 B/op +4076.65MB +``` + +## TODO +* reload/upgrade and feature switch +* composite primary keys +* switch - turn non-taken task into any state +* snatch - take away Taken task from its owner and turn it into R/W +* restart (renew, requeue?) - take task from W/D/Z states and put it into R/W +* it seems, that ttl/ttr/bury requires attrs field diff --git a/benchmarks/001_put_take_bench.lua b/benchmarks/001_put_take_bench.lua new file mode 100644 index 0000000..61a3bbc --- /dev/null +++ b/benchmarks/001_put_take_bench.lua @@ -0,0 +1,104 @@ +local lb = require 'luabench' +local fiber = require 'fiber' +local log = require 'log' +local fun = require 'fun' + +local xqueue = require 'xqueue' + +box.cfg{ memtx_memory = 2^30, log_level = 4, checkpoint_count = 1, wal_max_size = 512*2^20 } +box.schema.space.create('queue', { + if_not_exists = true, + format = { + { name = 'id', type = 'unsigned' }, + { name = 'status', type = 'string' }, + }, +}) +box.space.queue:create_index('primary', { parts = {'id'}, if_not_exists = true }) +box.space.queue:create_index('xq', { parts = {'status', 'id'}, if_not_exists = true }) + +xqueue.upgrade(box.space.queue, { + fields = { + id = 'id', + status = 'status', + }, + features = { + id = 'time64', + keep = false, + delayed = false, + buried = false, + retval = 'tuple', + zombie = false, + ttl = false, + ttr = false, + }, + debug = false, + workers = 32, + worker = function(task) + -- pass (auto-ack) + end +}) + +local queue = box.space.queue --[[@as xqueue.space]] +lb.before_all(function() queue:truncate() end) +lb.after_all(function() queue:truncate() box.snapshot() end) + +local M = {} + +local function new_producer_bench_f(num) + local start = fiber.cond() + local limit = 0 + local produced = 0 + + local done = fiber.channel() + + for _ = 1, num do + fiber.create(function() + while true do + start:wait() + + local empty_tab = {} + while produced < limit do + produced = produced + 1 + local ok, err = pcall(queue.put, queue, empty_tab) + if not ok then + log.error(":put() => %s", err) + end + end + + -- await everything + while queue:len() > 0 do + for _, task in queue.index.xq:pairs() do + queue:wait(task, 0.1) + end + end + + done:put(true) + end + end) + end + + ---@param sb luabench.B + return function(sb) + limit = sb.N + produced = 0 + start:broadcast() + + for _ = 1, num do + done:get() -- await all producers to finish + end + end +end + +---@param b luabench.B +function M.bench_producer_await(b) + b:run('producer-1', new_producer_bench_f(1)) + b:run('producer-2', new_producer_bench_f(2)) + b:run('producer-4', new_producer_bench_f(4)) + b:run('producer-8', new_producer_bench_f(8)) + b:run('producer-12', new_producer_bench_f(12)) + b:run('producer-16', new_producer_bench_f(16)) + b:run('producer-20', new_producer_bench_f(20)) + b:run('producer-24', new_producer_bench_f(24)) +end + +return M diff --git a/test/basic_test.lua b/test/basic_test.lua index 9515469..c04764e 100644 --- a/test/basic_test.lua +++ b/test/basic_test.lua @@ -138,6 +138,8 @@ function g.test_delayed_queue() { name = 'payload', type = 'any' }, }) + local F = { id = 1, status = 2, runat = 3, payload = 4 } + queue:create_index('primary', { parts = {'id'} }) queue:create_index('status', { parts = {'status', 'id'} }) queue:create_index('runat', { parts = {'runat', 'id'} }) @@ -176,7 +178,7 @@ function g.test_delayed_queue() t.assert_equals(queue:get({taken.id}).status, 'W', 'queue:release(..., {delay=<>}) must put task in W') t.assert_le(queue:get({task_put_delay_500ms.id}).runat, queue:get({taken.id}).runat, "first task must wakeup earlier") - local taken_delayed = queue:take({ timeout = 0.11 }) + local taken_delayed = queue:take({ timeout = 0.13 }) t.assert(taken_delayed, 'delayed task must be taken after timeout') local taken_put = queue:take({ timeout = 0.1 }) @@ -199,7 +201,7 @@ function g.test_delayed_queue() queue:release(take, { update = { - { '=', 'payload', { new = 2 } } + { '=', F.payload, { new = 2 } } } }) @@ -210,7 +212,7 @@ function g.test_delayed_queue() queue:ack(take, { update = { - { '=', 'payload', { finished = 2 } }, + { '=', F.payload, { finished = 2 } }, }, delay = 0.5, }) diff --git a/test/interface_test.lua b/test/interface_test.lua new file mode 100644 index 0000000..757680a --- /dev/null +++ b/test/interface_test.lua @@ -0,0 +1,160 @@ +---@diagnostic disable: inject-field +local xqueue = require 'xqueue' +require 'test.setup' +local clock = require 'clock' +local fun = require 'fun' +local log = require 'log' +local json = require 'json' + +local t = require 'luatest' --[[@as luatest]] +local g = t.group('interface') + +---@return xqueue.space +local function setup_queue(opts) + local space = box.schema.space.create(opts.name, {if_not_exists = true}) + space:format(opts.format) + space:create_index('primary', { parts = {'id'} }) + if opts.fields.runat then + space:create_index('runat', { parts = {'runat', 'id'} }) + end + + if opts.fields.priority then + space:create_index('xq', { parts = {'status', tostring(opts.fields.priority), 'id'} }) + else + space:create_index('xq', { parts = {'status', 'id'} }) + end + + if opts.fields.tube then + if opts.fields.priority then + space:create_index('tube', { parts = { 'tube', 'status', tostring(opts.fields.priority), 'id' } }) + else + space:create_index('tube', { parts = { 'tube', 'status', 'id' } }) + end + end + + xqueue.upgrade(space, opts) + return space +end + +local fiber = require 'fiber' +local netbox = require 'net.box' + +function g.test_producer_consumer() + local queue = setup_queue({ + name = 'simpleq', + format = { + { name = 'id', type = 'string' }, + { name = 'status', type = 'string' }, + { name = 'payload', type = 'any' }, + }, + features = { + id = 'uuid', + keep = false, + }, + fields = { + id = 'id', + status = 'status', + }, + retval = 'table', + } --[[@as xqueue.upgrade.options]]) + + local producer = fiber.create(function() + fiber.self():set_joinable(true) + fiber.yield() + for _ = 1, 10 do + queue:put({ payload = { id = math.random() } }) + end + queue:put({ payload = { id = 0, is_last = true } }) + end) + + local consumer = fiber.create(function() + fiber.self():set_joinable(true) + fiber.yield() + + repeat + -- taking loop + local task + while not task do + task = queue:take({ timeout = 1 }) + end + queue:ack(task) + until task.payload.is_last + + -- consumer has seen last task. + repeat + local task = queue:take(0) + if task then queue:ack(task) end + until not task + end) + + producer:join() + consumer:join() + + t.assert_equals(queue:len(), 0, "Queue must be empty") +end + + +function g.test_producer_consumer_with_feedback() + local format = { + { name = 'id', type = 'unsigned' }, + { name = 'nice', type = 'unsigned' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'attempt', type = 'unsigned' }, + { name = 'payload', type = 'any' }, + { name = 'context', type = 'any' }, + { name = 'error', type = 'any' }, + } + local F = fun.iter(format):enumerate():map(function(no, f) return f.name, no end):tomap() + + local queue + queue = setup_queue({ + name = 'q', + format = format, + features = { + id = 'time64', + buried = true, + delayed = true, + retval = 'table', + zombie = 5, + }, + debug = true, + fields = { + runat = 'runat', + status = 'status', + priority = 'nice', + }, + workers = 2, + worker = function(task) + fiber.sleep(math.random()) -- random sleep + if fiber.id() % 2 == task.id % 2 then + queue:ack(task, {update = { + { '+', F.attempt, 1 }, + { '=', F.context, { processed_by = fiber.id(), processed_time = fiber.time() } }, + }}) + else + queue:release(task, {update = { + { '+', F.attempt, 1 }, + { '=', F.error, { processed_by = fiber.id(), processed_time = fiber.time() } }, + }}) + end + end, + } --[[@as xqueue.upgrade.options]]) + + local fibs = {} + for i = 1, 10 do + fibs[i] = fiber.create(function() + fiber.self():set_joinable(true) + fiber.yield() + local res, is_processed = queue:put({ attempt = 0, nice = 0 }, { wait = 1 }) + log.info(":put() => %s %s", json.encode(res), is_processed) + repeat + res, is_processed = queue:wait(res, 0.1) + log.info(":wait() => %s %s", json.encode(res), is_processed) + until is_processed or res.status == 'D' or res.status == 'Z' + end) + end + for i = 1, 10 do + fibs[i]:join() + end +end diff --git a/test/network_test.lua b/test/network_test.lua index d8f829d..19d7506 100644 --- a/test/network_test.lua +++ b/test/network_test.lua @@ -28,6 +28,7 @@ function g.test_untake() { name = 'runat', type = 'number' }, { name = 'payload', type = 'any' }, }) + local F = { id = 1, status = 2, runat = 3, payload = 4 } queue:create_index('primary', { parts = {'id'} }) queue:create_index('status', { parts = {'status', 'id'} }) @@ -37,6 +38,7 @@ function g.test_untake() features = { id = 'time64', delayed = true, + retval = 'table', }, fields = { status = 'status', @@ -76,11 +78,11 @@ function g.test_untake() t.assert_equals(task.id, taken.id, "retutned the same task (2nd)") local processed_at = clock.time() - local acked = tt:call('box.space.queue:ack', {taken, { update = {{'=', 'payload', { processed_at = processed_at }}} }}) - t.assert_equals(acked.id, taken.id, ":ack() returned taken but completed task") + local acked = tt:call('box.space.queue:ack', {taken, { update = {{'=', F.payload, { processed_at = processed_at }}} }}, {timeout = 1}) + t.assert_equals(acked[1], taken.id, ":ack() returned taken but completed task") local awaiter_res = awaiter_fin:get() - t.assert_equals(awaiter_res[1].id, acked.id, "awaiter saw acknowledged task") + t.assert_equals(awaiter_res[1].id, acked[1], "awaiter saw acknowledged task") t.assert_equals(awaiter_res[2], true, "awaiter saw task as processed") tt:close() diff --git a/test/replicaset_test.lua b/test/replicaset_test.lua new file mode 100644 index 0000000..90fd564 --- /dev/null +++ b/test/replicaset_test.lua @@ -0,0 +1,281 @@ +---@diagnostic disable: inject-field +local t = require 'luatest' --[[@as luatest]] +local replicaset = require 'luatest.replica_set' + +local group_config = { + raft = { + box_cfg = { + log_level = 5, + replication_timeout = 0.1, + replication_connect_timeout = 10, + replication_sync_lag = 0.01, + election_mode = 'candidate', + replication_synchro_quorum = 3, + }, + }, + repl = { + box_cfg = { + log_level = 5, + replication_timeout = 1.1, + replication_connect_timeout = 10, + replication_sync_lag = 0.01, + replication_connect_quorum = 3, + }, + }, +} + +local Server = t.Server +local g = t.group('replicaset', { + { name = 'raft' }, + { name = 'repl' }, +}) + +---@return boolean +local function setup_queue(opts) + local log = require 'log' + local fiber = require 'fiber' + + local function await_on_ro(func) + while box.info.ro do + if pcall(func) then break end + fiber.sleep(0.01) + end + return box.info.ro + end + while box.info.ro and not box.space[opts.name] do + -- we're ro and no space found + log.info("i'm ro and no space") + if box.ctl.wait_rw then + pcall(box.ctl.wait_rw, 0.01) + else + fiber.sleep(0.01) + end + -- we're rw, but don't know whether space exists + if not box.info.ro then + log.info("i'm rw will create spaces") + break + end + -- we're ro so we have to await box.space[opts.name] + end + log.info("creating space: i'm %s", box.info.ro and 'ro' or 'rw') + + local space = box.schema.space.create(opts.name, { + if_not_exists = true, + format = opts.format, + }) + + await_on_ro(function() space.index.primary:count() end) + space:create_index('primary', { parts = {'id'}, if_not_exists = true }) + + if opts.fields.runat then + await_on_ro(function() space.index.runat:count() end) + space:create_index('runat', { parts = {'runat', 'id'}, if_not_exists = true }) + end + + await_on_ro(function() space.index.xq:count() end) + if opts.fields.priority then + space:create_index('xq', { + if_not_exists = true, + parts = {'status', tostring(opts.fields.priority), 'id'} + }) + else + space:create_index('xq', { + if_not_exists = true, + parts = {'status', 'id'} + }) + end + + if opts.fields.tube then + await_on_ro(function() space.index.tube:count() end) + if opts.fields.priority then + space:create_index('tube', { + if_not_exists = true, + parts = { 'tube', 'status', tostring(opts.fields.priority), 'id' } + }) + else + space:create_index('tube', { + if_not_exists = true, + parts = { 'tube', 'status', 'id' } + }) + end + end + + require'xqueue'.upgrade(space, opts) + return true +end + +local fio = require 'fio' +local fiber = require 'fiber' +local log = require 'log' + +g.after_each(function(test) + fiber.sleep(1.5) -- give some time to collect coverage + test.ctx.replica_set:stop() +end) + +local function v2num(str) + str = str or rawget(_G, '_TARANTOOL') + local maj, min = unpack(str:split('.')) + return tonumber(maj)*1000 + tonumber(min) +end + +function g.test_start(test) + test.ctx = {} + local params = test.params + + if v2num() < v2num('2.8') and params.name == 'raft' then + t.skip('Raft is not supported by this Tarantool version') + return + end + + local extend = function(result, template) + for k, v in pairs(template) do if result[k] == nil then result[k] = v end end + return result + end + + local data_dir = os.getenv('LUATEST_LUACOV_ROOT') + local replica_set = replicaset:new({}) + + ---@type BoxCfg + local box_cfg = extend({ + replication = { + Server.build_listen_uri('replica1', replica_set.id), + Server.build_listen_uri('replica2', replica_set.id), + Server.build_listen_uri('replica3', replica_set.id), + }, + }, group_config[params.name].box_cfg) + + local server_template = { + datadir = data_dir, + coverage_report = true, + } + + replica_set:build_and_add_server(extend({ alias = 'replica1', box_cfg = box_cfg }, server_template)) + replica_set:build_and_add_server(extend({ + alias = 'replica2', + box_cfg = extend({ + read_only = params.name == 'repl' + }, box_cfg), + }, server_template)) + replica_set:build_and_add_server(extend({ + alias = 'replica3', + box_cfg = extend({ + read_only = params.name == 'repl' + }, box_cfg), + }, server_template)) + replica_set:start() + replica_set:wait_for_fullmesh() + + test.ctx.replica_set = replica_set + + local simpleq = { + name = 'simpleq', + format = { + { name = 'id', type = 'string' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'payload', type = 'any' }, + }, + features = { + id = 'uuid', + keep = false, + delayed = true, + }, + fields = { + id = 'id', + status = 'status', + runat = 'runat', + }, + retval = 'table', + } --[[@as xqueue.upgrade.options]] + + local had_error + local await = {} + for _, srv in pairs(replica_set.servers) do + ---@cast srv luatest.server + local fib = fiber.create(function() + fiber.self():set_joinable(true) + local ok, err = pcall(srv.exec, srv, setup_queue, {simpleq}, { timeout = 20 }) + if not ok then + had_error = err + log.error("%s: %s", srv.alias, err) + end + end) + table.insert(await, fib) + end + for _, f in ipairs(await) do + fiber.join(f) + end + + t.assert_not(had_error, "setup of queue without errors") + + local rw = replica_set:get_leader() --[[@as luatest.server]] + local cookie = require'uuid'.str() + local task = rw:call('box.space.simpleq:put', {{payload = {cookie = cookie}}, { delay = 5 } }) + t.assert(task, "task was returned") + + local SWITCH_TIMEOUT = 10 + + if params.name == 'raft' then + local deadline = fiber.time() + SWITCH_TIMEOUT + local new_rw + repeat + rw:exec(function() box.ctl.demote() end) + fiber.sleep(0.5) + new_rw = replica_set:get_leader() + if fiber.time() > deadline then break end + until new_rw and rw.alias ~= new_rw.alias + + if not new_rw or new_rw.alias == rw.alias then + t.fail("box.ctl.demote() failed to elect different leader") + end + rw = new_rw + elseif params.name == 'repl' then + local leader_vclock = rw:exec(function() + box.cfg{read_only = true} + require 'fiber'.sleep(0) + return box.info.vclock + end) + + local new_rw + for _, server in ipairs(replica_set.servers) do + if server ~= rw then + new_rw = server + end + end + + t.assert_not_equals(new_rw.alias, rw.alias, "candidate to switch must be choosen") + + new_rw:exec(function(vclock, timeout) + local fib = require 'fiber' + + local eq_vclock = function() + for n, lsn in pairs(vclock) do + if (box.info.vclock[n] or 0) ~= lsn then return false end + end + return true + end + + local deadline = fib.time() + timeout + while not eq_vclock() and deadline > fib.time() do fib.sleep(0.001) end + assert(eq_vclock, "failed to await vclock") + + box.cfg{read_only = false} + if box.ctl.promote then + -- take control over synchro queue + box.ctl.promote() + end + end, {leader_vclock, SWITCH_TIMEOUT - 0.1}, { timeout = SWITCH_TIMEOUT }) + rw = new_rw + end + + do + local trw = replica_set:get_leader() + t.assert_equals(trw.alias, rw.alias, "after rw-switch luatest succesfully derived new leader") + end + + local task = rw:call('box.space.simpleq:take', { 5 }, { timeout = 6 }) + t.assert(task, "delayed task has been succesfully taken from new leader") + t.assert_equals(task.payload.cookie, cookie, "task.cookie is valid") + +end diff --git a/xqueue-dev-1.rockspec b/xqueue-dev-1.rockspec new file mode 100644 index 0000000..e688e21 --- /dev/null +++ b/xqueue-dev-1.rockspec @@ -0,0 +1,21 @@ +rockspec_format = "3.0" +package = "xqueue" +version = "dev-1" +source = { + url = "git+https://github.com/moonlibs/xqueue.git", + branch = "master" +} +description = { + summary = "Package for loading external lua config", + homepage = "https://github.com/moonlibs/xqueue.git", + license = "BSD" +} +dependencies = { + "lua ~> 5.1" +} +build = { + type = "builtin", + modules = { + xqueue = "xqueue.lua" + } +} diff --git a/xqueue.lua b/xqueue.lua index df07ccc..c3af090 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -270,6 +270,7 @@ local methods = {} ---@field _default_truncate fun(space: boxSpaceObject) ---@field _on_repl replaceTrigger ---@field _on_dis fun() +---@field ready? fiber.channel channel appears when xq is not ready ---@class xqueue.space: boxSpaceObject @@ -581,7 +582,6 @@ function M.upgrade(space,opts,depth) transition = {}; tube = stat_tube; } - -- TODO: benchmark index:count() if self.fields.tube then for _, t in space:pairs(nil, { iterator = box.index.ALL }) do local s = t[self.fields.status] @@ -598,6 +598,8 @@ function M.upgrade(space,opts,depth) self._stat.counts[s] = (self._stat.counts[s] or 0LL) + 1 end end + else + self._stat = { counts = {}, transition = {}, tube = stat_tube } end -- 3. features check @@ -819,7 +821,7 @@ function M.upgrade(space,opts,depth) log.verbose("awaiting rw") repeat if box.ctl.wait_rw then - box.ctl.wait_rw(1) + pcall(box.ctl.wait_rw, 1) else fiber.sleep(0.001) end @@ -828,7 +830,8 @@ function M.upgrade(space,opts,depth) local ok, err = pcall(func, ...) if not ok then - log.error("%s", err) + log.error("%s%s", + err, xq.ready and ': (xq is not ready yet)' or '') end fiber.testcancel() until (not box.space[space.name]) or space.xq ~= xq @@ -846,11 +849,8 @@ function M.upgrade(space,opts,depth) repeat fiber.sleep(0.001) until space.xq if xq.ready then xq.ready:get() end log.info("I am worker %s",i) - if box.info.ro then - log.info("Shutting down on ro instance") - return - end - while box.space[space.name] and space.xq == xq do + while box.space[space.name] and space.xq == xq and not box.info.ro do + if xq.ready then xq.ready:get() end local task = space:take(1) if task then local key = xq:getkey(task) @@ -869,6 +869,10 @@ function M.upgrade(space,opts,depth) end fiber.yield() end + if box.info.ro then + log.info("Shutting down on ro instance") + return + end log.info("worker %s ended", i) end,space,self) end @@ -884,14 +888,10 @@ function M.upgrade(space,opts,depth) if xq.ready then xq.ready:get() end local chan = xq.runat_chan log.info("Runat started") - if box.info.ro then - log.info("Shutting down on ro instance") - return - end local maxrun = 1000 local curwait local collect = {} - while box.space[space.name] and space.xq == xq do + while box.space[space.name] and space.xq == xq and not box.info.ro do local r,e = pcall(function() -- print("runat loop 2 ",box.time64()) local remaining @@ -968,6 +968,10 @@ function M.upgrade(space,opts,depth) if curwait == 0 then fiber.sleep(0) end chan:get(curwait) end + if box.info.ro then + log.info("Shutting down on ro instance") + return + end log.info("Runat ended") end,space,self,runat_index) end @@ -1164,7 +1168,6 @@ function M.upgrade(space,opts,depth) log.info("Upgraded %s into xqueue (status=%s)", space.name, box.info.status) - function self:starttest() local xq = self if box.info.status == 'orphan' then @@ -1181,10 +1184,16 @@ function M.upgrade(space,opts,depth) if box.info.ro then log.info("Server is ro, not resetting statuses") else - -- FIXME: with stable iterators add yield local space = box.space[self.space] + local ver = tonumber(rawget(_G, '_TARANTOOL'):match('^([^.]+%.[^.]+)')) + local yield_limit = 2^32-1 + local scanned = 0 + if ver >= 1.10 then + yield_limit = 1000 + end box.begin() for _,t in self.index:pairs({'T'},{ iterator = box.index.EQ }) do + scanned = scanned + 1 local key = t[ xq.key.no ] if not self.taken[key] and not self._lock[key] then local update = { @@ -1197,6 +1206,11 @@ function M.upgrade(space,opts,depth) space:update({key}, update) log.info("Start: T->R (%s)", key) end + if scanned == yield_limit then + scanned = 0 + box.commit() + box.begin() + end end box.commit() end @@ -1713,16 +1727,19 @@ end -- special remap of truncate for deliting stats and saving methods function methods:truncate() - local stat = self.xq._stat + local xq = self.xq + xq.ready = xq.ready or fiber.channel(0) + local stat = xq._stat for status, _ in pairs(stat.counts) do stat.counts[status] = 0LL end for transition, _ in pairs(stat.transition) do stat.transition[transition] = nil end - local ret = self.xq._default_truncate(self) + local ret = xq._default_truncate(self) local meta = debug.getmetatable(self) for k,v in pairs(methods) do meta[k] = v end + xq:make_ready() -- Now we reset our methods after truncation because -- as we can see in on_replace_dd_truncate: -- https://github.com/tarantool/tarantool/blob/0b7cc52607b2290d2f35cc68ee1a8243988c2735/src/box/alter.cc#L2239 @@ -1740,6 +1757,8 @@ local pretty_st = { D = "Done", } +local shortmap = { __serialize = 'map' } + ---@param pretty? boolean function methods:stats(pretty) local stats = table.deepcopy(self.xq._stat) @@ -1754,10 +1773,12 @@ function methods:stats(pretty) else stats.counts[s] = stats.counts[s] or 0LL for _, tube_stat in pairs(stats.tube) do + setmetatable(tube_stat.counts, shortmap) tube_stat.counts[s] = tube_stat.counts[s] or 0LL end end end + setmetatable(stats.counts, shortmap) return stats end