Skip to content

Commit

Permalink
feat: add on_exit hook
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasCrambert authored and rosa committed Feb 3, 2025
1 parent cb5d230 commit 9cd6bc3
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 28 deletions.
32 changes: 11 additions & 21 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,30 +41,20 @@ module SolidQueue
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes

delegate :on_start, :on_stop, to: Supervisor
delegate :on_start, :on_stop, :on_exit, to: Supervisor

def on_worker_start(...)
Worker.on_start(...)
end

def on_worker_stop(...)
Worker.on_stop(...)
end

def on_dispatcher_start(...)
Dispatcher.on_start(...)
end

def on_dispatcher_stop(...)
Dispatcher.on_stop(...)
end
[ Dispatcher, Scheduler, Worker ].each do |process|
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
process.on_start { block.call }
end

def on_scheduler_start(...)
Scheduler.on_start(...)
end
define_singleton_method(:"on_#{process.name.demodulize.downcase}_stop") do |&block|
process.on_stop { block.call }
end

def on_scheduler_stop(...)
Scheduler.on_stop(...)
define_singleton_method(:"on_#{process.name.demodulize.downcase}_exit") do |&block|
process.on_exit { block.call }
end
end

def supervisor?
Expand Down
3 changes: 2 additions & 1 deletion lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ class Dispatcher < Processes::Poller
after_boot :run_start_hooks
after_boot :start_concurrency_maintenance
before_shutdown :stop_concurrency_maintenance
after_shutdown :run_stop_hooks
before_shutdown :run_stop_hooks
after_shutdown :run_exit_hooks

def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS)
Expand Down
11 changes: 10 additions & 1 deletion lib/solid_queue/lifecycle_hooks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module LifecycleHooks
extend ActiveSupport::Concern

included do
mattr_reader :lifecycle_hooks, default: { start: [], stop: [] }
mattr_reader :lifecycle_hooks, default: { start: [], stop: [], exit: [] }
end

class_methods do
Expand All @@ -17,7 +17,12 @@ def on_stop(&block)
self.lifecycle_hooks[:stop] << block
end

def on_exit(&block)
self.lifecycle_hooks[:exit] << block
end

def clear_hooks
self.lifecycle_hooks[:exit] = []
self.lifecycle_hooks[:start] = []
self.lifecycle_hooks[:stop] = []
end
Expand All @@ -32,6 +37,10 @@ def run_stop_hooks
run_hooks_for :stop
end

def run_exit_hooks
run_hooks_for :exit
end

def run_hooks_for(event)
self.class.lifecycle_hooks.fetch(event, []).each do |block|
block.call
Expand Down
1 change: 1 addition & 0 deletions lib/solid_queue/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class Scheduler < Processes::Base
after_boot :schedule_recurring_tasks
before_shutdown :unschedule_recurring_tasks
before_shutdown :run_stop_hooks
after_shutdown :run_exit_hooks

def initialize(recurring_tasks:, **options)
@recurring_schedule = RecurringSchedule.new(recurring_tasks)
Expand Down
2 changes: 2 additions & 0 deletions lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ class Supervisor < Processes::Base
include LifecycleHooks
include Maintenance, Signals, Pidfiled

after_shutdown :run_exit_hooks

class << self
def start(**options)
SolidQueue.supervisor = true
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Worker < Processes::Poller

after_boot :run_start_hooks
before_shutdown :run_stop_hooks

after_shutdown :run_exit_hooks

attr_accessor :queues, :pool

Expand Down
19 changes: 15 additions & 4 deletions test/integration/lifecycle_hooks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,40 @@ class LifecycleHooksTest < ActiveSupport::TestCase
test "run lifecycle hooks" do
SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) }
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }
SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) }

SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }
SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) }

SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) }
SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) }
SolidQueue.on_dispatcher_exit { JobResult.create!(status: :hook_called, value: :dispatcher_exit) }

SolidQueue.on_scheduler_start { JobResult.create!(status: :hook_called, value: :scheduler_start) }
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) }
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) }

pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false)
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)


results = skip_active_record_query_cache do
assert_equal 8, JobResult.count
JobResult.last(8)
job_results = JobResult.where(status: :hook_called)
assert_equal 12, job_results.count
job_results
end

assert_equal({ "hook_called" => 8 }, results.map(&:status).tally)
assert_equal %w[start stop worker_start worker_stop dispatcher_start dispatcher_stop scheduler_start scheduler_stop].sort, results.map(&:value).sort
assert_equal({ "hook_called" => 12 }, results.map(&:status).tally)
assert_equal %w[
start stop exit
worker_start worker_stop worker_exit
dispatcher_start dispatcher_stop dispatcher_exit
scheduler_start scheduler_stop scheduler_exit
].sort, results.map(&:value).sort
ensure
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
Expand Down

0 comments on commit 9cd6bc3

Please sign in to comment.