Skip to content

Commit

Permalink
Create InterruptErrors extension to raise an exception when an inte…
Browse files Browse the repository at this point in the history
…rrupted job is retried (#830)

* Create `InterruptErrors` extension to raise an exception when an interrupted job is retried

* Update README.md
  • Loading branch information
bensheldon authored Feb 7, 2023
1 parent 8ddda78 commit a44a464
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 25 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ For more of the story of GoodJob, read the [introductory blog post](https://isla
- [Exceptions](#exceptions)
- [Retries](#retries)
- [ActionMailer retries](#actionmailer-retries)
- [Interrupts](#interrupts)
- [Timeouts](#timeouts)
- [Optimize queues, threads, and processes](#optimize-queues-threads-and-processes)
- [Database connections](#database-connections)
Expand Down Expand Up @@ -829,6 +830,23 @@ end
Note, that `ActionMailer::MailDeliveryJob` is a default since Rails 6.0. Be sure that your app is using that class, as it
might also be configured to use (deprecated now) `ActionMailer::DeliveryJob`.
### Interrupts
Jobs will be automatically retried if the process is interrupted while performing a job, for example as the result of a `SIGKILL` or power failure.
If you need more control over interrupt-caused retries, include the `GoodJob::ActiveJobExtensions::InterruptErrors` extension in your job closs. When an interrupted job is retried, the extension will raise a `GoodJob::InterruptError` exception within the job, which allows you to use ActiveJob's `retry_on` and `discard_on` to control the behavior of the job.
```ruby
class MyJob < ApplicationJob
# The extension must be included before other extensions
include GoodJob::ActiveJobExtensions::InterruptErrors
# Discard the job if it is interrupted
discard_on InterruptError
# Retry the job if it is interrupted
retry_on InterruptError, wait: 0, attempts: Float::INFINITY
end
```
### Timeouts
Job timeouts can be configured with an `around_perform`:
Expand Down
45 changes: 20 additions & 25 deletions app/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,27 @@ def perform
run_callbacks(:perform) do
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at

self.performed_at = Time.current
save! if GoodJob.preserve_job_records
result = GoodJob::CurrentThread.within do |current_thread|
current_thread.reset
current_thread.execution = self

result = execute
current_thread.execution_interrupted = performed_at if performed_at
update!(performed_at: Time.current)

ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do
value = ActiveJob::Base.execute(active_job_data)

if value.is_a?(Exception)
handled_error = value
value = nil
end
handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard

ExecutionResult.new(value: value, handled_error: handled_error, retried: current_thread.error_on_retry.present?)
rescue StandardError => e
ExecutionResult.new(value: nil, unhandled_error: e)
end
end

job_error = result.handled_error || result.unhandled_error
self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error
Expand Down Expand Up @@ -408,28 +425,6 @@ def active_job_data
end
end

# @return [ExecutionResult]
def execute
GoodJob::CurrentThread.within do |current_thread|
current_thread.reset
current_thread.execution = self

ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do
value = ActiveJob::Base.execute(active_job_data)

if value.is_a?(Exception)
handled_error = value
value = nil
end
handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard

ExecutionResult.new(value: value, handled_error: handled_error, retried: current_thread.error_on_retry.present?)
rescue StandardError => e
ExecutionResult.new(value: nil, unhandled_error: e)
end
end
end

def reset_batch_values(&block)
GoodJob::Batch.within_thread(batch_id: nil, batch_callback_id: nil, &block)
end
Expand Down
2 changes: 2 additions & 0 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
require "active_job/queue_adapters/good_job_adapter"
require "good_job/active_job_extensions/batches"
require "good_job/active_job_extensions/concurrency"
require "good_job/interrupt_error"
require "good_job/active_job_extensions/interrupt_errors"
require "good_job/active_job_extensions/notify_options"

require "good_job/assignable_connection"
Expand Down
16 changes: 16 additions & 0 deletions lib/good_job/active_job_extensions/interrupt_errors.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true
module GoodJob
module ActiveJobExtensions
module InterruptErrors
extend ActiveSupport::Concern

included do
around_perform do |_job, block|
raise InterruptError, "Interrupted after starting perform at '#{CurrentThread.execution_interrupted}'" if CurrentThread.execution_interrupted.present?

block.call
end
end
end
end
end
7 changes: 7 additions & 0 deletions lib/good_job/current_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module CurrentThread
error_on_discard
error_on_retry
execution
execution_interrupted
].freeze

# @!attribute [rw] cron_at
Expand Down Expand Up @@ -44,6 +45,12 @@ module CurrentThread
# @return [GoodJob::Execution, nil]
thread_mattr_accessor :execution

# @!attribute [rw] execution_interrupted
# @!scope class
# Execution Interrupted
# @return [Boolean, nil]
thread_mattr_accessor :execution_interrupted

# Resets attributes
# @param [Hash] values to assign
# @return [void]
Expand Down
6 changes: 6 additions & 0 deletions lib/good_job/interrupt_error.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# frozen_string_literal: true
module GoodJob
# Exception raised when a job is interrupted by a SIGKILL or power failure.
class InterruptError < StandardError
end
end
46 changes: 46 additions & 0 deletions spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# frozen_string_literal: true
require 'rails_helper'

RSpec.describe GoodJob::ActiveJobExtensions::InterruptErrors do
before do
ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external)

stub_const 'TestJob', (Class.new(ActiveJob::Base) do
include GoodJob::ActiveJobExtensions::InterruptErrors

def perform
end
end)
end

context 'when a dequeued job has a performed_at but no finished_at' do
before do
active_job = TestJob.perform_later
GoodJob::Execution.find_by(active_job_id: active_job.job_id).update!(performed_at: Time.current)
end

it 'raises a GoodJob::InterruptError' do
expect { GoodJob.perform_inline }.to raise_error(GoodJob::InterruptError)
end

it 'is rescuable' do
TestJob.retry_on GoodJob::InterruptError

expect { GoodJob.perform_inline }.not_to raise_error
expect(GoodJob::Execution.count).to eq(2)

job = GoodJob::Job.first
expect(job.executions.first.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at'
end
end

context 'when dequeued job does ot have performed at' do
before do
TestJob.perform_later
end

it 'does not raise' do
expect { GoodJob.perform_inline }.not_to raise_error
end
end
end
1 change: 1 addition & 0 deletions spec/lib/good_job/current_thread_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
error_on_discard: false,
error_on_retry: false,
execution: instance_double(GoodJob::Execution),
execution_interrupted: nil,
}

described_class.reset(value)
Expand Down

0 comments on commit a44a464

Please sign in to comment.