Skip to content

Commit

Permalink
Add non-const connect callback (#205)
Browse files Browse the repository at this point in the history
If `hiredis` >= v1.1.0 is used an alternative connect callback can be registered using:

int redisClusterAsyncSetConnectCallbackNC(redisClusterAsyncContext *acc,
                                          redisConnectCallbackNC *fn);

The callback function should have following prototype, aliased to redisConnectCallbackNC:
   void(redisAsyncContext *ac, int status);

The connect callback will be passed a non-const `redisAsyncContext*` on invocation
which e.g. allows the callback to set a push callback.

The build will check if used platform or the hiredis version supports it,
but the new API can manually be disabled in a build via:
CFLAGS=-DHIRCLUSTER_NO_NONCONST_CONNECT_CB

Additional changes:
* Add test of re-registration of connect callback
* Add simplest client side caching example

Co-authored-by: Viktor Söderqvist <[email protected]>
  • Loading branch information
bjosv and zuiderkwast authored Mar 18, 2024
1 parent a8b8097 commit 07f6da1
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 5 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,21 @@ aliased to `redisConnectCallback`:
void(const redisAsyncContext *ac, int status);
```

Alternatively, if `hiredis` >= v1.1.0 is used, you set a connect callback
that will be passed a non-const `redisAsyncContext*` on invocation (e.g.
to be able to set a push callback on it).

```c
int redisClusterAsyncSetConnectCallbackNC(redisClusterAsyncContext *acc,
redisConnectCallbackNC *fn);
```
The callback function should have the following prototype,
aliased to `redisConnectCallbackNC`:
```c
void(redisAsyncContext *ac, int status);
```

On a connection attempt, the `status` argument is set to `REDIS_OK`
when the connection was successful.
The file description of the connection socket can be retrieved
Expand Down
5 changes: 5 additions & 0 deletions examples/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ target_link_libraries(example_async
hiredis_cluster::hiredis_cluster
${EVENT_LIBRARY})

add_executable(clientside_caching_async clientside_caching_async.c)
target_link_libraries(clientside_caching_async
hiredis_cluster::hiredis_cluster
${EVENT_LIBRARY})

# Executable: tls
if(ENABLE_SSL)
find_package(hiredis_ssl REQUIRED)
Expand Down
167 changes: 167 additions & 0 deletions examples/src/clientside_caching_async.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Simple example how to enable client tracking to implement client side caching.
* Tracking can be enabled via a registered connect callback and invalidation
* messages are received via the registered push callback.
* The disconnect callback should also be used as an indication of invalidation.
*/
#include <hiredis_cluster/adapters/libevent.h>
#include <hiredis_cluster/hircluster.h>

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define CLUSTER_NODE "127.0.0.1:7000"
#define KEY "key:1"

void pushCallback(redisAsyncContext *ac, void *r);
void setCallback(redisClusterAsyncContext *acc, void *r, void *privdata);
void getCallback1(redisClusterAsyncContext *acc, void *r, void *privdata);
void getCallback2(redisClusterAsyncContext *acc, void *r, void *privdata);
void modifyKey(const char *key, const char *value);

/* The connect callback enables RESP3 and client tracking.
The non-const connect callback is used since we want to
set the push callback in the hiredis context. */
void connectCallbackNC(redisAsyncContext *ac, int status) {
assert(status == REDIS_OK);
redisAsyncSetPushCallback(ac, pushCallback);
redisAsyncCommand(ac, NULL, NULL, "HELLO 3");
redisAsyncCommand(ac, NULL, NULL, "CLIENT TRACKING ON");
printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}

/* The event callback issues a 'SET' command when the client is ready to accept
commands. A reply is expected via a call to 'setCallback()' */
void eventCallback(const redisClusterContext *cc, int event, void *privdata) {
(void)cc;
redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata;

/* We send our commands when the client is ready to accept commands. */
if (event == HIRCLUSTER_EVENT_READY) {
printf("Client is ready to accept commands\n");

int status =
redisClusterAsyncCommand(acc, setCallback, NULL, "SET %s 1", KEY);
assert(status == REDIS_OK);
}
}

/* Message callback for 'SET' commands. Issues a 'GET' command and a reply is
expected as a call to 'getCallback1()' */
void setCallback(redisClusterAsyncContext *acc, void *r, void *privdata) {
(void)privdata;
redisReply *reply = (redisReply *)r;
assert(reply != NULL);
printf("Callback for 'SET', reply: %s\n", reply->str);

int status =
redisClusterAsyncCommand(acc, getCallback1, NULL, "GET %s", KEY);
assert(status == REDIS_OK);
}

/* Message callback for the first 'GET' command. Modifies the key to
trigger Redis to send a key invalidation message and then sends another
'GET' command. The invalidation message is received via the registered
push callback. */
void getCallback1(redisClusterAsyncContext *acc, void *r, void *privdata) {
(void)privdata;
redisReply *reply = (redisReply *)r;
assert(reply != NULL);

printf("Callback for first 'GET', reply: %s\n", reply->str);

/* Modify the key from another client which will invalidate a cached value.
Redis will send an invalidation message via a push message. */
modifyKey(KEY, "99");

int status =
redisClusterAsyncCommand(acc, getCallback2, NULL, "GET %s", KEY);
assert(status == REDIS_OK);
}

/* Push message callback handling invalidation messages. */
void pushCallback(redisAsyncContext *ac, void *r) {
redisReply *reply = r;
if (!(reply->type == REDIS_REPLY_PUSH && reply->elements == 2 &&
reply->element[0]->type == REDIS_REPLY_STRING &&
!strncmp(reply->element[0]->str, "invalidate", 10) &&
reply->element[1]->type == REDIS_REPLY_ARRAY)) {
/* Not an 'invalidate' message. Ignore. */
return;
}
redisReply *payload = reply->element[1];
size_t i;
for (i = 0; i < payload->elements; i++) {
redisReply *key = payload->element[i];
if (key->type == REDIS_REPLY_STRING)
printf("Invalidate key '%.*s'\n", (int)key->len, key->str);
else if (key->type == REDIS_REPLY_NIL)
printf("Invalidate all\n");
}
}

/* Message callback for 'GET' commands. Exits program. */
void getCallback2(redisClusterAsyncContext *acc, void *r, void *privdata) {
(void)privdata;
redisReply *reply = (redisReply *)r;
assert(reply != NULL);

printf("Callback for second 'GET', reply: %s\n", reply->str);

/* Exit the eventloop after a couple of sent commands. */
redisClusterAsyncDisconnect(acc);
}

/* A disconnect callback should invalidate all cached keys. */
void disconnectCallback(const redisAsyncContext *ac, int status) {
assert(status == REDIS_OK);
printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);

printf("Invalidate all\n");
}

/* Helper to modify keys using a separate client. */
void modifyKey(const char *key, const char *value) {
printf("Modify key: '%s'\n", key);
redisClusterContext *cc = redisClusterContextInit();
int status = redisClusterSetOptionAddNodes(cc, CLUSTER_NODE);
assert(status == REDIS_OK);
status = redisClusterConnect2(cc);
assert(status == REDIS_OK);

redisReply *reply = redisClusterCommand(cc, "SET %s %s", key, value);
assert(reply != NULL);
freeReplyObject(reply);

redisClusterFree(cc);
}

int main(int argc, char **argv) {
redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);

int status;
status = redisClusterAsyncSetConnectCallbackNC(acc, connectCallbackNC);
assert(status == REDIS_OK);
status = redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
assert(status == REDIS_OK);
status = redisClusterSetEventCallback(acc->cc, eventCallback, acc);
assert(status == REDIS_OK);
status = redisClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
assert(status == REDIS_OK);

struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);

status = redisClusterAsyncConnect2(acc);
assert(status == REDIS_OK);

event_base_dispatch(base);

redisClusterAsyncFree(acc);
event_base_free(base);
return 0;
}
2 changes: 1 addition & 1 deletion examples/using_cmake_separate/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ script_dir=$(realpath "${0%/*}")
repo_dir=$(git rev-parse --show-toplevel)

# Download hiredis
hiredis_version=1.0.2
hiredis_version=1.1.0
curl -L https://github.com/redis/hiredis/archive/v${hiredis_version}.tar.gz | tar -xz -C ${script_dir}

# Build and install downloaded hiredis using CMake
Expand Down
27 changes: 23 additions & 4 deletions hircluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -3715,6 +3715,11 @@ redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc,
if (acc->onConnect) {
redisAsyncSetConnectCallback(ac, acc->onConnect);
}
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
else if (acc->onConnectNC) {
redisAsyncSetConnectCallbackNC(ac, acc->onConnectNC);
}
#endif

if (acc->onDisconnect) {
redisAsyncSetDisconnectCallback(ac, acc->onDisconnect);
Expand Down Expand Up @@ -3775,12 +3780,26 @@ int redisClusterAsyncConnect2(redisClusterAsyncContext *acc) {

int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc,
redisConnectCallback *fn) {
if (acc->onConnect == NULL) {
acc->onConnect = fn;
return REDIS_OK;
if (acc->onConnect != NULL)
return REDIS_ERR;
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
if (acc->onConnectNC != NULL)
return REDIS_ERR;
#endif
acc->onConnect = fn;
return REDIS_OK;
}

#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
int redisClusterAsyncSetConnectCallbackNC(redisClusterAsyncContext *acc,
redisConnectCallbackNC *fn) {
if (acc->onConnectNC != NULL || acc->onConnect != NULL) {
return REDIS_ERR;
}
return REDIS_ERR;
acc->onConnectNC = fn;
return REDIS_OK;
}
#endif

int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc,
redisDisconnectCallback *fn) {
Expand Down
14 changes: 14 additions & 0 deletions hircluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@
#define HIRCLUSTER_EVENT_READY 2
#define HIRCLUSTER_EVENT_FREE_CONTEXT 3

/* The non-const connect callback API is not available when:
* - using hiredis prior v.1.1.0; or
* - built on Windows since hiredis_cluster.def can't have conditional definitions. */
#if !(HIREDIS_MAJOR >= 1 && HIREDIS_MINOR >= 1) || _WIN32
#define HIRCLUSTER_NO_NONCONST_CONNECT_CB
#endif

#ifdef __cplusplus
extern "C" {
#endif
Expand Down Expand Up @@ -159,6 +166,9 @@ typedef struct redisClusterAsyncContext {

/* Called when the first write event was received. */
redisConnectCallback *onConnect;
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
redisConnectCallbackNC *onConnectNC;
#endif

} redisClusterAsyncContext;

Expand Down Expand Up @@ -286,6 +296,10 @@ void redisClusterAsyncFree(redisClusterAsyncContext *acc);

int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc,
redisConnectCallback *fn);
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
int redisClusterAsyncSetConnectCallbackNC(redisClusterAsyncContext *acc,
redisConnectCallbackNC *fn);
#endif
int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc,
redisDisconnectCallback *fn);

Expand Down
18 changes: 18 additions & 0 deletions tests/ct_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ void connectCallback(const redisAsyncContext *ac, int status) {
printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}

#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
void connectCallbackNC(redisAsyncContext *ac, int status) {
UNUSED(ac);
UNUSED(status);
/* The testcase expects a failure during registration of this
non-const connect callback and it should never be called. */
assert(0);
}
#endif

void disconnectCallback(const redisAsyncContext *ac, int status) {
ASSERT_MSG(status == REDIS_OK, ac->errstr);
printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
Expand Down Expand Up @@ -66,6 +76,14 @@ int main(void) {
int status;
status = redisClusterAsyncSetConnectCallback(acc, connectCallback);
assert(status == REDIS_OK);
status = redisClusterAsyncSetConnectCallback(acc, connectCallback);
assert(status == REDIS_ERR); /* Re-registration not accepted */

#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
status = redisClusterAsyncSetConnectCallbackNC(acc, connectCallbackNC);
assert(status == REDIS_ERR); /* Re-registration not accepted */
#endif

status = redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
assert(status == REDIS_OK);
status = redisClusterSetEventCallback(acc->cc, eventCallback, acc);
Expand Down

0 comments on commit 07f6da1

Please sign in to comment.