Skip to content

Commit

Permalink
[redis_proxy] Add support for SELECT and KEYS (#37706)
Browse files Browse the repository at this point in the history
This RP mainly improves the compatibility of redis proxy, improvement as
follows:

* Add support for the select command, referring to the implementation of
Apache KVRocks and Microsoft Garnet, simply returns OK.
* Add keys command support, send keys commands to each shard on the
backend, and then merge the results.
* For unsupport commands, the return value is the same as Redis, because
some clients judge this return value, for example, [Java lettuce
client](https://github.com/redis/lettuce/blob/14de5b88f922581f996c3b3311cb253eb36b32db/src/main/java/io/lettuce/core/RedisHandshake.java#L328)

---------

Signed-off-by: duanhongyi <[email protected]>
  • Loading branch information
duanhongyi authored Dec 22, 2024
1 parent 66cc217 commit 712b736
Show file tree
Hide file tree
Showing 15 changed files with 809 additions and 46 deletions.
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ removed_config_or_runtime:
Removed runtime flag ``envoy.reloadable_features.exclude_host_in_eds_status_draining``.
new_features:
- area: redis
change: |
Added support for keys and select.
- area: wasm
change: |
Added the wasm vm reload support to reload wasm vm when the wasm vm is failed with runtime errors. See
Expand Down
4 changes: 3 additions & 1 deletion docs/root/intro/arch_overview/other_protocols/redis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,13 @@ For details on each command's usage see the official
EXISTS, Generic
EXPIRE, Generic
EXPIREAT, Generic
KEYS, String
PERSIST, Generic
PEXPIRE, Generic
PEXPIREAT, Generic
PTTL, Generic
RESTORE, Generic
SELECT, Generic
TOUCH, Generic
TTL, Generic
TYPE, Generic
Expand Down Expand Up @@ -300,7 +302,7 @@ Envoy can also generate its own errors in response to the client.
the connection."
invalid request, "Command was rejected by the first stage of the command splitter due to
datatype or length."
unsupported command, "The command was not recognized by Envoy and therefore cannot be serviced
ERR unknown command, "The command was not recognized by Envoy and therefore cannot be serviced
because it cannot be hashed to a backend server."
finished with n errors, "Fragmented commands which sum the response (e.g. DEL) will return the
total number of errors received if any were received."
Expand Down
21 changes: 19 additions & 2 deletions source/extensions/clusters/redis/redis_cluster_lb.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "source/extensions/clusters/redis/redis_cluster_lb.h"

#include <string>

namespace Envoy {
namespace Extensions {
namespace Clusters {
Expand Down Expand Up @@ -138,8 +140,17 @@ Upstream::HostConstSharedPtr RedisClusterLoadBalancerFactory::RedisClusterLoadBa
return nullptr;
}

auto shard = shard_vector_->at(
slot_array_->at(hash.value() % Envoy::Extensions::Clusters::Redis::MaxSlot));
RedisShardSharedPtr shard;
if (dynamic_cast<const RedisSpecifyShardContextImpl*>(context)) {
if (hash.value() < shard_vector_->size()) {
shard = shard_vector_->at(hash.value());
} else {
return nullptr;
}
} else {
shard = shard_vector_->at(
slot_array_->at(hash.value() % Envoy::Extensions::Clusters::Redis::MaxSlot));
}

auto redis_context = dynamic_cast<RedisLoadBalancerContext*>(context);
if (redis_context && redis_context->isReadCommand()) {
Expand Down Expand Up @@ -213,6 +224,12 @@ absl::string_view RedisLoadBalancerContextImpl::hashtag(absl::string_view v, boo

return v.substr(start + 1, end - start - 1);
}
RedisSpecifyShardContextImpl::RedisSpecifyShardContextImpl(
uint64_t shard_index, const NetworkFilters::Common::Redis::RespValue& request,
NetworkFilters::Common::Redis::Client::ReadPolicy read_policy)
: RedisLoadBalancerContextImpl(std::to_string(shard_index), true, true, request, read_policy),
shard_index_(shard_index) {}

} // namespace Redis
} // namespace Clusters
} // namespace Extensions
Expand Down
20 changes: 20 additions & 0 deletions source/extensions/clusters/redis/redis_cluster_lb.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ class RedisLoadBalancerContextImpl : public RedisLoadBalancerContext,
const NetworkFilters::Common::Redis::Client::ReadPolicy read_policy_;
};

class RedisSpecifyShardContextImpl : public RedisLoadBalancerContextImpl {
public:
/**
* The redis specify Shard load balancer context for Redis requests.
* @param shard_index specify the shard index for the Redis request.
* @param request specify the Redis request.
* @param read_policy specify the read policy.
*/
RedisSpecifyShardContextImpl(uint64_t shard_index,
const NetworkFilters::Common::Redis::RespValue& request,
NetworkFilters::Common::Redis::Client::ReadPolicy read_policy =
NetworkFilters::Common::Redis::Client::ReadPolicy::Primary);

// Upstream::LoadBalancerContextBase
absl::optional<uint64_t> computeHashKey() override { return shard_index_; }

private:
const absl::optional<uint64_t> shard_index_;
};

class ClusterSlotUpdateCallBack {
public:
virtual ~ClusterSlotUpdateCallBack() = default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ struct SupportedCommands {
*/
static const std::string& mset() { CONSTRUCT_ON_FIRST_USE(std::string, "mset"); }

/**
* @return keys command
*/
static const std::string& keys() { CONSTRUCT_ON_FIRST_USE(std::string, "keys"); }

/**
* @return ping command
*/
Expand All @@ -94,6 +99,11 @@ struct SupportedCommands {
*/
static const std::string& quit() { CONSTRUCT_ON_FIRST_USE(std::string, "quit"); }

/**
* @return select command
*/
static const std::string& select() { CONSTRUCT_ON_FIRST_USE(std::string, "select"); }

/**
* @return commands which alters the state of redis
*/
Expand All @@ -112,6 +122,14 @@ struct SupportedCommands {
static bool isReadCommand(const std::string& command) {
return !writeCommands().contains(command);
}

static bool isSupportedCommand(const std::string& command) {
return (simpleCommands().contains(command) || evalCommands().contains(command) ||
hashMultipleSumResultCommands().contains(command) ||
transactionCommands().contains(command) || auth() == command || echo() == command ||
mget() == command || mset() == command || keys() == command || ping() == command ||
time() == command || quit() == command || select() == command);
}
};

} // namespace Redis
Expand Down
136 changes: 129 additions & 7 deletions source/extensions/filters/network/redis_proxy/command_splitter_impl.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "source/extensions/filters/network/redis_proxy/command_splitter_impl.h"

#include <cstdint>

#include "source/common/common/logger.h"
#include "source/extensions/filters/network/common/redis/supported_commands.h"

Expand Down Expand Up @@ -75,6 +77,35 @@ makeFragmentedRequest(const RouteSharedPtr& route, const std::string& command,
return handler;
}

/**
* Make request and maybe mirror the request based on the mirror policies of the route.
* @param route supplies the route matched with the request.
* @param command supplies the command of the request.
* @param key supplies the key of the request.
* @param incoming_request supplies the request.
* @param callbacks supplies the request completion callbacks.
* @param transaction supplies the transaction info of the current connection.
* @return PoolRequest* a handle to the active request or nullptr if the request could not be made
* for some reason.
*/
Common::Redis::Client::PoolRequest*
makeFragmentedRequestToShard(const RouteSharedPtr& route, const std::string& command,
uint16_t shard_index, const Common::Redis::RespValue& incoming_request,
ConnPool::PoolCallbacks& callbacks,
Common::Redis::Client::Transaction& transaction) {
auto handler = route->upstream(command)->makeRequestToShard(
shard_index, ConnPool::RespVariant(incoming_request), callbacks, transaction);
if (handler) {
for (auto& mirror_policy : route->mirrorPolicies()) {
if (mirror_policy->shouldMirror(command)) {
mirror_policy->upstream()->makeRequestToShard(
shard_index, ConnPool::RespVariant(incoming_request), null_pool_callbacks, transaction);
}
}
}
return handler;
}

// Send a string response downstream.
void localResponse(SplitCallbacks& callbacks, std::string response) {
Common::Redis::RespValuePtr res(new Common::Redis::RespValue());
Expand Down Expand Up @@ -385,6 +416,80 @@ void MSETRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t
}
}

SplitRequestPtr KeysRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source, bool delay_command_latency,
const StreamInfo::StreamInfo& stream_info) {
if (incoming_request->asArray().size() != 2) {
onWrongNumberOfArguments(callbacks, *incoming_request);
command_stats.error_.inc();
return nullptr;
}
const auto route = router.upstreamPool(incoming_request->asArray()[1].asString(), stream_info);
uint32_t shard_size =
route ? route->upstream(incoming_request->asArray()[0].asString())->shardSize() : 0;
if (shard_size == 0) {
command_stats.error_.inc();
callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost));
return nullptr;
}

std::unique_ptr<KeysRequest> request_ptr{
new KeysRequest(callbacks, command_stats, time_source, delay_command_latency)};
request_ptr->num_pending_responses_ = shard_size;
request_ptr->pending_requests_.reserve(request_ptr->num_pending_responses_);

request_ptr->pending_response_ = std::make_unique<Common::Redis::RespValue>();
request_ptr->pending_response_->type(Common::Redis::RespType::Array);

Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request);
for (uint32_t shard_index = 0; shard_index < shard_size; shard_index++) {
request_ptr->pending_requests_.emplace_back(*request_ptr, shard_index);
PendingRequest& pending_request = request_ptr->pending_requests_.back();

ENVOY_LOG(debug, "keys request shard index {}: {}", shard_index, base_request->toString());
pending_request.handle_ =
makeFragmentedRequestToShard(route, base_request->asArray()[0].asString(), shard_index,
*base_request, pending_request, callbacks.transaction());

if (!pending_request.handle_) {
pending_request.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost));
}
}

if (request_ptr->num_pending_responses_ > 0) {
return request_ptr;
}

return nullptr;
}

void KeysRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) {
pending_requests_[index].handle_ = nullptr;
switch (value->type()) {
case Common::Redis::RespType::Array: {
pending_response_->asArray().insert(pending_response_->asArray().end(),
value->asArray().begin(), value->asArray().end());
break;
}
default: {
error_count_++;
break;
}
}

ASSERT(num_pending_responses_ > 0);
if (--num_pending_responses_ == 0) {
updateStats(error_count_ == 0);
if (error_count_ == 0) {
callbacks_.onResponse(std::move(pending_response_));
} else {
callbacks_.onResponse(Common::Redis::Utility::makeError(
fmt::format("finished with {} error(s)", error_count_)));
}
}
}

SplitRequestPtr
SplitKeysSumResultRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
Expand Down Expand Up @@ -593,7 +698,7 @@ InstanceImpl::InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::s
Common::Redis::FaultManagerPtr&& fault_manager)
: router_(std::move(router)), simple_command_handler_(*router_),
eval_command_handler_(*router_), mget_handler_(*router_), mset_handler_(*router_),
split_keys_sum_result_handler_(*router_),
keys_handler_(*router_), split_keys_sum_result_handler_(*router_),
transaction_handler_(*router_), stats_{ALL_COMMAND_SPLITTER_STATS(
POOL_COUNTER_PREFIX(scope, stat_prefix + "splitter."))},
time_source_(time_source), fault_manager_(std::move(fault_manager)) {
Expand All @@ -616,6 +721,9 @@ InstanceImpl::InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::s
addHandler(scope, stat_prefix, Common::Redis::SupportedCommands::mset(), latency_in_micros,
mset_handler_);

addHandler(scope, stat_prefix, Common::Redis::SupportedCommands::keys(), latency_in_micros,
keys_handler_);

for (const std::string& command : Common::Redis::SupportedCommands::transactionCommands()) {
addHandler(scope, stat_prefix, command, latency_in_micros, transaction_handler_);
}
Expand All @@ -637,6 +745,15 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request,
}

std::string command_name = absl::AsciiStrToLower(request->asArray()[0].asString());
// Compatible with redis behavior, if there is an unsupported command, return immediately,
// this action must be performed before verifying auth, some redis clients rely on this behavior.
if (!Common::Redis::SupportedCommands::isSupportedCommand(command_name)) {
stats_.unsupported_command_.inc();
callbacks.onResponse(Common::Redis::Utility::makeError(fmt::format(
"ERR unknown command '{}', with args beginning with: {}", request->asArray()[0].asString(),
request->asArray().size() > 1 ? request->asArray()[1].asString() : "")));
return nullptr;
}

if (command_name == Common::Redis::SupportedCommands::auth()) {
if (request->asArray().size() < 2) {
Expand Down Expand Up @@ -704,6 +821,16 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request,
return nullptr;
}

if (command_name == Common::Redis::SupportedCommands::select()) {
// Respond to OK locally.
if (request->asArray().size() != 2) {
onInvalidRequest(callbacks);
return nullptr;
}
localResponse(callbacks, "OK");
return nullptr;
}

if (command_name == Common::Redis::SupportedCommands::quit()) {
callbacks.onQuit();
return nullptr;
Expand All @@ -718,12 +845,7 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request,

// Get the handler for the downstream request
auto handler = handler_lookup_table_.find(command_name.c_str());
if (handler == nullptr) {
stats_.unsupported_command_.inc();
callbacks.onResponse(Common::Redis::Utility::makeError(
fmt::format("unsupported command '{}'", request->asArray()[0].asString())));
return nullptr;
}
ASSERT(handler != nullptr);

// If we are within a transaction, forward all requests to the transaction handler (i.e. handler
// of "multi" command).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,26 @@ class MGETRequest : public FragmentedRequest {
void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) override;
};

/**
* KeysRequest sends the command to all Redis server. The response from each Redis (which
* must be an array) is merged and returned to the user. If there is any error or failure in
* processing the fragmented commands, an error will be returned.
*/
class KeysRequest : public FragmentedRequest {
public:
static SplitRequestPtr create(Router& router, Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source, bool delay_command_latency,
const StreamInfo::StreamInfo& stream_info);

private:
KeysRequest(SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source,
bool delay_command_latency)
: FragmentedRequest(callbacks, command_stats, time_source, delay_command_latency) {}
// RedisProxy::CommandSplitter::FragmentedRequest
void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) override;
};

/**
* SplitKeysSumResultRequest takes each key from the command and sends the same incoming command
* with each key to the appropriate Redis server. The response from each Redis (which must be an
Expand Down Expand Up @@ -390,6 +410,7 @@ class InstanceImpl : public Instance, Logger::Loggable<Logger::Id::redis> {
CommandHandlerFactory<EvalRequest> eval_command_handler_;
CommandHandlerFactory<MGETRequest> mget_handler_;
CommandHandlerFactory<MSETRequest> mset_handler_;
CommandHandlerFactory<KeysRequest> keys_handler_;
CommandHandlerFactory<SplitKeysSumResultRequest> split_keys_sum_result_handler_;
CommandHandlerFactory<TransactionRequest> transaction_handler_;
TrieLookupTable<HandlerDataPtr> handler_lookup_table_;
Expand Down
13 changes: 13 additions & 0 deletions source/extensions/filters/network/redis_proxy/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Instance {
public:
virtual ~Instance() = default;

virtual uint16_t shardSize() PURE;
/**
* Makes a redis request.
* @param hash_key supplies the key to use for consistent hashing.
Expand All @@ -64,6 +65,18 @@ class Instance {
virtual Common::Redis::Client::PoolRequest*
makeRequest(const std::string& hash_key, RespVariant&& request, PoolCallbacks& callbacks,
Common::Redis::Client::Transaction& transaction) PURE;
/**
* Makes a redis request.
* @param shard_index supplies the key to use for consistent hashing.
* @param request supplies the request to make.
* @param callbacks supplies the request completion callbacks.
* @param transaction supplies the transaction info of the current connection.
* @return PoolRequest* a handle to the active request or nullptr if the request could not be made
* for some reason.
*/
virtual Common::Redis::Client::PoolRequest*
makeRequestToShard(uint16_t shard_index, RespVariant&& request, PoolCallbacks& callbacks,
Common::Redis::Client::Transaction& transaction) PURE;
};

using InstanceSharedPtr = std::shared_ptr<Instance>;
Expand Down
Loading

0 comments on commit 712b736

Please sign in to comment.