From f1f6f0440e977aa4f57f21dc29a898c597d6ca57 Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Fri, 27 Sep 2024 19:36:26 +0900 Subject: [PATCH] fix: insufficient error handling for cluster down --- bin/pubsub | 2 +- bin/singlepiptx | 108 ++++++++++++++++++ lib/redis_client/cluster/errors.rb | 4 +- .../cluster/optimistic_locking.rb | 2 +- lib/redis_client/cluster/pipeline.rb | 2 +- lib/redis_client/cluster/pub_sub.rb | 5 +- lib/redis_client/cluster/router.rb | 6 +- lib/redis_client/cluster/transaction.rb | 2 +- test/redis_client/cluster/test_errors.rb | 6 +- test/test_against_cluster_scale.rb | 2 +- 10 files changed, 123 insertions(+), 16 deletions(-) create mode 100755 bin/singlepiptx diff --git a/bin/pubsub b/bin/pubsub index 06d93a7..e19a53a 100755 --- a/bin/pubsub +++ b/bin/pubsub @@ -65,7 +65,7 @@ module PubSubDebug log "#{role}: recv: #{e.class}" rescue RedisClient::CommandError => e log "#{role}: recv: #{e.class}: #{e.message}" - raise unless e.message.start_with?('CLUSTERDOWN Hash slot not served') + raise unless e.message.start_with?('CLUSTERDOWN') rescue StandardError => e log "#{role}: recv: #{e.class}: #{e.message}" raise diff --git a/bin/singlepiptx b/bin/singlepiptx new file mode 100755 index 0000000..2c9a075 --- /dev/null +++ b/bin/singlepiptx @@ -0,0 +1,108 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require 'bundler/setup' +require 'redis_cluster_client' + +module SinglePipTxDebug + module_function + + def spawn_single(cli) + Thread.new(cli) do |r| + role = ' Single' + + loop do + handle_errors(role) do + reply = r.call('incr', 'single') + log "#{role}: #{reply}" + end + ensure + sleep 1.0 + end + rescue StandardError => e + log "#{role}: dead: #{e.class}: #{e.message}" + raise + end + end + + def spawn_pipeline(cli) + Thread.new(cli) do |r| + role = ' Pipeline' + + loop do + handle_errors(role) do + reply = r.pipelined do |pi| + pi.call('incr', 'pipeline') + pi.call('incr', 'pipeline') + end + + log "#{role}: #{reply}" + end + ensure + sleep 1.0 + end + rescue StandardError => e + log "#{role}: dead: #{e.class}: #{e.message}" + raise + end + end + + def spawn_transaction(cli) + Thread.new(cli) do |r| + role = 'Transaction' + + loop do + handle_errors(role) do + reply = r.multi do |tx| + tx.call('incr', 'transaction') + tx.call('incr', 'transaction') + tx.call('incr', 'transaction') + end + + log "#{role}: #{reply}" + end + ensure + sleep 1.0 + end + rescue StandardError => e + log "#{role}: dead: #{e.class}: #{e.message}" + raise + end + end + + def handle_errors(role) # rubocop:disable Metrics/AbcSize + yield + rescue RedisClient::ConnectionError, RedisClient::Cluster::InitialSetupError, RedisClient::Cluster::NodeMightBeDown => e + log "#{role}: #{e.class}" + rescue RedisClient::CommandError => e + log "#{role}: #{e.class}: #{e.message}" + raise unless e.message.start_with?('CLUSTERDOWN') + rescue RedisClient::Cluster::ErrorCollection => e + log "#{role}: #{e.class}: #{e.message}" + raise unless e.errors.values.all? do |err| + err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError) + end + rescue StandardError => e + log "#{role}: #{e.class}: #{e.message}" + raise + end + + def log(msg) + print "#{msg}\n" + end +end + +clients = Array.new(3) { RedisClient.cluster(connect_with_original_config: true).new_client } +threads = [] + +Signal.trap(:INT) do + threads.each(&:exit) + clients.each(&:close) + SinglePipTxDebug.log("\nBye bye") + exit 0 +end + +threads << SinglePipTxDebug.spawn_single(clients[0]) +threads << SinglePipTxDebug.spawn_pipeline(clients[1]) +threads << SinglePipTxDebug.spawn_transaction(clients[2]) +threads.each(&:join) diff --git a/lib/redis_client/cluster/errors.rb b/lib/redis_client/cluster/errors.rb index 79239dc..ca2717d 100644 --- a/lib/redis_client/cluster/errors.rb +++ b/lib/redis_client/cluster/errors.rb @@ -37,8 +37,8 @@ def initialize(errors) end @errors = errors - messages = @errors.map { |node_key, error| "#{node_key}: #{error.message}" } - super("Errors occurred on any node: #{messages.join(', ')}") + messages = @errors.map { |node_key, error| "#{node_key}: (#{error.class}) #{error.message}" } + super(messages.join(', ')) end end diff --git a/lib/redis_client/cluster/optimistic_locking.rb b/lib/redis_client/cluster/optimistic_locking.rb index bfc978b..f0b38dd 100644 --- a/lib/redis_client/cluster/optimistic_locking.rb +++ b/lib/redis_client/cluster/optimistic_locking.rb @@ -33,7 +33,7 @@ def watch(keys) # rubocop:disable Metrics/AbcSize raise end rescue ::RedisClient::CommandError => e - @router.renew_cluster_state if e.message.start_with?('CLUSTERDOWN Hash slot not served') + @router.renew_cluster_state if e.message.start_with?('CLUSTERDOWN') raise end rescue ::RedisClient::ConnectionError diff --git a/lib/redis_client/cluster/pipeline.rb b/lib/redis_client/cluster/pipeline.rb index 29c7267..c30055d 100644 --- a/lib/redis_client/cluster/pipeline.rb +++ b/lib/redis_client/cluster/pipeline.rb @@ -73,7 +73,7 @@ def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # ruboco first_exception ||= result end - stale_cluster_state = true if result.message.start_with?('CLUSTERDOWN Hash slot not served') + stale_cluster_state = true if result.message.start_with?('CLUSTERDOWN') end results[index] = result diff --git a/lib/redis_client/cluster/pub_sub.rb b/lib/redis_client/cluster/pub_sub.rb index 7c86589..80338c8 100644 --- a/lib/redis_client/cluster/pub_sub.rb +++ b/lib/redis_client/cluster/pub_sub.rb @@ -90,11 +90,10 @@ def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/Cycloma case event = @queue.pop(true) when ::RedisClient::CommandError - raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served') + raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN') break start_over - when ::RedisClient::ConnectionError - break start_over + when ::RedisClient::ConnectionError then break start_over when StandardError then raise event when Array then break event end diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 2a6c1e1..f3981a1 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -74,7 +74,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi renew_cluster_state raise rescue ::RedisClient::CommandError => e - renew_cluster_state if e.message.start_with?('CLUSTERDOWN Hash slot not served') + renew_cluster_state if e.message.start_with?('CLUSTERDOWN') raise rescue ::RedisClient::Cluster::ErrorCollection => e raise if e.errors.any?(::RedisClient::CircuitBreaker::OpenCircuitError) @@ -82,7 +82,7 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi renew_cluster_state if e.errors.values.any? do |err| next false if ::RedisClient::Cluster::ErrorIdentification.identifiable?(err) && @node.none? { |c| ::RedisClient::Cluster::ErrorIdentification.client_owns_error?(err, c) } - err.message.start_with?('CLUSTERDOWN Hash slot not served') || err.is_a?(::RedisClient::ConnectionError) + err.message.start_with?('CLUSTERDOWN') || err.is_a?(::RedisClient::ConnectionError) end raise @@ -123,7 +123,7 @@ def handle_redirection(node, retry_count:) # rubocop:disable Metrics/AbcSize, Me node.call('ASKING') retry end - elsif e.message.start_with?('CLUSTERDOWN Hash slot not served') + elsif e.message.start_with?('CLUSTERDOWN') renew_cluster_state retry if retry_count >= 0 end diff --git a/lib/redis_client/cluster/transaction.rb b/lib/redis_client/cluster/transaction.rb index 7312829..0782b65 100644 --- a/lib/redis_client/cluster/transaction.rb +++ b/lib/redis_client/cluster/transaction.rb @@ -170,7 +170,7 @@ def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize elsif err.message.start_with?('ASK') node = @router.assign_asking_node(err.message) try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err - elsif err.message.start_with?('CLUSTERDOWN Hash slot not served') + elsif err.message.start_with?('CLUSTERDOWN') @router.renew_cluster_state if @watching_slot.nil? raise err else diff --git a/test/redis_client/cluster/test_errors.rb b/test/redis_client/cluster/test_errors.rb index 75ac6db..5ceac2a 100644 --- a/test/redis_client/cluster/test_errors.rb +++ b/test/redis_client/cluster/test_errors.rb @@ -5,7 +5,7 @@ class RedisClient class Cluster class TestErrors < TestingWrapper - DummyError = Struct.new('DummyError', :message) + DummyError = Class.new(StandardError) def test_initial_setup_error [ @@ -44,11 +44,11 @@ def test_error_collection_error [ { errors: { '127.0.0.1:6379' => DummyError.new('foo') }, - want: { msg: 'Errors occurred on any node: 127.0.0.1:6379: foo', size: 1 } + want: { msg: '127.0.0.1:6379: (RedisClient::Cluster::TestErrors::DummyError) foo', size: 1 } }, { errors: { '127.0.0.1:6379' => DummyError.new('foo'), '127.0.0.1:6380' => DummyError.new('bar') }, - want: { msg: 'Errors occurred on any node: 127.0.0.1:6379: foo, 127.0.0.1:6380: bar', size: 2 } + want: { msg: '127.0.0.1:6379: (RedisClient::Cluster::TestErrors::DummyError) foo, 127.0.0.1:6380: (RedisClient::Cluster::TestErrors::DummyError) bar', size: 2 } }, { errors: {}, want: { msg: '{}', size: 0 } }, { errors: [], want: { msg: '[]', size: 0 } }, diff --git a/test/test_against_cluster_scale.rb b/test/test_against_cluster_scale.rb index eac371a..9f557c3 100644 --- a/test/test_against_cluster_scale.rb +++ b/test/test_against_cluster_scale.rb @@ -115,7 +115,7 @@ def retryable(attempts:) attempts -= 1 break yield rescue ::RedisClient::CommandError => e - raise unless e.message.start_with?('CLUSTERDOWN Hash slot not served') + raise unless e.message.start_with?('CLUSTERDOWN') @cluster_down_error_count += 1 sleep WAIT_SEC