Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add on_exit hook #503

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 11 additions & 21 deletions lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,30 +41,20 @@ module SolidQueue
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes

delegate :on_start, :on_stop, to: Supervisor
delegate :on_start, :on_stop, :on_exit, to: Supervisor

def on_worker_start(...)
Worker.on_start(...)
end

def on_worker_stop(...)
Worker.on_stop(...)
end

def on_dispatcher_start(...)
Dispatcher.on_start(...)
end

def on_dispatcher_stop(...)
Dispatcher.on_stop(...)
end
[ Dispatcher, Scheduler, Worker ].each do |process|
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
process.on_start { block.call }
end

def on_scheduler_start(...)
Scheduler.on_start(...)
end
define_singleton_method(:"on_#{process.name.demodulize.downcase}_stop") do |&block|
process.on_stop { block.call }
end

def on_scheduler_stop(...)
Scheduler.on_stop(...)
define_singleton_method(:"on_#{process.name.demodulize.downcase}_exit") do |&block|
process.on_exit { block.call }
end
end

def supervisor?
Expand Down
3 changes: 2 additions & 1 deletion lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ class Dispatcher < Processes::Poller
after_boot :run_start_hooks
after_boot :start_concurrency_maintenance
before_shutdown :stop_concurrency_maintenance
after_shutdown :run_stop_hooks
before_shutdown :run_stop_hooks
after_shutdown :run_exit_hooks

def initialize(**options)
options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS)
Expand Down
11 changes: 10 additions & 1 deletion lib/solid_queue/lifecycle_hooks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module LifecycleHooks
extend ActiveSupport::Concern

included do
mattr_reader :lifecycle_hooks, default: { start: [], stop: [] }
mattr_reader :lifecycle_hooks, default: { start: [], stop: [], exit: [] }
end

class_methods do
Expand All @@ -17,7 +17,12 @@ def on_stop(&block)
self.lifecycle_hooks[:stop] << block
end

def on_exit(&block)
self.lifecycle_hooks[:exit] << block
end

def clear_hooks
self.lifecycle_hooks[:exit] = []
self.lifecycle_hooks[:start] = []
self.lifecycle_hooks[:stop] = []
end
Expand All @@ -32,6 +37,10 @@ def run_stop_hooks
run_hooks_for :stop
end

def run_exit_hooks
run_hooks_for :exit
end

def run_hooks_for(event)
self.class.lifecycle_hooks.fetch(event, []).each do |block|
block.call
Expand Down
1 change: 1 addition & 0 deletions lib/solid_queue/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class Scheduler < Processes::Base
after_boot :schedule_recurring_tasks
before_shutdown :unschedule_recurring_tasks
before_shutdown :run_stop_hooks
after_shutdown :run_exit_hooks

def initialize(recurring_tasks:, **options)
@recurring_schedule = RecurringSchedule.new(recurring_tasks)
Expand Down
2 changes: 2 additions & 0 deletions lib/solid_queue/supervisor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ class Supervisor < Processes::Base
include LifecycleHooks
include Maintenance, Signals, Pidfiled

after_shutdown :run_exit_hooks

class << self
def start(**options)
SolidQueue.supervisor = true
Expand Down
2 changes: 1 addition & 1 deletion lib/solid_queue/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class Worker < Processes::Poller

after_boot :run_start_hooks
before_shutdown :run_stop_hooks

after_shutdown :run_exit_hooks

attr_accessor :queues, :pool

Expand Down
19 changes: 15 additions & 4 deletions test/integration/lifecycle_hooks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,40 @@ class LifecycleHooksTest < ActiveSupport::TestCase
test "run lifecycle hooks" do
SolidQueue.on_start { JobResult.create!(status: :hook_called, value: :start) }
SolidQueue.on_stop { JobResult.create!(status: :hook_called, value: :stop) }
SolidQueue.on_exit { JobResult.create!(status: :hook_called, value: :exit) }

SolidQueue.on_worker_start { JobResult.create!(status: :hook_called, value: :worker_start) }
SolidQueue.on_worker_stop { JobResult.create!(status: :hook_called, value: :worker_stop) }
SolidQueue.on_worker_exit { JobResult.create!(status: :hook_called, value: :worker_exit) }

SolidQueue.on_dispatcher_start { JobResult.create!(status: :hook_called, value: :dispatcher_start) }
SolidQueue.on_dispatcher_stop { JobResult.create!(status: :hook_called, value: :dispatcher_stop) }
SolidQueue.on_dispatcher_exit { JobResult.create!(status: :hook_called, value: :dispatcher_exit) }

SolidQueue.on_scheduler_start { JobResult.create!(status: :hook_called, value: :scheduler_start) }
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_stop) }
SolidQueue.on_scheduler_stop { JobResult.create!(status: :hook_called, value: :scheduler_exit) }

pid = run_supervisor_as_fork(workers: [ { queues: "*" } ], dispatchers: [ { batch_size: 100 } ], skip_recurring: false)
wait_for_registered_processes(4)

terminate_process(pid)
wait_for_registered_processes(0)


results = skip_active_record_query_cache do
assert_equal 8, JobResult.count
JobResult.last(8)
job_results = JobResult.where(status: :hook_called)
assert_equal 12, job_results.count
job_results
end

assert_equal({ "hook_called" => 8 }, results.map(&:status).tally)
assert_equal %w[start stop worker_start worker_stop dispatcher_start dispatcher_stop scheduler_start scheduler_stop].sort, results.map(&:value).sort
assert_equal({ "hook_called" => 12 }, results.map(&:status).tally)
assert_equal %w[
start stop exit
worker_start worker_stop worker_exit
dispatcher_start dispatcher_stop dispatcher_exit
scheduler_start scheduler_stop scheduler_exit
].sort, results.map(&:value).sort
ensure
SolidQueue::Supervisor.clear_hooks
SolidQueue::Worker.clear_hooks
Expand Down