Skip to content

Commit

Permalink
fix: insufficient error handling for cluster down (#385)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Sep 27, 2024
1 parent db3a3c8 commit c75cca0
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 16 deletions.
2 changes: 1 addition & 1 deletion bin/pubsub
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ module PubSubDebug
log "#{role}: recv: #{e.class}"
rescue RedisClient::CommandError => e
log "#{role}: recv: #{e.class}: #{e.message}"
raise unless e.message.start_with?('CLUSTERDOWN Hash slot not served')
raise unless e.message.start_with?('CLUSTERDOWN')
rescue StandardError => e
log "#{role}: recv: #{e.class}: #{e.message}"
raise
Expand Down
108 changes: 108 additions & 0 deletions bin/singlepiptx
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require 'bundler/setup'
require 'redis_cluster_client'

module SinglePipTxDebug
module_function

def spawn_single(cli)
Thread.new(cli) do |r|
role = ' Single'

loop do
handle_errors(role) do
reply = r.call('incr', 'single')
log "#{role}: #{reply}"
end
ensure
sleep 1.0
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
raise
end
end

def spawn_pipeline(cli)
Thread.new(cli) do |r|
role = ' Pipeline'

loop do
handle_errors(role) do
reply = r.pipelined do |pi|
pi.call('incr', 'pipeline')
pi.call('incr', 'pipeline')
end

log "#{role}: #{reply}"
end
ensure
sleep 1.0
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
raise
end
end

def spawn_transaction(cli)
Thread.new(cli) do |r|
role = 'Transaction'

loop do
handle_errors(role) do
reply = r.multi do |tx|
tx.call('incr', 'transaction')
tx.call('incr', 'transaction')
tx.call('incr', 'transaction')
end

log "#{role}: #{reply}"
end
ensure
sleep 1.0
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
raise
end
end

def handle_errors(role) # rubocop:disable Metrics/AbcSize
yield
rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e
log "#{role}: #{e.class}"
rescue RedisClient::CommandError => e
log "#{role}: #{e.class}: #{e.message}"
raise unless e.message.start_with?('CLUSTERDOWN')
rescue RedisClient::Cluster::ErrorCollection => e
log "#{role}: #{e.class}: #{e.message}"
raise unless e.errors.values.all? do |err|
err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError)
end
rescue StandardError => e
log "#{role}: #{e.class}: #{e.message}"
raise
end

def log(msg)
print "#{msg}\n"
end
end

clients = Array.new(3) { RedisClient.cluster(connect_with_original_config: true).new_client }
threads = []

Signal.trap(:INT) do
threads.each(&:exit)
clients.each(&:close)
SinglePipTxDebug.log("\nBye bye")
exit 0
end

threads << SinglePipTxDebug.spawn_single(clients[0])
threads << SinglePipTxDebug.spawn_pipeline(clients[1])
threads << SinglePipTxDebug.spawn_transaction(clients[2])
threads.each(&:join)
4 changes: 2 additions & 2 deletions lib/redis_client/cluster/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def initialize(errors)
end

@errors = errors
messages = @errors.map { |node_key, error| "#{node_key}: #{error.message}" }
super("Errors occurred on any node: #{messages.join(', ')}")
messages = @errors.map { |node_key, error| "#{node_key}: (#{error.class}) #{error.message}" }
super(messages.join(', '))
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def watch(keys) # rubocop:disable Metrics/AbcSize
raise
end
rescue ::RedisClient::CommandError => e
@router.renew_cluster_state if e.message.start_with?('CLUSTERDOWN Hash slot not served')
@router.renew_cluster_state if e.message.start_with?('CLUSTERDOWN')
raise
end
rescue ::RedisClient::ConnectionError
Expand Down
2 changes: 1 addition & 1 deletion lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco
first_exception ||= result
end

stale_cluster_state = true if result.message.start_with?('CLUSTERDOWN Hash slot not served')
stale_cluster_state = true if result.message.start_with?('CLUSTERDOWN')
end

results[index] = result
Expand Down
5 changes: 2 additions & 3 deletions lib/redis_client/cluster/pub_sub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,10 @@ def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/Cycloma

case event = @queue.pop(true)
when ::RedisClient::CommandError
raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served')
raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN')

break start_over
when ::RedisClient::ConnectionError
break start_over
when ::RedisClient::ConnectionError then break start_over
when StandardError then raise event
when Array then break event
end
Expand Down
6 changes: 3 additions & 3 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
renew_cluster_state
raise
rescue ::RedisClient::CommandError => e
renew_cluster_state if e.message.start_with?('CLUSTERDOWN Hash slot not served')
renew_cluster_state if e.message.start_with?('CLUSTERDOWN')
raise
rescue ::RedisClient::Cluster::ErrorCollection => e
raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError)

renew_cluster_state if e.errors.values.any? do |err|
next false if ::RedisClient::Cluster::ErrorIdentification.identifiable?(err) && @node.none? { |c| ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(err, c) }

err.message.start_with?('CLUSTERDOWN Hash slot not served') || err.is_a?(::RedisClient::ConnectionError)
err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError)
end

raise
Expand Down Expand Up @@ -123,7 +123,7 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
node.call('ASKING')
retry
end
elsif e.message.start_with?('CLUSTERDOWN Hash slot not served')
elsif e.message.start_with?('CLUSTERDOWN')
renew_cluster_state
retry if retry_count >= 0
end
Expand Down
2 changes: 1 addition & 1 deletion lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize
elsif err.message.start_with?('ASK')
node = @router.assign_asking_node(err.message)
try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err
elsif err.message.start_with?('CLUSTERDOWN Hash slot not served')
elsif err.message.start_with?('CLUSTERDOWN')
@router.renew_cluster_state if @watching_slot.nil?
raise err
else
Expand Down
6 changes: 3 additions & 3 deletions test/redis_client/cluster/test_errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
class RedisClient
class Cluster
class TestErrors < TestingWrapper
DummyError = Struct.new('DummyError', :message)
DummyError = Class.new(StandardError)

def test_initial_setup_error
[
Expand Down Expand Up @@ -44,11 +44,11 @@ def test_error_collection_error
[
{
errors: { '127.0.0.1:6379' => DummyError.new('foo') },
want: { msg: 'Errors occurred on any node: 127.0.0.1:6379: foo', size: 1 }
want: { msg: '127.0.0.1:6379: (RedisClient::Cluster::TestErrors::DummyError) foo', size: 1 }
},
{
errors: { '127.0.0.1:6379' => DummyError.new('foo'), '127.0.0.1:6380' => DummyError.new('bar') },
want: { msg: 'Errors occurred on any node: 127.0.0.1:6379: foo, 127.0.0.1:6380: bar', size: 2 }
want: { msg: '127.0.0.1:6379: (RedisClient::Cluster::TestErrors::DummyError) foo, 127.0.0.1:6380: (RedisClient::Cluster::TestErrors::DummyError) bar', size: 2 }
},
{ errors: {}, want: { msg: '{}', size: 0 } },
{ errors: [], want: { msg: '[]', size: 0 } },
Expand Down
2 changes: 1 addition & 1 deletion test/test_against_cluster_scale.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def retryable(attempts:)
attempts -= 1
break yield
rescue ::RedisClient::CommandError => e
raise unless e.message.start_with?('CLUSTERDOWN Hash slot not served')
raise unless e.message.start_with?('CLUSTERDOWN')

@cluster_down_error_count += 1
sleep WAIT_SEC
Expand Down

0 comments on commit c75cca0

Please sign in to comment.