Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: lessen conditional branches #415

Merged
merged 1 commit into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/redis_client/cluster/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ def redirect_command(node, pipeline, inner_index)
args = timeout.nil? ? [] : [timeout]

if block.nil?
@router.try_send(node, method, command, args)
@router.send_command_to_node(node, method, command, args)
else
@router.try_send(node, method, command, args, &block)
@router.send_command_to_node(node, method, command, args, &block)
end
end

Expand Down
200 changes: 110 additions & 90 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ class Router

attr_reader :config

Action = Struct.new(
'RedisCommandRoutingAction',
:action_method_name,
:after_action_proc,
keyword_init: true
)

def initialize(config, concurrent_worker, pool: nil, **kwargs)
@config = config
@concurrent_worker = concurrent_worker
Expand All @@ -31,88 +38,20 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs)
@node.reload!
@command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout)
@command_builder = @config.command_builder
@dedicated_actions = build_dedicated_actions
rescue ::RedisClient::Cluster::InitialSetupError => e
e.with_config(config)
raise
end

def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/MethodLength
cmd_name = command.first

if cmd_name.casecmp('get').zero?
node = assign_node(command)
try_send(node, method, command, args, &block)
elsif cmd_name.casecmp('mget').zero?
send_multiple_keys_command(command.first, method, command, args, &block)
elsif cmd_name.casecmp('set').zero?
node = assign_node(command)
try_send(node, method, command, args, &block)
elsif cmd_name.casecmp('mset').zero?
send_multiple_keys_command(command.first, method, command, args, &block)
elsif cmd_name.casecmp('del').zero?
send_multiple_keys_command(command.first, method, command, args, &block)
elsif cmd_name.casecmp('ping').zero?
@node.send_ping(method, command, args).first.then(&TSF.call(block))
elsif cmd_name.casecmp('wait').zero?
send_wait_command(method, command, args, &block)
elsif cmd_name.casecmp('keys').zero?
@node.call_replicas(method, command, args).flatten.sort_by(&:to_s).then(&TSF.call(block))
elsif cmd_name.casecmp('dbsize').zero?
@node.call_replicas(method, command, args).select { |e| e.is_a?(Integer) }.sum.then(&TSF.call(block))
elsif cmd_name.casecmp('scan').zero?
scan(command, seed: 1)
elsif cmd_name.casecmp('lastsave').zero?
@node.call_all(method, command, args).sort_by(&:to_i).then(&TSF.call(block))
elsif cmd_name.casecmp('role').zero?
@node.call_all(method, command, args, &block)
elsif cmd_name.casecmp('config').zero?
send_config_command(method, command, args, &block)
elsif cmd_name.casecmp('client').zero?
send_client_command(method, command, args, &block)
elsif cmd_name.casecmp('cluster').zero?
send_cluster_command(method, command, args, &block)
elsif cmd_name.casecmp('memory').zero?
send_memory_command(method, command, args, &block)
elsif cmd_name.casecmp('script').zero?
send_script_command(method, command, args, &block)
elsif cmd_name.casecmp('pubsub').zero?
send_pubsub_command(method, command, args, &block)
elsif cmd_name.casecmp('watch').zero?
send_watch_command(command, &block)
elsif cmd_name.casecmp('acl').zero?
@node.call_all(method, command, args).first.then(&TSF.call(block))
elsif cmd_name.casecmp('auth').zero?
@node.call_all(method, command, args).first.then(&TSF.call(block))
elsif cmd_name.casecmp('bgrewriteaof').zero?
@node.call_all(method, command, args).first.then(&TSF.call(block))
elsif cmd_name.casecmp('bgsave').zero?
@node.call_all(method, command, args).first.then(&TSF.call(block))
elsif cmd_name.casecmp('quit').zero?
@node.call_all(method, command, args).first.then(&TSF.call(block))
elsif cmd_name.casecmp('save').zero?
@node.call_all(method, command, args).first.then(&TSF.call(block))
elsif cmd_name.casecmp('flushall').zero?
@node.call_primaries(method, command, args).first.then(&TSF.call(block))
elsif cmd_name.casecmp('flushdb').zero?
@node.call_primaries(method, command, args).first.then(&TSF.call(block))
elsif cmd_name.casecmp('readonly').zero?
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(command.first).with_config(@config)
elsif cmd_name.casecmp('readwrite').zero?
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(command.first).with_config(@config)
elsif cmd_name.casecmp('shutdown').zero?
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(command.first).with_config(@config)
elsif cmd_name.casecmp('discard').zero?
raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(command.first).with_config(@config)
elsif cmd_name.casecmp('exec').zero?
raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(command.first).with_config(@config)
elsif cmd_name.casecmp('multi').zero?
raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(command.first).with_config(@config)
elsif cmd_name.casecmp('unwatch').zero?
raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(command.first).with_config(@config)
else
node = assign_node(command)
try_send(node, method, command, args, &block)
end
def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
return assign_node_and_send_command(method, command, args, &block) unless @dedicated_actions.key?(command.first)

action = @dedicated_actions[command.first]
return send(action.action_method_name, method, command, args, &block) if action.after_action_proc.nil?

reply = send(action.action_method_name, method, command, args)
action.after_action_proc.call(reply).then(&TSF.call(block))
rescue ::RedisClient::CircuitBreaker::OpenCircuitError
raise
rescue ::RedisClient::Cluster::Node::ReloadNeeded
Expand All @@ -138,7 +77,12 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi
end

# @see https://redis.io/docs/reference/cluster-spec/#redirection-and-resharding Redirection and resharding
def try_send(node, method, command, args, retry_count: 3, &block)
def assign_node_and_send_command(method, command, args, retry_count: 3, &block)
node = assign_node(command)
send_command_to_node(node, method, command, args, retry_count: retry_count, &block)
end

def send_command_to_node(node, method, command, args, retry_count: 3, &block)
handle_redirection(node, command, retry_count: retry_count) do |on_node|
if args.empty?
# prevent memory allocation for variable-length args
Expand Down Expand Up @@ -313,6 +257,81 @@ def close

private

def build_dedicated_actions # rubocop:disable Metrics/AbcSize
pick_first = ->(reply) { reply.first } # rubocop:disable Style/SymbolProc
multiple_key_action = Action.new(action_method_name: :send_multiple_keys_command)
all_node_first_action = Action.new(action_method_name: :send_command_to_all_nodes, after_action_proc: pick_first)
primary_first_action = Action.new(action_method_name: :send_command_to_primaries, after_action_proc: pick_first)
not_supported_action = Action.new(action_method_name: :fail_not_supported_command)
keyless_action = Action.new(action_method_name: :fail_keyless_command)
actions = {
'ping' => Action.new(action_method_name: :send_ping_command, after_action_proc: pick_first),
'wait' => Action.new(action_method_name: :send_wait_command),
'keys' => Action.new(action_method_name: :send_command_to_replicas, after_action_proc: ->(reply) { reply.flatten.sort_by(&:to_s) }),
'dbsize' => Action.new(action_method_name: :send_command_to_replicas, after_action_proc: ->(reply) { reply.select { |e| e.is_a?(Integer) }.sum }),
'scan' => Action.new(action_method_name: :send_scan_command),
'lastsave' => Action.new(action_method_name: :send_command_to_all_nodes, after_action_proc: ->(reply) { reply.sort_by(&:to_i) }),
'role' => Action.new(action_method_name: :send_command_to_all_nodes),
'config' => Action.new(action_method_name: :send_config_command),
'client' => Action.new(action_method_name: :send_client_command),
'cluster' => Action.new(action_method_name: :send_cluster_command),
'memory' => Action.new(action_method_name: :send_memory_command),
'script' => Action.new(action_method_name: :send_script_command),
'pubsub' => Action.new(action_method_name: :send_pubsub_command),
'watch' => Action.new(action_method_name: :send_watch_command),
'mget' => multiple_key_action,
'mset' => multiple_key_action,
'del' => multiple_key_action,
'acl' => all_node_first_action,
'auth' => all_node_first_action,
'bgrewriteaof' => all_node_first_action,
'bgsave' => all_node_first_action,
'quit' => all_node_first_action,
'save' => all_node_first_action,
'flushall' => primary_first_action,
'flushdb' => primary_first_action,
'readonly' => not_supported_action,
'readwrite' => not_supported_action,
'shutdown' => not_supported_action,
'discard' => keyless_action,
'exec' => keyless_action,
'multi' => keyless_action,
'unwatch' => keyless_action
}.freeze
actions.each_with_object({}) do |(k, v), acc|
acc[k] = v
acc[k.upcase] = v
end.freeze
end

def send_command_to_all_nodes(method, command, args, &block)
@node.call_all(method, command, args, &block)
end

def send_command_to_primaries(method, command, args, &block)
@node.call_primaries(method, command, args, &block)
end

def send_command_to_replicas(method, command, args, &block)
@node.call_replicas(method, command, args, &block)
end

def send_ping_command(method, command, args, &block)
@node.send_ping(method, command, args, &block)
end

def send_scan_command(_method, command, _args, &_block)
scan(command, seed: 1)
end

def fail_not_supported_command(_method, command, _args, &_block)
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(command.first).with_config(@config)
end

def fail_keyless_command(_method, command, _args, &_block)
raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(command.first).with_config(@config)
end

def send_wait_command(method, command, args, retry_count: 1, &block) # rubocop:disable Metrics/AbcSize
@node.call_primaries(method, command, args).select { |r| r.is_a?(Integer) }.sum.then(&TSF.call(block))
rescue ::RedisClient::Cluster::ErrorCollection => e
Expand Down Expand Up @@ -363,23 +382,23 @@ def send_client_command(method, command, args, &block) # rubocop:disable Metrics

def send_cluster_command(method, command, args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
if command[1].casecmp('addslots').zero?
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config)
fail_not_supported_command(method, command, args, &block)
elsif command[1].casecmp('delslots').zero?
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config)
fail_not_supported_command(method, command, args, &block)
elsif command[1].casecmp('failover').zero?
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config)
fail_not_supported_command(method, command, args, &block)
elsif command[1].casecmp('forget').zero?
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config)
fail_not_supported_command(method, command, args, &block)
elsif command[1].casecmp('meet').zero?
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config)
fail_not_supported_command(method, command, args, &block)
elsif command[1].casecmp('replicate').zero?
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config)
fail_not_supported_command(method, command, args, &block)
elsif command[1].casecmp('reset').zero?
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config)
fail_not_supported_command(method, command, args, &block)
elsif command[1].casecmp('set-config-epoch').zero?
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config)
fail_not_supported_command(method, command, args, &block)
elsif command[1].casecmp('setslot').zero?
raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config)
fail_not_supported_command(method, command, args, &block)
elsif command[1].casecmp('saveconfig').zero?
@node.call_all(method, command, args).first.then(&TSF.call(block))
elsif command[1].casecmp('getkeysinslot').zero?
Expand Down Expand Up @@ -428,7 +447,7 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics
end
end

def send_watch_command(command)
def send_watch_command(_method, command, _args, &_block)
unless block_given?
msg = 'A block required. And you need to use the block argument as a client for the transaction.'
raise ::RedisClient::Cluster::Transaction::ConsistencyError.new(msg).with_config(@config)
Expand All @@ -443,8 +462,9 @@ def send_watch_command(command)
end
end

def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def send_multiple_keys_command(method, command, args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
# This implementation is prioritized performance rather than readability or so.
cmd = command.first
if cmd.casecmp('mget').zero?
single_key_cmd = 'get'
keys_step = 1
Expand All @@ -458,7 +478,7 @@ def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:dis
raise NotImplementedError, cmd
end

return try_send(assign_node(command), method, command, args, &block) if command.size <= keys_step + 1 || ::RedisClient::Cluster::KeySlotConverter.hash_tag_included?(command[1])
return assign_node_and_send_command(method, command, args, &block) if command.size <= keys_step + 1 || ::RedisClient::Cluster::KeySlotConverter.hash_tag_included?(command[1])

seed = @config.use_replica? && @config.replica_affinity == :random ? nil : Random.new_seed
pipeline = ::RedisClient::Cluster::Pipeline.new(self, @command_builder, @concurrent_worker, exception: true, seed: seed)
Expand Down
Loading