Skip to content

Commit

Permalink
Refactors handling redis disconnects and uses it for the watcher thre…
Browse files Browse the repository at this point in the history
…ad as well
  • Loading branch information
Conor Pappas committed Apr 23, 2018
1 parent 668e522 commit 91ba7e8
Showing 1 changed file with 25 additions and 16 deletions.
41 changes: 25 additions & 16 deletions lib/resque_stuck_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,22 +171,25 @@ def setup_watcher_thread
Thread.current.abort_on_exception = abort_on_exception
log_starting_thread(:watcher)
while @running
mutex = RedisMutex.new(:resque_stuck_queue, :block => 0)
if mutex.lock
begin
queues.each do |queue_name|
log_watcher_info(queue_name)
if should_trigger?(queue_name)
trigger_handler(queue_name, :triggered)
elsif should_recover?(queue_name)
trigger_handler(queue_name, :recovered)
error_lambda = lambda { |e| "Watcher thread couldn't access redis: #{e.inspect}" }
handle_redis_disconnect(error_lambda) do
mutex = RedisMutex.new(:resque_stuck_queue, :block => 0)
if mutex.lock
begin
queues.each do |queue_name|
log_watcher_info(queue_name)
if should_trigger?(queue_name)
trigger_handler(queue_name, :triggered)
elsif should_recover?(queue_name)
trigger_handler(queue_name, :recovered)
end
end
ensure
mutex.unlock
end
ensure
mutex.unlock
end
wait_for_it(:watcher_interval)
end
wait_for_it(:watcher_interval)
end
end
end
Expand Down Expand Up @@ -227,13 +230,11 @@ def enqueue_jobs
config[:heartbeat_job].call
else
queues.each do |queue_name|
begin
error_lambda = lambda { |e| "Enqueuing heartbeat job for #{queue_name} crashed: #{e.inspect}" }
handle_redis_disconnect(error_lambda) do
# Redis::Namespace.new support as well as Redis.new
namespace = redis.respond_to?(:namespace) ? redis.namespace : nil
Resque.enqueue_to(queue_name, HeartbeatJob, heartbeat_key_for(queue_name), redis.client.host, redis.client.port, namespace, Time.now.to_i )
rescue Redis::CannotConnectError => e
logger.error("Enqueuing heartbeat job for #{queue_name} crashed: #{e.inspect}")
logger.error("\n#{e.backtrace.join("\n")}")
end
end
end
Expand Down Expand Up @@ -319,6 +320,14 @@ def pretty_process_name
$0 = "rake --trace resque:stuck_queue #{redis.inspect} QUEUES=#{queues.join(",")}"
end

def handle_redis_disconnect(error_message)
yield
rescue Redis::BaseError, SocketError => e
message = error_message.respond_to?(:call) ? error_message.call(e) : error_message
logger.error(message)
logger.error("\n#{e.backtrace.join("\n")}")
raise e unless config[:redis_disconnect_recovery]
end
end
end
end
Expand Down

0 comments on commit 91ba7e8

Please sign in to comment.