Skip to content

Commit

Permalink
Abort enqueue when the concurrency limit is reached (#820)
Browse files Browse the repository at this point in the history
* Abort enqueue when the concurrency limit is reached

* Rename check method; use ActiveJob logger

---------

Co-authored-by: Ben Sheldon [he/him] <[email protected]>
  • Loading branch information
TAGraves and bensheldon authored Jan 31, 2023
1 parent 2b007ff commit 21993d4
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 33 deletions.
83 changes: 50 additions & 33 deletions lib/good_job/active_job_extensions/concurrency.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,13 @@ def deserialize(job_data)
class_attribute :good_job_concurrency_config, instance_accessor: false, default: {}
attr_writer :good_job_concurrency_key

around_enqueue do |job, block|
# Don't attempt to enforce concurrency limits with other queue adapters.
next(block.call) unless job.class.queue_adapter.is_a?(GoodJob::Adapter)

# Always allow jobs to be retried because the current job's execution will complete momentarily
next(block.call) if CurrentThread.active_job_id == job.job_id

# Only generate the concurrency key on the initial enqueue in case it is dynamic
job.good_job_concurrency_key ||= job._good_job_concurrency_key
key = job.good_job_concurrency_key
next(block.call) if key.blank?

enqueue_limit = job.class.good_job_concurrency_config[:enqueue_limit]
enqueue_limit = instance_exec(&enqueue_limit) if enqueue_limit.respond_to?(:call)
enqueue_limit = nil unless enqueue_limit.present? && (0...Float::INFINITY).cover?(enqueue_limit)

unless enqueue_limit
total_limit = job.class.good_job_concurrency_config[:total_limit]
total_limit = instance_exec(&total_limit) if total_limit.respond_to?(:call)
total_limit = nil unless total_limit.present? && (0...Float::INFINITY).cover?(total_limit)
if ActiveJob.gem_version >= Gem::Version.new("6.1.0")
before_enqueue do |job|
good_job_enqueue_concurrency_check(job, on_abort: -> { throw(:abort) }, on_enqueue: nil)
end

limit = enqueue_limit || total_limit
next(block.call) unless limit

GoodJob::Execution.advisory_lock_key(key, function: "pg_advisory_lock") do
enqueue_concurrency = if enqueue_limit
GoodJob::Execution.where(concurrency_key: key).unfinished.advisory_unlocked.count
else
GoodJob::Execution.where(concurrency_key: key).unfinished.count
end

# The job has not yet been enqueued, so check if adding it will go over the limit
block.call unless (enqueue_concurrency + 1) > limit
else
around_enqueue do |job, block|
good_job_enqueue_concurrency_check(job, on_abort: nil, on_enqueue: block)
end
end

Expand Down Expand Up @@ -113,6 +86,50 @@ def good_job_concurrency_key
@good_job_concurrency_key || _good_job_concurrency_key
end

private

def good_job_enqueue_concurrency_check(job, on_abort:, on_enqueue:)
# Don't attempt to enforce concurrency limits with other queue adapters.
return on_enqueue&.call unless job.class.queue_adapter.is_a?(GoodJob::Adapter)

# Always allow jobs to be retried because the current job's execution will complete momentarily
return on_enqueue&.call if CurrentThread.active_job_id == job.job_id

# Only generate the concurrency key on the initial enqueue in case it is dynamic
job.good_job_concurrency_key ||= job._good_job_concurrency_key
key = job.good_job_concurrency_key
return on_enqueue&.call if key.blank?

enqueue_limit = job.class.good_job_concurrency_config[:enqueue_limit]
enqueue_limit = instance_exec(&enqueue_limit) if enqueue_limit.respond_to?(:call)
enqueue_limit = nil unless enqueue_limit.present? && (0...Float::INFINITY).cover?(enqueue_limit)

unless enqueue_limit
total_limit = job.class.good_job_concurrency_config[:total_limit]
total_limit = instance_exec(&total_limit) if total_limit.respond_to?(:call)
total_limit = nil unless total_limit.present? && (0...Float::INFINITY).cover?(total_limit)
end

limit = enqueue_limit || total_limit
return on_enqueue&.call unless limit

GoodJob::Execution.advisory_lock_key(key, function: "pg_advisory_lock") do
enqueue_concurrency = if enqueue_limit
GoodJob::Execution.where(concurrency_key: key).unfinished.advisory_unlocked.count
else
GoodJob::Execution.where(concurrency_key: key).unfinished.count
end

# The job has not yet been enqueued, so check if adding it will go over the limit
if (enqueue_concurrency + 1) > limit
logger.info "Aborted enqueue of #{job.class.name} (Job ID: #{job.job_id}) because the concurrency key '#{key}' has reached its limit of #{limit} #{'job'.pluralize(limit)}"
on_abort&.call
else
on_enqueue&.call
end
end
end

# Generates the concurrency key from the configuration
# @return [Object] concurrency key
def _good_job_concurrency_key
Expand Down
8 changes: 8 additions & 0 deletions spec/lib/good_job/active_job_extensions/concurrency_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def perform(name:)
end

it "does not enqueue if enqueue concurrency limit is exceeded for a particular key" do
allow(Rails.logger.formatter).to receive(:call).and_call_original

expect(TestJob.perform_later(name: "Alice")).to be_present
expect(TestJob.perform_later(name: "Alice")).to be_present

Expand All @@ -59,6 +61,12 @@ def perform(name:)

expect(GoodJob::Execution.where(concurrency_key: "Alice").count).to eq 2
expect(GoodJob::Execution.where(concurrency_key: "Bob").count).to eq 1

expect(Rails.logger.formatter).to have_received(:call).with("INFO", anything, anything, a_string_matching(/Aborted enqueue of TestJob \(Job ID: .*\) because the concurrency key 'Alice' has reached its limit of 2 jobs/)).exactly(:once)
if ActiveJob.gem_version >= Gem::Version.new("6.1.0")
expect(Rails.logger.formatter).to have_received(:call).with("INFO", anything, anything, a_string_matching(/Enqueued TestJob \(Job ID: .*\) to \(default\) with arguments: {:name=>"Alice"}/)).exactly(:twice)
expect(Rails.logger.formatter).to have_received(:call).with("INFO", anything, anything, a_string_matching(/Enqueued TestJob \(Job ID: .*\) to \(default\) with arguments: {:name=>"Bob"}/)).exactly(:once)
end
end

it 'excludes jobs that are already executing/locked' do
Expand Down

0 comments on commit 21993d4

Please sign in to comment.