diff --git a/lib/resque/scheduler.rb b/lib/resque/scheduler.rb index 8ca91367..583339da 100644 --- a/lib/resque/scheduler.rb +++ b/lib/resque/scheduler.rb @@ -31,6 +31,106 @@ class << self # allow user to set an additional failure handler attr_writer :failure_handler + # runs without a lock + def run_delayed_only + procline 'Starting Delayed' + + # trap signals + register_signal_handlers + + # Quote from the resque/worker. + # Fix buffering so we can `rake resque:scheduler > scheduler.log` and + # get output from the child in there. + $stdout.sync = true + $stderr.sync = true + + begin + @th = Thread.current + + # Now start the scheduling part of the loop. + procline 'Processing Delayed Items' + loop do + begin + unlocked_handle_delayed_items + rescue Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError => e + log! e.message + end + poll_sleep + end + + rescue Interrupt + log 'Exiting' + end + end + + # Handles queueing delayed items + # at_time - Time to start scheduling items (default: now). + # this should be multi-process safe + def unlocked_handle_delayed_items(at_time = nil) + timestamp = Resque.next_delayed_timestamp(at_time) + if timestamp + until timestamp.nil? + unlocked_enqueue_delayed_items_for_timestamp(timestamp) + timestamp = Resque.next_delayed_timestamp(at_time) + end + end + end + + def unlocked_enqueue_delayed_items_for_timestamp(timestamp) + item = nil + loop do + handle_shutdown do + # Continually check that it is still the master + item = enqueue_next_item(timestamp) + end + # continue processing until there are no more ready items in this + # timestamp + break if item.nil? + end + end + + # run with RESQUE_SCHEDULER_MASTER_LOCK_PREFIX=scheduler + def run_scheduled_only + procline 'Starting Scheduler' + ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] = 'scheduler' + + # trap signals + register_signal_handlers + + # Quote from the resque/worker. + # Fix buffering so we can `rake resque:scheduler > scheduler.log` and + # get output from the child in there. + $stdout.sync = true + $stderr.sync = true + + # Load the schedule into rufus + # If dynamic is set, load that schedule otherwise use normal load + reload_schedule! + + begin + @th = Thread.current + + # Now start the scheduling part of the loop. + loop do + begin + if master? + update_schedule + procline 'Processing Schedules' + end + rescue Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError => e + log! e.message + release_master_lock + end + poll_sleep + end + + rescue Interrupt + log 'Exiting' + end + ensure + release_master_lock + end + # Schedule all jobs and continually look for delayed jobs (never returns) def run procline 'Starting' @@ -140,11 +240,16 @@ def load_schedule_job(name, config) args = optionizate_interval_value(config[interval_type]) args = [args, nil, job: true] if args.is_a?(::String) - job = rufus_scheduler.send(interval_type, *args) do - enqueue_recurring(name, config) + begin + job = rufus_scheduler.send(interval_type, *args) do + enqueue_recurring(name, config) + end + @scheduled_jobs[name] = job + interval_defined = true + + rescue => e + log_error "[Bad Schedule] ignoring with: #{e.message}\n#{e.backtrace.join("\n")}" end - @scheduled_jobs[name] = job - interval_defined = true break end unless interval_defined diff --git a/lib/resque/scheduler/delaying_extensions.rb b/lib/resque/scheduler/delaying_extensions.rb index ec7606f0..6b83beae 100644 --- a/lib/resque/scheduler/delaying_extensions.rb +++ b/lib/resque/scheduler/delaying_extensions.rb @@ -114,9 +114,14 @@ def next_delayed_timestamp(at_time = nil) def next_item_for_timestamp(timestamp) key = "delayed:#{timestamp.to_i}" + # TODO: this shoudl be all done in lua lpop, srem should be atomic encoded_item = redis.lpop(key) - redis.srem("timestamps:#{encoded_item}", key) - item = decode(encoded_item) + if encoded_item + redis.srem("timestamps:#{encoded_item}", key) + item = decode(encoded_item) + else + item = nil + end # If the list is empty, remove it. clean_up_timestamp(key, timestamp) @@ -285,6 +290,7 @@ def clean_up_timestamp(key, timestamp) # Use a watch here to ensure nobody adds jobs to this delayed # queue while we're removing it. redis.watch(key) do + # TODO: this should be done using lua script if redis.llen(key).to_i == 0 # If the list is empty, remove it. redis.multi do diff --git a/lib/resque/scheduler/server.rb b/lib/resque/scheduler/server.rb index 286118a7..9ec59d46 100644 --- a/lib/resque/scheduler/server.rb +++ b/lib/resque/scheduler/server.rb @@ -147,7 +147,7 @@ def show_job_arguments(args) def queue_from_class_name(class_name) Resque.queue_from_class( - Resque::Scheduler::Util.constantize(class_name) + class_name ) end diff --git a/resque-scheduler.gemspec b/resque-scheduler.gemspec index 3a4faea8..5c749f39 100644 --- a/resque-scheduler.gemspec +++ b/resque-scheduler.gemspec @@ -54,7 +54,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rubocop', '~> 0.40.0' spec.add_runtime_dependency 'mono_logger', '~> 1.0' - spec.add_runtime_dependency 'redis', '~> 3.3' - spec.add_runtime_dependency 'resque', '~> 1.26' - spec.add_runtime_dependency 'rufus-scheduler', '~> 3.2' + spec.add_runtime_dependency 'redis' + spec.add_runtime_dependency 'resque' + spec.add_runtime_dependency 'rufus-scheduler' end