From 09484ea6a914c381ea5cd4cc6c20b3004ce8eee6 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Tue, 2 Jan 2024 20:43:35 +1100 Subject: [PATCH] Fix underlying RedisClient circuit breakers never firing (#309) --- lib/redis_client/cluster/node.rb | 267 +++++++++++------- .../cluster/node/base_topology.rb | 60 ++++ .../cluster/node/latency_replica.rb | 30 +- lib/redis_client/cluster/node/primary_only.rb | 26 +- .../cluster/node/random_replica.rb | 6 +- .../cluster/node/random_replica_or_primary.rb | 6 +- .../cluster/node/replica_mixin.rb | 37 --- lib/redis_client/cluster/router.rb | 32 +-- lib/redis_client/cluster_config.rb | 75 ++--- test/command_capture_middleware.rb | 38 ++- .../cluster/node/test_latency_replica.rb | 42 +-- .../cluster/node/test_primary_only.rb | 38 +-- .../cluster/node/test_random_replica.rb | 42 +-- .../node/test_random_replica_or_primary.rb | 42 +-- .../cluster/node/testing_topology_mixin.rb | 36 ++- test/redis_client/cluster/test_node.rb | 171 +++++++---- test/redis_client/test_cluster.rb | 26 +- test/redis_client/test_cluster_config.rb | 62 ++-- 18 files changed, 547 insertions(+), 489 deletions(-) create mode 100644 lib/redis_client/cluster/node/base_topology.rb delete mode 100644 lib/redis_client/cluster/node/replica_mixin.rb diff --git a/lib/redis_client/cluster/node.rb b/lib/redis_client/cluster/node.rb index dc362e18..e301bbec 100644 --- a/lib/redis_client/cluster/node.rb +++ b/lib/redis_client/cluster/node.rb @@ -22,10 +22,10 @@ class Node SLOT_SIZE = 16_384 MIN_SLOT = 0 MAX_SLOT = SLOT_SIZE - 1 - IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze DEAD_FLAGS = %w[fail? fail handshake noaddr noflags].freeze ROLE_FLAGS = %w[master slave].freeze EMPTY_ARRAY = [].freeze + EMPTY_HASH = {}.freeze ReloadNeeded = Class.new(::RedisClient::Error) @@ -92,119 +92,19 @@ def build_connection_prelude end end - class << self - def load_info(options, concurrent_worker, slow_command_timeout: -1, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity - raise ::RedisClient::Cluster::InitialSetupError, [] if options.nil? || options.empty? - - startup_size = options.size > MAX_STARTUP_SAMPLE ? MAX_STARTUP_SAMPLE : options.size - startup_options = options.to_a.sample(startup_size).to_h - startup_nodes = ::RedisClient::Cluster::Node.new(startup_options, concurrent_worker, **kwargs) - work_group = concurrent_worker.new_group(size: startup_size) - - startup_nodes.each_with_index do |raw_client, i| - work_group.push(i, raw_client) do |client| - regular_timeout = client.read_timeout - client.read_timeout = slow_command_timeout > 0.0 ? slow_command_timeout : regular_timeout - reply = client.call('CLUSTER', 'NODES') - client.read_timeout = regular_timeout - parse_cluster_node_reply(reply) - rescue StandardError => e - e - ensure - client&.close - end - end - - node_info_list = errors = nil - - work_group.each do |i, v| - case v - when StandardError - errors ||= Array.new(startup_size) - errors[i] = v - else - node_info_list ||= Array.new(startup_size) - node_info_list[i] = v - end - end - - work_group.close - - raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.nil? - - grouped = node_info_list.compact.group_by do |info_list| - info_list.sort_by!(&:id) - info_list.each_with_object(String.new(capacity: 128 * info_list.size)) do |e, a| - a << e.id << e.node_key << e.role << e.primary_id << e.config_epoch - end - end - - grouped.max_by { |_, v| v.size }[1].first.freeze - end - - private - - # @see https://redis.io/commands/cluster-nodes/ - # @see https://github.com/redis/redis/blob/78960ad57b8a5e6af743d789ed8fd767e37d42b8/src/cluster.c#L4660-L4683 - def parse_cluster_node_reply(reply) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity - reply.each_line("\n", chomp: true).filter_map do |line| - fields = line.split - flags = fields[2].split(',') - next unless fields[7] == 'connected' && (flags & DEAD_FLAGS).empty? - - slots = if fields[8].nil? - EMPTY_ARRAY - else - fields[8..].reject { |str| str.start_with?('[') } - .map { |str| str.split('-').map { |s| Integer(s) } } - .map { |a| a.size == 1 ? a << a.first : a } - .map(&:sort) - end - - ::RedisClient::Cluster::Node::Info.new( - id: fields[0], - node_key: parse_node_key(fields[1]), - role: (flags & ROLE_FLAGS).first, - primary_id: fields[3], - ping_sent: fields[4], - pong_recv: fields[5], - config_epoch: fields[6], - link_state: fields[7], - slots: slots - ) - end - end - - # As redirection node_key is dependent on `cluster-preferred-endpoint-type` config, - # node_key should use hostname if present in CLUSTER NODES output. - # - # See https://redis.io/commands/cluster-nodes/ for details on the output format. - # node_address matches fhe format: - def parse_node_key(node_address) - ip_chunk, hostname, _auxiliaries = node_address.split(',') - ip_port_string = ip_chunk.split('@').first - return ip_port_string if hostname.nil? || hostname.empty? - - port = ip_port_string.split(':')[1] - "#{hostname}:#{port}" - end - end - def initialize( - options, concurrent_worker, - node_info_list: [], - with_replica: false, - replica_affinity: :random, + config:, pool: nil, **kwargs ) @concurrent_worker = concurrent_worker - @slots = build_slot_node_mappings(node_info_list) - @replications = build_replication_mappings(node_info_list) - klass = make_topology_class(with_replica, replica_affinity) - @topology = klass.new(@replications, options, pool, @concurrent_worker, **kwargs) + @slots = build_slot_node_mappings(EMPTY_ARRAY) + @replications = build_replication_mappings(EMPTY_ARRAY) + klass = make_topology_class(config.use_replica?, config.replica_affinity) + @topology = klass.new(pool, @concurrent_worker, **kwargs) + @config = config @mutex = Mutex.new end @@ -255,6 +155,14 @@ def clients_for_scanning(seed: nil) @topology.clients_for_scanning(seed: seed).values.sort_by { |c| "#{c.config.host}-#{c.config.port}" } end + def clients + @topology.clients.values + end + + def primary_clients + @topology.primary_clients.values + end + def replica_clients @topology.replica_clients.values end @@ -292,6 +200,20 @@ def update_slot(slot, node_key) end end + def reload! + with_reload_lock do + with_startup_clients(MAX_STARTUP_SAMPLE) do |startup_clients| + @node_info = refetch_node_info_list(startup_clients) + @node_configs = @node_info.to_h do |node_info| + [node_info.node_key, @config.client_config_for_node(node_info.node_key)] + end + @slots = build_slot_node_mappings(@node_info) + @replications = build_replication_mappings(@node_info) + @topology.process_topology_update!(@replications, @node_configs) + end + end + end + private def make_topology_class(with_replica, replica_affinity) @@ -378,6 +300,137 @@ def try_map(clients, &block) # rubocop:disable Metrics/AbcSize, Metrics/Cyclomat [results, errors] end + + def refetch_node_info_list(startup_clients) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + startup_size = startup_clients.size + work_group = @concurrent_worker.new_group(size: startup_size) + + startup_clients.each_with_index do |raw_client, i| + work_group.push(i, raw_client) do |client| + regular_timeout = client.read_timeout + client.read_timeout = @config.slow_command_timeout > 0.0 ? @config.slow_command_timeout : regular_timeout + reply = client.call('CLUSTER', 'NODES') + client.read_timeout = regular_timeout + parse_cluster_node_reply(reply) + rescue StandardError => e + e + ensure + client&.close + end + end + + node_info_list = errors = nil + + work_group.each do |i, v| + case v + when StandardError + errors ||= Array.new(startup_size) + errors[i] = v + else + node_info_list ||= Array.new(startup_size) + node_info_list[i] = v + end + end + + work_group.close + + raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.nil? + + grouped = node_info_list.compact.group_by do |info_list| + info_list.sort_by!(&:id) + info_list.each_with_object(String.new(capacity: 128 * info_list.size)) do |e, a| + a << e.id << e.node_key << e.role << e.primary_id << e.config_epoch + end + end + + grouped.max_by { |_, v| v.size }[1].first + end + + def parse_cluster_node_reply(reply) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + reply.each_line("\n", chomp: true).filter_map do |line| + fields = line.split + flags = fields[2].split(',') + next unless fields[7] == 'connected' && (flags & DEAD_FLAGS).empty? + + slots = if fields[8].nil? + EMPTY_ARRAY + else + fields[8..].reject { |str| str.start_with?('[') } + .map { |str| str.split('-').map { |s| Integer(s) } } + .map { |a| a.size == 1 ? a << a.first : a } + .map(&:sort) + end + + ::RedisClient::Cluster::Node::Info.new( + id: fields[0], + node_key: parse_node_key(fields[1]), + role: (flags & ROLE_FLAGS).first, + primary_id: fields[3], + ping_sent: fields[4], + pong_recv: fields[5], + config_epoch: fields[6], + link_state: fields[7], + slots: slots + ) + end + end + + # As redirection node_key is dependent on `cluster-preferred-endpoint-type` config, + # node_key should use hostname if present in CLUSTER NODES output. + # + # See https://redis.io/commands/cluster-nodes/ for details on the output format. + # node_address matches fhe format: + def parse_node_key(node_address) + ip_chunk, hostname, _auxiliaries = node_address.split(',') + ip_port_string = ip_chunk.split('@').first + return ip_port_string if hostname.nil? || hostname.empty? + + port = ip_port_string.split(':')[1] + "#{hostname}:#{port}" + end + + def with_startup_clients(count) # rubocop:disable Metrics/AbcSize + if @config.connect_with_original_config + # If connect_with_original_config is set, that means we need to build actual client objects + # and close them, so that we e.g. re-resolve a DNS entry with the cluster nodes in it. + begin + # Memoize the startup clients, so we maintain RedisClient's internal circuit breaker configuration + # if it's set. + @startup_clients ||= @config.startup_nodes.values.sample(count).map do |node_config| + ::RedisClient::Cluster::Node::Config.new(**node_config).new_client + end + yield @startup_clients + ensure + # Close the startup clients when we're done, so we don't maintain pointless open connections to + # the cluster though + @startup_clients&.each(&:close) + end + else + # (re-)connect using nodes we already know about. + # If this is the first time we're connecting to the cluster, we need to seed the topology with the + # startup clients though. + @topology.process_topology_update!({}, @config.startup_nodes) if @topology.clients.empty? + yield @topology.clients.values.sample(count) + end + end + + def with_reload_lock + # What should happen with concurrent calls #reload? This is a realistic possibility if the cluster goes into + # a CLUSTERDOWN state, and we're using a pooled backend. Every thread will independently discover this, and + # call reload!. + # For now, if a reload is in progress, wait for that to complete, and consider that the same as us having + # performed the reload. + # Probably in the future we should add a circuit breaker to #reload itself, and stop trying if the cluster is + # obviously not working. + wait_start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @mutex.synchronize do + return if @last_reloaded_at && @last_reloaded_at > wait_start + + r = yield + @last_reloaded_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + r + end + end end end end diff --git a/lib/redis_client/cluster/node/base_topology.rb b/lib/redis_client/cluster/node/base_topology.rb new file mode 100644 index 00000000..e8bfbaaf --- /dev/null +++ b/lib/redis_client/cluster/node/base_topology.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +class RedisClient + class Cluster + class Node + class BaseTopology + IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze + attr_reader :clients, :primary_clients, :replica_clients + + def initialize(pool, concurrent_worker, **kwargs) + @pool = pool + @clients = {} + @client_options = kwargs.reject { |k, _| IGNORE_GENERIC_CONFIG_KEYS.include?(k) } + @concurrent_worker = concurrent_worker + @replications = EMPTY_HASH + @primary_node_keys = EMPTY_ARRAY + @replica_node_keys = EMPTY_ARRAY + @primary_clients = EMPTY_ARRAY + @replica_clients = EMPTY_ARRAY + end + + def any_primary_node_key(seed: nil) + random = seed.nil? ? Random : Random.new(seed) + @primary_node_keys.sample(random: random) + end + + def process_topology_update!(replications, options) # rubocop:disable Metrics/AbcSize + @replications = replications.freeze + @primary_node_keys = @replications.keys.sort.select { |k| options.key?(k) }.freeze + @replica_node_keys = @replications.values.flatten.sort.select { |k| options.key?(k) }.freeze + + # Disconnect from nodes that we no longer want, and connect to nodes we're not connected to yet + disconnect_from_unwanted_nodes(options) + connect_to_new_nodes(options) + + @primary_clients, @replica_clients = @clients.partition { |k, _| @primary_node_keys.include?(k) }.map(&:to_h) + @primary_clients.freeze + @replica_clients.freeze + end + + private + + def disconnect_from_unwanted_nodes(options) + (@clients.keys - options.keys).each do |node_key| + @clients.delete(node_key).close + end + end + + def connect_to_new_nodes(options) + (options.keys - @clients.keys).each do |node_key| + option = options[node_key].merge(@client_options) + config = ::RedisClient::Cluster::Node::Config.new(scale_read: !@primary_node_keys.include?(node_key), **option) + client = @pool.nil? ? config.new_client : config.new_pool(**@pool) + @clients[node_key] = client + end + end + end + end + end +end diff --git a/lib/redis_client/cluster/node/latency_replica.rb b/lib/redis_client/cluster/node/latency_replica.rb index 29ef3fcb..b1636947 100644 --- a/lib/redis_client/cluster/node/latency_replica.rb +++ b/lib/redis_client/cluster/node/latency_replica.rb @@ -1,29 +1,14 @@ # frozen_string_literal: true -require 'redis_client/cluster/node/replica_mixin' +require 'redis_client/cluster/node/base_topology' class RedisClient class Cluster class Node - class LatencyReplica - include ::RedisClient::Cluster::Node::ReplicaMixin - - attr_reader :replica_clients - + class LatencyReplica < BaseTopology DUMMY_LATENCY_MSEC = 100 * 1000 * 1000 MEASURE_ATTEMPT_COUNT = 10 - def initialize(replications, options, pool, concurrent_worker, **kwargs) - super - - all_replica_clients = @clients.select { |k, _| @replica_node_keys.include?(k) } - latencies = measure_latencies(all_replica_clients, concurrent_worker) - @replications.each_value { |keys| keys.sort_by! { |k| latencies.fetch(k) } } - @replica_clients = select_replica_clients(@replications, @clients) - @clients_for_scanning = select_clients_for_scanning(@replications, @clients) - @existed_replicas = @replications.values.reject(&:empty?) - end - def clients_for_scanning(seed: nil) # rubocop:disable Lint/UnusedMethodArgument @clients_for_scanning end @@ -37,6 +22,17 @@ def any_replica_node_key(seed: nil) @existed_replicas.sample(random: random)&.first || any_primary_node_key(seed: seed) end + def process_topology_update!(replications, options) + super + + all_replica_clients = @clients.select { |k, _| @replica_node_keys.include?(k) } + latencies = measure_latencies(all_replica_clients, @concurrent_worker) + @replications.each_value { |keys| keys.sort_by! { |k| latencies.fetch(k) } } + @replica_clients = select_replica_clients(@replications, @clients) + @clients_for_scanning = select_clients_for_scanning(@replications, @clients) + @existed_replicas = @replications.values.reject(&:empty?) + end + private def measure_latencies(clients, concurrent_worker) # rubocop:disable Metrics/AbcSize diff --git a/lib/redis_client/cluster/node/primary_only.rb b/lib/redis_client/cluster/node/primary_only.rb index 18f85bb7..50ba9518 100644 --- a/lib/redis_client/cluster/node/primary_only.rb +++ b/lib/redis_client/cluster/node/primary_only.rb @@ -1,16 +1,11 @@ # frozen_string_literal: true +require 'redis_client/cluster/node/base_topology' + class RedisClient class Cluster class Node - class PrimaryOnly - attr_reader :clients - - def initialize(replications, options, pool, _concurrent_worker, **kwargs) - @primary_node_keys = replications.keys.sort - @clients = build_clients(@primary_node_keys, options, pool, **kwargs) - end - + class PrimaryOnly < BaseTopology alias primary_clients clients alias replica_clients clients @@ -29,17 +24,10 @@ def any_primary_node_key(seed: nil) alias any_replica_node_key any_primary_node_key - private - - def build_clients(primary_node_keys, options, pool, **kwargs) - options.filter_map do |node_key, option| - next if !primary_node_keys.empty? && !primary_node_keys.include?(node_key) - - option = option.merge(kwargs.reject { |k, _| ::RedisClient::Cluster::Node::IGNORE_GENERIC_CONFIG_KEYS.include?(k) }) - config = ::RedisClient::Cluster::Node::Config.new(**option) - client = pool.nil? ? config.new_client : config.new_pool(**pool) - [node_key, client] - end.to_h + def process_topology_update!(replications, options) + # Remove non-primary nodes from options (provided that we actually have any primaries at all) + options = options.select { |node_key, _| replications.key?(node_key) } if replications.keys.any? + super(replications, options) end end end diff --git a/lib/redis_client/cluster/node/random_replica.rb b/lib/redis_client/cluster/node/random_replica.rb index 33124425..4870d789 100644 --- a/lib/redis_client/cluster/node/random_replica.rb +++ b/lib/redis_client/cluster/node/random_replica.rb @@ -1,13 +1,11 @@ # frozen_string_literal: true -require 'redis_client/cluster/node/replica_mixin' +require 'redis_client/cluster/node/base_topology' class RedisClient class Cluster class Node - class RandomReplica - include ::RedisClient::Cluster::Node::ReplicaMixin - + class RandomReplica < BaseTopology def replica_clients keys = @replications.values.filter_map(&:sample) @clients.select { |k, _| keys.include?(k) } diff --git a/lib/redis_client/cluster/node/random_replica_or_primary.rb b/lib/redis_client/cluster/node/random_replica_or_primary.rb index 197600ec..f11890cc 100644 --- a/lib/redis_client/cluster/node/random_replica_or_primary.rb +++ b/lib/redis_client/cluster/node/random_replica_or_primary.rb @@ -1,13 +1,11 @@ # frozen_string_literal: true -require 'redis_client/cluster/node/replica_mixin' +require 'redis_client/cluster/node/base_topology' class RedisClient class Cluster class Node - class RandomReplicaOrPrimary - include ::RedisClient::Cluster::Node::ReplicaMixin - + class RandomReplicaOrPrimary < BaseTopology def replica_clients keys = @replications.values.filter_map(&:sample) @clients.select { |k, _| keys.include?(k) } diff --git a/lib/redis_client/cluster/node/replica_mixin.rb b/lib/redis_client/cluster/node/replica_mixin.rb deleted file mode 100644 index 93e00851..00000000 --- a/lib/redis_client/cluster/node/replica_mixin.rb +++ /dev/null @@ -1,37 +0,0 @@ -# frozen_string_literal: true - -class RedisClient - class Cluster - class Node - module ReplicaMixin - attr_reader :clients, :primary_clients - - EMPTY_ARRAY = [].freeze - - def initialize(replications, options, pool, _concurrent_worker, **kwargs) - @replications = replications - @primary_node_keys = @replications.keys.sort - @replica_node_keys = @replications.values.flatten.sort - @clients = build_clients(@primary_node_keys, options, pool, **kwargs) - @primary_clients = @clients.select { |k, _| @primary_node_keys.include?(k) } - end - - def any_primary_node_key(seed: nil) - random = seed.nil? ? Random : Random.new(seed) - @primary_node_keys.sample(random: random) - end - - private - - def build_clients(primary_node_keys, options, pool, **kwargs) - options.to_h do |node_key, option| - option = option.merge(kwargs.reject { |k, _| ::RedisClient::Cluster::Node::IGNORE_GENERIC_CONFIG_KEYS.include?(k) }) - config = ::RedisClient::Cluster::Node::Config.new(scale_read: !primary_node_keys.include?(node_key), **option) - client = pool.nil? ? config.new_client : config.new_pool(**pool) - [node_key, client] - end - end - end - end - end -end diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index c974b3f0..d7bdedc2 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -22,9 +22,9 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs) @concurrent_worker = concurrent_worker @pool = pool @client_kwargs = kwargs - @node = fetch_cluster_info(@config, @concurrent_worker, pool: @pool, **@client_kwargs) + @node = ::RedisClient::Cluster::Node.new(concurrent_worker, config: config, pool: pool, **kwargs) + update_cluster_info! @command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout) - @mutex = Mutex.new @command_builder = @config.command_builder end @@ -289,34 +289,8 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics end end - def fetch_cluster_info(config, concurrent_worker, pool: nil, **kwargs) - node_info_list = ::RedisClient::Cluster::Node.load_info(config.per_node_key, concurrent_worker, slow_command_timeout: config.slow_command_timeout, **kwargs) - node_addrs = node_info_list.map { |i| ::RedisClient::Cluster::NodeKey.hashify(i.node_key) } - config.update_node(node_addrs) - ::RedisClient::Cluster::Node.new( - config.per_node_key, - concurrent_worker, - node_info_list: node_info_list, - pool: pool, - with_replica: config.use_replica?, - replica_affinity: config.replica_affinity, - **kwargs - ) - end - def update_cluster_info! - return if @mutex.locked? - - @mutex.synchronize do - begin - @node.each(&:close) - rescue ::RedisClient::Cluster::ErrorCollection - # ignore - end - - @config = @original_config.dup if @connect_with_original_config - @node = fetch_cluster_info(@config, @concurrent_worker, pool: @pool, **@client_kwargs) - end + @node.reload! end end end diff --git a/lib/redis_client/cluster_config.rb b/lib/redis_client/cluster_config.rb index 73fbea25..78455b49 100644 --- a/lib/redis_client/cluster_config.rb +++ b/lib/redis_client/cluster_config.rb @@ -23,9 +23,10 @@ class ClusterConfig InvalidClientConfigError = Class.new(::RedisClient::Error) - attr_reader :command_builder, :client_config, :replica_affinity, :slow_command_timeout, :connect_with_original_config + attr_reader :command_builder, :client_config, :replica_affinity, :slow_command_timeout, + :connect_with_original_config, :startup_nodes - def initialize( # rubocop:disable Metrics/AbcSize + def initialize( nodes: DEFAULT_NODES, replica: false, replica_affinity: :random, @@ -34,39 +35,26 @@ def initialize( # rubocop:disable Metrics/AbcSize connect_with_original_config: false, client_implementation: ::RedisClient::Cluster, # for redis gem slow_command_timeout: SLOW_COMMAND_TIMEOUT, + command_builder: ::RedisClient::CommandBuilder, **client_config ) @replica = true & replica @replica_affinity = replica_affinity.to_s.to_sym @fixed_hostname = fixed_hostname.to_s - @node_configs = build_node_configs(nodes.dup) - client_config = client_config.reject { |k, _| IGNORE_GENERIC_CONFIG_KEYS.include?(k) } - @command_builder = client_config.fetch(:command_builder, ::RedisClient::CommandBuilder) - @client_config = merge_generic_config(client_config, @node_configs) + @command_builder = command_builder + node_configs = build_node_configs(nodes.dup) + @client_config = merge_generic_config(client_config, node_configs) + # Keep tabs on the original startup nodes we were constructed with + @startup_nodes = build_startup_nodes(node_configs) @concurrency = merge_concurrency_option(concurrency) @connect_with_original_config = connect_with_original_config @client_implementation = client_implementation @slow_command_timeout = slow_command_timeout - @mutex = Mutex.new - end - - def dup - self.class.new( - nodes: @node_configs, - replica: @replica, - replica_affinity: @replica_affinity, - fixed_hostname: @fixed_hostname, - concurrency: @concurrency, - connect_with_original_config: @connect_with_original_config, - client_implementation: @client_implementation, - slow_command_timeout: @slow_command_timeout, - **@client_config - ) end def inspect - "#<#{self.class.name} #{per_node_key.values}>" + "#<#{self.class.name} #{startup_nodes.values}>" end def read_timeout @@ -86,29 +74,14 @@ def new_client(**kwargs) @client_implementation.new(self, concurrency: @concurrency, **kwargs) end - def per_node_key - @node_configs.to_h do |config| - node_key = ::RedisClient::Cluster::NodeKey.build_from_host_port(config[:host], config[:port]) - config = @client_config.merge(config) - config = config.merge(host: @fixed_hostname) unless @fixed_hostname.empty? - [node_key, config] - end - end - def use_replica? @replica end - def update_node(addrs) - return if @mutex.locked? - - @mutex.synchronize { @node_configs = build_node_configs(addrs) } - end - - def add_node(host, port) - return if @mutex.locked? - - @mutex.synchronize { @node_configs << { host: host, port: port } } + def client_config_for_node(node_key) + config = ::RedisClient::Cluster::NodeKey.hashify(node_key) + config[:port] = ensure_integer(config[:port]) + augment_client_config(config) end private @@ -176,11 +149,23 @@ def ensure_integer(value) end def merge_generic_config(client_config, node_configs) - return client_config if node_configs.empty? + cfg = node_configs.first || {} + client_config.reject { |k, _| IGNORE_GENERIC_CONFIG_KEYS.include?(k) } + .merge(cfg.slice(*MERGE_CONFIG_KEYS)) + end + + def build_startup_nodes(configs) + configs.to_h do |config| + node_key = ::RedisClient::Cluster::NodeKey.build_from_host_port(config[:host], config[:port]) + config = augment_client_config(config) + [node_key, config] + end + end - cfg = node_configs.first - MERGE_CONFIG_KEYS.each { |k| client_config[k] = cfg[k] if cfg.key?(k) } - client_config + def augment_client_config(config) + config = @client_config.merge(config) + config = config.merge(host: @fixed_hostname) unless @fixed_hostname.empty? + config end end end diff --git a/test/command_capture_middleware.rb b/test/command_capture_middleware.rb index 362e187e..1bd9417b 100644 --- a/test/command_capture_middleware.rb +++ b/test/command_capture_middleware.rb @@ -7,9 +7,37 @@ def inspect end end + # The CommandBuffer is what should be set as the :captured_commands custom option. + # It needs to be threadsafe, because redis-cluster-client performs some redis operations on + # multiple nodes in parallel, and in e.g. jruby it's not safe to concurrently manipulate the same array. + class CommandBuffer + def initialize + @array = [] + @mutex = Mutex.new + end + + def to_a + @mutex.synchronize do + @array.dup + end + end + + def <<(command) + @mutex.synchronize do + @array << command + end + end + + def clear + @mutex.synchronize do + @array.clear + end + end + end + def call(command, redis_config) redis_config.custom[:captured_commands] << CapturedCommand.new( - server_url: redis_config.server_url, + server_url: CommandCaptureMiddleware.normalize_captured_url(redis_config.server_url), command: command, pipelined: false ) @@ -19,11 +47,17 @@ def call(command, redis_config) def call_pipelined(commands, redis_config) commands.map do |command| redis_config.custom[:captured_commands] << CapturedCommand.new( - server_url: redis_config.server_url, + server_url: CommandCaptureMiddleware.normalize_captured_url(redis_config.server_url), command: command, pipelined: true ) end super end + + def self.normalize_captured_url(url) + URI.parse(url).tap do |u| + u.path = '' + end.to_s + end end diff --git a/test/redis_client/cluster/node/test_latency_replica.rb b/test/redis_client/cluster/node/test_latency_replica.rb index 66987523..bff4f55d 100644 --- a/test/redis_client/cluster/node/test_latency_replica.rb +++ b/test/redis_client/cluster/node/test_latency_replica.rb @@ -7,49 +7,41 @@ class RedisClient class Cluster class Node class TestLatencyReplica < TestingWrapper + TESTING_TOPOLOGY_OPTIONS = { replica: true, replica_affinity: :latency }.freeze include TestingTopologyMixin def test_clients_with_redis_client - got = @test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient, client) } - assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) + got = @test_node.clients + got.each { |client| assert_instance_of(::RedisClient, client) } + assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort) end def test_clients_with_pooled_redis_client - test_topology = ::RedisClient::Cluster::Node::LatencyReplica.new( - @replications, - @options, - { timeout: 3, size: 2 }, - @concurrent_worker, - **TEST_GENERIC_OPTIONS - ) - - got = test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient::Pooled, client) } - assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) - ensure - test_topology&.clients&.each_value(&:close) + test_node = make_node(pool: { timeout: 3, size: 2 }) + got = test_node.clients + got.each { |client| assert_instance_of(::RedisClient::Pooled, client) } + assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort) end def test_primary_clients - got = @test_topology.primary_clients - got.each_value do |client| + got = @test_node.primary_clients + got.each do |client| assert_instance_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end def test_replica_clients - got = @test_topology.replica_clients - got.each_value do |client| + got = @test_node.replica_clients + got.each do |client| assert_instance_of(::RedisClient, client) assert_equal('slave', client.call('ROLE').first) end end def test_clients_for_scanning - got = @test_topology.clients_for_scanning - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got = @test_node.clients_for_scanning + got.each { |client| assert_instance_of(::RedisClient, client) } assert_equal(TEST_SHARD_SIZE, got.size) end @@ -73,12 +65,6 @@ def test_any_replica_node_key got = @test_topology.any_replica_node_key assert_includes(@replications.values.flatten, got) end - - private - - def topology_class - ::RedisClient::Cluster::Node::LatencyReplica - end end end end diff --git a/test/redis_client/cluster/node/test_primary_only.rb b/test/redis_client/cluster/node/test_primary_only.rb index ec60afdd..27188319 100644 --- a/test/redis_client/cluster/node/test_primary_only.rb +++ b/test/redis_client/cluster/node/test_primary_only.rb @@ -7,53 +7,45 @@ class RedisClient class Cluster class Node class TestPrimaryOnly < TestingWrapper + TESTING_TOPOLOGY_OPTIONS = { replica: false }.freeze include TestingTopologyMixin def test_clients_with_redis_client - got = @test_topology.clients - got.each_value do |client| + got = @test_node.clients + got.each do |client| assert_instance_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end def test_clients_with_pooled_redis_client - test_topology = ::RedisClient::Cluster::Node::PrimaryOnly.new( - @replications, - @options, - { timeout: 3, size: 2 }, - @concurrent_worker, - **TEST_GENERIC_OPTIONS - ) - - got = test_topology.clients - got.each_value do |client| + test_node = make_node(pool: { timeout: 3, size: 2 }) + got = test_node.clients + got.each do |client| assert_instance_of(::RedisClient::Pooled, client) assert_equal('master', client.call('ROLE').first) end - ensure - test_topology&.clients&.each_value(&:close) end def test_primary_clients - got = @test_topology.primary_clients - got.each_value do |client| + got = @test_node.primary_clients + got.each do |client| assert_instance_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end def test_replica_clients - got = @test_topology.replica_clients - got.each_value do |client| + got = @test_node.replica_clients + got.each do |client| assert_instance_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end def test_clients_for_scanning - got = @test_topology.clients_for_scanning - got.each_value do |client| + got = @test_node.clients_for_scanning + got.each do |client| assert_instance_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end @@ -74,12 +66,6 @@ def test_any_replica_node_key got = @test_topology.any_replica_node_key assert_includes(@replications.keys, got) end - - private - - def topology_class - ::RedisClient::Cluster::Node::PrimaryOnly - end end end end diff --git a/test/redis_client/cluster/node/test_random_replica.rb b/test/redis_client/cluster/node/test_random_replica.rb index 1116bf7a..73bc226f 100644 --- a/test/redis_client/cluster/node/test_random_replica.rb +++ b/test/redis_client/cluster/node/test_random_replica.rb @@ -7,49 +7,41 @@ class RedisClient class Cluster class Node class TestRandomReplica < TestingWrapper + TESTING_TOPOLOGY_OPTIONS = { replica: true, replica_affinity: :random }.freeze include TestingTopologyMixin def test_clients_with_redis_client - got = @test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient, client) } - assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) + got = @test_node.clients + got.each { |client| assert_instance_of(::RedisClient, client) } + assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort) end def test_clients_with_pooled_redis_client - test_topology = ::RedisClient::Cluster::Node::RandomReplica.new( - @replications, - @options, - { timeout: 3, size: 2 }, - @concurrent_worker, - **TEST_GENERIC_OPTIONS - ) - - got = test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient::Pooled, client) } - assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) - ensure - test_topology&.clients&.each_value(&:close) + test_node = make_node(pool: { timeout: 3, size: 2 }) + got = test_node.clients + got.each { |client| assert_instance_of(::RedisClient::Pooled, client) } + assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort) end def test_primary_clients - got = @test_topology.primary_clients - got.each_value do |client| + got = @test_node.primary_clients + got.each do |client| assert_instance_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end def test_replica_clients - got = @test_topology.replica_clients - got.each_value do |client| + got = @test_node.replica_clients + got.each do |client| assert_instance_of(::RedisClient, client) assert_equal('slave', client.call('ROLE').first) end end def test_clients_for_scanning - got = @test_topology.clients_for_scanning - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got = @test_node.clients_for_scanning + got.each { |client| assert_instance_of(::RedisClient, client) } assert_equal(TEST_SHARD_SIZE, got.size) end @@ -73,12 +65,6 @@ def test_any_replica_node_key got = @test_topology.any_replica_node_key assert_includes(@replications.values.flatten, got) end - - private - - def topology_class - ::RedisClient::Cluster::Node::RandomReplica - end end end end diff --git a/test/redis_client/cluster/node/test_random_replica_or_primary.rb b/test/redis_client/cluster/node/test_random_replica_or_primary.rb index 33319d20..ea0ad2b7 100644 --- a/test/redis_client/cluster/node/test_random_replica_or_primary.rb +++ b/test/redis_client/cluster/node/test_random_replica_or_primary.rb @@ -7,49 +7,41 @@ class RedisClient class Cluster class Node class TestRandomReplicaWithPrimary < TestingWrapper + TESTING_TOPOLOGY_OPTIONS = { replica: true, replica_affinity: :random_with_primary }.freeze include TestingTopologyMixin def test_clients_with_redis_client - got = @test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient, client) } - assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) + got = @test_node.clients + got.each { |client| assert_instance_of(::RedisClient, client) } + assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort) end def test_clients_with_pooled_redis_client - test_topology = ::RedisClient::Cluster::Node::RandomReplicaOrPrimary.new( - @replications, - @options, - { timeout: 3, size: 2 }, - @concurrent_worker, - **TEST_GENERIC_OPTIONS - ) - - got = test_topology.clients - got.each_value { |client| assert_instance_of(::RedisClient::Pooled, client) } - assert_equal(%w[master slave], got.map { |_, v| v.call('ROLE').first }.uniq.sort) - ensure - test_topology&.clients&.each_value(&:close) + test_node = make_node(pool: { timeout: 3, size: 2 }) + got = test_node.clients + got.each { |client| assert_instance_of(::RedisClient::Pooled, client) } + assert_equal(%w[master slave], got.map { |v| v.call('ROLE').first }.uniq.sort) end def test_primary_clients - got = @test_topology.primary_clients - got.each_value do |client| + got = @test_node.primary_clients + got.each do |client| assert_instance_of(::RedisClient, client) assert_equal('master', client.call('ROLE').first) end end def test_replica_clients - got = @test_topology.replica_clients - got.each_value do |client| + got = @test_node.replica_clients + got.each do |client| assert_instance_of(::RedisClient, client) assert_equal('slave', client.call('ROLE').first) end end def test_clients_for_scanning - got = @test_topology.clients_for_scanning - got.each_value { |client| assert_instance_of(::RedisClient, client) } + got = @test_node.clients_for_scanning + got.each { |client| assert_instance_of(::RedisClient, client) } assert_equal(TEST_SHARD_SIZE, got.size) end @@ -73,12 +65,6 @@ def test_any_replica_node_key got = @test_topology.any_replica_node_key assert_includes(@replications.values.flatten, got) end - - private - - def topology_class - ::RedisClient::Cluster::Node::RandomReplicaOrPrimary - end end end end diff --git a/test/redis_client/cluster/node/testing_topology_mixin.rb b/test/redis_client/cluster/node/testing_topology_mixin.rb index f7d2a42b..4acd7a66 100644 --- a/test/redis_client/cluster/node/testing_topology_mixin.rb +++ b/test/redis_client/cluster/node/testing_topology_mixin.rb @@ -4,31 +4,29 @@ class RedisClient class Cluster class Node module TestingTopologyMixin - def setup - test_config = ::RedisClient::ClusterConfig.new( + def make_node(pool: nil, **kwargs) + config = ::RedisClient::ClusterConfig.new(**{ nodes: TEST_NODE_URIS, fixed_hostname: TEST_FIXED_HOSTNAME, - **TEST_GENERIC_OPTIONS - ) - @concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create - test_node_info_list = ::RedisClient::Cluster::Node.load_info(test_config.per_node_key, @concurrent_worker) - if TEST_FIXED_HOSTNAME - test_node_info_list.each do |info| - _, port = ::RedisClient::Cluster::NodeKey.split(info.node_key) - info.node_key = ::RedisClient::Cluster::NodeKey.build_from_host_port(TEST_FIXED_HOSTNAME, port) - end + **TEST_GENERIC_OPTIONS, + **self.class::TESTING_TOPOLOGY_OPTIONS + }.merge(kwargs)) + concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create + ::RedisClient::Cluster::Node.new(concurrent_worker, pool: pool, config: config).tap do |node| + node.reload! + @test_nodes ||= [] + @test_nodes << node end - node_addrs = test_node_info_list.map { |info| ::RedisClient::Cluster::NodeKey.hashify(info.node_key) } - test_config.update_node(node_addrs) - @options = test_config.per_node_key - test_node = ::RedisClient::Cluster::Node.new(@options, @concurrent_worker, node_info_list: test_node_info_list) - @replications = test_node.instance_variable_get(:@replications) - test_node&.each(&:close) - @test_topology = topology_class.new(@replications, @options, nil, @concurrent_worker, **TEST_GENERIC_OPTIONS) + end + + def setup + @test_node = make_node + @test_topology = @test_node.instance_variable_get(:@topology) + @replications = @test_node.instance_variable_get(:@replications) end def teardown - @test_topology&.clients&.each_value(&:close) + @test_nodes&.each { |n| n.each(&:close) } end end end diff --git a/test/redis_client/cluster/test_node.rb b/test/redis_client/cluster/test_node.rb index d0819279..dc50188a 100644 --- a/test/redis_client/cluster/test_node.rb +++ b/test/redis_client/cluster/test_node.rb @@ -23,61 +23,27 @@ def test_connection_prelude # rubocop:disable Metrics/ClassLength class TestNode < TestingWrapper def setup - @test_config = ::RedisClient::ClusterConfig.new( - nodes: TEST_NODE_URIS, - fixed_hostname: TEST_FIXED_HOSTNAME, - **TEST_GENERIC_OPTIONS - ) - @concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create - @test_node_info_list = ::RedisClient::Cluster::Node.load_info(@test_config.per_node_key, @concurrent_worker) - if TEST_FIXED_HOSTNAME - @test_node_info_list.each do |info| - _, port = ::RedisClient::Cluster::NodeKey.split(info.node_key) - info.node_key = ::RedisClient::Cluster::NodeKey.build_from_host_port(TEST_FIXED_HOSTNAME, port) - end - end - node_addrs = @test_node_info_list.map { |info| ::RedisClient::Cluster::NodeKey.hashify(info.node_key) } - @test_config.update_node(node_addrs) - @test_node = ::RedisClient::Cluster::Node.new( - @test_config.per_node_key, - @concurrent_worker, - node_info_list: @test_node_info_list - ) - @test_node_with_scale_read = ::RedisClient::Cluster::Node.new( - @test_config.per_node_key, - @concurrent_worker, - node_info_list: @test_node_info_list, - with_replica: true - ) + @test_node = make_node.tap(&:reload!) + @test_node_with_scale_read = make_node(replica: true).tap(&:reload!) + @test_node_info_list = @test_node.instance_variable_get(:@node_info) end def teardown - @test_node&.each(&:close) - @test_node_with_scale_read&.each(&:close) + @test_nodes&.each { |n| n&.each(&:close) } end - def test_load_info - [ - { - params: { options: TEST_NODE_OPTIONS, kwargs: TEST_GENERIC_OPTIONS }, - want: { size: TEST_NUMBER_OF_NODES } - }, - { - params: { options: { '127.0.0.1:11211' => { host: '127.0.0.1', port: 11_211 } }, kwargs: TEST_GENERIC_OPTIONS }, - want: { error: ::RedisClient::Cluster::InitialSetupError } - }, - { - params: { options: {}, kwargs: TEST_GENERIC_OPTIONS }, - want: { error: ::RedisClient::Cluster::InitialSetupError } - } - ].each_with_index do |c, idx| - msg = "Case: #{idx}" - got = -> { ::RedisClient::Cluster::Node.load_info(c[:params][:options], @concurrent_worker, **c[:params][:kwargs]) } - if c[:want].key?(:error) - assert_raises(c[:want][:error], msg, &got) - else - assert_equal(c[:want][:size], got.call.size, msg) - end + def make_node(capture_buffer: CommandCaptureMiddleware::CommandBuffer.new, pool: nil, **kwargs) + config = ::RedisClient::ClusterConfig.new(**{ + nodes: TEST_NODE_URIS, + fixed_hostname: TEST_FIXED_HOSTNAME, + middlewares: [CommandCaptureMiddleware], + custom: { captured_commands: capture_buffer }, + **TEST_GENERIC_OPTIONS + }.merge(kwargs)) + concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create + ::RedisClient::Cluster::Node.new(concurrent_worker, pool: pool, config: config).tap do |node| + @test_nodes ||= [] + @test_nodes << node end end @@ -111,7 +77,7 @@ def test_parse_cluster_node_reply_continuous_slots primary_id: '-', ping_sent: '0', pong_recv: '0', config_epoch: '1', link_state: 'connected', slots: [[0, 5460]] } ] - got = ::RedisClient::Cluster::Node.send(:parse_cluster_node_reply, info) + got = @test_node.send(:parse_cluster_node_reply, info) assert_equal(want, got.map(&:to_h)) end @@ -145,7 +111,7 @@ def test_parse_cluster_node_reply_discrete_slots primary_id: '-', ping_sent: '0', pong_recv: '0', config_epoch: '1', link_state: 'connected', slots: [[0, 3000], [3002, 5460], [15_001, 15_001]] } ] - got = ::RedisClient::Cluster::Node.send(:parse_cluster_node_reply, info) + got = @test_node.send(:parse_cluster_node_reply, info) assert_equal(want, got.map(&:to_h)) end @@ -179,7 +145,7 @@ def test_parse_cluster_node_reply_discrete_slots_and_resharding primary_id: '-', ping_sent: '0', pong_recv: '0', config_epoch: '1', link_state: 'connected', slots: [[0, 3000], [3002, 5460], [15_001, 15_001]] } ] - got = ::RedisClient::Cluster::Node.send(:parse_cluster_node_reply, info) + got = @test_node.send(:parse_cluster_node_reply, info) assert_equal(want, got.map(&:to_h)) end @@ -213,7 +179,7 @@ def test_parse_cluster_node_reply_with_hostname primary_id: '-', ping_sent: '0', pong_recv: '0', config_epoch: '1', link_state: 'connected', slots: [[0, 5460]] } ] - got = ::RedisClient::Cluster::Node.send(:parse_cluster_node_reply, info) + got = @test_node.send(:parse_cluster_node_reply, info) assert_equal(want, got.map(&:to_h)) end @@ -247,7 +213,7 @@ def test_parse_cluster_node_reply_with_hostname_and_auxiliaries primary_id: '-', ping_sent: '0', pong_recv: '0', config_epoch: '1', link_state: 'connected', slots: [[0, 5460]] } ] - got = ::RedisClient::Cluster::Node.send(:parse_cluster_node_reply, info) + got = @test_node.send(:parse_cluster_node_reply, info) assert_equal(want, got.map(&:to_h)) end @@ -281,7 +247,7 @@ def test_parse_cluster_node_reply_with_auxiliaries primary_id: '-', ping_sent: '0', pong_recv: '0', config_epoch: '1', link_state: 'connected', slots: [[0, 5460]] } ] - got = ::RedisClient::Cluster::Node.send(:parse_cluster_node_reply, info) + got = @test_node.send(:parse_cluster_node_reply, info) assert_equal(want, got.map(&:to_h)) end @@ -295,7 +261,7 @@ def test_parse_cluster_node_reply_failure_link_state e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001 myself,master - 0 0 1 disconnected 0-5460 INFO - assert_empty(::RedisClient::Cluster::Node.send(:parse_cluster_node_reply, info)) + assert_empty(@test_node.send(:parse_cluster_node_reply, info)) end def test_parse_cluster_node_reply_failure_flags @@ -308,7 +274,7 @@ def test_parse_cluster_node_reply_failure_flags e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001 myself,fail,master - 0 0 1 connected 0-5460 INFO - assert_empty(::RedisClient::Cluster::Node.send(:parse_cluster_node_reply, info)) + assert_empty(@test_node.send(:parse_cluster_node_reply, info)) end def test_inspect @@ -382,12 +348,25 @@ def test_send_ping assert_equal(want, got, 'Case: scale read') end - def test_clients_for_scanning # rubocop:disable Metrics/CyclomaticComplexity - want = @test_node_info_list.select(&:primary?).map(&:node_key).sort - got = @test_node.clients_for_scanning.map { |client| "#{client.config.host}:#{client.config.port}" } + def test_clients_for_scanning # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + test_config = @test_node.instance_variable_get(:@config) + want = @test_node_info_list.select(&:primary?) + .map(&:node_key) + # need to call client_config_for_node so that if we're using fixed_hostname in this test, + # we get the actual hostname we're connecting to, not the one returned by the cluster API + .map { |key| test_config.client_config_for_node(key) } + .map { |cfg| "#{cfg[:host]}:#{cfg[:port]}" } + .sort + got = @test_node.clients_for_scanning.map { |client| "#{client.config.host}:#{client.config.port}" }.sort assert_equal(want, got, 'Case: primary only') - want = @test_node_info_list.select(&:replica?).map(&:node_key) + want = @test_node_info_list.select(&:replica?) + .map(&:node_key) + # As per above, we need to get the real hostname, not that reported by Redis, + # if fixed_hostname is set. + .map { |key| test_config.client_config_for_node(key) } + .map { |cfg| "#{cfg[:host]}:#{cfg[:port]}" } + .sort got = @test_node_with_scale_read.clients_for_scanning.map { |client| "#{client.config.host}:#{client.config.port}" } got.each { |e| assert_includes(want, e, 'Case: scale read') } end @@ -611,6 +590,74 @@ def test_try_map end end end + + def test_reload + capture_buffer = CommandCaptureMiddleware::CommandBuffer.new + test_node = make_node(replica: true, capture_buffer: capture_buffer) + + capture_buffer.clear + test_node.reload! + + # It should have reloaded by calling CLUSTER NODES on three of the startup nodes + cluster_node_cmds = capture_buffer.to_a.select { |c| c.command == %w[CLUSTER NODES] } + assert_equal RedisClient::Cluster::Node::MAX_STARTUP_SAMPLE, cluster_node_cmds.size + + # It should have connected to all of the clients. + assert_equal TEST_NUMBER_OF_NODES, test_node.to_a.size + + # If we reload again, it should NOT change the redis client instances we have. + original_client_ids = test_node.to_a.map(&:object_id).to_set + test_node.reload! + new_client_ids = test_node.to_a.map(&:object_id).to_set + assert_equal original_client_ids, new_client_ids + end + + def test_reload_with_original_config + bootstrap_node = TEST_NODE_URIS.first + capture_buffer = CommandCaptureMiddleware::CommandBuffer.new + test_node = make_node( + nodes: [bootstrap_node], + replica: true, + connect_with_original_config: true, + capture_buffer: capture_buffer + ) + + test_node.reload! + # After reloading the first time, our Node object knows about all hosts, despite only starting with one: + assert_equal TEST_NUMBER_OF_NODES, test_node.to_a.size + + # When we reload, it will only call CLUSTER NODES against a single node, the bootstrap node. + capture_buffer.clear + test_node.reload! + + cluster_node_cmds = capture_buffer.to_a.select { |c| c.command == %w[CLUSTER NODES] } + assert_equal 1, cluster_node_cmds.size + assert_equal bootstrap_node, cluster_node_cmds.first.server_url + end + + def test_reload_concurrently + capture_buffer = CommandCaptureMiddleware::CommandBuffer.new + test_node = make_node(replica: true, pool: { size: 2 }, capture_buffer: capture_buffer) + + # Simulate refetch_node_info_list taking a long time + test_node.singleton_class.prepend(Module.new do + def refetch_node_info_list(...) + r = super + sleep 2 + r + end + end) + + capture_buffer.clear + t1 = Thread.new { test_node.reload! } + t2 = Thread.new { test_node.reload! } + [t1, t2].each(&:join) + + # We should only have reloaded once, which is to say, we only called CLUSTER NODES command MAX_STARTUP_SAMPLE + # times + cluster_node_cmds = capture_buffer.to_a.select { |c| c.command == %w[CLUSTER NODES] } + assert_equal RedisClient::Cluster::Node::MAX_STARTUP_SAMPLE, cluster_node_cmds.size + end end # rubocop:enable Metrics/ClassLength end diff --git a/test/redis_client/test_cluster.rb b/test/redis_client/test_cluster.rb index c4922b22..59fc2de5 100644 --- a/test/redis_client/test_cluster.rb +++ b/test/redis_client/test_cluster.rb @@ -6,7 +6,7 @@ class RedisClient class TestCluster module Mixin def setup - @captured_commands = [] + @captured_commands = CommandCaptureMiddleware::CommandBuffer.new @client = new_test_client @client.call('FLUSHDB') wait_for_replication @@ -461,6 +461,30 @@ def test_compatibility_with_redis_gem assert_raises(NoMethodError) { @client.densaugeo('1m') } end + def test_circuit_breakers + cli = ::RedisClient.cluster( + nodes: TEST_NODE_URIS, + fixed_hostname: TEST_FIXED_HOSTNAME, + # This option is important - need to make sure that the reloads happen on different connections + # to the timeouts, so that they don't count against the circuit breaker (they'll have their own breakers). + connect_with_original_config: true, + **TEST_GENERIC_OPTIONS.merge( + circuit_breaker: { + # Also important - the retry_count on resharding errors is set to 3, so we have to allow at lest + # that many errors to avoid tripping the breaker in the first call. + error_threshold: 4, + error_timeout: 60, + success_threshold: 10 + } + ) + ).new_client + + assert_raises(::RedisClient::ReadTimeoutError) { cli.blocking_call(0.1, 'BRPOP', 'foo', 0) } + assert_raises(::RedisClient::CircuitBreaker::OpenCircuitError) { cli.blocking_call(0.1, 'BRPOP', 'foo', 0) } + + cli&.close + end + private def wait_for_replication diff --git a/test/redis_client/test_cluster_config.rb b/test/redis_client/test_cluster_config.rb index fd4610cb..a745d8c1 100644 --- a/test/redis_client/test_cluster_config.rb +++ b/test/redis_client/test_cluster_config.rb @@ -5,23 +5,6 @@ class RedisClient class TestClusterConfig < TestingWrapper - def test_dup - orig = ::RedisClient::ClusterConfig.new - copy = orig.dup - refute_equal(orig.object_id, copy.object_id) - - ::RedisClient::ClusterConfig.instance_method(:initialize).parameters.each do |type, name| - case type - when :key - want = orig.instance_variable_get("@#{name}".to_sym) - got = copy.instance_variable_get("@#{name}".to_sym) - next if got.nil? - - assert_equal(want, got, "Case: #{type}=#{name}") - end - end - end - def test_inspect want = '#"127.0.0.1", :port=>6379}]>' got = ::RedisClient::ClusterConfig.new.inspect @@ -50,7 +33,7 @@ def test_new_client ) end - def test_per_node_key + def test_startup_nodes [ { config: ::RedisClient::ClusterConfig.new, @@ -75,9 +58,22 @@ def test_per_node_key want: { '127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, timeout: 1 } } + }, + { + config: ::RedisClient::ClusterConfig.new(nodes: ['redis://1.2.3.4:1234', 'rediss://5.6.7.8:5678']), + want: { + '1.2.3.4:1234' => { host: '1.2.3.4', port: 1234 }, + '5.6.7.8:5678' => { host: '5.6.7.8', port: 5678, ssl: true } + } + }, + { + config: ::RedisClient::ClusterConfig.new(custom: { foo: 'bar' }), + want: { + '127.0.0.1:6379' => { host: '127.0.0.1', port: 6379, custom: { foo: 'bar' } } + } } ].each_with_index do |c, idx| - assert_equal(c[:want], c[:config].per_node_key, "Case: #{idx}") + assert_equal(c[:want], c[:config].startup_nodes, "Case: #{idx}") end end @@ -103,20 +99,6 @@ def test_replica_affinity end end - def test_update_node - config = ::RedisClient::ClusterConfig.new(nodes: %w[redis://127.0.0.1:6379]) - assert_equal([{ host: '127.0.0.1', port: 6379 }], config.instance_variable_get(:@node_configs)) - config.update_node(%w[redis://127.0.0.2:6380]) - assert_equal([{ host: '127.0.0.2', port: 6380 }], config.instance_variable_get(:@node_configs)) - end - - def test_add_node - config = ::RedisClient::ClusterConfig.new(nodes: %w[redis://127.0.0.1:6379]) - assert_equal([{ host: '127.0.0.1', port: 6379 }], config.instance_variable_get(:@node_configs)) - config.add_node('127.0.0.2', 6380) - assert_equal([{ host: '127.0.0.1', port: 6379 }, { host: '127.0.0.2', port: 6380 }], config.instance_variable_get(:@node_configs)) - end - def test_command_builder assert_equal(::RedisClient::CommandBuilder, ::RedisClient::ClusterConfig.new.command_builder) end @@ -194,5 +176,19 @@ def test_merge_generic_config assert_equal(c[:want], got, msg) end end + + def test_client_config_for_node + config = ::RedisClient::ClusterConfig.new( + nodes: ['redis://username:password@1.2.3.4:1234', 'rediss://5.6.7.8:5678'], + custom: { foo: 'bar' } + ) + assert_equal({ + host: '9.9.9.9', + port: 9999, + username: 'username', + password: 'password', + custom: { foo: 'bar' } + }, config.client_config_for_node('9.9.9.9:9999')) + end end end