Skip to content

Commit

Permalink
Ensure batch is reloaded before updating on multiple enqueues (#824)
Browse files Browse the repository at this point in the history
* Ensure batch is reloaded before updating on multiple enqueues

* Reset both enqueued_at and finished_at before re-enqueuing a batch
  • Loading branch information
bensheldon authored Feb 6, 2023
1 parent d44f9d3 commit a10495c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 13 deletions.
18 changes: 13 additions & 5 deletions app/models/good_job/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,23 @@ def initialize(_record: nil, **properties) # rubocop:disable Lint/UnderscorePref
# @return [Array<ActiveJob::Base>] Active jobs added to the batch
def enqueue(active_jobs = [], **properties, &block)
assign_properties(properties)
record.save!
if record.new_record?
record.save!
else
record.with_advisory_lock(function: "pg_advisory_lock") do
record.enqueued_at_will_change!
record.finished_at_will_change!
record.update!(enqueued_at: nil, finished_at: nil)
end
end

active_jobs = add(active_jobs, &block)

record.finished_at = nil
record.enqueued_at = Time.current if enqueued_at.nil?
record.save!
record.with_advisory_lock(function: "pg_advisory_lock") do
record.update!(enqueued_at: Time.current)
record._continue_discard_or_finish(lock: false)
end

record._continue_discard_or_finish
active_jobs
end

Expand Down
19 changes: 15 additions & 4 deletions app/models/good_job/batch_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,17 @@ def display_attributes
attributes.except('serialized_properties').merge(properties: properties)
end

def _continue_discard_or_finish(execution = nil)
def _continue_discard_or_finish(execution = nil, lock: true)
execution_discarded = execution && execution.error.present? && execution.retried_good_job_id.nil?
with_advisory_lock(function: "pg_advisory_lock") do
take_advisory_lock(lock) do
Batch.within_thread(batch_id: nil, batch_callback_id: id) do
if execution_discarded && discarded_at.blank?
reload
if execution_discarded && !discarded_at
update(discarded_at: Time.current)
on_discard.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :discard }) if on_discard.present?
end

if !finished_at && enqueued_at && jobs.where(finished_at: nil).count.zero?
if enqueued_at && !finished_at && jobs.where(finished_at: nil).count.zero?
update(finished_at: Time.current)
on_success.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :success }) if !discarded_at && on_success.present?
on_finish.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :finish }) if on_finish.present?
Expand Down Expand Up @@ -80,5 +81,15 @@ def properties=(value)

self.serialized_properties = value
end

private

def take_advisory_lock(value, &block)
if value
with_advisory_lock(function: "pg_advisory_lock", &block)
else
yield
end
end
end
end
25 changes: 21 additions & 4 deletions spec/app/models/good_job/batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
describe GoodJob::Batch do
before do
ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external)
stub_const 'TestJob', Class.new(ActiveJob::Base)
stub_const 'TestJob', (Class.new(ActiveJob::Base) do
def perform
end
end)
stub_const 'CallbackJob', (Class.new(ActiveJob::Base) do
def perform(batch, params)
end
end)
end

describe '.enqueue' do
Expand All @@ -18,6 +25,16 @@
expect(batch.enqueued_at).to be_within(1.second).of(Time.current)
expect(batch.active_jobs.count).to eq 2
end

it 'resets callbacks' do
batch = described_class.new(on_finish: CallbackJob)
batch.enqueue { TestJob.perform_later } # 1st time triggers callback
GoodJob.perform_inline
batch.enqueue { TestJob.perform_later } # 2nd time does not trigger callback (finished_at didn't update to nil on the stale reference to batch_record)
GoodJob.perform_inline

expect(batch.callback_active_jobs.count).to eq 2
end
end

describe '#add' do
Expand Down Expand Up @@ -51,9 +68,9 @@
expect { batch.enqueue }.to change(batch, :enqueued_at).from(nil)
end

it 'does not overwrite an old value' do
batch._record.update(enqueued_at: 1.day.ago)
expect { batch.enqueue }.not_to change(batch, :enqueued_at)
it 'does updates the enqueued_at and clears finished_at' do
batch._record.update(enqueued_at: 1.day.ago, finished_at: 1.day.ago)
expect { batch.enqueue(TestJob.new) }.to change(batch, :enqueued_at).and change(batch, :finished_at).to(nil)
end

it 'can assign callback jobs' do
Expand Down

0 comments on commit a10495c

Please sign in to comment.