diff --git a/README.md b/README.md index 5a3dec0cd..11dc91c0c 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,7 @@ For more of the story of GoodJob, read the [introductory blog post](https://isla - [Exceptions](#exceptions) - [Retries](#retries) - [ActionMailer retries](#actionmailer-retries) + - [Interrupts](#interrupts) - [Timeouts](#timeouts) - [Optimize queues, threads, and processes](#optimize-queues-threads-and-processes) - [Database connections](#database-connections) @@ -829,6 +830,23 @@ end Note, that `ActionMailer::MailDeliveryJob` is a default since Rails 6.0. Be sure that your app is using that class, as it might also be configured to use (deprecated now) `ActionMailer::DeliveryJob`. +### Interrupts + +Jobs will be automatically retried if the process is interrupted while performing a job, for example as the result of a `SIGKILL` or power failure. + +If you need more control over interrupt-caused retries, include the `GoodJob::ActiveJobExtensions::InterruptErrors` extension in your job closs. When an interrupted job is retried, the extension will raise a `GoodJob::InterruptError` exception within the job, which allows you to use ActiveJob's `retry_on` and `discard_on` to control the behavior of the job. + +```ruby +class MyJob < ApplicationJob + # The extension must be included before other extensions + include GoodJob::ActiveJobExtensions::InterruptErrors + # Discard the job if it is interrupted + discard_on InterruptError + # Retry the job if it is interrupted + retry_on InterruptError, wait: 0, attempts: Float::INFINITY +end +``` + ### Timeouts Job timeouts can be configured with an `around_perform`: diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index d51eef899..6cc3cf9b2 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -315,10 +315,27 @@ def perform run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at - self.performed_at = Time.current - save! if GoodJob.preserve_job_records + result = GoodJob::CurrentThread.within do |current_thread| + current_thread.reset + current_thread.execution = self - result = execute + current_thread.execution_interrupted = performed_at if performed_at + update!(performed_at: Time.current) + + ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do + value = ActiveJob::Base.execute(active_job_data) + + if value.is_a?(Exception) + handled_error = value + value = nil + end + handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard + + ExecutionResult.new(value: value, handled_error: handled_error, retried: current_thread.error_on_retry.present?) + rescue StandardError => e + ExecutionResult.new(value: nil, unhandled_error: e) + end + end job_error = result.handled_error || result.unhandled_error self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error @@ -408,28 +425,6 @@ def active_job_data end end - # @return [ExecutionResult] - def execute - GoodJob::CurrentThread.within do |current_thread| - current_thread.reset - current_thread.execution = self - - ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do - value = ActiveJob::Base.execute(active_job_data) - - if value.is_a?(Exception) - handled_error = value - value = nil - end - handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard - - ExecutionResult.new(value: value, handled_error: handled_error, retried: current_thread.error_on_retry.present?) - rescue StandardError => e - ExecutionResult.new(value: nil, unhandled_error: e) - end - end - end - def reset_batch_values(&block) GoodJob::Batch.within_thread(batch_id: nil, batch_callback_id: nil, &block) end diff --git a/lib/good_job.rb b/lib/good_job.rb index ba5640081..d4425501e 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -9,6 +9,8 @@ require "active_job/queue_adapters/good_job_adapter" require "good_job/active_job_extensions/batches" require "good_job/active_job_extensions/concurrency" +require "good_job/interrupt_error" +require "good_job/active_job_extensions/interrupt_errors" require "good_job/active_job_extensions/notify_options" require "good_job/assignable_connection" diff --git a/lib/good_job/active_job_extensions/interrupt_errors.rb b/lib/good_job/active_job_extensions/interrupt_errors.rb new file mode 100644 index 000000000..ccb20cd99 --- /dev/null +++ b/lib/good_job/active_job_extensions/interrupt_errors.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true +module GoodJob + module ActiveJobExtensions + module InterruptErrors + extend ActiveSupport::Concern + + included do + around_perform do |_job, block| + raise InterruptError, "Interrupted after starting perform at '#{CurrentThread.execution_interrupted}'" if CurrentThread.execution_interrupted.present? + + block.call + end + end + end + end +end diff --git a/lib/good_job/current_thread.rb b/lib/good_job/current_thread.rb index 4c3c22f4f..28cb1b408 100644 --- a/lib/good_job/current_thread.rb +++ b/lib/good_job/current_thread.rb @@ -12,6 +12,7 @@ module CurrentThread error_on_discard error_on_retry execution + execution_interrupted ].freeze # @!attribute [rw] cron_at @@ -44,6 +45,12 @@ module CurrentThread # @return [GoodJob::Execution, nil] thread_mattr_accessor :execution + # @!attribute [rw] execution_interrupted + # @!scope class + # Execution Interrupted + # @return [Boolean, nil] + thread_mattr_accessor :execution_interrupted + # Resets attributes # @param [Hash] values to assign # @return [void] diff --git a/lib/good_job/interrupt_error.rb b/lib/good_job/interrupt_error.rb new file mode 100644 index 000000000..61db1f911 --- /dev/null +++ b/lib/good_job/interrupt_error.rb @@ -0,0 +1,6 @@ +# frozen_string_literal: true +module GoodJob + # Exception raised when a job is interrupted by a SIGKILL or power failure. + class InterruptError < StandardError + end +end diff --git a/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb b/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb new file mode 100644 index 000000000..a1aea63f8 --- /dev/null +++ b/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true +require 'rails_helper' + +RSpec.describe GoodJob::ActiveJobExtensions::InterruptErrors do + before do + ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external) + + stub_const 'TestJob', (Class.new(ActiveJob::Base) do + include GoodJob::ActiveJobExtensions::InterruptErrors + + def perform + end + end) + end + + context 'when a dequeued job has a performed_at but no finished_at' do + before do + active_job = TestJob.perform_later + GoodJob::Execution.find_by(active_job_id: active_job.job_id).update!(performed_at: Time.current) + end + + it 'raises a GoodJob::InterruptError' do + expect { GoodJob.perform_inline }.to raise_error(GoodJob::InterruptError) + end + + it 'is rescuable' do + TestJob.retry_on GoodJob::InterruptError + + expect { GoodJob.perform_inline }.not_to raise_error + expect(GoodJob::Execution.count).to eq(2) + + job = GoodJob::Job.first + expect(job.executions.first.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at' + end + end + + context 'when dequeued job does ot have performed at' do + before do + TestJob.perform_later + end + + it 'does not raise' do + expect { GoodJob.perform_inline }.not_to raise_error + end + end +end diff --git a/spec/lib/good_job/current_thread_spec.rb b/spec/lib/good_job/current_thread_spec.rb index 1cc905243..2a02548d5 100644 --- a/spec/lib/good_job/current_thread_spec.rb +++ b/spec/lib/good_job/current_thread_spec.rb @@ -57,6 +57,7 @@ error_on_discard: false, error_on_retry: false, execution: instance_double(GoodJob::Execution), + execution_interrupted: nil, } described_class.reset(value)