diff --git a/app/models/good_job/base_execution.rb b/app/models/good_job/base_execution.rb
index 0cc113c37..15f56221f 100644
--- a/app/models/good_job/base_execution.rb
+++ b/app/models/good_job/base_execution.rb
@@ -33,12 +33,25 @@ def params_execution_count
def coalesce_scheduled_at_created_at
arel_table.coalesce(arel_table['scheduled_at'], arel_table['created_at'])
end
+
+ def discrete_support?
+ if connection.table_exists?('good_job_executions')
+ true
+ else
+ migration_pending_warning!
+ false
+ end
+ end
end
# The ActiveJob job class, as a string
# @return [String]
def job_class
- serialized_params['job_class']
+ discrete? ? attributes['job_class'] : serialized_params['job_class']
+ end
+
+ def discrete?
+ self.class.discrete_support? && is_discrete?
end
end
end
diff --git a/app/models/good_job/discrete_execution.rb b/app/models/good_job/discrete_execution.rb
new file mode 100644
index 000000000..08d0e7e7a
--- /dev/null
+++ b/app/models/good_job/discrete_execution.rb
@@ -0,0 +1,52 @@
+# frozen_string_literal: true
+
+module GoodJob # :nodoc:
+ class DiscreteExecution < BaseRecord
+ self.table_name = 'good_job_executions'
+
+ belongs_to :execution, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true
+ belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true
+
+ scope :finished, -> { where.not(finished_at: nil) }
+
+ alias_attribute :performed_at, :created_at
+
+ def number
+ serialized_params.fetch('executions', 0) + 1
+ end
+
+ # Time between when this job was expected to run and when it started running
+ def queue_latency
+ created_at - scheduled_at
+ end
+
+ # Time between when this job started and finished
+ def runtime_latency
+ (finished_at || Time.current) - performed_at if performed_at
+ end
+
+ def last_status_at
+ finished_at || created_at
+ end
+
+ def status
+ if finished_at.present?
+ if error.present?
+ :retried
+ elsif error.present? && job.finished_at.present?
+ :discarded
+ else
+ :succeeded
+ end
+ else
+ :running
+ end
+ end
+
+ def display_serialized_params
+ serialized_params.merge({
+ _good_job_execution: attributes.except('serialized_params'),
+ })
+ end
+ end
+end
diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb
index b845aeabf..46e6caf27 100644
--- a/app/models/good_job/execution.rb
+++ b/app/models/good_job/execution.rb
@@ -70,9 +70,13 @@ def self.queue_parser(string)
end
belongs_to :batch, class_name: 'GoodJob::BatchRecord', optional: true, inverse_of: :executions
-
belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', optional: true, inverse_of: :executions
- after_destroy -> { self.class.active_job_id(active_job_id).delete_all }, if: -> { @_destroy_job }
+ has_many :discrete_executions, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :execution # rubocop:disable Rails/HasManyOrHasOneDependent
+
+ after_destroy lambda {
+ GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4
+ self.class.active_job_id(active_job_id).delete_all
+ }, if: -> { @_destroy_job }
# Get executions with given ActiveJob ID
# @!method active_job_id
@@ -201,8 +205,12 @@ def self.queue_parser(string)
end
end)
- # Construct a GoodJob::Execution from an ActiveJob instance.
def self.build_for_enqueue(active_job, overrides = {})
+ new(**enqueue_args(active_job, overrides))
+ end
+
+ # Construct arguments for GoodJob::Execution from an ActiveJob instance.
+ def self.enqueue_args(active_job, overrides = {})
if active_job.priority && GoodJob.configuration.smaller_number_is_higher_priority.nil?
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
The next major version of GoodJob (v4.0) will change job `priority` to give smaller numbers higher priority (default: `0`), in accordance with Active Job's definition of priority.
@@ -218,6 +226,7 @@ def self.build_for_enqueue(active_job, overrides = {})
serialized_params: active_job.serialize,
scheduled_at: active_job.scheduled_at,
}
+
execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)
reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
@@ -238,7 +247,7 @@ def self.build_for_enqueue(active_job, overrides = {})
execution_args[:cron_at] = CurrentThread.cron_at
end
- new(**execution_args.merge(overrides))
+ execution_args.merge(overrides)
end
# Finds the next eligible Execution, acquire an advisory lock related to it, and
@@ -298,19 +307,47 @@ def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)
# The new {Execution} instance representing the queued ActiveJob job.
def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
- execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
+ current_execution = CurrentThread.execution
+
+ retried = current_execution && current_execution.active_job_id == active_job.job_id
+ if retried
+ if current_execution.discrete?
+ execution = current_execution
+ execution.assign_attributes(enqueue_args(active_job, { scheduled_at: scheduled_at }))
+ execution.scheduled_at ||= Time.current
+ execution.performed_at = nil
+ execution.finished_at = nil
+ else
+ execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
+ end
+ else
+ execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
+ execution.make_discrete if discrete_support?
+ end
- execution.create_with_advisory_lock = create_with_advisory_lock
- instrument_payload[:execution] = execution
+ if create_with_advisory_lock
+ if execution.persisted?
+ execution.advisory_lock
+ else
+ execution.create_with_advisory_lock = true
+ end
+ end
+ instrument_payload[:execution] = execution
execution.save!
- active_job.provider_job_id = execution.id
- CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
+ CurrentThread.execution.retried_good_job_id = execution.id if retried && !CurrentThread.execution.discrete?
+ active_job.provider_job_id = execution.id
execution
end
end
+ def self.format_error(error)
+ raise ArgumentError unless error.is_a?(Exception)
+
+ [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.message].join
+ end
+
# Execute the ActiveJob job this {Execution} represents.
# @return [ExecutionResult]
# An array of the return value of the job's +#perform+ method and the
@@ -320,12 +357,39 @@ def perform
run_callbacks(:perform) do
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at
+ discrete_execution = nil
result = GoodJob::CurrentThread.within do |current_thread|
current_thread.reset
current_thread.execution = self
- current_thread.execution_interrupted = performed_at if performed_at
- update!(performed_at: Time.current)
+ if performed_at
+ current_thread.execution_interrupted = performed_at
+
+ if discrete?
+ interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{performed_at}'"))
+ self.error = interrupt_error_string
+ discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all( # rubocop:disable Rails/SkipsModelValidations
+ error: interrupt_error_string,
+ finished_at: Time.current
+ )
+ end
+ end
+
+ if discrete?
+ transaction do
+ now = Time.current
+ discrete_execution = discrete_executions.create!(
+ job_class: job_class,
+ queue_name: queue_name,
+ serialized_params: serialized_params,
+ scheduled_at: (scheduled_at || created_at),
+ created_at: now
+ )
+ update!(performed_at: now, executions_count: ((executions_count || 0) + 1))
+ end
+ else
+ update!(performed_at: Time.current)
+ end
ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload|
value = ActiveJob::Base.execute(active_job_data)
@@ -349,14 +413,42 @@ def perform
end
job_error = result.handled_error || result.unhandled_error
- self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error
+
+ if job_error
+ error_string = self.class.format_error(job_error)
+ self.error = error_string
+ discrete_execution.error = error_string if discrete_execution
+ else
+ self.error = nil
+ end
reenqueued = result.retried? || retried_good_job_id.present?
if result.unhandled_error && GoodJob.retry_on_unhandled_error
- save!
+ if discrete_execution
+ transaction do
+ discrete_execution.update!(finished_at: Time.current)
+ update!(performed_at: nil, finished_at: nil, retried_good_job_id: nil)
+ end
+ else
+ save!
+ end
elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present?
- self.finished_at = Time.current
- save!
+ now = Time.current
+ if discrete_execution
+ if reenqueued
+ self.performed_at = nil
+ else
+ self.finished_at = now
+ end
+ discrete_execution.finished_at = now
+ transaction do
+ discrete_execution.save!
+ save!
+ end
+ else
+ self.finished_at = now
+ save!
+ end
else
destroy_job
end
@@ -371,6 +463,17 @@ def executable?
self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id)
end
+ def make_discrete
+ self.is_discrete = true
+ self.id = active_job_id
+ self.job_class = serialized_params['job_class']
+ self.executions_count ||= 0
+
+ current_time = Time.current
+ self.created_at ||= current_time
+ self.scheduled_at ||= current_time
+ end
+
# Build an ActiveJob instance and deserialize the arguments, using `#active_job_data`.
#
# @param ignore_deserialization_errors [Boolean]
diff --git a/app/models/good_job/job.rb b/app/models/good_job/job.rb
index 0761db07a..3fa26c19b 100644
--- a/app/models/good_job/job.rb
+++ b/app/models/good_job/job.rb
@@ -30,6 +30,11 @@ def table_name=(_value)
belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true
has_many :executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent
+ has_many :discrete_executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: :active_job_id, inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent
+
+ after_destroy lambda {
+ GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4
+ }
# Only the most-recent unretried execution represents a "Job"
default_scope { where(retried_good_job_id: nil) }
@@ -56,6 +61,8 @@ def table_name=(_value)
# Errored but will not be retried
scope :discarded, -> { finished.where.not(error: nil) }
+ scope :unfinished_undiscrete, -> { where(finished_at: nil, retried_good_job_id: nil, is_discrete: [nil, false]) }
+
# The job's ActiveJob UUID
# @return [String]
def id
@@ -191,9 +198,10 @@ def retry_job
execution.class.transaction(joinable: false, requires_new: true) do
new_active_job = active_job.retry_job(wait: 0, error: execution.error)
- execution.save
+ execution.save!
end
end
+
new_active_job
end
end
@@ -213,7 +221,7 @@ def discard_job(message)
update_execution = proc do
execution.update(
finished_at: Time.current,
- error: [job_error.class, GoodJob::Execution::ERROR_MESSAGE_SEPARATOR, job_error.message].join
+ error: GoodJob::Execution.format_error(job_error)
)
end
diff --git a/app/views/good_job/jobs/show.html.erb b/app/views/good_job/jobs/show.html.erb
index 01e20e306..2e59f45db 100644
--- a/app/views/good_job/jobs/show.html.erb
+++ b/app/views/good_job/jobs/show.html.erb
@@ -3,7 +3,12 @@
@@ -21,6 +26,10 @@
<%= tag.strong @job.priority %>
+
+ <%= tag.span relative_time(@job.last_status_at), class: "small" %>
+ <%= status_badge @job.status %>
+
<% if @job.status.in? [:scheduled, :retried, :queued] %>
<%= button_to reschedule_job_path(@job.id), method: :put,
class: "btn btn-sm btn-outline-primary",
@@ -59,8 +68,25 @@
-
<%= t "good_job.models.job.arguments" %>
- <%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', '), class: 'text-wrap text-break' %>
+
+ <%= t "good_job.models.job.arguments" %>
+ <%= tag.button type: "button", class: "btn btn-sm text-muted", role: "button",
+ title: t("good_job.actions.inspect"),
+ data: { bs_toggle: "collapse", bs_target: "##{dom_id(@job, 'params')}" },
+ aria: { expanded: false, controls: dom_id(@job, "params") } do %>
+ <%= render_icon "info" %>
+ <%= t "good_job.actions.inspect" %>
+ <% end %>
+
+<%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', '), class: 'text-wrap text-break' %>
+
+<%= tag.div id: dom_id(@job, "params"), class: "list-group-item collapse small bg-dark text-light" do %>
+ <%= tag.pre JSON.pretty_generate(@job.display_serialized_params) %>
+<% end %>
-<%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %>
+<% if @job.discrete? %>
+ <%= render 'executions', executions: @job.discrete_executions.reverse %>
+<% else %>
+ <%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %>
+<% end %>
diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb
index 73cc1dba3..27d23ab85 100644
--- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb
+++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb
@@ -22,6 +22,10 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.uuid :batch_id
t.uuid :batch_callback_id
+
+ t.boolean :is_discrete
+ t.integer :executions_count
+ t.text :job_class
end
create_table :good_job_batches, id: :uuid do |t|
@@ -38,6 +42,18 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.datetime :finished_at
end
+ create_table :good_job_executions, id: :uuid do |t|
+ t.timestamps
+
+ t.uuid :active_job_id, null: false
+ t.text :job_class
+ t.text :queue_name
+ t.jsonb :serialized_params
+ t.datetime :scheduled_at
+ t.datetime :finished_at
+ t.text :error
+ end
+
create_table :good_job_processes, id: :uuid do |t|
t.timestamps
t.jsonb :state
@@ -62,5 +78,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
where: "finished_at IS NULL", name: :index_good_jobs_jobs_on_priority_created_at_when_unfinished
add_index :good_jobs, [:batch_id], where: "batch_id IS NOT NULL"
add_index :good_jobs, [:batch_callback_id], where: "batch_callback_id IS NOT NULL"
+
+ add_index :good_job_executions, [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at
end
end
diff --git a/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb b/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb
new file mode 100644
index 000000000..eb2fd1392
--- /dev/null
+++ b/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb
@@ -0,0 +1,32 @@
+# frozen_string_literal: true
+class CreateGoodJobExecutions < ActiveRecord::Migration<%= migration_version %>
+ def change
+ reversible do |dir|
+ dir.up do
+ # Ensure this incremental update migration is idempotent
+ # with monolithic install migration.
+ return if connection.table_exists?(:good_job_executions)
+ end
+ end
+
+ create_table :good_job_executions, id: :uuid do |t|
+ t.timestamps
+
+ t.uuid :active_job_id, null: false
+ t.text :job_class
+ t.text :queue_name
+ t.jsonb :serialized_params
+ t.datetime :scheduled_at
+ t.datetime :finished_at
+ t.text :error
+
+ t.index [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at
+ end
+
+ change_table :good_jobs do |t|
+ t.boolean :is_discrete
+ t.integer :executions_count
+ t.text :job_class
+ end
+ end
+end
diff --git a/lib/good_job.rb b/lib/good_job.rb
index 14be87d2c..85f034e91 100644
--- a/lib/good_job.rb
+++ b/lib/good_job.rb
@@ -170,14 +170,19 @@ def self.cleanup_preserved_jobs(older_than: nil, in_batches_of: 1_000)
ActiveSupport::Notifications.instrument("cleanup_preserved_jobs.good_job", { older_than: older_than, timestamp: timestamp }) do |payload|
deleted_executions_count = 0
deleted_batches_count = 0
+ deleted_discrete_executions_count = 0
jobs_query = GoodJob::Job.where('finished_at <= ?', timestamp).order(finished_at: :asc).limit(in_batches_of)
jobs_query = jobs_query.succeeded unless include_discarded
loop do
- deleted = GoodJob::Execution.where(job: jobs_query).delete_all
- break if deleted.zero?
+ active_job_ids = jobs_query.pluck(:active_job_id)
+ break if active_job_ids.empty?
- deleted_executions_count += deleted
+ deleted_discrete_executions = GoodJob::DiscreteExecution.where(active_job_id: active_job_ids).delete_all
+ deleted_discrete_executions_count += deleted_discrete_executions
+
+ deleted_executions = GoodJob::Execution.where(active_job_id: active_job_ids).delete_all
+ deleted_executions_count += deleted_executions
end
if GoodJob::BatchRecord.migrated?
@@ -191,9 +196,14 @@ def self.cleanup_preserved_jobs(older_than: nil, in_batches_of: 1_000)
end
end
- payload[:destroyed_executions_count] = deleted_executions_count
payload[:destroyed_batches_count] = deleted_batches_count
- payload[:destroyed_records_count] = deleted_executions_count + deleted_batches_count
+ payload[:destroyed_discrete_executions_count] = deleted_discrete_executions_count
+ payload[:destroyed_executions_count] = deleted_executions_count
+
+ destroyed_records_count = deleted_batches_count + deleted_discrete_executions_count + deleted_executions_count
+ payload[:destroyed_records_count] = destroyed_records_count
+
+ destroyed_records_count
end
end
diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb
index 17712437d..82a26e054 100644
--- a/lib/good_job/adapter.rb
+++ b/lib/good_job/adapter.rb
@@ -50,11 +50,15 @@ def enqueue_all(active_jobs)
current_time = Time.current
executions = active_jobs.map do |active_job|
- GoodJob::Execution.build_for_enqueue(active_job, {
- id: SecureRandom.uuid,
- created_at: current_time,
- updated_at: current_time,
- })
+ GoodJob::Execution.build_for_enqueue(active_job).tap do |execution|
+ if GoodJob::Execution.discrete_support?
+ execution.make_discrete
+ execution.scheduled_at = current_time if execution.scheduled_at == execution.created_at
+ end
+
+ execution.created_at = current_time
+ execution.updated_at = current_time
+ end
end
inline_executions = []
diff --git a/scripts/benchmark_discrete_jobs.rb b/scripts/benchmark_discrete_jobs.rb
new file mode 100644
index 000000000..1e9b47a76
--- /dev/null
+++ b/scripts/benchmark_discrete_jobs.rb
@@ -0,0 +1,38 @@
+ENV['GOOD_JOB_EXECUTION_MODE'] = 'external'
+
+require_relative '../spec/test_app/config/environment'
+require_relative '../lib/good_job'
+require 'benchmark/ips'
+
+performer = GoodJob::JobPerformer.new("*")
+Benchmark.ips do |x|
+ GoodJob::Execution.delete_all
+ ActiveJob::Base.queue_adapter.enqueue_all 10_000.times.map { ExampleJob.new }
+ GoodJob::Execution.update_all(is_discrete: true)
+ x.report("discrete jobs and no errors") do
+ performer.next
+ end
+
+ GoodJob::Execution.delete_all
+ ActiveJob::Base.queue_adapter.enqueue_all 10_000.times.map { ExampleJob.new }
+ GoodJob::Execution.update_all(is_discrete: false)
+ x.report("undiscrete jobs and no errors") do
+ performer.next
+ end
+
+ GoodJob::Execution.delete_all
+ ActiveJob::Base.queue_adapter.enqueue_all 10_000.times.map { ExampleJob.new(ExampleJob::ERROR_FIVE_TIMES_TYPE) }
+ GoodJob::Execution.update_all(is_discrete: true)
+ x.report("discrete jobs and 5 errors") do
+ performer.next
+ end
+
+ GoodJob::Execution.delete_all
+ ActiveJob::Base.queue_adapter.enqueue_all 10_000.times.map { ExampleJob.new(ExampleJob::ERROR_FIVE_TIMES_TYPE) }
+ GoodJob::Execution.update_all(is_discrete: false)
+ x.report("undiscrete jobs and 5 errors") do
+ performer.next
+ end
+
+ x.compare!
+end
diff --git a/spec/app/filters/good_job/jobs_filter_spec.rb b/spec/app/filters/good_job/jobs_filter_spec.rb
index c24a62915..1a961a6dd 100644
--- a/spec/app/filters/good_job/jobs_filter_spec.rb
+++ b/spec/app/filters/good_job/jobs_filter_spec.rb
@@ -62,9 +62,9 @@
expect(filter.states).to eq({
"scheduled" => 1,
"retried" => 0,
- "queued" => 0,
+ "queued" => 1,
"running" => 1,
- "succeeded" => 2,
+ "succeeded" => 1,
"discarded" => 1,
})
end
diff --git a/spec/app/jobs/example_job_spec.rb b/spec/app/jobs/example_job_spec.rb
index e0dea7faf..327b531e4 100644
--- a/spec/app/jobs/example_job_spec.rb
+++ b/spec/app/jobs/example_job_spec.rb
@@ -25,9 +25,9 @@
end
travel_back
- executions = GoodJob::Execution.where(active_job_id: active_job.job_id).order(created_at: :asc)
- expect(executions.size).to eq 2
- expect(executions.last.error).to be_nil
+ good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id)
+ expect(good_job.discrete_executions.count).to eq 2
+ expect(good_job.discrete_executions.last.error).to be_nil
end
end
@@ -40,26 +40,25 @@
end
travel_back
- executions = GoodJob::Execution.where(active_job_id: active_job.job_id).order(created_at: :asc)
- expect(executions.size).to eq 6
- expect(executions.last.error).to be_nil
+ good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id)
+
+ expect(good_job.discrete_executions.count).to eq 6
+ expect(good_job.discrete_executions.last.error).to be_nil
end
end
describe "DEAD_TYPE" do
it 'errors but does not retry' do
- described_class.perform_later(described_class::DEAD_TYPE)
+ active_job = described_class.perform_later(described_class::DEAD_TYPE)
10.times do
GoodJob.perform_inline
travel(5.minutes)
end
travel_back
- active_job_id = GoodJob::Execution.last.active_job_id
-
- executions = GoodJob::Execution.where(active_job_id: active_job_id).order(created_at: :asc)
- expect(executions.size).to eq 3
- expect(executions.last.error).to be_present
+ good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id)
+ expect(good_job.discrete_executions.count).to eq 3
+ expect(good_job.discrete_executions.last.error).to be_present
end
end
diff --git a/spec/app/models/good_job/execution_spec.rb b/spec/app/models/good_job/execution_spec.rb
index 258b4544f..0282153ad 100644
--- a/spec/app/models/good_job/execution_spec.rb
+++ b/spec/app/models/good_job/execution_spec.rb
@@ -3,6 +3,8 @@
RSpec.describe GoodJob::Execution do
before do
+ allow(described_class).to receive(:discrete_support?).and_return(false)
+
stub_const "RUN_JOBS", Concurrent::Array.new
stub_const 'TestJob', (Class.new(ActiveJob::Base) do
self.queue_name = 'test'
@@ -21,6 +23,41 @@ def perform(result_value = nil, raise_error: false)
describe '.enqueue' do
let(:active_job) { TestJob.new }
+ context 'when discrete' do
+ before do
+ allow(described_class).to receive(:discrete_support?).and_return(true)
+ end
+
+ it 'assigns is discrete, id, scheduled_at' do
+ expect { described_class.enqueue(active_job) }.to change(described_class, :count).by(1)
+
+ execution = described_class.last
+ expect(execution).to have_attributes(
+ is_discrete: true,
+ id: active_job.job_id,
+ active_job_id: active_job.job_id,
+ created_at: execution.scheduled_at,
+ scheduled_at: execution.created_at
+ )
+ end
+ end
+
+ context 'when NOT discrete' do
+ before { allow(described_class).to receive(:discrete_support?).and_return(false) }
+
+ it 'does not assign id, scheduled_at' do
+ expect { described_class.enqueue(active_job) }.to change(described_class, :count).by(1)
+
+ execution = described_class.last
+ expect(execution.id).not_to eq(active_job.job_id)
+ expect(execution).to have_attributes(
+ is_discrete: nil,
+ active_job_id: active_job.job_id,
+ scheduled_at: nil
+ )
+ end
+ end
+
it 'creates a new GoodJob record' do
execution = nil
@@ -621,6 +658,73 @@ def job_params
end
end
end
+
+ context 'when Discrete' do
+ before do
+ ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline)
+ allow(described_class).to receive(:discrete_support?).and_return(true)
+ good_job.update!(is_discrete: true)
+ end
+
+ it 'updates the Execution record and creates a DiscreteExecution record' do
+ good_job.perform
+
+ expect(good_job.reload).to have_attributes(
+ executions_count: 1,
+ finished_at: within(1.second).of(Time.current)
+ )
+
+ dexecution = good_job.discrete_executions.first
+ expect(dexecution).to be_present
+ expect(dexecution).to have_attributes(
+ active_job_id: good_job.active_job_id,
+ job_class: good_job.job_class,
+ queue_name: good_job.queue_name,
+ created_at: within(0.001).of(good_job.performed_at),
+ scheduled_at: within(0.001).of(good_job.created_at),
+ finished_at: within(1.second).of(Time.current),
+ error: nil,
+ serialized_params: good_job.serialized_params
+ )
+ end
+
+ context 'when ActiveJob rescues an error' do
+ let(:active_job) { TestJob.new("a string", raise_error: true) }
+ let!(:good_job) { described_class.enqueue(active_job) }
+
+ before do
+ allow(described_class).to receive(:discrete_support?).and_return(true)
+ allow(GoodJob).to receive(:preserve_job_records).and_return(true)
+ TestJob.retry_on(StandardError, wait: 1.hour, attempts: 2) { nil }
+ good_job.update!(is_discrete: true)
+ end
+
+ it 'updates the existing Execution/Job record instead of creating a new one' do
+ expect { good_job.perform }
+ .to not_change(described_class, :count)
+ .and change { good_job.reload.serialized_params["executions"] }.by(1)
+ .and not_change { good_job.reload.id }
+ .and not_change { described_class.count }
+
+ expect(good_job.reload).to have_attributes(
+ error: "TestJob::ExpectedError: Raised expected error",
+ created_at: within(1.second).of(Time.current),
+ performed_at: nil,
+ finished_at: nil,
+ scheduled_at: within(10.minutes).of(1.hour.from_now) # interval because of retry jitter
+ )
+ expect(GoodJob::DiscreteExecution.count).to eq(1)
+ discrete_execution = good_job.discrete_executions.first
+ expect(discrete_execution).to have_attributes(
+ active_job_id: good_job.active_job_id,
+ error: "TestJob::ExpectedError: Raised expected error",
+ created_at: within(1.second).of(Time.current),
+ scheduled_at: within(1.second).of(Time.current),
+ finished_at: within(1.second).of(Time.current)
+ )
+ end
+ end
+ end
end
describe '#destroy_job' do
diff --git a/spec/app/models/good_job/job_spec.rb b/spec/app/models/good_job/job_spec.rb
index 18a47eb1d..01bcd1f99 100644
--- a/spec/app/models/good_job/job_spec.rb
+++ b/spec/app/models/good_job/job_spec.rb
@@ -2,23 +2,38 @@
require 'rails_helper'
RSpec.describe GoodJob::Job do
- subject(:job) { described_class.find(head_execution.active_job_id) }
+ let(:active_job_id) { SecureRandom.uuid }
- before do
- allow(GoodJob).to receive(:preserve_job_records).and_return(true)
- ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external)
-
- stub_const 'TestJob', (Class.new(ActiveJob::Base) do
- def perform(feline = nil, canine: nil)
- end
- end)
- stub_const 'TestJob::Error', Class.new(StandardError)
+ let(:job) do
+ described_class.create!(
+ is_discrete: true,
+ active_job_id: active_job_id,
+ scheduled_at: 10.minutes.from_now,
+ queue_name: 'mice',
+ priority: 10,
+ serialized_params: {
+ 'job_id' => active_job_id,
+ 'job_class' => 'TestJob',
+ 'executions' => 1,
+ 'exception_executions' => { 'TestJob::Error' => 1 },
+ 'queue_name' => 'mice',
+ 'priority' => 10,
+ 'arguments' => ['cat', { 'canine' => 'dog' }],
+ }
+ ).tap do |job|
+ job.discrete_executions.create!(
+ scheduled_at: 1.minute.ago,
+ created_at: 1.minute.ago,
+ finished_at: 1.minute.ago,
+ error: "TestJob::Error: TestJob::Error"
+ )
+ end
end
+ let(:undiscrete_job) { described_class.find(head_execution.active_job_id) }
- let!(:tail_execution) do
- active_job_id = SecureRandom.uuid
+ let(:tail_execution) do
GoodJob::Execution.create!(
- active_job_id: SecureRandom.uuid,
+ active_job_id: active_job_id,
created_at: 1.minute.ago,
queue_name: 'mice',
priority: 10,
@@ -33,7 +48,7 @@ def perform(feline = nil, canine: nil)
)
end
- let!(:head_execution) do
+ let(:head_execution) do
GoodJob::Execution.create!(
active_job_id: tail_execution.active_job_id,
scheduled_at: 10.minutes.from_now,
@@ -57,6 +72,18 @@ def perform(feline = nil, canine: nil)
end
end
+ before do
+ allow(GoodJob).to receive(:preserve_job_records).and_return(true)
+ ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external)
+
+ stub_const 'TestJob', (Class.new(ActiveJob::Base) do
+ def perform
+ raise "Didn't expect to perform this job"
+ end
+ end)
+ stub_const 'TestJob::Error', Class.new(StandardError)
+ end
+
describe '.find' do
it 'returns a record that is the same as the head execution' do
job = described_class.find(head_execution.active_job_id)
@@ -66,7 +93,7 @@ def perform(feline = nil, canine: nil)
describe '#id' do
it 'is the ActiveJob ID' do
- expect(job.id).to eq head_execution.active_job_id
+ expect(job.id).to eq job.active_job_id
end
end
@@ -78,6 +105,7 @@ def perform(feline = nil, canine: nil)
describe '#head_execution' do
it 'is the head execution (which should be the same record)' do
+ job = undiscrete_job
expect(job.head_execution).to eq head_execution
expect(job._execution_id).to eq head_execution.id
end
@@ -85,6 +113,7 @@ def perform(feline = nil, canine: nil)
describe '#tail_execution' do
it 'is the tail execution' do
+ job = undiscrete_job
expect(job.tail_execution).to eq tail_execution
end
end
@@ -160,29 +189,50 @@ def perform(feline = nil, canine: nil)
describe '#retry_job' do
context 'when job is retried' do
before do
- head_execution.update!(
+ job.update!(
finished_at: Time.current,
error: "TestJob::Error: TestJob::Error"
)
end
- it 'enqueues another execution and updates the original job' do
- original_head_execution = job.head_execution
+ it 'updates the original job' do
+ expect(job).to be_discrete
expect do
job.retry_job
- end.to change { job.executions.reload.size }.by(1)
-
- new_head_execution = job.head_execution(reload: true)
- expect(new_head_execution.serialized_params).to include(
- "executions" => 2,
- "queue_name" => "mice",
- "priority" => 10,
- "arguments" => ['cat', hash_including('canine' => 'dog')]
- )
+ end.to change { job.reload.finished? }.from(true).to(false)
+ expect(job.executions.count).to eq 1
+ end
+
+ context 'when job is not discrete' do
+ let(:job) { undiscrete_job }
- original_head_execution.reload
- expect(original_head_execution.retried_good_job_id).to eq new_head_execution.id
+ before do
+ head_execution.update!(
+ finished_at: Time.current,
+ error: "TestJob::Error: TestJob::Error"
+ )
+ end
+
+ it 'enqueues another execution and updates the original job' do
+ original_head_execution = undiscrete_job.head_execution
+
+ expect do
+ job.retry_job
+ end.to change { job.executions.reload.size }.by(1)
+ expect(job.reload).not_to be_finished
+
+ new_head_execution = job.head_execution(reload: true)
+ expect(new_head_execution.serialized_params).to include(
+ "executions" => 2,
+ "queue_name" => "mice",
+ "priority" => 10,
+ "arguments" => ['cat', hash_including('canine' => 'dog')]
+ )
+
+ original_head_execution.reload
+ expect(original_head_execution.retried_good_job_id).to eq new_head_execution.id
+ end
end
end
@@ -279,18 +329,12 @@ def perform(feline = nil, canine: nil)
end
describe '#destroy_job' do
- context 'when a job is finished' do
- before do
- job.head_execution.update! finished_at: Time.current
- end
-
- it 'destroys all the job executions' do
- job.destroy_job
+ it 'destroys job and executions' do
+ job.update! finished_at: Time.current
+ job.destroy_job
- expect { head_execution.reload }.to raise_error ActiveRecord::RecordNotFound
- expect { tail_execution.reload }.to raise_error ActiveRecord::RecordNotFound
- expect { job.reload }.to raise_error ActiveRecord::RecordNotFound
- end
+ expect { job.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect(GoodJob::DiscreteExecution.count).to eq 0
end
context 'when a job is not finished' do
@@ -298,5 +342,29 @@ def perform(feline = nil, canine: nil)
expect { job.destroy_job }.to raise_error GoodJob::Job::ActionForStateMismatchError
end
end
+
+ context "when undiscrete job" do
+ let(:job) { undiscrete_job }
+
+ context 'when a job is finished' do
+ before do
+ job.head_execution.update! finished_at: Time.current
+ end
+
+ it 'destroys all the job executions' do
+ job.destroy_job
+
+ expect { head_execution.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { tail_execution.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { job.reload }.to raise_error ActiveRecord::RecordNotFound
+ end
+ end
+
+ context 'when a job is not finished' do
+ it 'raises an ActionForStateMismatchError' do
+ expect { job.destroy_job }.to raise_error GoodJob::Job::ActionForStateMismatchError
+ end
+ end
+ end
end
end
diff --git a/spec/integration/adapter_spec.rb b/spec/integration/adapter_spec.rb
index 9445c3e39..978ff4d75 100644
--- a/spec/integration/adapter_spec.rb
+++ b/spec/integration/adapter_spec.rb
@@ -37,7 +37,7 @@ def perform(*_args, **_kwargs)
end
it 'assigns successfully_enqueued' do
- ok_job = TestJob.perform_later
+ ok_job = TestJob.new
expect { ok_job.enqueue }.not_to raise_error
expect(ok_job.successfully_enqueued?).to be true if ok_job.respond_to?(:successfully_enqueued?)
@@ -58,7 +58,7 @@ def perform(*_args, **_kwargs)
expect(execution).to have_attributes(
queue_name: 'test',
priority: 50,
- scheduled_at: nil
+ scheduled_at: within(1).of(Time.current)
)
end
diff --git a/spec/integration/batch_spec.rb b/spec/integration/batch_spec.rb
index 9a00f042b..bb4396fc7 100644
--- a/spec/integration/batch_spec.rb
+++ b/spec/integration/batch_spec.rb
@@ -215,9 +215,10 @@ def perform(*_args, **_kwargs)
GoodJob.perform_inline
expect(GoodJob::Job.count).to eq 3
- expect(GoodJob::Execution.count).to eq 5
- expect(GoodJob::Execution.where(batch_id: batch.id).count).to eq 1
- expect(GoodJob::Execution.where(batch_callback_id: batch.id).count).to eq 4
+ expect(GoodJob::Execution.count).to eq 3
+ expect(GoodJob::DiscreteExecution.count).to eq 5
+ expect(GoodJob::Job.where(batch_id: batch.id).count).to eq 1
+ expect(GoodJob::Job.where(batch_callback_id: batch.id).count).to eq 2
callback_arguments = GoodJob::Job.where(batch_callback_id: batch.id).map(&:head_execution).map(&:active_job).map(&:arguments).map(&:second)
expect(callback_arguments).to contain_exactly({ event: :discard }, { event: :finish })
@@ -230,9 +231,10 @@ def perform(*_args, **_kwargs)
GoodJob.perform_inline
expect(GoodJob::Job.count).to eq 3
- expect(GoodJob::Execution.count).to eq 5
- expect(GoodJob::Execution.where(batch_id: batch.id).count).to eq 1
- expect(GoodJob::Execution.where(batch_callback_id: batch.id).count).to eq 4
+ expect(GoodJob::Execution.count).to eq 3
+ expect(GoodJob::DiscreteExecution.count).to eq 5
+ expect(GoodJob::Job.where(batch_id: batch.id).count).to eq 1
+ expect(GoodJob::Job.where(batch_callback_id: batch.id).count).to eq 2
callback_arguments = GoodJob::Job.where(batch_callback_id: batch.id).map(&:head_execution).map(&:active_job).map(&:arguments).map(&:second)
expect(callback_arguments).to contain_exactly({ event: :success }, { event: :finish })
diff --git a/spec/lib/good_job/active_job_extensions/concurrency_spec.rb b/spec/lib/good_job/active_job_extensions/concurrency_spec.rb
index 3526c7b27..f580bd147 100644
--- a/spec/lib/good_job/active_job_extensions/concurrency_spec.rb
+++ b/spec/lib/good_job/active_job_extensions/concurrency_spec.rb
@@ -113,19 +113,21 @@ def perform(name:)
end
it "will error and retry jobs if concurrency is exceeded" do
- TestJob.perform_later(name: "Alice")
+ active_job = TestJob.perform_later(name: "Alice")
performer = GoodJob::JobPerformer.new('*')
scheduler = GoodJob::Scheduler.new(performer, max_threads: 5)
5.times { scheduler.create_thread }
sleep_until(max: 10, increments_of: 0.5) do
- GoodJob::Execution.where(concurrency_key: "Alice").finished.count >= 1
+ GoodJob::DiscreteExecution.where(active_job_id: active_job.job_id).finished.count >= 1
end
scheduler.shutdown
- expect(GoodJob::Execution.count).to be >= 1
- expect(GoodJob::Execution.where("error LIKE '%GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError%'")).to be_present
+ expect(GoodJob::Job.find_by(active_job_id: active_job.job_id).concurrency_key).to eq "Alice"
+
+ expect(GoodJob::DiscreteExecution.count).to be >= 1
+ expect(GoodJob::DiscreteExecution.where("error LIKE '%GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError%'")).to be_present
end
it 'is ignored with the job is executed via perform_now' do
@@ -153,17 +155,37 @@ def perform
end)
end
- it 'preserves the key value across retries' do
- TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice")
- begin
- GoodJob.perform_inline
- rescue StandardError
- nil
+ describe 'retries' do
+ it 'preserves the value' do
+ TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice")
+
+ begin
+ GoodJob.perform_inline
+ rescue StandardError
+ nil
+ end
+
+ expect(GoodJob::Execution.count).to eq 1
+ expect(GoodJob::Execution.first.concurrency_key).to be_present
+ expect(GoodJob::Job.first).not_to be_finished
end
- expect(GoodJob::Execution.count).to eq 2
- first_execution, retried_execution = GoodJob::Execution.order(created_at: :asc).to_a
- expect(retried_execution.concurrency_key).to eq first_execution.concurrency_key
+ context 'when not discrete' do
+ it 'preserves the key value across retries' do
+ TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice")
+ GoodJob::Job.first.update!(is_discrete: false)
+
+ begin
+ GoodJob.perform_inline
+ rescue StandardError
+ nil
+ end
+
+ expect(GoodJob::Execution.count).to eq 2
+ first_execution, retried_execution = GoodJob::Execution.order(created_at: :asc).to_a
+ expect(retried_execution.concurrency_key).to eq first_execution.concurrency_key
+ end
+ end
end
end
diff --git a/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb b/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb
index 35da102f3..9454e042d 100644
--- a/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb
+++ b/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb
@@ -16,21 +16,63 @@ def perform
context 'when a dequeued job has a performed_at but no finished_at' do
before do
active_job = TestJob.perform_later
- GoodJob::Execution.find_by(active_job_id: active_job.job_id).update!(performed_at: Time.current)
+ good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id)
+ good_job.update!(performed_at: Time.current, finished_at: nil)
+ good_job.discrete_executions.create!(performed_at: Time.current, finished_at: nil)
end
it 'raises a GoodJob::InterruptError' do
expect { GoodJob.perform_inline }.to raise_error(GoodJob::InterruptError)
end
- it 'is rescuable' do
- TestJob.retry_on GoodJob::InterruptError
+ context 'when discrete execution is NOT enabled' do
+ before do
+ allow(GoodJob::Execution).to receive(:discrete_support?).and_return(false)
+ end
- expect { GoodJob.perform_inline }.not_to raise_error
- expect(GoodJob::Execution.count).to eq(2)
+ it 'is rescuable' do
+ TestJob.retry_on GoodJob::InterruptError
- job = GoodJob::Job.first
- expect(job.executions.first.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at'
+ expect { GoodJob.perform_inline }.not_to raise_error
+ expect(GoodJob::Execution.count).to eq(2)
+
+ job = GoodJob::Job.first
+ expect(job.executions.first.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at'
+ end
+ end
+
+ context 'when discrete execution is enabled' do
+ before do
+ allow(GoodJob::Execution).to receive(:discrete_support?).and_return(true)
+ end
+
+ it 'does not create a new execution' do
+ TestJob.retry_on GoodJob::InterruptError
+
+ expect { GoodJob.perform_inline }.not_to raise_error
+ expect(GoodJob::Job.count).to eq(1)
+ expect(GoodJob::Execution.count).to eq(1)
+ expect(GoodJob::DiscreteExecution.count).to eq(2)
+
+ job = GoodJob::Job.first
+ expect(job.executions.count).to eq(1)
+ expect(job.finished?).to be false
+ expect(job.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at'
+
+ initial_discrete_execution = job.discrete_executions.first
+ expect(initial_discrete_execution).to have_attributes(
+ performed_at: be_present,
+ finished_at: be_present,
+ error: start_with('GoodJob::InterruptError: Interrupted after starting perform at')
+ )
+
+ retried_discrete_execution = job.discrete_executions.last
+ expect(retried_discrete_execution).to have_attributes(
+ performed_at: be_present,
+ finished_at: be_present,
+ error: start_with('GoodJob::InterruptError: Interrupted after starting perform at')
+ )
+ end
end
end
diff --git a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb
index 4394e0b39..8847c3540 100644
--- a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb
+++ b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb
@@ -30,10 +30,10 @@ def perform
it 'can be overridden by set(good_job_notify: true)' do
TestJob.good_job_notify = false
TestJob.set(good_job_notify: true).perform_later
- expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default' })
+ expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', scheduled_at: within(1).of(Time.current) })
GoodJob::Bulk.enqueue { TestJob.set(good_job_notify: true).perform_later }
- expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1 })
+ expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1, scheduled_at: within(1).of(Time.current) })
end
it 'works for bulk enqueuing' do
@@ -85,7 +85,7 @@ def perform
scheduler = GoodJob::Scheduler.new(performer, max_threads: 5)
scheduler.create_thread
- sleep_until(max: 5, increments_of: 0.5) { GoodJob::Execution.count >= 2 }
+ sleep_until(max: 5, increments_of: 0.5) { GoodJob::DiscreteExecution.count >= 2 }
scheduler.shutdown
expect(GoodJob::Notifier).not_to have_received(:notify)
diff --git a/spec/lib/good_job/adapter_spec.rb b/spec/lib/good_job/adapter_spec.rb
index ffc60bf76..20bc8a604 100644
--- a/spec/lib/good_job/adapter_spec.rb
+++ b/spec/lib/good_job/adapter_spec.rb
@@ -129,7 +129,7 @@ def perform(succeed: true)
expect(result).to eq 1
expect(active_jobs.map(&:provider_job_id)).to eq [active_jobs.first.provider_job_id, nil]
- expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1 })
+ expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1, scheduled_at: within(0.1).of(Time.current) })
end
it 'sets successfully_enqueued, if Rails supports it' do
diff --git a/spec/lib/good_job_spec.rb b/spec/lib/good_job_spec.rb
index d423acaea..b6679711a 100644
--- a/spec/lib/good_job_spec.rb
+++ b/spec/lib/good_job_spec.rb
@@ -42,17 +42,20 @@
let!(:old_unfinished_job) { GoodJob::Execution.create!(active_job_id: SecureRandom.uuid, scheduled_at: 15.days.ago, finished_at: nil) }
let!(:old_finished_job) { GoodJob::Execution.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago) }
let!(:old_finished_job_execution) { GoodJob::Execution.create!(active_job_id: old_finished_job.active_job_id, retried_good_job_id: old_finished_job.id, finished_at: 16.days.ago) }
+ let!(:old_finished_job_discrete_execution) { GoodJob::DiscreteExecution.create!(active_job_id: old_finished_job.active_job_id, finished_at: 16.days.ago) }
let!(:old_discarded_job) { GoodJob::Execution.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago, error: "Error") }
let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 15.days.ago) }
it 'deletes finished jobs' do
destroyed_records_count = described_class.cleanup_preserved_jobs(in_batches_of: 1)
- expect(destroyed_records_count).to eq 4
+ expect(destroyed_records_count).to eq 5
expect { recent_job.reload }.not_to raise_error
expect { old_unfinished_job.reload }.not_to raise_error
expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound
expect { old_discarded_job.reload }.to raise_error ActiveRecord::RecordNotFound
expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound
end
@@ -60,12 +63,15 @@
it 'takes arguments' do
destroyed_records_count = described_class.cleanup_preserved_jobs(older_than: 10.seconds)
- expect(destroyed_records_count).to eq 5
+ expect(destroyed_records_count).to eq 6
expect { recent_job.reload }.to raise_error ActiveRecord::RecordNotFound
expect { old_unfinished_job.reload }.not_to raise_error
expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound
expect { old_discarded_job.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound
end
it 'is instrumented' do
@@ -83,12 +89,15 @@
allow(described_class.configuration).to receive(:env).and_return ENV.to_hash.merge({ 'GOOD_JOB_CLEANUP_DISCARDED_JOBS' => 'false' })
destroyed_records_count = described_class.cleanup_preserved_jobs
- expect(destroyed_records_count).to eq 3
+ expect(destroyed_records_count).to eq 4
expect { recent_job.reload }.not_to raise_error
expect { old_unfinished_job.reload }.not_to raise_error
expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound
+ expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound
expect { old_discarded_job.reload }.not_to raise_error
+ expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound
end
end
diff --git a/spec/requests/good_job/jobs_controller_spec.rb b/spec/requests/good_job/jobs_controller_spec.rb
index ac57b23ce..8e247fa71 100644
--- a/spec/requests/good_job/jobs_controller_spec.rb
+++ b/spec/requests/good_job/jobs_controller_spec.rb
@@ -92,19 +92,22 @@
describe 'mass_action=retry' do
before do
job.update(error: "Error message")
+ job.discrete_executions.first.update(error: "Error message")
end
it 'retries the job' do
- put good_job.mass_update_jobs_path, params: {
- mass_action: 'retry',
- job_ids: [job.id],
- }
+ expect do
+ put good_job.mass_update_jobs_path, params: {
+ mass_action: 'retry',
+ job_ids: [job.id],
+ }
+ end.to change { job.reload.status }.from(:discarded).to(:succeeded)
expect(response).to have_http_status(:found)
expect(flash[:notice]).to eq('Successfully retried 1 job')
job.reload
- expect(job.executions.count).to eq 2
+ expect(job.discrete_executions.count).to eq 2
end
end
diff --git a/spec/support/example_app_helper.rb b/spec/support/example_app_helper.rb
index 30b46317c..70ab9af75 100644
--- a/spec/support/example_app_helper.rb
+++ b/spec/support/example_app_helper.rb
@@ -48,6 +48,7 @@ def within_example_app
tables = %i[
good_jobs
good_job_batches
+ good_job_executions
good_job_processes
good_job_settings
]
diff --git a/spec/support/postgres_notices.rb b/spec/support/postgres_notices.rb
index 8b0dd4be4..9257f51fa 100644
--- a/spec/support/postgres_notices.rb
+++ b/spec/support/postgres_notices.rb
@@ -16,7 +16,9 @@
end
RSpec.configure do |config|
- config.after do
+ config.around do |example|
+ POSTGRES_NOTICES.clear
+ example.run
expect(POSTGRES_NOTICES).to be_empty
end
end
diff --git a/spec/support/rspec_not_change.rb b/spec/support/rspec_not_change.rb
new file mode 100644
index 000000000..8f0ffae77
--- /dev/null
+++ b/spec/support/rspec_not_change.rb
@@ -0,0 +1,2 @@
+# frozen_string_literal: true
+RSpec::Matchers.define_negated_matcher :not_change, :change
diff --git a/spec/system/jobs_spec.rb b/spec/system/jobs_spec.rb
index 4aa2f2177..d1f14370f 100644
--- a/spec/system/jobs_spec.rb
+++ b/spec/system/jobs_spec.rb
@@ -123,7 +123,7 @@
accept_confirm { click_on 'Retry job' }
end
expect(page).to have_content "Job has been retried"
- end.to change { discarded_job.executions.reload.size }.by(1)
+ end.to change { discarded_job.reload.status }.from(:discarded).to(:queued)
end
it 'can discard jobs' do
diff --git a/spec/test_app/app/jobs/example_job.rb b/spec/test_app/app/jobs/example_job.rb
index 491eff8bf..27842b80f 100644
--- a/spec/test_app/app/jobs/example_job.rb
+++ b/spec/test_app/app/jobs/example_job.rb
@@ -4,7 +4,7 @@ class ExampleJob < ApplicationJob
TYPES = [
SUCCESS_TYPE = 'success',
- ERROR_ONCE_TYPE = 'error_once',
+ ERROR_ONCE_TYPE = 'error_once',
ERROR_FIVE_TIMES_TYPE = 'error_five_times',
DEAD_TYPE = 'dead',
SLOW_TYPE = 'slow',
diff --git a/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb b/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb
new file mode 100644
index 000000000..a3eaf68b9
--- /dev/null
+++ b/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb
@@ -0,0 +1,32 @@
+# frozen_string_literal: true
+class CreateGoodJobExecutions < ActiveRecord::Migration[7.0]
+ def change
+ reversible do |dir|
+ dir.up do
+ # Ensure this incremental update migration is idempotent
+ # with monolithic install migration.
+ return if connection.table_exists?(:good_job_executions)
+ end
+ end
+
+ create_table :good_job_executions, id: :uuid do |t|
+ t.timestamps
+
+ t.uuid :active_job_id, null: false
+ t.text :job_class
+ t.text :queue_name
+ t.jsonb :serialized_params
+ t.datetime :scheduled_at
+ t.datetime :finished_at
+ t.text :error
+
+ t.index [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at
+ end
+
+ change_table :good_jobs do |t|
+ t.boolean :is_discrete
+ t.integer :executions_count
+ t.text :job_class
+ end
+ end
+end
diff --git a/spec/test_app/db/schema.rb b/spec/test_app/db/schema.rb
index 9b68e84ab..dc2a1cb37 100644
--- a/spec/test_app/db/schema.rb
+++ b/spec/test_app/db/schema.rb
@@ -10,15 +10,14 @@
#
# It's strongly recommended that you check this file into your version control system.
-ActiveRecord::Schema.define(version: 2023_01_31_214927) do
-
+ActiveRecord::Schema.define(version: 2023_04_12_144442) do
# These are extensions that must be enabled in order to support this database
enable_extension "pgcrypto"
enable_extension "plpgsql"
create_table "good_job_batches", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
- t.datetime "created_at", precision: 6, null: false
- t.datetime "updated_at", precision: 6, null: false
+ t.datetime "created_at", null: false
+ t.datetime "updated_at", null: false
t.text "description"
t.jsonb "serialized_properties"
t.text "on_finish"
@@ -31,15 +30,28 @@
t.datetime "finished_at"
end
+ create_table "good_job_executions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
+ t.datetime "created_at", null: false
+ t.datetime "updated_at", null: false
+ t.uuid "active_job_id", null: false
+ t.text "job_class"
+ t.text "queue_name"
+ t.jsonb "serialized_params"
+ t.datetime "scheduled_at"
+ t.datetime "finished_at"
+ t.text "error"
+ t.index ["active_job_id", "created_at"], name: "index_good_job_executions_on_active_job_id_and_created_at"
+ end
+
create_table "good_job_processes", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
- t.datetime "created_at", precision: 6, null: false
- t.datetime "updated_at", precision: 6, null: false
+ t.datetime "created_at", null: false
+ t.datetime "updated_at", null: false
t.jsonb "state"
end
create_table "good_job_settings", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
- t.datetime "created_at", precision: 6, null: false
- t.datetime "updated_at", precision: 6, null: false
+ t.datetime "created_at", null: false
+ t.datetime "updated_at", null: false
t.text "key"
t.jsonb "value"
t.index ["key"], name: "index_good_job_settings_on_key", unique: true
@@ -62,6 +74,9 @@
t.datetime "cron_at"
t.uuid "batch_id"
t.uuid "batch_callback_id"
+ t.boolean "is_discrete"
+ t.integer "executions_count"
+ t.text "job_class"
t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at"
t.index ["active_job_id"], name: "index_good_jobs_on_active_job_id"
t.index ["batch_callback_id"], name: "index_good_jobs_on_batch_callback_id", where: "(batch_callback_id IS NOT NULL)"