From adef6dc728a351c0e5bae96585c7c9852d6a011b Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Tue, 31 Dec 2024 10:30:44 +0900 Subject: [PATCH 1/2] perf: make dedicated actions static --- lib/redis_client/cluster.rb | 1 - lib/redis_client/cluster/router.rb | 99 +++++++++++++++--------------- 2 files changed, 48 insertions(+), 52 deletions(-) diff --git a/lib/redis_client/cluster.rb b/lib/redis_client/cluster.rb index 55cd483..71a0ac4 100644 --- a/lib/redis_client/cluster.rb +++ b/lib/redis_client/cluster.rb @@ -19,7 +19,6 @@ def initialize(config = nil, pool: nil, concurrency: nil, **kwargs) @config = config.nil? ? ClusterConfig.new(**kwargs) : config @concurrent_worker = ::RedisClient::Cluster::ConcurrentWorker.create(**(concurrency || {})) @command_builder = @config.command_builder - @pool = pool @kwargs = kwargs @router = nil diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index f3efe47..9606ae8 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -17,8 +17,53 @@ class Cluster class Router ZERO_CURSOR_FOR_SCAN = '0' TSF = ->(f, x) { f.nil? ? x : f.call(x) }.curry + DEDICATED_ACTIONS = lambda do # rubocop:disable Metrics/BlockLength + pick_first = ->(reply) { reply.first } # rubocop:disable Style/SymbolProc + multiple_key_action = Action.new(method_name: :send_multiple_keys_command) + all_node_first_action = Action.new(method_name: :send_command_to_all_nodes, reply_transformer: pick_first) + primary_first_action = Action.new(method_name: :send_command_to_primaries, reply_transformer: pick_first) + not_supported_action = Action.new(method_name: :fail_not_supported_command) + keyless_action = Action.new(method_name: :fail_keyless_command) + { + 'ping' => Action.new(method_name: :send_ping_command, reply_transformer: pick_first), + 'wait' => Action.new(method_name: :send_wait_command), + 'keys' => Action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.flatten.sort_by(&:to_s) }), + 'dbsize' => Action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.select { |e| e.is_a?(Integer) }.sum }), + 'scan' => Action.new(method_name: :send_scan_command), + 'lastsave' => Action.new(method_name: :send_command_to_all_nodes, reply_transformer: ->(reply) { reply.sort_by(&:to_i) }), + 'role' => Action.new(method_name: :send_command_to_all_nodes), + 'config' => Action.new(method_name: :send_config_command), + 'client' => Action.new(method_name: :send_client_command), + 'cluster' => Action.new(method_name: :send_cluster_command), + 'memory' => Action.new(method_name: :send_memory_command), + 'script' => Action.new(method_name: :send_script_command), + 'pubsub' => Action.new(method_name: :send_pubsub_command), + 'watch' => Action.new(method_name: :send_watch_command), + 'mget' => multiple_key_action, + 'mset' => multiple_key_action, + 'del' => multiple_key_action, + 'acl' => all_node_first_action, + 'auth' => all_node_first_action, + 'bgrewriteaof' => all_node_first_action, + 'bgsave' => all_node_first_action, + 'quit' => all_node_first_action, + 'save' => all_node_first_action, + 'flushall' => primary_first_action, + 'flushdb' => primary_first_action, + 'readonly' => not_supported_action, + 'readwrite' => not_supported_action, + 'shutdown' => not_supported_action, + 'discard' => keyless_action, + 'exec' => keyless_action, + 'multi' => keyless_action, + 'unwatch' => keyless_action + }.each_with_object({}) do |(k, v), acc| + acc[k] = v + acc[k.upcase] = v + end + end.call.freeze - private_constant :ZERO_CURSOR_FOR_SCAN, :TSF + private_constant :ZERO_CURSOR_FOR_SCAN, :TSF, :DEDICATED_ACTIONS attr_reader :config @@ -38,16 +83,15 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs) @node.reload! @command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout) @command_builder = @config.command_builder - @dedicated_actions = build_dedicated_actions rescue ::RedisClient::Cluster::InitialSetupError => e e.with_config(config) raise end def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity - return assign_node_and_send_command(method, command, args, &block) unless @dedicated_actions.key?(command.first) + return assign_node_and_send_command(method, command, args, &block) unless DEDICATED_ACTIONS.key?(command.first) - action = @dedicated_actions[command.first] + action = DEDICATED_ACTIONS[command.first] return send(action.method_name, method, command, args, &block) if action.reply_transformer.nil? reply = send(action.method_name, method, command, args) @@ -257,53 +301,6 @@ def close private - def build_dedicated_actions # rubocop:disable Metrics/AbcSize - pick_first = ->(reply) { reply.first } # rubocop:disable Style/SymbolProc - multiple_key_action = Action.new(method_name: :send_multiple_keys_command) - all_node_first_action = Action.new(method_name: :send_command_to_all_nodes, reply_transformer: pick_first) - primary_first_action = Action.new(method_name: :send_command_to_primaries, reply_transformer: pick_first) - not_supported_action = Action.new(method_name: :fail_not_supported_command) - keyless_action = Action.new(method_name: :fail_keyless_command) - actions = { - 'ping' => Action.new(method_name: :send_ping_command, reply_transformer: pick_first), - 'wait' => Action.new(method_name: :send_wait_command), - 'keys' => Action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.flatten.sort_by(&:to_s) }), - 'dbsize' => Action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.select { |e| e.is_a?(Integer) }.sum }), - 'scan' => Action.new(method_name: :send_scan_command), - 'lastsave' => Action.new(method_name: :send_command_to_all_nodes, reply_transformer: ->(reply) { reply.sort_by(&:to_i) }), - 'role' => Action.new(method_name: :send_command_to_all_nodes), - 'config' => Action.new(method_name: :send_config_command), - 'client' => Action.new(method_name: :send_client_command), - 'cluster' => Action.new(method_name: :send_cluster_command), - 'memory' => Action.new(method_name: :send_memory_command), - 'script' => Action.new(method_name: :send_script_command), - 'pubsub' => Action.new(method_name: :send_pubsub_command), - 'watch' => Action.new(method_name: :send_watch_command), - 'mget' => multiple_key_action, - 'mset' => multiple_key_action, - 'del' => multiple_key_action, - 'acl' => all_node_first_action, - 'auth' => all_node_first_action, - 'bgrewriteaof' => all_node_first_action, - 'bgsave' => all_node_first_action, - 'quit' => all_node_first_action, - 'save' => all_node_first_action, - 'flushall' => primary_first_action, - 'flushdb' => primary_first_action, - 'readonly' => not_supported_action, - 'readwrite' => not_supported_action, - 'shutdown' => not_supported_action, - 'discard' => keyless_action, - 'exec' => keyless_action, - 'multi' => keyless_action, - 'unwatch' => keyless_action - }.freeze - actions.each_with_object({}) do |(k, v), acc| - acc[k] = v - acc[k.upcase] = v - end.freeze - end - def send_command_to_all_nodes(method, command, args, &block) @node.call_all(method, command, args, &block) end From bae50ba6f7fb15e92f801605f602d9c851ef28c7 Mon Sep 17 00:00:00 2001 From: Taishi Kasuga Date: Tue, 31 Dec 2024 10:39:02 +0900 Subject: [PATCH 2/2] fix --- lib/redis_client/cluster/router.rb | 46 +++++++++++++----------------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index 9606ae8..e32924c 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -18,27 +18,28 @@ class Router ZERO_CURSOR_FOR_SCAN = '0' TSF = ->(f, x) { f.nil? ? x : f.call(x) }.curry DEDICATED_ACTIONS = lambda do # rubocop:disable Metrics/BlockLength + action = Struct.new('RedisCommandRoutingAction', :method_name, :reply_transformer, keyword_init: true) pick_first = ->(reply) { reply.first } # rubocop:disable Style/SymbolProc - multiple_key_action = Action.new(method_name: :send_multiple_keys_command) - all_node_first_action = Action.new(method_name: :send_command_to_all_nodes, reply_transformer: pick_first) - primary_first_action = Action.new(method_name: :send_command_to_primaries, reply_transformer: pick_first) - not_supported_action = Action.new(method_name: :fail_not_supported_command) - keyless_action = Action.new(method_name: :fail_keyless_command) + multiple_key_action = action.new(method_name: :send_multiple_keys_command) + all_node_first_action = action.new(method_name: :send_command_to_all_nodes, reply_transformer: pick_first) + primary_first_action = action.new(method_name: :send_command_to_primaries, reply_transformer: pick_first) + not_supported_action = action.new(method_name: :fail_not_supported_command) + keyless_action = action.new(method_name: :fail_keyless_command) { - 'ping' => Action.new(method_name: :send_ping_command, reply_transformer: pick_first), - 'wait' => Action.new(method_name: :send_wait_command), - 'keys' => Action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.flatten.sort_by(&:to_s) }), - 'dbsize' => Action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.select { |e| e.is_a?(Integer) }.sum }), - 'scan' => Action.new(method_name: :send_scan_command), - 'lastsave' => Action.new(method_name: :send_command_to_all_nodes, reply_transformer: ->(reply) { reply.sort_by(&:to_i) }), - 'role' => Action.new(method_name: :send_command_to_all_nodes), - 'config' => Action.new(method_name: :send_config_command), - 'client' => Action.new(method_name: :send_client_command), - 'cluster' => Action.new(method_name: :send_cluster_command), - 'memory' => Action.new(method_name: :send_memory_command), - 'script' => Action.new(method_name: :send_script_command), - 'pubsub' => Action.new(method_name: :send_pubsub_command), - 'watch' => Action.new(method_name: :send_watch_command), + 'ping' => action.new(method_name: :send_ping_command, reply_transformer: pick_first), + 'wait' => action.new(method_name: :send_wait_command), + 'keys' => action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.flatten.sort_by(&:to_s) }), + 'dbsize' => action.new(method_name: :send_command_to_replicas, reply_transformer: ->(reply) { reply.select { |e| e.is_a?(Integer) }.sum }), + 'scan' => action.new(method_name: :send_scan_command), + 'lastsave' => action.new(method_name: :send_command_to_all_nodes, reply_transformer: ->(reply) { reply.sort_by(&:to_i) }), + 'role' => action.new(method_name: :send_command_to_all_nodes), + 'config' => action.new(method_name: :send_config_command), + 'client' => action.new(method_name: :send_client_command), + 'cluster' => action.new(method_name: :send_cluster_command), + 'memory' => action.new(method_name: :send_memory_command), + 'script' => action.new(method_name: :send_script_command), + 'pubsub' => action.new(method_name: :send_pubsub_command), + 'watch' => action.new(method_name: :send_watch_command), 'mget' => multiple_key_action, 'mset' => multiple_key_action, 'del' => multiple_key_action, @@ -67,13 +68,6 @@ class Router attr_reader :config - Action = Struct.new( - 'RedisCommandRoutingAction', - :method_name, - :reply_transformer, - keyword_init: true - ) - def initialize(config, concurrent_worker, pool: nil, **kwargs) @config = config @concurrent_worker = concurrent_worker