From 037ce562fe7407baf05bfb128b2bfd98663916c6 Mon Sep 17 00:00:00 2001 From: "thomas.crambert" Date: Wed, 29 Jan 2025 14:10:10 +0100 Subject: [PATCH] feat: add on_exit hook --- lib/solid_queue.rb | 32 ++++++++---------------- lib/solid_queue/dispatcher.rb | 3 ++- lib/solid_queue/lifecycle_hooks.rb | 11 +++++++- lib/solid_queue/scheduler.rb | 1 + lib/solid_queue/supervisor.rb | 2 ++ lib/solid_queue/worker.rb | 2 +- test/integration/lifecycle_hooks_test.rb | 19 +++++++++++--- 7 files changed, 42 insertions(+), 28 deletions(-) diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index 1e1961e6..02b88d05 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -41,30 +41,20 @@ module SolidQueue mattr_accessor :clear_finished_jobs_after, default: 1.day mattr_accessor :default_concurrency_control_period, default: 3.minutes - delegate :on_start, :on_stop, to: Supervisor + delegate :on_start, :on_stop, :on_exit, to: Supervisor - def on_worker_start(...) - Worker.on_start(...) - end - - def on_worker_stop(...) - Worker.on_stop(...) - end - - def on_dispatcher_start(...) - Dispatcher.on_start(...) - end - - def on_dispatcher_stop(...) - Dispatcher.on_stop(...) - end + [ Dispatcher, Scheduler, Worker ].each do |process| + define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block| + process.on_start { block.call } + end - def on_scheduler_start(...) - Scheduler.on_start(...) - end + define_singleton_method(:"on_#{process.name.demodulize.downcase}_stop") do |&block| + process.on_stop { block.call } + end - def on_scheduler_stop(...) - Scheduler.on_stop(...) + define_singleton_method(:"on_#{process.name.demodulize.downcase}_exit") do |&block| + process.on_exit { block.call } + end end def supervisor? diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index a443df2e..6f7ec245 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -8,7 +8,8 @@ class Dispatcher < Processes::Poller after_boot :run_start_hooks after_boot :start_concurrency_maintenance before_shutdown :stop_concurrency_maintenance - after_shutdown :run_stop_hooks + before_shutdown :run_stop_hooks + after_shutdown :run_exit_hooks def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS) diff --git a/lib/solid_queue/lifecycle_hooks.rb b/lib/solid_queue/lifecycle_hooks.rb index fabddac4..0403459a 100644 --- a/lib/solid_queue/lifecycle_hooks.rb +++ b/lib/solid_queue/lifecycle_hooks.rb @@ -5,7 +5,7 @@ module LifecycleHooks extend ActiveSupport::Concern included do - mattr_reader :lifecycle_hooks, default: { start: [], stop: [] } + mattr_reader :lifecycle_hooks, default: { start: [], stop: [], exit: [] } end class_methods do @@ -17,7 +17,12 @@ def on_stop(&block) self.lifecycle_hooks[:stop] << block end + def on_exit(&block) + self.lifecycle_hooks[:exit] << block + end + def clear_hooks + self.lifecycle_hooks[:exit] = [] self.lifecycle_hooks[:start] = [] self.lifecycle_hooks[:stop] = [] end @@ -32,6 +37,10 @@ def run_stop_hooks run_hooks_for :stop end + def run_exit_hooks + run_hooks_for :exit + end + def run_hooks_for(event) self.class.lifecycle_hooks.fetch(event, []).each do |block| block.call diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index b68075dc..d3164ed5 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -11,6 +11,7 @@ class Scheduler < Processes::Base after_boot :schedule_recurring_tasks before_shutdown :unschedule_recurring_tasks before_shutdown :run_stop_hooks + after_shutdown :run_exit_hooks def initialize(recurring_tasks:, **options) @recurring_schedule = RecurringSchedule.new(recurring_tasks) diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index e8f075eb..f2207691 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -5,6 +5,8 @@ class Supervisor < Processes::Base include LifecycleHooks include Maintenance, Signals, Pidfiled + after_shutdown :run_exit_hooks + class << self def start(**options) SolidQueue.supervisor = true diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index f34a14f0..54d4d870 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -6,7 +6,7 @@ class Worker < Processes::Poller after_boot :run_start_hooks before_shutdown :run_stop_hooks - + after_shutdown :run_exit_hooks attr_accessor :queues, :pool diff --git a/test/integration/lifecycle_hooks_test.rb b/test/integration/lifecycle_hooks_test.rb index 8bc4dc94..7da73228 100644 --- a/test/integration/lifecycle_hooks_test.rb +++ b/test/integration/lifecycle_hooks_test.rb @@ -8,15 +8,19 @@ class LifecycleHooksTest < ActiveSupport::TestCase test "run lifecycle hooks" do SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) } SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) } + SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) } SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) } SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) } + SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) } SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) } SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) } + SolidQueue.on_dispatcher_exit { JobResult.create!(status: :hook_called, value: :dispatcher_exit) } SolidQueue.on_scheduler_start { JobResult.create!(status: :hook_called, value: :scheduler_start) } SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) } + SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) } pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false) wait_for_registered_processes(4) @@ -24,13 +28,20 @@ class LifecycleHooksTest < ActiveSupport::TestCase terminate_process(pid) wait_for_registered_processes(0) + results = skip_active_record_query_cache do - assert_equal 8, JobResult.count - JobResult.last(8) + job_results = JobResult.where(status: :hook_called) + assert_equal 12, job_results.count + job_results end - assert_equal({ "hook_called" => 8 }, results.map(&:status).tally) - assert_equal %w[start stop worker_start worker_stop dispatcher_start dispatcher_stop scheduler_start scheduler_stop].sort, results.map(&:value).sort + assert_equal({ "hook_called" => 12 }, results.map(&:status).tally) + assert_equal %w[ + start stop exit + worker_start worker_stop worker_exit + dispatcher_start dispatcher_stop dispatcher_exit + scheduler_start scheduler_stop scheduler_exit + ].sort, results.map(&:value).sort ensure SolidQueue::Supervisor.clear_hooks SolidQueue::Worker.clear_hooks