Skip to content

Commit

Permalink
perf: make dedicated actions definition static (#422)
Browse files Browse the repository at this point in the history
  • Loading branch information
supercaracal authored Dec 31, 2024
1 parent c84f6fd commit 128c747
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 59 deletions.
1 change: 0 additions & 1 deletion lib/redis_client/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ def initialize(config = nil, pool: nil, concurrency: nil, **kwargs)
@config = config.nil? ? ClusterConfig.new(**kwargs) : config
@concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {}))
@command_builder = @config.command_builder

@pool = pool
@kwargs = kwargs
@router = nil
Expand Down
107 changes: 49 additions & 58 deletions lib/redis_client/cluster/router.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,57 @@ class Cluster
class Router
ZERO_CURSOR_FOR_SCAN = '0'
TSF = ->(f, x) { f.nil? ? x : f.call(x) }.curry
DEDICATED_ACTIONS = lambda do # rubocop:disable Metrics/BlockLength
action = Struct.new('RedisCommandRoutingAction', :method_name, :reply_transformer, keyword_init: true)
pick_first = ->(reply) { reply.first } # rubocop:disable Style/SymbolProc
multiple_key_action = action.new(method_name: :send_multiple_keys_command)
all_node_first_action = action.new(method_name: :send_command_to_all_nodes, reply_transformer: pick_first)
primary_first_action = action.new(method_name: :send_command_to_primaries, reply_transformer: pick_first)
not_supported_action = action.new(method_name: :fail_not_supported_command)
keyless_action = action.new(method_name: :fail_keyless_command)
{
'ping' => action.new(method_name: :send_ping_command, reply_transformer: pick_first),
'wait' => action.new(method_name: :send_wait_command),
'keys' => action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.flatten.sort_by(&:to_s) }),
'dbsize' => action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.select { |e| e.is_a?(Integer) }.sum }),
'scan' => action.new(method_name: :send_scan_command),
'lastsave' => action.new(method_name: :send_command_to_all_nodes, reply_transformer: ->(reply) { reply.sort_by(&:to_i) }),
'role' => action.new(method_name: :send_command_to_all_nodes),
'config' => action.new(method_name: :send_config_command),
'client' => action.new(method_name: :send_client_command),
'cluster' => action.new(method_name: :send_cluster_command),
'memory' => action.new(method_name: :send_memory_command),
'script' => action.new(method_name: :send_script_command),
'pubsub' => action.new(method_name: :send_pubsub_command),
'watch' => action.new(method_name: :send_watch_command),
'mget' => multiple_key_action,
'mset' => multiple_key_action,
'del' => multiple_key_action,
'acl' => all_node_first_action,
'auth' => all_node_first_action,
'bgrewriteaof' => all_node_first_action,
'bgsave' => all_node_first_action,
'quit' => all_node_first_action,
'save' => all_node_first_action,
'flushall' => primary_first_action,
'flushdb' => primary_first_action,
'readonly' => not_supported_action,
'readwrite' => not_supported_action,
'shutdown' => not_supported_action,
'discard' => keyless_action,
'exec' => keyless_action,
'multi' => keyless_action,
'unwatch' => keyless_action
}.each_with_object({}) do |(k, v), acc|
acc[k] = v
acc[k.upcase] = v
end
end.call.freeze

private_constant :ZERO_CURSOR_FOR_SCAN, :TSF
private_constant :ZERO_CURSOR_FOR_SCAN, :TSF, :DEDICATED_ACTIONS

attr_reader :config

Action = Struct.new(
'RedisCommandRoutingAction',
:method_name,
:reply_transformer,
keyword_init: true
)

def initialize(config, concurrent_worker, pool: nil, **kwargs)
@config = config
@concurrent_worker = concurrent_worker
Expand All @@ -38,16 +77,15 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs)
@node.reload!
@command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout)
@command_builder = @config.command_builder
@dedicated_actions = build_dedicated_actions
rescue ::RedisClient::Cluster::InitialSetupError => e
e.with_config(config)
raise
end

def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
return assign_node_and_send_command(method, command, args, &block) unless @dedicated_actions.key?(command.first)
return assign_node_and_send_command(method, command, args, &block) unless DEDICATED_ACTIONS.key?(command.first)

action = @dedicated_actions[command.first]
action = DEDICATED_ACTIONS[command.first]
return send(action.method_name, method, command, args, &block) if action.reply_transformer.nil?

reply = send(action.method_name, method, command, args)
Expand Down Expand Up @@ -257,53 +295,6 @@ def close

private

def build_dedicated_actions # rubocop:disable Metrics/AbcSize
pick_first = ->(reply) { reply.first } # rubocop:disable Style/SymbolProc
multiple_key_action = Action.new(method_name: :send_multiple_keys_command)
all_node_first_action = Action.new(method_name: :send_command_to_all_nodes, reply_transformer: pick_first)
primary_first_action = Action.new(method_name: :send_command_to_primaries, reply_transformer: pick_first)
not_supported_action = Action.new(method_name: :fail_not_supported_command)
keyless_action = Action.new(method_name: :fail_keyless_command)
actions = {
'ping' => Action.new(method_name: :send_ping_command, reply_transformer: pick_first),
'wait' => Action.new(method_name: :send_wait_command),
'keys' => Action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.flatten.sort_by(&:to_s) }),
'dbsize' => Action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.select { |e| e.is_a?(Integer) }.sum }),
'scan' => Action.new(method_name: :send_scan_command),
'lastsave' => Action.new(method_name: :send_command_to_all_nodes, reply_transformer: ->(reply) { reply.sort_by(&:to_i) }),
'role' => Action.new(method_name: :send_command_to_all_nodes),
'config' => Action.new(method_name: :send_config_command),
'client' => Action.new(method_name: :send_client_command),
'cluster' => Action.new(method_name: :send_cluster_command),
'memory' => Action.new(method_name: :send_memory_command),
'script' => Action.new(method_name: :send_script_command),
'pubsub' => Action.new(method_name: :send_pubsub_command),
'watch' => Action.new(method_name: :send_watch_command),
'mget' => multiple_key_action,
'mset' => multiple_key_action,
'del' => multiple_key_action,
'acl' => all_node_first_action,
'auth' => all_node_first_action,
'bgrewriteaof' => all_node_first_action,
'bgsave' => all_node_first_action,
'quit' => all_node_first_action,
'save' => all_node_first_action,
'flushall' => primary_first_action,
'flushdb' => primary_first_action,
'readonly' => not_supported_action,
'readwrite' => not_supported_action,
'shutdown' => not_supported_action,
'discard' => keyless_action,
'exec' => keyless_action,
'multi' => keyless_action,
'unwatch' => keyless_action
}.freeze
actions.each_with_object({}) do |(k, v), acc|
acc[k] = v
acc[k.upcase] = v
end.freeze
end

def send_command_to_all_nodes(method, command, args, &block)
@node.call_all(method, command, args, &block)
end
Expand Down

0 comments on commit 128c747

Please sign in to comment.