diff --git a/README.md b/README.md index 105ad2f..396509b 100644 --- a/README.md +++ b/README.md @@ -459,6 +459,21 @@ aliased to `redisConnectCallback`: void(const redisAsyncContext *ac, int status); ``` +Alternatively, if `hiredis` >= v1.1.0 is used, you set a connect callback +that will be passed a non-const `redisAsyncContext*` on invocation (e.g. +to be able to set a push callback on it). + +```c +int redisClusterAsyncSetConnectCallbackNC(redisClusterAsyncContext *acc, + redisConnectCallbackNC *fn); +``` + +The callback function should have the following prototype, +aliased to `redisConnectCallbackNC`: +```c +void(redisAsyncContext *ac, int status); +``` + On a connection attempt, the `status` argument is set to `REDIS_OK` when the connection was successful. The file description of the connection socket can be retrieved diff --git a/examples/src/CMakeLists.txt b/examples/src/CMakeLists.txt index b1ede39..b27bd33 100644 --- a/examples/src/CMakeLists.txt +++ b/examples/src/CMakeLists.txt @@ -17,6 +17,11 @@ target_link_libraries(example_async hiredis_cluster::hiredis_cluster ${EVENT_LIBRARY}) +add_executable(clientside_caching_async clientside_caching_async.c) +target_link_libraries(clientside_caching_async + hiredis_cluster::hiredis_cluster + ${EVENT_LIBRARY}) + # Executable: tls if(ENABLE_SSL) find_package(hiredis_ssl REQUIRED) diff --git a/examples/src/clientside_caching_async.c b/examples/src/clientside_caching_async.c new file mode 100644 index 0000000..345b399 --- /dev/null +++ b/examples/src/clientside_caching_async.c @@ -0,0 +1,167 @@ +/* + * Simple example how to enable client tracking to implement client side caching. + * Tracking can be enabled via a registered connect callback and invalidation + * messages are received via the registered push callback. + * The disconnect callback should also be used as an indication of invalidation. + */ +#include +#include + +#include +#include +#include +#include + +#define CLUSTER_NODE "127.0.0.1:7000" +#define KEY "key:1" + +void pushCallback(redisAsyncContext *ac, void *r); +void setCallback(redisClusterAsyncContext *acc, void *r, void *privdata); +void getCallback1(redisClusterAsyncContext *acc, void *r, void *privdata); +void getCallback2(redisClusterAsyncContext *acc, void *r, void *privdata); +void modifyKey(const char *key, const char *value); + +/* The connect callback enables RESP3 and client tracking. + The non-const connect callback is used since we want to + set the push callback in the hiredis context. */ +void connectCallbackNC(redisAsyncContext *ac, int status) { + assert(status == REDIS_OK); + redisAsyncSetPushCallback(ac, pushCallback); + redisAsyncCommand(ac, NULL, NULL, "HELLO 3"); + redisAsyncCommand(ac, NULL, NULL, "CLIENT TRACKING ON"); + printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port); +} + +/* The event callback issues a 'SET' command when the client is ready to accept + commands. A reply is expected via a call to 'setCallback()' */ +void eventCallback(const redisClusterContext *cc, int event, void *privdata) { + (void)cc; + redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata; + + /* We send our commands when the client is ready to accept commands. */ + if (event == HIRCLUSTER_EVENT_READY) { + printf("Client is ready to accept commands\n"); + + int status = + redisClusterAsyncCommand(acc, setCallback, NULL, "SET %s 1", KEY); + assert(status == REDIS_OK); + } +} + +/* Message callback for 'SET' commands. Issues a 'GET' command and a reply is + expected as a call to 'getCallback1()' */ +void setCallback(redisClusterAsyncContext *acc, void *r, void *privdata) { + (void)privdata; + redisReply *reply = (redisReply *)r; + assert(reply != NULL); + printf("Callback for 'SET', reply: %s\n", reply->str); + + int status = + redisClusterAsyncCommand(acc, getCallback1, NULL, "GET %s", KEY); + assert(status == REDIS_OK); +} + +/* Message callback for the first 'GET' command. Modifies the key to + trigger Redis to send a key invalidation message and then sends another + 'GET' command. The invalidation message is received via the registered + push callback. */ +void getCallback1(redisClusterAsyncContext *acc, void *r, void *privdata) { + (void)privdata; + redisReply *reply = (redisReply *)r; + assert(reply != NULL); + + printf("Callback for first 'GET', reply: %s\n", reply->str); + + /* Modify the key from another client which will invalidate a cached value. + Redis will send an invalidation message via a push message. */ + modifyKey(KEY, "99"); + + int status = + redisClusterAsyncCommand(acc, getCallback2, NULL, "GET %s", KEY); + assert(status == REDIS_OK); +} + +/* Push message callback handling invalidation messages. */ +void pushCallback(redisAsyncContext *ac, void *r) { + redisReply *reply = r; + if (!(reply->type == REDIS_REPLY_PUSH && reply->elements == 2 && + reply->element[0]->type == REDIS_REPLY_STRING && + !strncmp(reply->element[0]->str, "invalidate", 10) && + reply->element[1]->type == REDIS_REPLY_ARRAY)) { + /* Not an 'invalidate' message. Ignore. */ + return; + } + redisReply *payload = reply->element[1]; + size_t i; + for (i = 0; i < payload->elements; i++) { + redisReply *key = payload->element[i]; + if (key->type == REDIS_REPLY_STRING) + printf("Invalidate key '%.*s'\n", (int)key->len, key->str); + else if (key->type == REDIS_REPLY_NIL) + printf("Invalidate all\n"); + } +} + +/* Message callback for 'GET' commands. Exits program. */ +void getCallback2(redisClusterAsyncContext *acc, void *r, void *privdata) { + (void)privdata; + redisReply *reply = (redisReply *)r; + assert(reply != NULL); + + printf("Callback for second 'GET', reply: %s\n", reply->str); + + /* Exit the eventloop after a couple of sent commands. */ + redisClusterAsyncDisconnect(acc); +} + +/* A disconnect callback should invalidate all cached keys. */ +void disconnectCallback(const redisAsyncContext *ac, int status) { + assert(status == REDIS_OK); + printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port); + + printf("Invalidate all\n"); +} + +/* Helper to modify keys using a separate client. */ +void modifyKey(const char *key, const char *value) { + printf("Modify key: '%s'\n", key); + redisClusterContext *cc = redisClusterContextInit(); + int status = redisClusterSetOptionAddNodes(cc, CLUSTER_NODE); + assert(status == REDIS_OK); + status = redisClusterConnect2(cc); + assert(status == REDIS_OK); + + redisReply *reply = redisClusterCommand(cc, "SET %s %s", key, value); + assert(reply != NULL); + freeReplyObject(reply); + + redisClusterFree(cc); +} + +int main(int argc, char **argv) { + redisClusterAsyncContext *acc = redisClusterAsyncContextInit(); + assert(acc); + + int status; + status = redisClusterAsyncSetConnectCallbackNC(acc, connectCallbackNC); + assert(status == REDIS_OK); + status = redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback); + assert(status == REDIS_OK); + status = redisClusterSetEventCallback(acc->cc, eventCallback, acc); + assert(status == REDIS_OK); + status = redisClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE); + assert(status == REDIS_OK); + + struct event_base *base = event_base_new(); + status = redisClusterLibeventAttach(acc, base); + assert(status == REDIS_OK); + + status = redisClusterAsyncConnect2(acc); + assert(status == REDIS_OK); + + event_base_dispatch(base); + + redisClusterAsyncFree(acc); + event_base_free(base); + return 0; +} diff --git a/examples/using_cmake_separate/build.sh b/examples/using_cmake_separate/build.sh index 75ea24a..dbf254a 100755 --- a/examples/using_cmake_separate/build.sh +++ b/examples/using_cmake_separate/build.sh @@ -9,7 +9,7 @@ script_dir=$(realpath "${0%/*}") repo_dir=$(git rev-parse --show-toplevel) # Download hiredis -hiredis_version=1.0.2 +hiredis_version=1.1.0 curl -L https://github.com/redis/hiredis/archive/v${hiredis_version}.tar.gz | tar -xz -C ${script_dir} # Build and install downloaded hiredis using CMake diff --git a/hircluster.c b/hircluster.c index 69b812f..2326b0a 100644 --- a/hircluster.c +++ b/hircluster.c @@ -3715,6 +3715,11 @@ redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc, if (acc->onConnect) { redisAsyncSetConnectCallback(ac, acc->onConnect); } +#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB + else if (acc->onConnectNC) { + redisAsyncSetConnectCallbackNC(ac, acc->onConnectNC); + } +#endif if (acc->onDisconnect) { redisAsyncSetDisconnectCallback(ac, acc->onDisconnect); @@ -3775,12 +3780,26 @@ int redisClusterAsyncConnect2(redisClusterAsyncContext *acc) { int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc, redisConnectCallback *fn) { - if (acc->onConnect == NULL) { - acc->onConnect = fn; - return REDIS_OK; + if (acc->onConnect != NULL) + return REDIS_ERR; +#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB + if (acc->onConnectNC != NULL) + return REDIS_ERR; +#endif + acc->onConnect = fn; + return REDIS_OK; +} + +#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB +int redisClusterAsyncSetConnectCallbackNC(redisClusterAsyncContext *acc, + redisConnectCallbackNC *fn) { + if (acc->onConnectNC != NULL || acc->onConnect != NULL) { + return REDIS_ERR; } - return REDIS_ERR; + acc->onConnectNC = fn; + return REDIS_OK; } +#endif int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc, redisDisconnectCallback *fn) { diff --git a/hircluster.h b/hircluster.h index b79dd3f..fd92f83 100644 --- a/hircluster.h +++ b/hircluster.h @@ -67,6 +67,13 @@ #define HIRCLUSTER_EVENT_READY 2 #define HIRCLUSTER_EVENT_FREE_CONTEXT 3 +/* The non-const connect callback API is not available when: + * - using hiredis prior v.1.1.0; or + * - built on Windows since hiredis_cluster.def can't have conditional definitions. */ +#if !(HIREDIS_MAJOR >= 1 && HIREDIS_MINOR >= 1) || _WIN32 +#define HIRCLUSTER_NO_NONCONST_CONNECT_CB +#endif + #ifdef __cplusplus extern "C" { #endif @@ -159,6 +166,9 @@ typedef struct redisClusterAsyncContext { /* Called when the first write event was received. */ redisConnectCallback *onConnect; +#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB + redisConnectCallbackNC *onConnectNC; +#endif } redisClusterAsyncContext; @@ -286,6 +296,10 @@ void redisClusterAsyncFree(redisClusterAsyncContext *acc); int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc, redisConnectCallback *fn); +#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB +int redisClusterAsyncSetConnectCallbackNC(redisClusterAsyncContext *acc, + redisConnectCallbackNC *fn); +#endif int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc, redisDisconnectCallback *fn); diff --git a/tests/ct_async.c b/tests/ct_async.c index e42254b..008e568 100644 --- a/tests/ct_async.c +++ b/tests/ct_async.c @@ -27,6 +27,16 @@ void connectCallback(const redisAsyncContext *ac, int status) { printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port); } +#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB +void connectCallbackNC(redisAsyncContext *ac, int status) { + UNUSED(ac); + UNUSED(status); + /* The testcase expects a failure during registration of this + non-const connect callback and it should never be called. */ + assert(0); +} +#endif + void disconnectCallback(const redisAsyncContext *ac, int status) { ASSERT_MSG(status == REDIS_OK, ac->errstr); printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port); @@ -66,6 +76,14 @@ int main(void) { int status; status = redisClusterAsyncSetConnectCallback(acc, connectCallback); assert(status == REDIS_OK); + status = redisClusterAsyncSetConnectCallback(acc, connectCallback); + assert(status == REDIS_ERR); /* Re-registration not accepted */ + +#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB + status = redisClusterAsyncSetConnectCallbackNC(acc, connectCallbackNC); + assert(status == REDIS_ERR); /* Re-registration not accepted */ +#endif + status = redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback); assert(status == REDIS_OK); status = redisClusterSetEventCallback(acc->cc, eventCallback, acc);