Skip to content

Commit

Permalink
Set job execution thread priority to -3 when in async mode (#1560)
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon authored Dec 12, 2024
1 parent 714b87e commit 125be6d
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 4 deletions.
1 change: 1 addition & 0 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def start_async
return unless execute_async?

@capsule.start
@capsule.lower_thread_priority = true if GoodJob.configuration.lower_thread_priority.in?([true, nil])
@_async_started = true
end

Expand Down
10 changes: 9 additions & 1 deletion lib/good_job/capsule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def initialize(configuration: nil)

@shared_executor = GoodJob::SharedExecutor.new
@tracker = GoodJob::CapsuleTracker.new(executor: @shared_executor)
@lower_thread_priority = nil

self.class.instances << self
end
Expand All @@ -38,7 +39,9 @@ def start(force: false)

@notifier = GoodJob::Notifier.new(enable_listening: configuration.enable_listen_notify, capsule: self, executor: @shared_executor)
@poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval)
@multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true)
@multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true).tap do |multischeduler|
multischeduler.lower_thread_priority = @lower_thread_priority unless @lower_thread_priority.nil?
end
@notifier.recipients.push([@multi_scheduler, :create_thread])
@poller.recipients.push(-> { @multi_scheduler.create_thread({ fanout: true }) })

Expand Down Expand Up @@ -110,6 +113,11 @@ def process_id
@tracker.process_id
end

def lower_thread_priority=(value)
@lower_thread_priority = value
@multi_scheduler&.lower_thread_priority = value
end

private

def configuration
Expand Down
8 changes: 8 additions & 0 deletions lib/good_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,14 @@ def in_webserver?
end || false
end

def lower_thread_priority
return options[:lower_thread_priority] unless options[:lower_thread_priority].nil?
return rails_config[:lower_thread_priority] unless rails_config[:lower_thread_priority].nil?
return ActiveModel::Type::Boolean.new.cast(env['GOOD_JOB_LOWER_THREAD_PRIORITY']) unless env['GOOD_JOB_LOWER_THREAD_PRIORITY'].nil?

nil
end

# Whether to take an advisory lock on the process record in the notifier reactor.
# @return [Boolean]
def advisory_lock_heartbeat
Expand Down
9 changes: 8 additions & 1 deletion lib/good_job/multi_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def self.from_configuration(configuration, capsule: GoodJob.capsule, warm_cache_
max_cache: configuration.max_cache,
warm_cache_on_initialize: warm_cache_on_initialize,
cleanup_interval_seconds: configuration.cleanup_interval_seconds,
cleanup_interval_jobs: configuration.cleanup_interval_jobs
cleanup_interval_jobs: configuration.cleanup_interval_jobs,
lower_thread_priority: configuration.lower_thread_priority
)
end

Expand Down Expand Up @@ -85,6 +86,12 @@ def create_thread(state = nil)
end
end

def lower_thread_priority=(value)
schedulers.each do |scheduler|
scheduler.lower_thread_priority = value
end
end

def stats
scheduler_stats = schedulers.map(&:stats)

Expand Down
13 changes: 12 additions & 1 deletion lib/good_job/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class Scheduler
fallback_policy: :discard,
}.freeze

# In CRuby, this sets the thread quantum to ~12.5ms ( 100ms * 2^(-3) ).
LOW_THREAD_PRIORITY = -3

# @!attribute [r] instances
# @!scope class
# List of all instantiated Schedulers in the current process.
Expand All @@ -39,13 +42,18 @@ class Scheduler
# @return [String]
attr_reader :name

# Whether to lower the thread priority to a fixed value
# @return [Boolean]
attr_accessor :lower_thread_priority

# @param performer [GoodJob::JobPerformer]
# @param max_threads [Numeric, nil] number of seconds between polls for jobs
# @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory
# @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately, or manually by calling +warm_cache+
# @param cleanup_interval_seconds [Numeric, nil] number of seconds between cleaning up job records
# @param cleanup_interval_jobs [Numeric, nil] number of executed jobs between cleaning up job records
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil)
# @param lower_thread_priority [Boolean] whether to lower the thread priority of execution threads
def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil, lower_thread_priority: false)
raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

@performer = performer
Expand All @@ -62,6 +70,8 @@ def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initia
@cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs)
@executor_options[:name] = name

self.lower_thread_priority = lower_thread_priority

create_executor
warm_cache if warm_cache_on_initialize
self.class.instances << self
Expand Down Expand Up @@ -271,6 +281,7 @@ def create_task(delay = 0, fanout: false)
future = Concurrent::ScheduledTask.new(delay, args: [self, performer], executor: executor, timer_set: timer_set) do |thr_scheduler, thr_performer|
Thread.current.name = Thread.current.name.sub("-worker-", "-thread-") if Thread.current.name
Thread.current[:good_job_scheduler] = thr_scheduler
Thread.current.priority = -3 if thr_scheduler.lower_thread_priority

Rails.application.reloader.wrap do
thr_performer.next do |found|
Expand Down
12 changes: 11 additions & 1 deletion spec/lib/good_job/adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def perform(succeed: true)
allow(GoodJob::Job).to receive(:enqueue).and_return(good_job)
allow(GoodJob::Notifier).to receive(:notify)

capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil)
capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil, "lower_thread_priority=": nil)
allow(GoodJob).to receive(:capsule).and_return(capsule)
allow(capsule).to receive(:start)

Expand All @@ -99,6 +99,16 @@ def perform(succeed: true)
expect(capsule).to have_received(:create_thread)
expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default' })
end

it 'lowers the thread priority of the capsule' do
capsule = instance_double(GoodJob::Capsule, start: nil, create_thread: nil, "lower_thread_priority=": nil)
allow(GoodJob).to receive(:capsule).and_return(capsule)
allow(capsule).to receive(:start)

described_class.new(execution_mode: :async_all)

expect(capsule).to have_received(:lower_thread_priority=).with(true)
end
end
end

Expand Down

0 comments on commit 125be6d

Please sign in to comment.