Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guard against invalid schedule loading #543

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 109 additions & 4 deletions lib/resque/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,106 @@ class << self
# allow user to set an additional failure handler
attr_writer :failure_handler

# runs without a lock
def run_delayed_only
procline 'Starting Delayed'

# trap signals
register_signal_handlers

# Quote from the resque/worker.
# Fix buffering so we can `rake resque:scheduler > scheduler.log` and
# get output from the child in there.
$stdout.sync = true
$stderr.sync = true

begin
@th = Thread.current

# Now start the scheduling part of the loop.
procline 'Processing Delayed Items'
loop do
begin
unlocked_handle_delayed_items
rescue Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError => e
log! e.message
end
poll_sleep
end

rescue Interrupt
log 'Exiting'
end
end

# Handles queueing delayed items
# at_time - Time to start scheduling items (default: now).
# this should be multi-process safe
def unlocked_handle_delayed_items(at_time = nil)
timestamp = Resque.next_delayed_timestamp(at_time)
if timestamp
until timestamp.nil?
unlocked_enqueue_delayed_items_for_timestamp(timestamp)
timestamp = Resque.next_delayed_timestamp(at_time)
end
end
end

def unlocked_enqueue_delayed_items_for_timestamp(timestamp)
item = nil
loop do
handle_shutdown do
# Continually check that it is still the master
item = enqueue_next_item(timestamp)
end
# continue processing until there are no more ready items in this
# timestamp
break if item.nil?
end
end

# run with RESQUE_SCHEDULER_MASTER_LOCK_PREFIX=scheduler
def run_scheduled_only
procline 'Starting Scheduler'
ENV['RESQUE_SCHEDULER_MASTER_LOCK_PREFIX'] = 'scheduler'

# trap signals
register_signal_handlers

# Quote from the resque/worker.
# Fix buffering so we can `rake resque:scheduler > scheduler.log` and
# get output from the child in there.
$stdout.sync = true
$stderr.sync = true

# Load the schedule into rufus
# If dynamic is set, load that schedule otherwise use normal load
reload_schedule!

begin
@th = Thread.current

# Now start the scheduling part of the loop.
loop do
begin
if master?
update_schedule
procline 'Processing Schedules'
end
rescue Errno::EAGAIN, Errno::ECONNRESET, Redis::CannotConnectError => e
log! e.message
release_master_lock
end
poll_sleep
end

rescue Interrupt
log 'Exiting'
end
ensure
release_master_lock
end

# Schedule all jobs and continually look for delayed jobs (never returns)
def run
procline 'Starting'
Expand Down Expand Up @@ -140,11 +240,16 @@ def load_schedule_job(name, config)
args = optionizate_interval_value(config[interval_type])
args = [args, nil, job: true] if args.is_a?(::String)

job = rufus_scheduler.send(interval_type, *args) do
enqueue_recurring(name, config)
begin
job = rufus_scheduler.send(interval_type, *args) do
enqueue_recurring(name, config)
end
@scheduled_jobs[name] = job
interval_defined = true

rescue => e
log_error "[Bad Schedule] ignoring with: #{e.message}\n#{e.backtrace.join("\n")}"
end
@scheduled_jobs[name] = job
interval_defined = true
break
end
unless interval_defined
Expand Down
10 changes: 8 additions & 2 deletions lib/resque/scheduler/delaying_extensions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,14 @@ def next_delayed_timestamp(at_time = nil)
def next_item_for_timestamp(timestamp)
key = "delayed:#{timestamp.to_i}"

# TODO: this shoudl be all done in lua lpop, srem should be atomic
encoded_item = redis.lpop(key)
redis.srem("timestamps:#{encoded_item}", key)
item = decode(encoded_item)
if encoded_item
redis.srem("timestamps:#{encoded_item}", key)
item = decode(encoded_item)
else
item = nil
end

# If the list is empty, remove it.
clean_up_timestamp(key, timestamp)
Expand Down Expand Up @@ -285,6 +290,7 @@ def clean_up_timestamp(key, timestamp)
# Use a watch here to ensure nobody adds jobs to this delayed
# queue while we're removing it.
redis.watch(key) do
# TODO: this should be done using lua script
if redis.llen(key).to_i == 0
# If the list is empty, remove it.
redis.multi do
Expand Down
2 changes: 1 addition & 1 deletion lib/resque/scheduler/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def show_job_arguments(args)

def queue_from_class_name(class_name)
Resque.queue_from_class(
Resque::Scheduler::Util.constantize(class_name)
class_name
)
end

Expand Down
6 changes: 3 additions & 3 deletions resque-scheduler.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'rubocop', '~> 0.40.0'

spec.add_runtime_dependency 'mono_logger', '~> 1.0'
spec.add_runtime_dependency 'redis', '~> 3.3'
spec.add_runtime_dependency 'resque', '~> 1.26'
spec.add_runtime_dependency 'rufus-scheduler', '~> 3.2'
spec.add_runtime_dependency 'redis'
spec.add_runtime_dependency 'resque'
spec.add_runtime_dependency 'rufus-scheduler'
end