Skip to content

Commit

Permalink
Merge pull request #1 from sashacmc/liveliness_sample
Browse files Browse the repository at this point in the history
Add liveliness samples
  • Loading branch information
p-avital authored Nov 9, 2023
2 parents dc51ccb + 4928b11 commit cf215fd
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 19 deletions.
67 changes: 67 additions & 0 deletions examples/z_get_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <stdio.h>
#include <string.h>

#include "zenoh.h"

int main(int argc, char **argv) {
char *expr = "group1/**";
if (argc > 1) {
expr = argv[1];
}

z_keyexpr_t keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression\n", expr);
exit(-1);
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

printf("Sending liveliness query '%s'...\n", expr);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
zc_liveliness_get(z_loan(s), keyexpr, z_move(channel.send), NULL);
z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Alive token ('%s')\n", z_loan(keystr));
z_drop(z_move(keystr));
} else {
printf("Received an error\n");
}
}
z_drop(z_move(reply));
z_drop(z_move(channel));
z_close(z_move(s));
return 0;
}
76 changes: 76 additions & 0 deletions examples/z_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <stdio.h>
#include <string.h>
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif
#include "zenoh.h"

int main(int argc, char **argv) {
char *expr = "group1/zenoh-rs";
if (argc > 1) {
expr = argv[1];
}

z_keyexpr_t keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression\n", expr);
exit(-1);
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

printf("Declaring liveliness token '%s'...\n", expr);
zc_owned_liveliness_token_t token = zc_liveliness_declare_token(z_loan(s), keyexpr, NULL);
if (!z_check(token)) {
printf("Unable to create liveliness token!\n");
exit(-1);
}

printf("Enter 'd' to undeclare liveliness token, 'q' to quit...\n");
char c = 0;
while (c != 'q') {
c = getchar();
if (c == -1) {
sleep(1);
} else if (c == 'd') {
printf("Undeclaring liveliness token...\n");
z_drop(z_move(token));
}
}

z_drop(z_move(token));
z_close(z_move(s));
return 0;
}
86 changes: 86 additions & 0 deletions examples/z_sub_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdio.h>
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif
#include "zenoh.h"

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
switch (sample->kind) {
case Z_SAMPLE_KIND_PUT:
printf(">> [LivelinessSubscriber] New alive token ('%s')\n", z_loan(keystr));
break;
case Z_SAMPLE_KIND_DELETE:
printf(">> [LivelinessSubscriber] Dropped token ('%s')\n", z_loan(keystr));
break;
}
z_drop(z_move(keystr));
}

int main(int argc, char **argv) {
char *expr = "group1/**";
if (argc > 1) {
expr = argv[1];
}

z_keyexpr_t keyexpr = z_keyexpr(expr);
if (!z_check(keyexpr)) {
printf("%s is not a valid key expression\n", expr);
exit(-1);
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

printf("Declaring liveliness subscriber on '%s'...\n", expr);
z_owned_closure_sample_t callback = z_closure(data_handler);
z_owned_subscriber_t sub = zc_liveliness_declare_subscriber(z_loan(s), keyexpr, z_move(callback), NULL);
if (!z_check(sub)) {
printf("Unable to declare liveliness subscriber.\n");
exit(-1);
}

printf("Enter 'q' to quit...\n");
char c = 0;
while (c != 'q') {
c = getchar();
if (c == -1) {
sleep(1);
}
}

z_undeclare_subscriber(z_move(sub));
z_close(z_move(s));
return 0;
}
41 changes: 22 additions & 19 deletions include/zenoh_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
z_owned_reply_channel_t * : z_reply_channel_drop, \
zc_owned_payload_t * : zc_payload_drop, \
zc_owned_shmbuf_t * : zc_shmbuf_drop, \
zc_owned_shm_manager_t * : zc_shm_manager_drop \
zc_owned_shm_manager_t * : zc_shm_manager_drop, \
zc_owned_liveliness_token_t * : zc_liveliness_undeclare_token \
)(x)

#define z_null(x) (*x = \
Expand All @@ -62,27 +63,29 @@
z_owned_reply_channel_t * : z_reply_channel_null, \
zc_owned_payload_t * : zc_payload_null, \
zc_owned_shmbuf_t * : zc_shmbuf_null, \
zc_owned_shm_manager_t * : zc_shm_manager_null \
zc_owned_shm_manager_t * : zc_shm_manager_null, \
zc_owned_liveliness_token_t * : zc_liveliness_token_null \
)())

#define z_check(x) \
_Generic((x), z_owned_session_t : z_session_check, \
z_owned_publisher_t : z_publisher_check, \
z_owned_keyexpr_t : z_keyexpr_check, \
z_keyexpr_t : z_keyexpr_is_initialized, \
z_owned_config_t : z_config_check, \
z_owned_scouting_config_t : z_scouting_config_check, \
z_bytes_t : z_bytes_check, \
z_owned_subscriber_t : z_subscriber_check, \
z_owned_pull_subscriber_t : z_pull_subscriber_check, \
z_owned_queryable_t : z_queryable_check, \
z_owned_encoding_t : z_encoding_check, \
z_owned_reply_t : z_reply_check, \
z_owned_hello_t : z_hello_check, \
z_owned_str_t : z_str_check, \
zc_owned_payload_t : zc_payload_check, \
zc_owned_shmbuf_t : zc_shmbuf_check, \
zc_owned_shm_manager_t : zc_shm_manager_check \
_Generic((x), z_owned_session_t : z_session_check, \
z_owned_publisher_t : z_publisher_check, \
z_owned_keyexpr_t : z_keyexpr_check, \
z_keyexpr_t : z_keyexpr_is_initialized, \
z_owned_config_t : z_config_check, \
z_owned_scouting_config_t : z_scouting_config_check, \
z_bytes_t : z_bytes_check, \
z_owned_subscriber_t : z_subscriber_check, \
z_owned_pull_subscriber_t : z_pull_subscriber_check, \
z_owned_queryable_t : z_queryable_check, \
z_owned_encoding_t : z_encoding_check, \
z_owned_reply_t : z_reply_check, \
z_owned_hello_t : z_hello_check, \
z_owned_str_t : z_str_check, \
zc_owned_payload_t : zc_payload_check, \
zc_owned_shmbuf_t : zc_shmbuf_check, \
zc_owned_shm_manager_t : zc_shm_manager_check, \
zc_owned_liveliness_token_t : zc_liveliness_token_check \
)(&x)

#define z_call(x, ...) \
Expand Down

0 comments on commit cf215fd

Please sign in to comment.