From 59b43aa86a64ff8581fc490bfc9ad6268336e24e Mon Sep 17 00:00:00 2001 From: ccouton Date: Fri, 15 Nov 2024 16:30:57 +0100 Subject: [PATCH 1/4] fix(good_job-1536): case of clock drift and delay close to 0.001 --- app/models/good_job/cron_entry.rb | 4 +++- lib/good_job/cron_manager.rb | 20 +++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/app/models/good_job/cron_entry.rb b/app/models/good_job/cron_entry.rb index 53ad9071..d6496fe2 100644 --- a/app/models/good_job/cron_entry.rb +++ b/app/models/good_job/cron_entry.rb @@ -12,6 +12,7 @@ class CronEntry include ActiveModel::Model attr_reader :params + attr_reader :enqueued_at_least_one_time def self.all(configuration: nil) configuration ||= GoodJob.configuration @@ -26,7 +27,7 @@ def self.find(key, configuration: nil) def initialize(params = {}) @params = params - + @enqueued_at_least_one_time = false return if cron_proc? raise ArgumentError, "Invalid cron format: '#{cron}'" unless fugit.instance_of?(Fugit::Cron) end @@ -98,6 +99,7 @@ def disable def enqueue(cron_at = nil) GoodJob::CurrentThread.within do |current_thread| + @enqueued_at_least_one_time = true current_thread.cron_key = key current_thread.cron_at = cron_at diff --git a/lib/good_job/cron_manager.rb b/lib/good_job/cron_manager.rb index a4973bde..7a9bc7fa 100644 --- a/lib/good_job/cron_manager.rb +++ b/lib/good_job/cron_manager.rb @@ -86,18 +86,20 @@ def shutdown? def create_task(cron_entry, previously_at: nil) cron_at = cron_entry.next_at(previously_at: previously_at) delay = [(cron_at - Time.current).to_f, 0].max - future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry, cron_at], executor: @executor) do |thr_scheduler, thr_cron_entry, thr_cron_at| - # Re-schedule the next cron task before executing the current task - thr_scheduler.create_task(thr_cron_entry, previously_at: thr_cron_at) + if !cron_entry.enqueued_at_least_one_time || delay >= 0.01 #case of clock drift and delay close to 0.001 + future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry, cron_at], executor: @executor) do |thr_scheduler, thr_cron_entry, thr_cron_at| + # Re-schedule the next cron task before executing the current task + thr_scheduler.create_task(thr_cron_entry, previously_at: thr_cron_at) - Rails.application.executor.wrap do - cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled? + Rails.application.executor.wrap do + cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled? + end end - end - @tasks[cron_entry.key] = future - future.add_observer(self.class, :task_observer) - future.execute + @tasks[cron_entry.key] = future + future.add_observer(self.class, :task_observer) + future.execute + end end # Uses the graceful restart period to re-enqueue jobs that were scheduled to run during the period. From ba9c4074f6e521f40bfaf153f0a605afcb9424eb Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Fri, 15 Nov 2024 10:37:43 -0600 Subject: [PATCH 2/4] Refactor to pass state through create_task instead of via ivar --- app/models/good_job/cron_entry.rb | 4 +--- lib/good_job/cron_manager.rb | 36 ++++++++++++++++++++----------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/app/models/good_job/cron_entry.rb b/app/models/good_job/cron_entry.rb index d6496fe2..53ad9071 100644 --- a/app/models/good_job/cron_entry.rb +++ b/app/models/good_job/cron_entry.rb @@ -12,7 +12,6 @@ class CronEntry include ActiveModel::Model attr_reader :params - attr_reader :enqueued_at_least_one_time def self.all(configuration: nil) configuration ||= GoodJob.configuration @@ -27,7 +26,7 @@ def self.find(key, configuration: nil) def initialize(params = {}) @params = params - @enqueued_at_least_one_time = false + return if cron_proc? raise ArgumentError, "Invalid cron format: '#{cron}'" unless fugit.instance_of?(Fugit::Cron) end @@ -99,7 +98,6 @@ def disable def enqueue(cron_at = nil) GoodJob::CurrentThread.within do |current_thread| - @enqueued_at_least_one_time = true current_thread.cron_key = key current_thread.cron_at = cron_at diff --git a/lib/good_job/cron_manager.rb b/lib/good_job/cron_manager.rb index 7a9bc7fa..e5d32fc0 100644 --- a/lib/good_job/cron_manager.rb +++ b/lib/good_job/cron_manager.rb @@ -26,11 +26,13 @@ def self.task_observer(time, output, thread_error) # rubocop:disable Lint/Unused end # Execution configuration to be scheduled - # @return [Hash] + # @return [Array] attr_reader :cron_entries # @param cron_entries [Array] # @param start_on_initialize [Boolean] + # @param graceful_restart_period [ActiveSupport::Duration, nil] + # @param executor [Concurrent::Executor] def initialize(cron_entries = [], start_on_initialize: false, graceful_restart_period: nil, executor: Concurrent.global_io_executor) @executor = executor @running = false @@ -82,24 +84,32 @@ def shutdown? # Enqueues a scheduled task # @param cron_entry [CronEntry] the CronEntry object to schedule - # @param previously_at [Date, Time, ActiveSupport::TimeWithZone, nil] the last, +in-memory+, scheduled time the cron task was intended to run - def create_task(cron_entry, previously_at: nil) - cron_at = cron_entry.next_at(previously_at: previously_at) - delay = [(cron_at - Time.current).to_f, 0].max - if !cron_entry.enqueued_at_least_one_time || delay >= 0.01 #case of clock drift and delay close to 0.001 - future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry, cron_at], executor: @executor) do |thr_scheduler, thr_cron_entry, thr_cron_at| + # #param at [Time, nil] When a task needs to optionally be rescheduled because of clock-drift or other inaccuracy + # @param previously_at [Time, nil] the last +in-memory+ scheduled time the cron task was intended to run + def create_task(cron_entry, at: nil, previously_at: nil) + cron_at = at || cron_entry.next_at(previously_at: previously_at) + + # ScheduledTask runs immediately if delay is <= 0.01. Prevent that. + # https://github.com/ruby-concurrency/concurrent-ruby/blob/56227a4c3ebdd53b8b0976eb8296ceb7a093496f/lib/concurrent-ruby/concurrent/executor/timer_set.rb#L97 + delay = [(cron_at - Time.current).to_f, 0.02].max + + future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry, cron_at, previously_at], executor: @executor) do |thr_manager, thr_cron_entry, thr_cron_at| + if thr_cron_at && thr_cron_at > Time.current + # If clock drift or other inaccuracy, reschedule the task again + thr_manager.create_task(thr_cron_entry, at: thr_cron_at, previously_at: previously_at) + else # Re-schedule the next cron task before executing the current task - thr_scheduler.create_task(thr_cron_entry, previously_at: thr_cron_at) + thr_manager.create_task(thr_cron_entry, previously_at: thr_cron_at) Rails.application.executor.wrap do cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled? end end - - @tasks[cron_entry.key] = future - future.add_observer(self.class, :task_observer) - future.execute end + + @tasks[cron_entry.key] = future + future.add_observer(self.class, :task_observer) + future.execute end # Uses the graceful restart period to re-enqueue jobs that were scheduled to run during the period. @@ -110,7 +120,7 @@ def create_graceful_tasks(cron_entry) time_period = @graceful_restart_period.ago..Time.current cron_entry.within(time_period).each do |cron_at| - future = Concurrent::Future.new(args: [self, cron_entry, cron_at], executor: @executor) do |_thr_scheduler, thr_cron_entry, thr_cron_at| + future = Concurrent::Future.new(args: [self, cron_entry, cron_at], executor: @executor) do |_thr_manager, thr_cron_entry, thr_cron_at| Rails.application.executor.wrap do cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled? end From 248e60b3c13e70d945cd34a3d9c991fd97160140 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Fri, 15 Nov 2024 10:40:42 -0600 Subject: [PATCH 3/4] Fix yard typo --- lib/good_job/cron_manager.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/good_job/cron_manager.rb b/lib/good_job/cron_manager.rb index e5d32fc0..77793605 100644 --- a/lib/good_job/cron_manager.rb +++ b/lib/good_job/cron_manager.rb @@ -84,7 +84,7 @@ def shutdown? # Enqueues a scheduled task # @param cron_entry [CronEntry] the CronEntry object to schedule - # #param at [Time, nil] When a task needs to optionally be rescheduled because of clock-drift or other inaccuracy + # @param at [Time, nil] When a task needs to optionally be rescheduled because of clock-drift or other inaccuracy # @param previously_at [Time, nil] the last +in-memory+ scheduled time the cron task was intended to run def create_task(cron_entry, at: nil, previously_at: nil) cron_at = at || cron_entry.next_at(previously_at: previously_at) From fdd0f4ee8536bf7ac8c3f524aca7535d45e394a0 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Fri, 15 Nov 2024 11:00:04 -0600 Subject: [PATCH 4/4] When should have run in the past, run immediately --- lib/good_job/cron_manager.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/good_job/cron_manager.rb b/lib/good_job/cron_manager.rb index 77793605..6d72ad63 100644 --- a/lib/good_job/cron_manager.rb +++ b/lib/good_job/cron_manager.rb @@ -89,9 +89,9 @@ def shutdown? def create_task(cron_entry, at: nil, previously_at: nil) cron_at = at || cron_entry.next_at(previously_at: previously_at) - # ScheduledTask runs immediately if delay is <= 0.01. Prevent that. + # ScheduledTask runs immediately if delay is <= 0.01; avoid ever scheduling the task before the intended time # https://github.com/ruby-concurrency/concurrent-ruby/blob/56227a4c3ebdd53b8b0976eb8296ceb7a093496f/lib/concurrent-ruby/concurrent/executor/timer_set.rb#L97 - delay = [(cron_at - Time.current).to_f, 0.02].max + delay = cron_at <= Time.current ? 0.0 : [(cron_at - Time.current).to_f, 0.02].max future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry, cron_at, previously_at], executor: @executor) do |thr_manager, thr_cron_entry, thr_cron_at| if thr_cron_at && thr_cron_at > Time.current