diff --git a/app/models/good_job/batch.rb b/app/models/good_job/batch.rb index 3b6292702..066ed9816 100644 --- a/app/models/good_job/batch.rb +++ b/app/models/good_job/batch.rb @@ -86,15 +86,23 @@ def initialize(_record: nil, **properties) # rubocop:disable Lint/UnderscorePref # @return [Array] Active jobs added to the batch def enqueue(active_jobs = [], **properties, &block) assign_properties(properties) - record.save! + if record.new_record? + record.save! + else + record.with_advisory_lock(function: "pg_advisory_lock") do + record.enqueued_at_will_change! + record.finished_at_will_change! + record.update!(enqueued_at: nil, finished_at: nil) + end + end active_jobs = add(active_jobs, &block) - record.finished_at = nil - record.enqueued_at = Time.current if enqueued_at.nil? - record.save! + record.with_advisory_lock(function: "pg_advisory_lock") do + record.update!(enqueued_at: Time.current) + record._continue_discard_or_finish(lock: false) + end - record._continue_discard_or_finish active_jobs end diff --git a/app/models/good_job/batch_record.rb b/app/models/good_job/batch_record.rb index 4915d946d..c496dfd83 100644 --- a/app/models/good_job/batch_record.rb +++ b/app/models/good_job/batch_record.rb @@ -43,16 +43,17 @@ def display_attributes attributes.except('serialized_properties').merge(properties: properties) end - def _continue_discard_or_finish(execution = nil) + def _continue_discard_or_finish(execution = nil, lock: true) execution_discarded = execution && execution.error.present? && execution.retried_good_job_id.nil? - with_advisory_lock(function: "pg_advisory_lock") do + take_advisory_lock(lock) do Batch.within_thread(batch_id: nil, batch_callback_id: id) do - if execution_discarded && discarded_at.blank? + reload + if execution_discarded && !discarded_at update(discarded_at: Time.current) on_discard.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :discard }) if on_discard.present? end - if !finished_at && enqueued_at && jobs.where(finished_at: nil).count.zero? + if enqueued_at && !finished_at && jobs.where(finished_at: nil).count.zero? update(finished_at: Time.current) on_success.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :success }) if !discarded_at && on_success.present? on_finish.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :finish }) if on_finish.present? @@ -80,5 +81,15 @@ def properties=(value) self.serialized_properties = value end + + private + + def take_advisory_lock(value, &block) + if value + with_advisory_lock(function: "pg_advisory_lock", &block) + else + yield + end + end end end diff --git a/spec/app/models/good_job/batch_spec.rb b/spec/app/models/good_job/batch_spec.rb index f29b8b832..dfb50dd35 100644 --- a/spec/app/models/good_job/batch_spec.rb +++ b/spec/app/models/good_job/batch_spec.rb @@ -4,7 +4,14 @@ describe GoodJob::Batch do before do ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external) - stub_const 'TestJob', Class.new(ActiveJob::Base) + stub_const 'TestJob', (Class.new(ActiveJob::Base) do + def perform + end + end) + stub_const 'CallbackJob', (Class.new(ActiveJob::Base) do + def perform(batch, params) + end + end) end describe '.enqueue' do @@ -18,6 +25,16 @@ expect(batch.enqueued_at).to be_within(1.second).of(Time.current) expect(batch.active_jobs.count).to eq 2 end + + it 'resets callbacks' do + batch = described_class.new(on_finish: CallbackJob) + batch.enqueue { TestJob.perform_later } # 1st time triggers callback + GoodJob.perform_inline + batch.enqueue { TestJob.perform_later } # 2nd time does not trigger callback (finished_at didn't update to nil on the stale reference to batch_record) + GoodJob.perform_inline + + expect(batch.callback_active_jobs.count).to eq 2 + end end describe '#add' do @@ -51,9 +68,9 @@ expect { batch.enqueue }.to change(batch, :enqueued_at).from(nil) end - it 'does not overwrite an old value' do - batch._record.update(enqueued_at: 1.day.ago) - expect { batch.enqueue }.not_to change(batch, :enqueued_at) + it 'does updates the enqueued_at and clears finished_at' do + batch._record.update(enqueued_at: 1.day.ago, finished_at: 1.day.ago) + expect { batch.enqueue(TestJob.new) }.to change(batch, :enqueued_at).and change(batch, :finished_at).to(nil) end it 'can assign callback jobs' do