Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add liveliness bindings #189

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions examples/z_get_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <stdio.h>
#include <string.h>

#include "zenoh.h"

int main(int argc, char **argv) {
char *expr = "group1/**";
if (argc > 1) {
expr = argv[1];
}

z_keyexpr_t keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression\n", expr);
exit(-1);
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

printf("Sending liveliness query '%s'...\n", expr);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
zc_liveliness_get(z_loan(s), keyexpr, z_move(channel.send), NULL);
z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Alive token ('%s')\n", z_loan(keystr));
z_drop(z_move(keystr));
} else {
printf("Received an error\n");
}
}
z_drop(z_move(reply));
z_drop(z_move(channel));
z_close(z_move(s));
return 0;
}
76 changes: 76 additions & 0 deletions examples/z_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <stdio.h>
#include <string.h>
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif
#include "zenoh.h"

int main(int argc, char **argv) {
char *expr = "group1/zenoh-rs";
if (argc > 1) {
expr = argv[1];
}

z_keyexpr_t keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression\n", expr);
exit(-1);
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

printf("Declaring liveliness token '%s'...\n", expr);
zc_owned_liveliness_token_t token = zc_liveliness_declare_token(z_loan(s), keyexpr, NULL);
if (!z_check(token)) {
printf("Unable to create liveliness token!\n");
exit(-1);
}

printf("Enter 'd' to undeclare liveliness token, 'q' to quit...\n");
char c = 0;
while (c != 'q') {
c = getchar();
if (c == -1) {
sleep(1);
} else if (c == 'd') {
printf("Undeclaring liveliness token...\n");
z_drop(z_move(token));
}
}

z_drop(z_move(token));
z_close(z_move(s));
return 0;
}
86 changes: 86 additions & 0 deletions examples/z_sub_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdio.h>
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif
#include "zenoh.h"

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
switch (sample->kind) {
case Z_SAMPLE_KIND_PUT:
printf(">> [LivelinessSubscriber] New alive token ('%s')\n", z_loan(keystr));
break;
case Z_SAMPLE_KIND_DELETE:
printf(">> [LivelinessSubscriber] Dropped token ('%s')\n", z_loan(keystr));
break;
}
z_drop(z_move(keystr));
}

int main(int argc, char **argv) {
char *expr = "group1/**";
if (argc > 1) {
expr = argv[1];
}

z_keyexpr_t keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression\n", expr);
exit(-1);
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

printf("Declaring liveliness subscriber on '%s'...\n", expr);
z_owned_closure_sample_t callback = z_closure(data_handler);
z_owned_subscriber_t sub = zc_liveliness_declare_subscriber(z_loan(s), keyexpr, z_move(callback), NULL);
if (!z_check(sub)) {
printf("Unable to declare liveliness subscriber.\n");
exit(-1);
}

printf("Enter 'q' to quit...\n");
char c = 0;
while (c != 'q') {
c = getchar();
if (c == -1) {
sleep(1);
}
}

z_undeclare_subscriber(z_move(sub));
z_close(z_move(s));
return 0;
}
121 changes: 121 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,34 @@ typedef struct z_owned_scouting_config_t {
typedef struct z_subscriber_t {
const struct z_owned_subscriber_t *_0;
} z_subscriber_t;
/**
* The options for `zc_liveliness_declare_token`
*/
typedef struct zc_owned_liveliness_declaration_options_t {
uint8_t _inner;
} zc_owned_liveliness_declaration_options_t;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The zenoh-commons.h header is meant to be common across zenoh-c and zenoh-pico.
All functions starting with zc_ are inherently meant to be specific to zenoh-c.
All these function declarations should hence go into zenoh-concrete.h.

@p-avital @milyin I see that other zc_ functions slipped in zenoh-commons.h in the past. I think they should be moved into zenoh-concrete.h as well. Opinions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the original goal was to be able to copy-paste this header into pico to ensure we always have the same signatures, but it didn't end up working out that way.

I think we can make the splitter logic a bit better if we want to stick to the original ideal. Otherwise, we could remove the splitter entirely, or just leave it in that state. Any odd these solutions is fine by me :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

zenoh-pico has evolved quite a bit since we initially made that split and I see how the split could hardly work out in practice. Therefore I'm more in favour of unifying all the includes in a single file rather than introducing artificial splits (although correct in principle).

For what concerns the scope of this specific PR, let's not change anything for the time being... A different PR may follow in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, though note that we'll never be single filed because the zenoh-macros file is still hand-written

/**
* The options for `zc_liveliness_declare_subscriber`
*/
typedef struct zc_owned_liveliness_declare_subscriber_options_t {
uint8_t _inner;
} zc_owned_liveliness_declare_subscriber_options_t;
/**
* A liveliness token that can be used to provide the network with information about connectivity to its
* declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key
* expressions.
*
* A DELETE on the token's key expression will be received by subscribers if the token is destroyed, or if connectivity between the subscriber and the token's creator is lost.
*/
typedef struct zc_owned_liveliness_token_t {
uintptr_t _inner[4];
} zc_owned_liveliness_token_t;
/**
* The options for `zc_liveliness_declare_subscriber`
*/
typedef struct zc_owned_liveliness_get_options_t {
uint32_t timeout_ms;
} zc_owned_liveliness_get_options_t;
/**
* An owned payload, backed by a reference counted owner.
*
Expand Down Expand Up @@ -1679,6 +1707,99 @@ ZENOHC_API struct z_keyexpr_t zc_keyexpr_from_slice(const char *name, uintptr_t
ZENOHC_API
struct z_keyexpr_t zc_keyexpr_from_slice_unchecked(const char *start,
uintptr_t len);
/**
* Returns `true` if the options are valid.
*/
ZENOHC_API
bool zc_liveliness_declaration_options_check(const struct zc_owned_liveliness_declaration_options_t *_opts);
/**
* Destroys the options.
*/
ZENOHC_API
void zc_liveliness_declaration_options_drop(struct zc_owned_liveliness_declaration_options_t *opts);
/**
* The gravestone value for `zc_owned_liveliness_declaration_options_t`
*/
ZENOHC_API
struct zc_owned_liveliness_declaration_options_t zc_liveliness_declaration_options_null(void);
/**
* Declares a subscriber on liveliness tokens that intersect `key`.
*
* Passing `NULL` as options is valid and equivalent to passing a pointer to the default options.
*/
ZENOHC_API
struct z_owned_subscriber_t zc_liveliness_declare_subscriber(struct z_session_t session,
struct z_keyexpr_t key,
struct z_owned_closure_sample_t *callback,
const struct zc_owned_liveliness_declare_subscriber_options_t *_options);
/**
* Constructs and declares a liveliness token on the network.
*
* Liveliness token subscribers on an intersecting key expression will receive a PUT sample when connectivity
* is achieved, and a DELETE sample if it's lost.
*
* Passing `NULL` as options is valid and equivalent to a pointer to the default options.
*/
ZENOHC_API
struct zc_owned_liveliness_token_t zc_liveliness_declare_token(struct z_session_t session,
struct z_keyexpr_t key,
const struct zc_owned_liveliness_declaration_options_t *_options);
/**
* Queries liveliness tokens currently on the network with a key expression intersecting with `key`.
*
* Note that the same "value stealing" tricks apply as with a normal `z_get`
*
* Passing `NULL` as options is valid and equivalent to passing a pointer to the default options.
*/
ZENOHC_API
int8_t zc_liveliness_get(struct z_session_t session,
struct z_keyexpr_t key,
struct z_owned_closure_reply_t *callback,
const struct zc_owned_liveliness_get_options_t *options);
/**
* Returns `true` if the options are valid.
*/
ZENOHC_API
bool zc_liveliness_get_options_check(const struct zc_owned_liveliness_get_options_t *_opts);
/**
* The gravestone value for `zc_owned_liveliness_get_options_t`
*/
ZENOHC_API struct zc_owned_liveliness_get_options_t zc_liveliness_get_options_default(void);
/**
* Destroys the options.
*/
ZENOHC_API void zc_liveliness_get_options_drop(struct zc_owned_liveliness_get_options_t *opts);
/**
* The gravestone value for `zc_owned_liveliness_get_options_t`
*/
ZENOHC_API struct zc_owned_liveliness_get_options_t zc_liveliness_get_options_null(void);
/**
* Returns `true` if the options are valid.
*/
ZENOHC_API
bool zc_liveliness_subscriber_options_check(const struct zc_owned_liveliness_declare_subscriber_options_t *_opts);
/**
* Destroys the options.
*/
ZENOHC_API
void zc_liveliness_subscriber_options_drop(struct zc_owned_liveliness_declare_subscriber_options_t *opts);
/**
* The gravestone value for `zc_owned_liveliness_declare_subscriber_options_t`
*/
ZENOHC_API
struct zc_owned_liveliness_declare_subscriber_options_t zc_liveliness_subscriber_options_null(void);
/**
* Returns `true` unless the token is at its gravestone value.
*/
ZENOHC_API bool zc_liveliness_token_check(const struct zc_owned_liveliness_token_t *token);
/**
* The gravestone value for liveliness tokens.
*/
ZENOHC_API struct zc_owned_liveliness_token_t zc_liveliness_token_null(void);
/**
* Destroys a liveliness token, notifying subscribers of its destruction.
*/
ZENOHC_API void zc_liveliness_undeclare_token(struct zc_owned_liveliness_token_t *token);
/**
* Returns `false` if `payload` is the gravestone value.
*/
Expand Down
Loading
Loading