diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index e44a476..b6491cb 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -196,7 +196,7 @@ function tube.new(space, on_task_change, opts) }, { __index = method }) self.cond = qc.waiter() - self.fiber = fiber.create(fifottl_fiber, self) + self.fiber = util.background_fiber(fifottl_fiber, self) self.sync_chan = fiber.channel() return self @@ -412,7 +412,7 @@ function method.start(self) if self.fiber then return end - self.fiber = fiber.create(fifottl_fiber, self) + self.fiber = util.background_fiber(fifottl_fiber, self) end function method.stop(self) diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index 7b726ba..e4410f3 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -383,7 +383,7 @@ function tube.new(space, on_task_change, opts) }, { __index = method }) self.cond = qc.waiter() - self.fiber = fiber.create(utubettl_fiber, self) + self.fiber = util.background_fiber(utubettl_fiber, self) self.sync_chan = fiber.channel(1) return self @@ -741,7 +741,7 @@ function method.start(self) if self.fiber then return end - self.fiber = fiber.create(utubettl_fiber, self) + self.fiber = util.background_fiber(utubettl_fiber, self) end function method.stop(self) diff --git a/queue/abstract/queue_session.lua b/queue/abstract/queue_session.lua index 9f32a8e..4dc79eb 100644 --- a/queue/abstract/queue_session.lua +++ b/queue/abstract/queue_session.lua @@ -171,7 +171,7 @@ end --- Create an expiration fiber to cleanup expired sessions. local function create_expiration_fiber() - local exp_fiber = fiber.create(function() + local exp_fiber = util.background_fiber(function() fiber.self():name('queue_expiration_fiber') while true do if box.info.ro == false then diff --git a/queue/abstract/queue_state.lua b/queue/abstract/queue_state.lua index 289a7d3..5ddfca2 100644 --- a/queue/abstract/queue_state.lua +++ b/queue/abstract/queue_state.lua @@ -2,6 +2,7 @@ local log = require('log') local fiber = require('fiber') +local util = require('queue.util') --[[ States switching scheme: @@ -62,7 +63,7 @@ end local function create_state_fiber(on_state_change_cb) log.info('Started queue state fiber') - fiber.create(function() + util.background_fiber(function() fiber.self():name('queue_state_fiber') while true do if current == queue_state.states.WAITING then diff --git a/queue/init.lua b/queue/init.lua index 6e61faa..e27aba1 100644 --- a/queue/init.lua +++ b/queue/init.lua @@ -4,6 +4,7 @@ local abstract = require('queue.abstract') local queue_state = require('queue.abstract.queue_state') local qc = require('queue.compat') local queue = nil +local util = require('queue.util') -- load all core drivers local core_drivers = { @@ -107,7 +108,7 @@ local function wrap_box_cfg() cfg_mt.__call = cfg_call_wrapper else -- Wait for the rw state. - fiber.new(rw_waiter) + util.background_fiber(rw_waiter) end else error('The box.cfg type is unexpected: ' .. type(box.cfg)) diff --git a/queue/util.lua b/queue/util.lua index 84b350a..0dc084c 100644 --- a/queue/util.lua +++ b/queue/util.lua @@ -34,6 +34,23 @@ local function event_time(tm) return tm end +local function background_fiber(func, ...) + local fib = fiber.new(func, ...) + + if not package.reload then + return fib + end + + fib:set_joinable(true) + + package.reload:register(function() + fib:cancel() + fib:join() + end) + + return fib +end + local util = { MAX_TIMEOUT = MAX_TIMEOUT, TIMEOUT_INFINITY = TIMEOUT_INFINITY @@ -42,7 +59,8 @@ local util = { -- methods local method = { time = time, - event_time = event_time + event_time = event_time, + background_fiber = background_fiber, } return setmetatable(util, { __index = method })