diff --git a/lib/good_job/notifier.rb b/lib/good_job/notifier.rb index 115fa1819..7a87094c1 100644 --- a/lib/good_job/notifier.rb +++ b/lib/good_job/notifier.rb @@ -168,35 +168,37 @@ def listen(delay: 0) future = Concurrent::ScheduledTask.new(delay, args: [@recipients, executor, @listening], executor: @executor) do |thr_recipients, thr_executor, thr_listening| with_connection do begin - run_callbacks :listen do - ActiveSupport::Notifications.instrument("notifier_listen.good_job") do - connection.execute("LISTEN #{CHANNEL}") + Rails.application.executor.wrap do + run_callbacks :listen do + ActiveSupport::Notifications.instrument("notifier_listen.good_job") do + connection.execute("LISTEN #{CHANNEL}") + end + thr_listening.make_true end - thr_listening.make_true end - ActiveSupport::Dependencies.interlock.permit_concurrent_loads do - while thr_executor.running? - wait_for_notify do |channel, payload| - next unless channel == CHANNEL - - ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) - parsed_payload = JSON.parse(payload, symbolize_names: true) - thr_recipients.each do |recipient| - target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] - target.send(method_name, parsed_payload) - end - end + while thr_executor.running? + wait_for_notify do |channel, payload| + next unless channel == CHANNEL - reset_connection_errors + ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) + parsed_payload = JSON.parse(payload, symbolize_names: true) + thr_recipients.each do |recipient| + target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] + target.send(method_name, parsed_payload) + end end + + reset_connection_errors end end ensure - run_callbacks :unlisten do - thr_listening.make_false - ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do - connection.execute("UNLISTEN *") + Rails.application.executor.wrap do + run_callbacks :unlisten do + thr_listening.make_false + ActiveSupport::Notifications.instrument("notifier_unlisten.good_job") do + connection.execute("UNLISTEN *") + end end end end @@ -207,8 +209,10 @@ def listen(delay: 0) end def with_connection - self.connection = Execution.connection_pool.checkout.tap do |conn| - Execution.connection_pool.remove(conn) + Rails.application.executor.wrap do + self.connection = Execution.connection_pool.checkout.tap do |conn| + Execution.connection_pool.remove(conn) + end end connection.execute("SET application_name = #{connection.quote(self.class.name)}")