Skip to content

Commit

Permalink
support AsyncRedisCluster::redis, so that we can send command that ha…
Browse files Browse the repository at this point in the history
…s no key to cluster in async mode
  • Loading branch information
sewenew committed Apr 29, 2023
1 parent f1c02ee commit 4368319
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 8 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2597,8 +2597,15 @@ auto mget_res = async_cluster.mget<std::vector<OptionalString>>({"{hashtag}key1"
unordered_map<string, string> m = {{"a", "b"}, {"c", "d"}};
Future<void> hmset_res = async_redis.hmset("hash", m.begin(), m.end());
// Create an AsyncRedis object with hash-tag, so that we can send commands that has no key.
// It connects to Redis instance that holds the given key, i.e. hash-tag.
auto r = async_cluster.redis("hash-tag");
Future<string> ping_res = r.command<string>("ping");
```
**NOTE**: By default, when you use `AsyncRedisCluster::redis(const StringView &hash_tag, bool new_connection = true)` to create an `AsyncRedis` object, instead of picking a connection from the underlying connection pool, it creates a new connection to the corresponding Redis server. So this is NOT a cheap operation, and you should try to reuse this newly created `AsyncRedis` object as much as possible. If you pass `false` as the second parameter, you can create a `AsyncRedis` object without creating a new connection. However, in this case, you should be very careful, otherwise, you might get bad performance or even dead lock. Please carefully check the related [pipeline section](#very-important-notes) before using this feature. Also the returned `AsyncRedis` object is NOT thread-safe, and if it throws exception, you need to destroy it, and create a new one with the `AsyncRedisCluster::redis` method.
#### Async Subscriber
**NOTE**: I'm not quite satisfied with the interface of `AsyncSubscriber`. If you have a better idea, feel free to open an issue for discussion.
Expand Down
2 changes: 2 additions & 0 deletions src/sw/redis++/async_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ class GuardedAsyncConnection {
std::shared_ptr<AsyncConnection> _connection;
};

using GuardedAsyncConnectionSPtr = std::shared_ptr<GuardedAsyncConnection>;

namespace detail {

// We seperate this function from ClusterEvent to avoid
Expand Down
5 changes: 4 additions & 1 deletion src/sw/redis++/async_connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ AsyncConnectionPool& AsyncConnectionPool::operator=(AsyncConnectionPool &&that)
}

AsyncConnectionPool::~AsyncConnectionPool() {
assert(_loop);
if (!_loop) {
// This pool has been moved.
return;
}

// TODO: what if the connection has been borrowed but not returned?
// Or we dont' need to worry about that, since it's destructing and
Expand Down
4 changes: 4 additions & 0 deletions src/sw/redis++/async_redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ AsyncRedis::AsyncRedis(const std::shared_ptr<AsyncSentinel> &sentinel,
opts);
}

AsyncRedis::AsyncRedis(const GuardedAsyncConnectionSPtr &connection) : _connection(connection) {
assert(_connection);
}

AsyncSubscriber AsyncRedis::subscriber() {
// TODO: maybe we don't need to check this,
// since there's no Transaction or Pipeline for AsyncRedis
Expand Down
41 changes: 34 additions & 7 deletions src/sw/redis++/async_redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,10 @@ class AsyncRedis {
}

private:
friend class AsyncRedisCluster;

explicit AsyncRedis(const GuardedAsyncConnectionSPtr &connection);

explicit AsyncRedis(const Uri &uri);

template <typename Result, typename Formatter, typename ...Args>
Expand All @@ -1029,10 +1033,20 @@ class AsyncRedis {
Future<Result> _command_with_parser(Formatter formatter, Args &&...args) {
auto formatted_cmd = formatter(std::forward<Args>(args)...);

assert(_pool);
SafeAsyncConnection connection(*_pool);
if (_connection) {
// Single connection mode.
auto &connection = _connection->connection();
if (connection.broken()) {
throw Error("connection is broken");
}

return connection.send<Result, ResultParser>(std::move(formatted_cmd));
} else {
assert(_pool);
SafeAsyncConnection connection(*_pool);

return connection.connection().send<Result, ResultParser>(std::move(formatted_cmd));
return connection.connection().send<Result, ResultParser>(std::move(formatted_cmd));
}
}

template <typename Result, typename Callback, std::size_t ...Is, typename ...Args>
Expand Down Expand Up @@ -1065,16 +1079,29 @@ class AsyncRedis {
void _callback_command_with_parser(Callback &&cb, Formatter formatter, Args &&...args) {
auto formatted_cmd = formatter(std::forward<Args>(args)...);

assert(_pool);
SafeAsyncConnection connection(*_pool);
if (_connection) {
// Single connection mode.
auto &connection = _connection->connection();
if (connection.broken()) {
throw Error("connection is broken");
}

connection.connection().send<Result, ResultParser, Callback>(
std::move(formatted_cmd), std::forward<Callback>(cb));
connection.send<Result, ResultParser, Callback>(
std::move(formatted_cmd), std::forward<Callback>(cb));
} else {
assert(_pool);
SafeAsyncConnection connection(*_pool);

connection.connection().send<Result, ResultParser, Callback>(
std::move(formatted_cmd), std::forward<Callback>(cb));
}
}

EventLoopSPtr _loop;

AsyncConnectionPoolSPtr _pool;

GuardedAsyncConnectionSPtr _connection;
};

}
Expand Down
12 changes: 12 additions & 0 deletions src/sw/redis++/async_redis_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ AsyncRedisCluster::AsyncRedisCluster(const ConnectionOptions &opts,
_pool = std::make_shared<AsyncShardsPool>(_loop, pool_opts, opts, role);
}

AsyncRedis AsyncRedisCluster::redis(const StringView &hash_tag, bool new_connection) {
assert(_pool);

auto pool = _pool->fetch(hash_tag);
if (new_connection) {
// Create a new pool.
pool = std::make_shared<AsyncConnectionPool>(pool->clone());
}

return AsyncRedis(std::make_shared<GuardedAsyncConnection>(pool));
}

AsyncSubscriber AsyncRedisCluster::subscriber() {
assert(_pool);

Expand Down
3 changes: 3 additions & 0 deletions src/sw/redis++/async_redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "sw/redis++/utils.h"
#include "sw/redis++/async_connection.h"
#include "sw/redis++/async_connection_pool.h"
#include "sw/redis++/async_redis.h"
#include "sw/redis++/async_shards_pool.h"
#include "sw/redis++/async_subscriber.h"
#include "sw/redis++/event_loop.h"
Expand Down Expand Up @@ -48,6 +49,8 @@ class AsyncRedisCluster {

~AsyncRedisCluster() = default;

AsyncRedis redis(const StringView &hash_tag, bool new_connection = true);

AsyncSubscriber subscriber();

template <typename Result, typename ...Args>
Expand Down

0 comments on commit 4368319

Please sign in to comment.