Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: insufficient error handling for cluster down #385

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/pubsub
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ module PubSubDebug
log "#{role}: recv: #{e.class}"
rescue RedisClient::CommandError => e
log "#{role}: recv: #{e.class}: #{e.message}"
raise unless e.message.start_with?('CLUSTERDOWN Hash slot not served')
raise unless e.message.start_with?('CLUSTERDOWN')
rescue StandardError => e
log "#{role}: recv: #{e.class}: #{e.message}"
raise
Expand Down
108 changes: 108 additions & 0 deletions bin/singlepiptx
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require 'bundler/setup'
require 'redis_cluster_client'

module SinglePipTxDebug
module_function

def spawn_single(cli)
Thread.new(cli) do |r|
role = ' Single'

loop do
handle_errors(role) do
reply = r.call('incr', 'single')
log "#{role}: #{reply}"
end
ensure
sleep 1.0
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
raise
end
end

def spawn_pipeline(cli)
Thread.new(cli) do |r|
role = ' Pipeline'

loop do
handle_errors(role) do
reply = r.pipelined do |pi|
pi.call('incr', 'pipeline')
pi.call('incr', 'pipeline')
end

log "#{role}: #{reply}"
end
ensure
sleep 1.0
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
raise
end
end

def spawn_transaction(cli)
Thread.new(cli) do |r|
role = 'Transaction'

loop do
handle_errors(role) do
reply = r.multi do |tx|
tx.call('incr', 'transaction')
tx.call('incr', 'transaction')
tx.call('incr', 'transaction')
end

log "#{role}: #{reply}"
end
ensure
sleep 1.0
end
rescue StandardError => e
log "#{role}: dead: #{e.class}: #{e.message}"
raise
end
end

def handle_errors(role) # rubocop:disable Metrics/AbcSize
yield
rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e
log "#{role}: #{e.class}"
rescue RedisClient::CommandError => e
log "#{role}: #{e.class}: #{e.message}"
raise unless e.message.start_with?('CLUSTERDOWN')
rescue RedisClient::Cluster::ErrorCollection => e
log "#{role}: #{e.class}: #{e.message}"
raise unless e.errors.values.all? do |err|
err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError)
end
rescue StandardError => e
log "#{role}: #{e.class}: #{e.message}"
raise
end

def log(msg)
print "#{msg}\n"
end
end

clients = Array.new(3) { RedisClient.cluster(connect_with_original_config: true).new_client }
threads = []

Signal.trap(:INT) do
threads.each(&:exit)
clients.each(&:close)
SinglePipTxDebug.log("\nBye bye")
exit 0
end

threads << SinglePipTxDebug.spawn_single(clients[0])
threads << SinglePipTxDebug.spawn_pipeline(clients[1])
threads << SinglePipTxDebug.spawn_transaction(clients[2])
threads.each(&:join)
4 changes: 2 additions & 2 deletions lib/redis_client/cluster/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def initialize(errors)
end

@errors = errors
messages = @errors.map { |node_key, error| "#{node_key}: #{error.message}" }
super("Errors occurred on any node: #{messages.join(', ')}")
messages = @errors.map { |node_key, error| "#{node_key}: (#{error.class}) #{error.message}" }
super(messages.join(', '))
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/redis_client/cluster/optimistic_locking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def watch(keys) # rubocop:disable Metrics/AbcSize
raise
end
rescue ::RedisClient::CommandError => e
@router.renew_cluster_state if e.message.start_with?('CLUSTERDOWN Hash slot not served')
@router.renew_cluster_state if e.message.start_with?('CLUSTERDOWN')
raise
end
rescue ::RedisClient::ConnectionError
Expand Down
2 changes: 1 addition & 1 deletion lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco
first_exception ||= result
end

stale_cluster_state = true if result.message.start_with?('CLUSTERDOWN Hash slot not served')
stale_cluster_state = true if result.message.start_with?('CLUSTERDOWN')
end

results[index] = result
Expand Down
5 changes: 2 additions & 3 deletions lib/redis_client/cluster/pub_sub.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,10 @@ def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/Cycloma

case event = @queue.pop(true)
when ::RedisClient::CommandError
raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served')
raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN')

break start_over
when ::RedisClient::ConnectionError
break start_over
when ::RedisClient::ConnectionError then break start_over
when StandardError then raise event
when Array then break event
end
Expand Down
6 changes: 3 additions & 3 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
renew_cluster_state
raise
rescue ::RedisClient::CommandError => e
renew_cluster_state if e.message.start_with?('CLUSTERDOWN Hash slot not served')
renew_cluster_state if e.message.start_with?('CLUSTERDOWN')
raise
rescue ::RedisClient::Cluster::ErrorCollection => e
raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError)

renew_cluster_state if e.errors.values.any? do |err|
next false if ::RedisClient::Cluster::ErrorIdentification.identifiable?(err) && @node.none? { |c| ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(err, c) }

err.message.start_with?('CLUSTERDOWN Hash slot not served') || err.is_a?(::RedisClient::ConnectionError)
err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError)
end

raise
Expand Down Expand Up @@ -123,7 +123,7 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me
node.call('ASKING')
retry
end
elsif e.message.start_with?('CLUSTERDOWN Hash slot not served')
elsif e.message.start_with?('CLUSTERDOWN')
renew_cluster_state
retry if retry_count >= 0
end
Expand Down
2 changes: 1 addition & 1 deletion lib/redis_client/cluster/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize
elsif err.message.start_with?('ASK')
node = @router.assign_asking_node(err.message)
try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err
elsif err.message.start_with?('CLUSTERDOWN Hash slot not served')
elsif err.message.start_with?('CLUSTERDOWN')
@router.renew_cluster_state if @watching_slot.nil?
raise err
else
Expand Down
6 changes: 3 additions & 3 deletions test/redis_client/cluster/test_errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
class RedisClient
class Cluster
class TestErrors < TestingWrapper
DummyError = Struct.new('DummyError', :message)
DummyError = Class.new(StandardError)

def test_initial_setup_error
[
Expand Down Expand Up @@ -44,11 +44,11 @@ def test_error_collection_error
[
{
errors: { '127.0.0.1:6379' => DummyError.new('foo') },
want: { msg: 'Errors occurred on any node: 127.0.0.1:6379: foo', size: 1 }
want: { msg: '127.0.0.1:6379: (RedisClient::Cluster::TestErrors::DummyError) foo', size: 1 }
},
{
errors: { '127.0.0.1:6379' => DummyError.new('foo'), '127.0.0.1:6380' => DummyError.new('bar') },
want: { msg: 'Errors occurred on any node: 127.0.0.1:6379: foo, 127.0.0.1:6380: bar', size: 2 }
want: { msg: '127.0.0.1:6379: (RedisClient::Cluster::TestErrors::DummyError) foo, 127.0.0.1:6380: (RedisClient::Cluster::TestErrors::DummyError) bar', size: 2 }
},
{ errors: {}, want: { msg: '{}', size: 0 } },
{ errors: [], want: { msg: '[]', size: 0 } },
Expand Down
2 changes: 1 addition & 1 deletion test/test_against_cluster_scale.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def retryable(attempts:)
attempts -= 1
break yield
rescue ::RedisClient::CommandError => e
raise unless e.message.start_with?('CLUSTERDOWN Hash slot not served')
raise unless e.message.start_with?('CLUSTERDOWN')

@cluster_down_error_count += 1
sleep WAIT_SEC
Expand Down
Loading