Skip to content

Commit

Permalink
Add liveliness bindings (#189)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc authored Nov 15, 2023
1 parent 0ddcdf6 commit 193fe45
Show file tree
Hide file tree
Showing 7 changed files with 623 additions and 19 deletions.
67 changes: 67 additions & 0 deletions examples/z_get_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <stdio.h>
#include <string.h>

#include "zenoh.h"

int main(int argc, char **argv) {
char *expr = "group1/**";
if (argc > 1) {
expr = argv[1];
}

z_keyexpr_t keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression\n", expr);
exit(-1);
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

printf("Sending liveliness query '%s'...\n", expr);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
zc_liveliness_get(z_loan(s), keyexpr, z_move(channel.send), NULL);
z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Alive token ('%s')\n", z_loan(keystr));
z_drop(z_move(keystr));
} else {
printf("Received an error\n");
}
}
z_drop(z_move(reply));
z_drop(z_move(channel));
z_close(z_move(s));
return 0;
}
76 changes: 76 additions & 0 deletions examples/z_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <stdio.h>
#include <string.h>
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif
#include "zenoh.h"

int main(int argc, char **argv) {
char *expr = "group1/zenoh-rs";
if (argc > 1) {
expr = argv[1];
}

z_keyexpr_t keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression\n", expr);
exit(-1);
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

printf("Declaring liveliness token '%s'...\n", expr);
zc_owned_liveliness_token_t token = zc_liveliness_declare_token(z_loan(s), keyexpr, NULL);
if (!z_check(token)) {
printf("Unable to create liveliness token!\n");
exit(-1);
}

printf("Enter 'd' to undeclare liveliness token, 'q' to quit...\n");
char c = 0;
while (c != 'q') {
c = getchar();
if (c == -1) {
sleep(1);
} else if (c == 'd') {
printf("Undeclaring liveliness token...\n");
z_drop(z_move(token));
}
}

z_drop(z_move(token));
z_close(z_move(s));
return 0;
}
86 changes: 86 additions & 0 deletions examples/z_sub_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdio.h>
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif
#include "zenoh.h"

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
switch (sample->kind) {
case Z_SAMPLE_KIND_PUT:
printf(">> [LivelinessSubscriber] New alive token ('%s')\n", z_loan(keystr));
break;
case Z_SAMPLE_KIND_DELETE:
printf(">> [LivelinessSubscriber] Dropped token ('%s')\n", z_loan(keystr));
break;
}
z_drop(z_move(keystr));
}

int main(int argc, char **argv) {
char *expr = "group1/**";
if (argc > 1) {
expr = argv[1];
}

z_keyexpr_t keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression\n", expr);
exit(-1);
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

printf("Declaring liveliness subscriber on '%s'...\n", expr);
z_owned_closure_sample_t callback = z_closure(data_handler);
z_owned_subscriber_t sub = zc_liveliness_declare_subscriber(z_loan(s), keyexpr, z_move(callback), NULL);
if (!z_check(sub)) {
printf("Unable to declare liveliness subscriber.\n");
exit(-1);
}

printf("Enter 'q' to quit...\n");
char c = 0;
while (c != 'q') {
c = getchar();
if (c == -1) {
sleep(1);
}
}

z_undeclare_subscriber(z_move(sub));
z_close(z_move(s));
return 0;
}
121 changes: 121 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,34 @@ typedef struct z_owned_scouting_config_t {
typedef struct z_subscriber_t {
const struct z_owned_subscriber_t *_0;
} z_subscriber_t;
/**
* The options for `zc_liveliness_declare_token`
*/
typedef struct zc_owned_liveliness_declaration_options_t {
uint8_t _inner;
} zc_owned_liveliness_declaration_options_t;
/**
* The options for `zc_liveliness_declare_subscriber`
*/
typedef struct zc_owned_liveliness_declare_subscriber_options_t {
uint8_t _inner;
} zc_owned_liveliness_declare_subscriber_options_t;
/**
* A liveliness token that can be used to provide the network with information about connectivity to its
* declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key
* expressions.
*
* A DELETE on the token's key expression will be received by subscribers if the token is destroyed, or if connectivity between the subscriber and the token's creator is lost.
*/
typedef struct zc_owned_liveliness_token_t {
uintptr_t _inner[4];
} zc_owned_liveliness_token_t;
/**
* The options for `zc_liveliness_declare_subscriber`
*/
typedef struct zc_owned_liveliness_get_options_t {
uint32_t timeout_ms;
} zc_owned_liveliness_get_options_t;
/**
* An owned payload, backed by a reference counted owner.
*
Expand Down Expand Up @@ -1679,6 +1707,99 @@ ZENOHC_API struct z_keyexpr_t zc_keyexpr_from_slice(const char *name, uintptr_t
ZENOHC_API
struct z_keyexpr_t zc_keyexpr_from_slice_unchecked(const char *start,
uintptr_t len);
/**
* Returns `true` if the options are valid.
*/
ZENOHC_API
bool zc_liveliness_declaration_options_check(const struct zc_owned_liveliness_declaration_options_t *_opts);
/**
* Destroys the options.
*/
ZENOHC_API
void zc_liveliness_declaration_options_drop(struct zc_owned_liveliness_declaration_options_t *opts);
/**
* The gravestone value for `zc_owned_liveliness_declaration_options_t`
*/
ZENOHC_API
struct zc_owned_liveliness_declaration_options_t zc_liveliness_declaration_options_null(void);
/**
* Declares a subscriber on liveliness tokens that intersect `key`.
*
* Passing `NULL` as options is valid and equivalent to passing a pointer to the default options.
*/
ZENOHC_API
struct z_owned_subscriber_t zc_liveliness_declare_subscriber(struct z_session_t session,
struct z_keyexpr_t key,
struct z_owned_closure_sample_t *callback,
const struct zc_owned_liveliness_declare_subscriber_options_t *_options);
/**
* Constructs and declares a liveliness token on the network.
*
* Liveliness token subscribers on an intersecting key expression will receive a PUT sample when connectivity
* is achieved, and a DELETE sample if it's lost.
*
* Passing `NULL` as options is valid and equivalent to a pointer to the default options.
*/
ZENOHC_API
struct zc_owned_liveliness_token_t zc_liveliness_declare_token(struct z_session_t session,
struct z_keyexpr_t key,
const struct zc_owned_liveliness_declaration_options_t *_options);
/**
* Queries liveliness tokens currently on the network with a key expression intersecting with `key`.
*
* Note that the same "value stealing" tricks apply as with a normal `z_get`
*
* Passing `NULL` as options is valid and equivalent to passing a pointer to the default options.
*/
ZENOHC_API
int8_t zc_liveliness_get(struct z_session_t session,
struct z_keyexpr_t key,
struct z_owned_closure_reply_t *callback,
const struct zc_owned_liveliness_get_options_t *options);
/**
* Returns `true` if the options are valid.
*/
ZENOHC_API
bool zc_liveliness_get_options_check(const struct zc_owned_liveliness_get_options_t *_opts);
/**
* The gravestone value for `zc_owned_liveliness_get_options_t`
*/
ZENOHC_API struct zc_owned_liveliness_get_options_t zc_liveliness_get_options_default(void);
/**
* Destroys the options.
*/
ZENOHC_API void zc_liveliness_get_options_drop(struct zc_owned_liveliness_get_options_t *opts);
/**
* The gravestone value for `zc_owned_liveliness_get_options_t`
*/
ZENOHC_API struct zc_owned_liveliness_get_options_t zc_liveliness_get_options_null(void);
/**
* Returns `true` if the options are valid.
*/
ZENOHC_API
bool zc_liveliness_subscriber_options_check(const struct zc_owned_liveliness_declare_subscriber_options_t *_opts);
/**
* Destroys the options.
*/
ZENOHC_API
void zc_liveliness_subscriber_options_drop(struct zc_owned_liveliness_declare_subscriber_options_t *opts);
/**
* The gravestone value for `zc_owned_liveliness_declare_subscriber_options_t`
*/
ZENOHC_API
struct zc_owned_liveliness_declare_subscriber_options_t zc_liveliness_subscriber_options_null(void);
/**
* Returns `true` unless the token is at its gravestone value.
*/
ZENOHC_API bool zc_liveliness_token_check(const struct zc_owned_liveliness_token_t *token);
/**
* The gravestone value for liveliness tokens.
*/
ZENOHC_API struct zc_owned_liveliness_token_t zc_liveliness_token_null(void);
/**
* Destroys a liveliness token, notifying subscribers of its destruction.
*/
ZENOHC_API void zc_liveliness_undeclare_token(struct zc_owned_liveliness_token_t *token);
/**
* Returns `false` if `payload` is the gravestone value.
*/
Expand Down
Loading

0 comments on commit 193fe45

Please sign in to comment.