diff --git a/lib/redis_client/cluster/pipeline.rb b/lib/redis_client/cluster/pipeline.rb index 4254db2..a5a77f2 100644 --- a/lib/redis_client/cluster/pipeline.rb +++ b/lib/redis_client/cluster/pipeline.rb @@ -283,9 +283,9 @@ def redirect_command(node, pipeline, inner_index) args = timeout.nil? ? [] : [timeout] if block.nil? - @router.try_send(node, method, command, args) + @router.send_command_to_node(node, method, command, args) else - @router.try_send(node, method, command, args, &block) + @router.send_command_to_node(node, method, command, args, &block) end end diff --git a/lib/redis_client/cluster/router.rb b/lib/redis_client/cluster/router.rb index c299536..fa0c15a 100644 --- a/lib/redis_client/cluster/router.rb +++ b/lib/redis_client/cluster/router.rb @@ -22,6 +22,13 @@ class Router attr_reader :config + Action = Struct.new( + 'RedisCommandRoutingAction', + :action_method_name, + :after_action_proc, + keyword_init: true + ) + def initialize(config, concurrent_worker, pool: nil, **kwargs) @config = config @concurrent_worker = concurrent_worker @@ -31,88 +38,20 @@ def initialize(config, concurrent_worker, pool: nil, **kwargs) @node.reload! @command = ::RedisClient::Cluster::Command.load(@node.replica_clients.shuffle, slow_command_timeout: config.slow_command_timeout) @command_builder = @config.command_builder + @dedicated_actions = build_dedicated_actions rescue ::RedisClient::Cluster::InitialSetupError => e e.with_config(config) raise end - def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/MethodLength - cmd_name = command.first - - if cmd_name.casecmp('get').zero? - node = assign_node(command) - try_send(node, method, command, args, &block) - elsif cmd_name.casecmp('mget').zero? - send_multiple_keys_command(command.first, method, command, args, &block) - elsif cmd_name.casecmp('set').zero? - node = assign_node(command) - try_send(node, method, command, args, &block) - elsif cmd_name.casecmp('mset').zero? - send_multiple_keys_command(command.first, method, command, args, &block) - elsif cmd_name.casecmp('del').zero? - send_multiple_keys_command(command.first, method, command, args, &block) - elsif cmd_name.casecmp('ping').zero? - @node.send_ping(method, command, args).first.then(&TSF.call(block)) - elsif cmd_name.casecmp('wait').zero? - send_wait_command(method, command, args, &block) - elsif cmd_name.casecmp('keys').zero? - @node.call_replicas(method, command, args).flatten.sort_by(&:to_s).then(&TSF.call(block)) - elsif cmd_name.casecmp('dbsize').zero? - @node.call_replicas(method, command, args).select { |e| e.is_a?(Integer) }.sum.then(&TSF.call(block)) - elsif cmd_name.casecmp('scan').zero? - scan(command, seed: 1) - elsif cmd_name.casecmp('lastsave').zero? - @node.call_all(method, command, args).sort_by(&:to_i).then(&TSF.call(block)) - elsif cmd_name.casecmp('role').zero? - @node.call_all(method, command, args, &block) - elsif cmd_name.casecmp('config').zero? - send_config_command(method, command, args, &block) - elsif cmd_name.casecmp('client').zero? - send_client_command(method, command, args, &block) - elsif cmd_name.casecmp('cluster').zero? - send_cluster_command(method, command, args, &block) - elsif cmd_name.casecmp('memory').zero? - send_memory_command(method, command, args, &block) - elsif cmd_name.casecmp('script').zero? - send_script_command(method, command, args, &block) - elsif cmd_name.casecmp('pubsub').zero? - send_pubsub_command(method, command, args, &block) - elsif cmd_name.casecmp('watch').zero? - send_watch_command(command, &block) - elsif cmd_name.casecmp('acl').zero? - @node.call_all(method, command, args).first.then(&TSF.call(block)) - elsif cmd_name.casecmp('auth').zero? - @node.call_all(method, command, args).first.then(&TSF.call(block)) - elsif cmd_name.casecmp('bgrewriteaof').zero? - @node.call_all(method, command, args).first.then(&TSF.call(block)) - elsif cmd_name.casecmp('bgsave').zero? - @node.call_all(method, command, args).first.then(&TSF.call(block)) - elsif cmd_name.casecmp('quit').zero? - @node.call_all(method, command, args).first.then(&TSF.call(block)) - elsif cmd_name.casecmp('save').zero? - @node.call_all(method, command, args).first.then(&TSF.call(block)) - elsif cmd_name.casecmp('flushall').zero? - @node.call_primaries(method, command, args).first.then(&TSF.call(block)) - elsif cmd_name.casecmp('flushdb').zero? - @node.call_primaries(method, command, args).first.then(&TSF.call(block)) - elsif cmd_name.casecmp('readonly').zero? - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(command.first).with_config(@config) - elsif cmd_name.casecmp('readwrite').zero? - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(command.first).with_config(@config) - elsif cmd_name.casecmp('shutdown').zero? - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(command.first).with_config(@config) - elsif cmd_name.casecmp('discard').zero? - raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(command.first).with_config(@config) - elsif cmd_name.casecmp('exec').zero? - raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(command.first).with_config(@config) - elsif cmd_name.casecmp('multi').zero? - raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(command.first).with_config(@config) - elsif cmd_name.casecmp('unwatch').zero? - raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(command.first).with_config(@config) - else - node = assign_node(command) - try_send(node, method, command, args, &block) - end + def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + return assign_node_and_send_command(method, command, args, &block) unless @dedicated_actions.key?(command.first) + + action = @dedicated_actions[command.first] + return send(action.action_method_name, method, command, args, &block) if action.after_action_proc.nil? + + reply = send(action.action_method_name, method, command, args) + action.after_action_proc.call(reply).then(&TSF.call(block)) rescue ::RedisClient::CircuitBreaker::OpenCircuitError raise rescue ::RedisClient::Cluster::Node::ReloadNeeded @@ -138,7 +77,12 @@ def send_command(method, command, *args, &block) # rubocop:disable Metrics/AbcSi end # @see https://redis.io/docs/reference/cluster-spec/#redirection-and-resharding Redirection and resharding - def try_send(node, method, command, args, retry_count: 3, &block) + def assign_node_and_send_command(method, command, args, retry_count: 3, &block) + node = assign_node(command) + send_command_to_node(node, method, command, args, retry_count: retry_count, &block) + end + + def send_command_to_node(node, method, command, args, retry_count: 3, &block) handle_redirection(node, command, retry_count: retry_count) do |on_node| if args.empty? # prevent memory allocation for variable-length args @@ -313,6 +257,81 @@ def close private + def build_dedicated_actions # rubocop:disable Metrics/AbcSize + pick_first = ->(reply) { reply.first } # rubocop:disable Style/SymbolProc + multiple_key_action = Action.new(action_method_name: :send_multiple_keys_command) + all_node_first_action = Action.new(action_method_name: :send_command_to_all_nodes, after_action_proc: pick_first) + primary_first_action = Action.new(action_method_name: :send_command_to_primaries, after_action_proc: pick_first) + not_supported_action = Action.new(action_method_name: :fail_not_supported_command) + keyless_action = Action.new(action_method_name: :fail_keyless_command) + actions = { + 'ping' => Action.new(action_method_name: :send_ping_command, after_action_proc: pick_first), + 'wait' => Action.new(action_method_name: :send_wait_command), + 'keys' => Action.new(action_method_name: :send_command_to_replicas, after_action_proc: ->(reply) { reply.flatten.sort_by(&:to_s) }), + 'dbsize' => Action.new(action_method_name: :send_command_to_replicas, after_action_proc: ->(reply) { reply.select { |e| e.is_a?(Integer) }.sum }), + 'scan' => Action.new(action_method_name: :send_scan_command), + 'lastsave' => Action.new(action_method_name: :send_command_to_all_nodes, after_action_proc: ->(reply) { reply.sort_by(&:to_i) }), + 'role' => Action.new(action_method_name: :send_command_to_all_nodes), + 'config' => Action.new(action_method_name: :send_config_command), + 'client' => Action.new(action_method_name: :send_client_command), + 'cluster' => Action.new(action_method_name: :send_cluster_command), + 'memory' => Action.new(action_method_name: :send_memory_command), + 'script' => Action.new(action_method_name: :send_script_command), + 'pubsub' => Action.new(action_method_name: :send_pubsub_command), + 'watch' => Action.new(action_method_name: :send_watch_command), + 'mget' => multiple_key_action, + 'mset' => multiple_key_action, + 'del' => multiple_key_action, + 'acl' => all_node_first_action, + 'auth' => all_node_first_action, + 'bgrewriteaof' => all_node_first_action, + 'bgsave' => all_node_first_action, + 'quit' => all_node_first_action, + 'save' => all_node_first_action, + 'flushall' => primary_first_action, + 'flushdb' => primary_first_action, + 'readonly' => not_supported_action, + 'readwrite' => not_supported_action, + 'shutdown' => not_supported_action, + 'discard' => keyless_action, + 'exec' => keyless_action, + 'multi' => keyless_action, + 'unwatch' => keyless_action + }.freeze + actions.each_with_object({}) do |(k, v), acc| + acc[k] = v + acc[k.upcase] = v + end.freeze + end + + def send_command_to_all_nodes(method, command, args, &block) + @node.call_all(method, command, args, &block) + end + + def send_command_to_primaries(method, command, args, &block) + @node.call_primaries(method, command, args, &block) + end + + def send_command_to_replicas(method, command, args, &block) + @node.call_replicas(method, command, args, &block) + end + + def send_ping_command(method, command, args, &block) + @node.send_ping(method, command, args, &block) + end + + def send_scan_command(_method, command, _args, &_block) + scan(command, seed: 1) + end + + def fail_not_supported_command(_method, command, _args, &_block) + raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(command.first).with_config(@config) + end + + def fail_keyless_command(_method, command, _args, &_block) + raise ::RedisClient::Cluster::AmbiguousNodeError.from_command(command.first).with_config(@config) + end + def send_wait_command(method, command, args, retry_count: 1, &block) # rubocop:disable Metrics/AbcSize @node.call_primaries(method, command, args).select { |r| r.is_a?(Integer) }.sum.then(&TSF.call(block)) rescue ::RedisClient::Cluster::ErrorCollection => e @@ -363,23 +382,23 @@ def send_client_command(method, command, args, &block) # rubocop:disable Metrics def send_cluster_command(method, command, args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity if command[1].casecmp('addslots').zero? - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config) + fail_not_supported_command(method, command, args, &block) elsif command[1].casecmp('delslots').zero? - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config) + fail_not_supported_command(method, command, args, &block) elsif command[1].casecmp('failover').zero? - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config) + fail_not_supported_command(method, command, args, &block) elsif command[1].casecmp('forget').zero? - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config) + fail_not_supported_command(method, command, args, &block) elsif command[1].casecmp('meet').zero? - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config) + fail_not_supported_command(method, command, args, &block) elsif command[1].casecmp('replicate').zero? - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config) + fail_not_supported_command(method, command, args, &block) elsif command[1].casecmp('reset').zero? - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config) + fail_not_supported_command(method, command, args, &block) elsif command[1].casecmp('set-config-epoch').zero? - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config) + fail_not_supported_command(method, command, args, &block) elsif command[1].casecmp('setslot').zero? - raise ::RedisClient::Cluster::OrchestrationCommandNotSupported.from_command(['cluster', command[1]]).with_config(@config) + fail_not_supported_command(method, command, args, &block) elsif command[1].casecmp('saveconfig').zero? @node.call_all(method, command, args).first.then(&TSF.call(block)) elsif command[1].casecmp('getkeysinslot').zero? @@ -428,7 +447,7 @@ def send_pubsub_command(method, command, args, &block) # rubocop:disable Metrics end end - def send_watch_command(command) + def send_watch_command(_method, command, _args, &_block) unless block_given? msg = 'A block required. And you need to use the block argument as a client for the transaction.' raise ::RedisClient::Cluster::Transaction::ConsistencyError.new(msg).with_config(@config) @@ -443,8 +462,9 @@ def send_watch_command(command) end end - def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity + def send_multiple_keys_command(method, command, args, &block) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity # This implementation is prioritized performance rather than readability or so. + cmd = command.first if cmd.casecmp('mget').zero? single_key_cmd = 'get' keys_step = 1 @@ -458,7 +478,7 @@ def send_multiple_keys_command(cmd, method, command, args, &block) # rubocop:dis raise NotImplementedError, cmd end - return try_send(assign_node(command), method, command, args, &block) if command.size <= keys_step + 1 || ::RedisClient::Cluster::KeySlotConverter.hash_tag_included?(command[1]) + return assign_node_and_send_command(method, command, args, &block) if command.size <= keys_step + 1 || ::RedisClient::Cluster::KeySlotConverter.hash_tag_included?(command[1]) seed = @config.use_replica? && @config.replica_affinity == :random ? nil : Random.new_seed pipeline = ::RedisClient::Cluster::Pipeline.new(self, @command_builder, @concurrent_worker, exception: true, seed: seed)