Skip to content

Commit

Permalink
Fix underlying RedisClient circuit breakers never firing (#309)
Browse files Browse the repository at this point in the history
  • Loading branch information
KJ Tsanaktsidis authored Jan 2, 2024
1 parent 38311a8 commit 09484ea
Show file tree
Hide file tree
Showing 18 changed files with 547 additions and 489 deletions.
267 changes: 160 additions & 107 deletions lib/redis_client/cluster/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ class Node
SLOT_SIZE = 16_384
MIN_SLOT = 0
MAX_SLOT = SLOT_SIZE - 1
IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze
DEAD_FLAGS = %w[fail? fail handshake noaddr noflags].freeze
ROLE_FLAGS = %w[master slave].freeze
EMPTY_ARRAY = [].freeze
EMPTY_HASH = {}.freeze

ReloadNeeded = Class.new(::RedisClient::Error)

Expand Down Expand Up @@ -92,119 +92,19 @@ def build_connection_prelude
end
end

class << self
def load_info(options, concurrent_worker, slow_command_timeout: -1, **kwargs) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
raise ::RedisClient::Cluster::InitialSetupError, [] if options.nil? || options.empty?

startup_size = options.size > MAX_STARTUP_SAMPLE ? MAX_STARTUP_SAMPLE : options.size
startup_options = options.to_a.sample(startup_size).to_h
startup_nodes = ::RedisClient::Cluster::Node.new(startup_options, concurrent_worker, **kwargs)
work_group = concurrent_worker.new_group(size: startup_size)

startup_nodes.each_with_index do |raw_client, i|
work_group.push(i, raw_client) do |client|
regular_timeout = client.read_timeout
client.read_timeout = slow_command_timeout > 0.0 ? slow_command_timeout : regular_timeout
reply = client.call('CLUSTER', 'NODES')
client.read_timeout = regular_timeout
parse_cluster_node_reply(reply)
rescue StandardError => e
e
ensure
client&.close
end
end

node_info_list = errors = nil

work_group.each do |i, v|
case v
when StandardError
errors ||= Array.new(startup_size)
errors[i] = v
else
node_info_list ||= Array.new(startup_size)
node_info_list[i] = v
end
end

work_group.close

raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.nil?

grouped = node_info_list.compact.group_by do |info_list|
info_list.sort_by!(&:id)
info_list.each_with_object(String.new(capacity: 128 * info_list.size)) do |e, a|
a << e.id << e.node_key << e.role << e.primary_id << e.config_epoch
end
end

grouped.max_by { |_, v| v.size }[1].first.freeze
end

private

# @see https://redis.io/commands/cluster-nodes/
# @see https://github.com/redis/redis/blob/78960ad57b8a5e6af743d789ed8fd767e37d42b8/src/cluster.c#L4660-L4683
def parse_cluster_node_reply(reply) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
reply.each_line("\n", chomp: true).filter_map do |line|
fields = line.split
flags = fields[2].split(',')
next unless fields[7] == 'connected' && (flags & DEAD_FLAGS).empty?

slots = if fields[8].nil?
EMPTY_ARRAY
else
fields[8..].reject { |str| str.start_with?('[') }
.map { |str| str.split('-').map { |s| Integer(s) } }
.map { |a| a.size == 1 ? a << a.first : a }
.map(&:sort)
end

::RedisClient::Cluster::Node::Info.new(
id: fields[0],
node_key: parse_node_key(fields[1]),
role: (flags & ROLE_FLAGS).first,
primary_id: fields[3],
ping_sent: fields[4],
pong_recv: fields[5],
config_epoch: fields[6],
link_state: fields[7],
slots: slots
)
end
end

# As redirection node_key is dependent on `cluster-preferred-endpoint-type` config,
# node_key should use hostname if present in CLUSTER NODES output.
#
# See https://redis.io/commands/cluster-nodes/ for details on the output format.
# node_address matches fhe format: <ip:port@cport[,hostname[,auxiliary_field=value]*]>
def parse_node_key(node_address)
ip_chunk, hostname, _auxiliaries = node_address.split(',')
ip_port_string = ip_chunk.split('@').first
return ip_port_string if hostname.nil? || hostname.empty?

port = ip_port_string.split(':')[1]
"#{hostname}:#{port}"
end
end

def initialize(
options,
concurrent_worker,
node_info_list: [],
with_replica: false,
replica_affinity: :random,
config:,
pool: nil,
**kwargs
)

@concurrent_worker = concurrent_worker
@slots = build_slot_node_mappings(node_info_list)
@replications = build_replication_mappings(node_info_list)
klass = make_topology_class(with_replica, replica_affinity)
@topology = klass.new(@replications, options, pool, @concurrent_worker, **kwargs)
@slots = build_slot_node_mappings(EMPTY_ARRAY)
@replications = build_replication_mappings(EMPTY_ARRAY)
klass = make_topology_class(config.use_replica?, config.replica_affinity)
@topology = klass.new(pool, @concurrent_worker, **kwargs)
@config = config
@mutex = Mutex.new
end

Expand Down Expand Up @@ -255,6 +155,14 @@ def clients_for_scanning(seed: nil)
@topology.clients_for_scanning(seed: seed).values.sort_by { |c| "#{c.config.host}-#{c.config.port}" }
end

def clients
@topology.clients.values
end

def primary_clients
@topology.primary_clients.values
end

def replica_clients
@topology.replica_clients.values
end
Expand Down Expand Up @@ -292,6 +200,20 @@ def update_slot(slot, node_key)
end
end

def reload!
with_reload_lock do
with_startup_clients(MAX_STARTUP_SAMPLE) do |startup_clients|
@node_info = refetch_node_info_list(startup_clients)
@node_configs = @node_info.to_h do |node_info|
[node_info.node_key, @config.client_config_for_node(node_info.node_key)]
end
@slots = build_slot_node_mappings(@node_info)
@replications = build_replication_mappings(@node_info)
@topology.process_topology_update!(@replications, @node_configs)
end
end
end

private

def make_topology_class(with_replica, replica_affinity)
Expand Down Expand Up @@ -378,6 +300,137 @@ def try_map(clients, &block) # rubocop:disable Metrics/AbcSize, Metrics/Cyclomat

[results, errors]
end

def refetch_node_info_list(startup_clients) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
startup_size = startup_clients.size
work_group = @concurrent_worker.new_group(size: startup_size)

startup_clients.each_with_index do |raw_client, i|
work_group.push(i, raw_client) do |client|
regular_timeout = client.read_timeout
client.read_timeout = @config.slow_command_timeout > 0.0 ? @config.slow_command_timeout : regular_timeout
reply = client.call('CLUSTER', 'NODES')
client.read_timeout = regular_timeout
parse_cluster_node_reply(reply)
rescue StandardError => e
e
ensure
client&.close
end
end

node_info_list = errors = nil

work_group.each do |i, v|
case v
when StandardError
errors ||= Array.new(startup_size)
errors[i] = v
else
node_info_list ||= Array.new(startup_size)
node_info_list[i] = v
end
end

work_group.close

raise ::RedisClient::Cluster::InitialSetupError, errors if node_info_list.nil?

grouped = node_info_list.compact.group_by do |info_list|
info_list.sort_by!(&:id)
info_list.each_with_object(String.new(capacity: 128 * info_list.size)) do |e, a|
a << e.id << e.node_key << e.role << e.primary_id << e.config_epoch
end
end

grouped.max_by { |_, v| v.size }[1].first
end

def parse_cluster_node_reply(reply) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
reply.each_line("\n", chomp: true).filter_map do |line|
fields = line.split
flags = fields[2].split(',')
next unless fields[7] == 'connected' && (flags & DEAD_FLAGS).empty?

slots = if fields[8].nil?
EMPTY_ARRAY
else
fields[8..].reject { |str| str.start_with?('[') }
.map { |str| str.split('-').map { |s| Integer(s) } }
.map { |a| a.size == 1 ? a << a.first : a }
.map(&:sort)
end

::RedisClient::Cluster::Node::Info.new(
id: fields[0],
node_key: parse_node_key(fields[1]),
role: (flags & ROLE_FLAGS).first,
primary_id: fields[3],
ping_sent: fields[4],
pong_recv: fields[5],
config_epoch: fields[6],
link_state: fields[7],
slots: slots
)
end
end

# As redirection node_key is dependent on `cluster-preferred-endpoint-type` config,
# node_key should use hostname if present in CLUSTER NODES output.
#
# See https://redis.io/commands/cluster-nodes/ for details on the output format.
# node_address matches fhe format: <ip:port@cport[,hostname[,auxiliary_field=value]*]>
def parse_node_key(node_address)
ip_chunk, hostname, _auxiliaries = node_address.split(',')
ip_port_string = ip_chunk.split('@').first
return ip_port_string if hostname.nil? || hostname.empty?

port = ip_port_string.split(':')[1]
"#{hostname}:#{port}"
end

def with_startup_clients(count) # rubocop:disable Metrics/AbcSize
if @config.connect_with_original_config
# If connect_with_original_config is set, that means we need to build actual client objects
# and close them, so that we e.g. re-resolve a DNS entry with the cluster nodes in it.
begin
# Memoize the startup clients, so we maintain RedisClient's internal circuit breaker configuration
# if it's set.
@startup_clients ||= @config.startup_nodes.values.sample(count).map do |node_config|
::RedisClient::Cluster::Node::Config.new(**node_config).new_client
end
yield @startup_clients
ensure
# Close the startup clients when we're done, so we don't maintain pointless open connections to
# the cluster though
@startup_clients&.each(&:close)
end
else
# (re-)connect using nodes we already know about.
# If this is the first time we're connecting to the cluster, we need to seed the topology with the
# startup clients though.
@topology.process_topology_update!({}, @config.startup_nodes) if @topology.clients.empty?
yield @topology.clients.values.sample(count)
end
end

def with_reload_lock
# What should happen with concurrent calls #reload? This is a realistic possibility if the cluster goes into
# a CLUSTERDOWN state, and we're using a pooled backend. Every thread will independently discover this, and
# call reload!.
# For now, if a reload is in progress, wait for that to complete, and consider that the same as us having
# performed the reload.
# Probably in the future we should add a circuit breaker to #reload itself, and stop trying if the cluster is
# obviously not working.
wait_start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@mutex.synchronize do
return if @last_reloaded_at && @last_reloaded_at > wait_start

r = yield
@last_reloaded_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
r
end
end
end
end
end
60 changes: 60 additions & 0 deletions lib/redis_client/cluster/node/base_topology.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# frozen_string_literal: true

class RedisClient
class Cluster
class Node
class BaseTopology
IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze
attr_reader :clients, :primary_clients, :replica_clients

def initialize(pool, concurrent_worker, **kwargs)
@pool = pool
@clients = {}
@client_options = kwargs.reject { |k, _| IGNORE_GENERIC_CONFIG_KEYS.include?(k) }
@concurrent_worker = concurrent_worker
@replications = EMPTY_HASH
@primary_node_keys = EMPTY_ARRAY
@replica_node_keys = EMPTY_ARRAY
@primary_clients = EMPTY_ARRAY
@replica_clients = EMPTY_ARRAY
end

def any_primary_node_key(seed: nil)
random = seed.nil? ? Random : Random.new(seed)
@primary_node_keys.sample(random: random)
end

def process_topology_update!(replications, options) # rubocop:disable Metrics/AbcSize
@replications = replications.freeze
@primary_node_keys = @replications.keys.sort.select { |k| options.key?(k) }.freeze
@replica_node_keys = @replications.values.flatten.sort.select { |k| options.key?(k) }.freeze

# Disconnect from nodes that we no longer want, and connect to nodes we're not connected to yet
disconnect_from_unwanted_nodes(options)
connect_to_new_nodes(options)

@primary_clients, @replica_clients = @clients.partition { |k, _| @primary_node_keys.include?(k) }.map(&:to_h)
@primary_clients.freeze
@replica_clients.freeze
end

private

def disconnect_from_unwanted_nodes(options)
(@clients.keys - options.keys).each do |node_key|
@clients.delete(node_key).close
end
end

def connect_to_new_nodes(options)
(options.keys - @clients.keys).each do |node_key|
option = options[node_key].merge(@client_options)
config = ::RedisClient::Cluster::Node::Config.new(scale_read: !@primary_node_keys.include?(node_key), **option)
client = @pool.nil? ? config.new_client : config.new_pool(**@pool)
@clients[node_key] = client
end
end
end
end
end
end
Loading

0 comments on commit 09484ea

Please sign in to comment.