From 5145f293879c9e5e997b4fdd18eb679943a66036 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Tue, 14 Nov 2023 18:25:00 +0100 Subject: [PATCH] Add liveliness bindings --- examples/z_get_liveliness.c | 67 ++++++++++ examples/z_liveliness.c | 76 +++++++++++ examples/z_sub_liveliness.c | 86 +++++++++++++ include/zenoh_commons.h | 121 ++++++++++++++++++ include/zenoh_macros.h | 41 +++--- src/lib.rs | 5 + src/liveliness.rs | 246 ++++++++++++++++++++++++++++++++++++ 7 files changed, 623 insertions(+), 19 deletions(-) create mode 100644 examples/z_get_liveliness.c create mode 100644 examples/z_liveliness.c create mode 100644 examples/z_sub_liveliness.c create mode 100644 src/liveliness.rs diff --git a/examples/z_get_liveliness.c b/examples/z_get_liveliness.c new file mode 100644 index 000000000..c667cb03a --- /dev/null +++ b/examples/z_get_liveliness.c @@ -0,0 +1,67 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include + +#include "zenoh.h" + +int main(int argc, char **argv) { + char *expr = "group1/**"; + if (argc > 1) { + expr = argv[1]; + } + + z_keyexpr_t keyexpr = z_keyexpr(expr); + if (!z_check(keyexpr)) { + printf("%s is not a valid key expression\n", expr); + exit(-1); + } + + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY); + exit(-1); + } + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + + printf("Sending liveliness query '%s'...\n", expr); + z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + zc_liveliness_get(z_loan(s), keyexpr, z_move(channel.send), NULL); + z_owned_reply_t reply = z_reply_null(); + for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + printf(">> Alive token ('%s')\n", z_loan(keystr)); + z_drop(z_move(keystr)); + } else { + printf("Received an error\n"); + } + } + z_drop(z_move(reply)); + z_drop(z_move(channel)); + z_close(z_move(s)); + return 0; +} diff --git a/examples/z_liveliness.c b/examples/z_liveliness.c new file mode 100644 index 000000000..b21cc40bc --- /dev/null +++ b/examples/z_liveliness.c @@ -0,0 +1,76 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) +#include +#define sleep(x) Sleep(x * 1000) +#else +#include +#endif +#include "zenoh.h" + +int main(int argc, char **argv) { + char *expr = "group1/zenoh-rs"; + if (argc > 1) { + expr = argv[1]; + } + + z_keyexpr_t keyexpr = z_keyexpr(expr); + if (!z_check(keyexpr)) { + printf("%s is not a valid key expression\n", expr); + exit(-1); + } + + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY); + exit(-1); + } + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + + printf("Declaring liveliness token '%s'...\n", expr); + zc_owned_liveliness_token_t token = zc_liveliness_declare_token(z_loan(s), keyexpr, NULL); + if (!z_check(token)) { + printf("Unable to create liveliness token!\n"); + exit(-1); + } + + printf("Enter 'd' to undeclare liveliness token, 'q' to quit...\n"); + char c = 0; + while (c != 'q') { + c = getchar(); + if (c == -1) { + sleep(1); + } else if (c == 'd') { + printf("Undeclaring liveliness token...\n"); + z_drop(z_move(token)); + } + } + + z_drop(z_move(token)); + z_close(z_move(s)); + return 0; +} diff --git a/examples/z_sub_liveliness.c b/examples/z_sub_liveliness.c new file mode 100644 index 000000000..f67ac5a68 --- /dev/null +++ b/examples/z_sub_liveliness.c @@ -0,0 +1,86 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) +#include +#define sleep(x) Sleep(x * 1000) +#else +#include +#endif +#include "zenoh.h" + +void data_handler(const z_sample_t *sample, void *arg) { + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + switch (sample->kind) { + case Z_SAMPLE_KIND_PUT: + printf(">> [LivelinessSubscriber] New alive token ('%s')\n", z_loan(keystr)); + break; + case Z_SAMPLE_KIND_DELETE: + printf(">> [LivelinessSubscriber] Dropped token ('%s')\n", z_loan(keystr)); + break; + } + z_drop(z_move(keystr)); +} + +int main(int argc, char **argv) { + char *expr = "group1/**"; + if (argc > 1) { + expr = argv[1]; + } + + z_keyexpr_t keyexpr = z_keyexpr(expr); + if (!z_check(keyexpr)) { + printf("%s is not a valid key expression\n", expr); + exit(-1); + } + + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY); + exit(-1); + } + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + + printf("Declaring liveliness subscriber on '%s'...\n", expr); + z_owned_closure_sample_t callback = z_closure(data_handler); + z_owned_subscriber_t sub = zc_liveliness_declare_subscriber(z_loan(s), keyexpr, z_move(callback), NULL); + if (!z_check(sub)) { + printf("Unable to declare liveliness subscriber.\n"); + exit(-1); + } + + printf("Enter 'q' to quit...\n"); + char c = 0; + while (c != 'q') { + c = getchar(); + if (c == -1) { + sleep(1); + } + } + + z_undeclare_subscriber(z_move(sub)); + z_close(z_move(s)); + return 0; +} diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 763c267aa..aecd40786 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -667,6 +667,34 @@ typedef struct z_owned_scouting_config_t { typedef struct z_subscriber_t { const struct z_owned_subscriber_t *_0; } z_subscriber_t; +/** + * The options for `zc_liveliness_declare_token` + */ +typedef struct zc_owned_liveliness_declaration_options_t { + uint8_t _inner; +} zc_owned_liveliness_declaration_options_t; +/** + * The options for `zc_liveliness_declare_subscriber` + */ +typedef struct zc_owned_liveliness_declare_subscriber_options_t { + uint8_t _inner; +} zc_owned_liveliness_declare_subscriber_options_t; +/** + * A liveliness token that can be used to provide the network with information about connectivity to its + * declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key + * expressions. + * + * A DELETE on the token's key expression will be received by subscribers if the token is destroyed, or if connectivity between the subscriber and the token's creator is lost. + */ +typedef struct zc_owned_liveliness_token_t { + uintptr_t _inner[4]; +} zc_owned_liveliness_token_t; +/** + * The options for `zc_liveliness_declare_subscriber` + */ +typedef struct zc_owned_liveliness_get_options_t { + uint32_t timeout_ms; +} zc_owned_liveliness_get_options_t; /** * An owned payload, backed by a reference counted owner. * @@ -1679,6 +1707,99 @@ ZENOHC_API struct z_keyexpr_t zc_keyexpr_from_slice(const char *name, uintptr_t ZENOHC_API struct z_keyexpr_t zc_keyexpr_from_slice_unchecked(const char *start, uintptr_t len); +/** + * Returns `true` if the options are valid. + */ +ZENOHC_API +bool zc_liveliness_declaration_options_check(const struct zc_owned_liveliness_declaration_options_t *_opts); +/** + * Destroys the options. + */ +ZENOHC_API +void zc_liveliness_declaration_options_drop(struct zc_owned_liveliness_declaration_options_t *opts); +/** + * The gravestone value for `zc_owned_liveliness_declaration_options_t` + */ +ZENOHC_API +struct zc_owned_liveliness_declaration_options_t zc_liveliness_declaration_options_null(void); +/** + * Declares a subscriber on liveliness tokens that intersect `key`. + * + * Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. + */ +ZENOHC_API +struct z_owned_subscriber_t zc_liveliness_declare_subscriber(struct z_session_t session, + struct z_keyexpr_t key, + struct z_owned_closure_sample_t *callback, + const struct zc_owned_liveliness_declare_subscriber_options_t *_options); +/** + * Constructs and declares a liveliness token on the network. + * + * Liveliness token subscribers on an intersecting key expression will receive a PUT sample when connectivity + * is achieved, and a DELETE sample if it's lost. + * + * Passing `NULL` as options is valid and equivalent to a pointer to the default options. + */ +ZENOHC_API +struct zc_owned_liveliness_token_t zc_liveliness_declare_token(struct z_session_t session, + struct z_keyexpr_t key, + const struct zc_owned_liveliness_declaration_options_t *_options); +/** + * Queries liveliness tokens currently on the network with a key expression intersecting with `key`. + * + * Note that the same "value stealing" tricks apply as with a normal `z_get` + * + * Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. + */ +ZENOHC_API +int8_t zc_liveliness_get(struct z_session_t session, + struct z_keyexpr_t key, + struct z_owned_closure_reply_t *callback, + const struct zc_owned_liveliness_get_options_t *options); +/** + * Returns `true` if the options are valid. + */ +ZENOHC_API +bool zc_liveliness_get_options_check(const struct zc_owned_liveliness_get_options_t *_opts); +/** + * The gravestone value for `zc_owned_liveliness_get_options_t` + */ +ZENOHC_API struct zc_owned_liveliness_get_options_t zc_liveliness_get_options_default(void); +/** + * Destroys the options. + */ +ZENOHC_API void zc_liveliness_get_options_drop(struct zc_owned_liveliness_get_options_t *opts); +/** + * The gravestone value for `zc_owned_liveliness_get_options_t` + */ +ZENOHC_API struct zc_owned_liveliness_get_options_t zc_liveliness_get_options_null(void); +/** + * Returns `true` if the options are valid. + */ +ZENOHC_API +bool zc_liveliness_subscriber_options_check(const struct zc_owned_liveliness_declare_subscriber_options_t *_opts); +/** + * Destroys the options. + */ +ZENOHC_API +void zc_liveliness_subscriber_options_drop(struct zc_owned_liveliness_declare_subscriber_options_t *opts); +/** + * The gravestone value for `zc_owned_liveliness_declare_subscriber_options_t` + */ +ZENOHC_API +struct zc_owned_liveliness_declare_subscriber_options_t zc_liveliness_subscriber_options_null(void); +/** + * Returns `true` unless the token is at its gravestone value. + */ +ZENOHC_API bool zc_liveliness_token_check(const struct zc_owned_liveliness_token_t *token); +/** + * The gravestone value for liveliness tokens. + */ +ZENOHC_API struct zc_owned_liveliness_token_t zc_liveliness_token_null(void); +/** + * Destroys a liveliness token, notifying subscribers of its destruction. + */ +ZENOHC_API void zc_liveliness_undeclare_token(struct zc_owned_liveliness_token_t *token); /** * Returns `false` if `payload` is the gravestone value. */ diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index d817fb6ad..2a2c23e51 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -37,7 +37,8 @@ z_owned_reply_channel_t * : z_reply_channel_drop, \ zc_owned_payload_t * : zc_payload_drop, \ zc_owned_shmbuf_t * : zc_shmbuf_drop, \ - zc_owned_shm_manager_t * : zc_shm_manager_drop \ + zc_owned_shm_manager_t * : zc_shm_manager_drop, \ + zc_owned_liveliness_token_t * : zc_liveliness_undeclare_token \ )(x) #define z_null(x) (*x = \ @@ -62,27 +63,29 @@ z_owned_reply_channel_t * : z_reply_channel_null, \ zc_owned_payload_t * : zc_payload_null, \ zc_owned_shmbuf_t * : zc_shmbuf_null, \ - zc_owned_shm_manager_t * : zc_shm_manager_null \ + zc_owned_shm_manager_t * : zc_shm_manager_null, \ + zc_owned_liveliness_token_t * : zc_liveliness_token_null \ )()) #define z_check(x) \ - _Generic((x), z_owned_session_t : z_session_check, \ - z_owned_publisher_t : z_publisher_check, \ - z_owned_keyexpr_t : z_keyexpr_check, \ - z_keyexpr_t : z_keyexpr_is_initialized, \ - z_owned_config_t : z_config_check, \ - z_owned_scouting_config_t : z_scouting_config_check, \ - z_bytes_t : z_bytes_check, \ - z_owned_subscriber_t : z_subscriber_check, \ - z_owned_pull_subscriber_t : z_pull_subscriber_check, \ - z_owned_queryable_t : z_queryable_check, \ - z_owned_encoding_t : z_encoding_check, \ - z_owned_reply_t : z_reply_check, \ - z_owned_hello_t : z_hello_check, \ - z_owned_str_t : z_str_check, \ - zc_owned_payload_t : zc_payload_check, \ - zc_owned_shmbuf_t : zc_shmbuf_check, \ - zc_owned_shm_manager_t : zc_shm_manager_check \ + _Generic((x), z_owned_session_t : z_session_check, \ + z_owned_publisher_t : z_publisher_check, \ + z_owned_keyexpr_t : z_keyexpr_check, \ + z_keyexpr_t : z_keyexpr_is_initialized, \ + z_owned_config_t : z_config_check, \ + z_owned_scouting_config_t : z_scouting_config_check, \ + z_bytes_t : z_bytes_check, \ + z_owned_subscriber_t : z_subscriber_check, \ + z_owned_pull_subscriber_t : z_pull_subscriber_check, \ + z_owned_queryable_t : z_queryable_check, \ + z_owned_encoding_t : z_encoding_check, \ + z_owned_reply_t : z_reply_check, \ + z_owned_hello_t : z_hello_check, \ + z_owned_str_t : z_str_check, \ + zc_owned_payload_t : zc_payload_check, \ + zc_owned_shmbuf_t : zc_shmbuf_check, \ + zc_owned_shm_manager_t : zc_shm_manager_check, \ + zc_owned_liveliness_token_t : zc_liveliness_token_check \ )(&x) #define z_call(x, ...) \ diff --git a/src/lib.rs b/src/lib.rs index c24a536a5..cc2a7bb3d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,9 @@ // Contributors: // ZettaScale Zenoh team, // + +#![allow(non_camel_case_types)] + mod collections; pub use crate::collections::*; mod config; @@ -39,6 +42,8 @@ mod publisher; pub use crate::publisher::*; mod closures; pub use closures::*; +mod liveliness; +pub use liveliness::*; #[cfg(feature = "shared-memory")] mod shm; diff --git a/src/liveliness.rs b/src/liveliness.rs new file mode 100644 index 000000000..ed5602e84 --- /dev/null +++ b/src/liveliness.rs @@ -0,0 +1,246 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh::{ + liveliness::{Liveliness, LivelinessToken}, + prelude::{SessionDeclarations, SplitBuffer}, +}; +use zenoh_util::core::{zresult::ErrNo, SyncResolve}; + +use crate::{ + z_closure_reply_call, z_closure_sample_call, z_keyexpr_t, z_owned_closure_reply_t, + z_owned_closure_sample_t, z_owned_subscriber_t, z_sample_t, z_session_t, +}; + +/// A liveliness token that can be used to provide the network with information about connectivity to its +/// declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key +/// expressions. +/// +/// A DELETE on the token's key expression will be received by subscribers if the token is destroyed, or if connectivity between the subscriber and the token's creator is lost. +#[repr(C)] +pub struct zc_owned_liveliness_token_t { + _inner: [usize; 4], +} + +/// The gravestone value for liveliness tokens. +#[no_mangle] +pub extern "C" fn zc_liveliness_token_null() -> zc_owned_liveliness_token_t { + zc_owned_liveliness_token_t { _inner: [0; 4] } +} + +/// Returns `true` unless the token is at its gravestone value. +#[no_mangle] +pub extern "C" fn zc_liveliness_token_check(token: &zc_owned_liveliness_token_t) -> bool { + token._inner.iter().any(|v| *v != 0) +} +/// The options for `zc_liveliness_declare_token` +#[repr(C)] +pub struct zc_owned_liveliness_declaration_options_t { + _inner: u8, +} +/// The gravestone value for `zc_owned_liveliness_declaration_options_t` +#[no_mangle] +pub extern "C" fn zc_liveliness_declaration_options_null( +) -> zc_owned_liveliness_declaration_options_t { + zc_owned_liveliness_declaration_options_t { _inner: 0 } +} +/// Returns `true` if the options are valid. +#[no_mangle] +pub extern "C" fn zc_liveliness_declaration_options_check( + _opts: &zc_owned_liveliness_declaration_options_t, +) -> bool { + true +} +/// Destroys the options. +#[no_mangle] +pub extern "C" fn zc_liveliness_declaration_options_drop( + opts: &mut zc_owned_liveliness_declaration_options_t, +) { + *opts = zc_liveliness_declaration_options_null() +} +impl From> for zc_owned_liveliness_token_t { + fn from(value: LivelinessToken<'static>) -> Self { + unsafe { core::mem::transmute(value) } + } +} +impl From for Option> { + fn from(value: zc_owned_liveliness_token_t) -> Self { + if value._inner.iter().all(|v| *v == 0) { + None + } else { + Some(unsafe { core::mem::transmute(value) }) + } + } +} +/// Constructs and declares a liveliness token on the network. +/// +/// Liveliness token subscribers on an intersecting key expression will receive a PUT sample when connectivity +/// is achieved, and a DELETE sample if it's lost. +/// +/// Passing `NULL` as options is valid and equivalent to a pointer to the default options. +#[no_mangle] +pub extern "C" fn zc_liveliness_declare_token( + session: z_session_t, + key: z_keyexpr_t, + _options: Option<&zc_owned_liveliness_declaration_options_t>, +) -> zc_owned_liveliness_token_t { + let Some(session) = session.upgrade() else { + log::error!("Failed to declare liveliness token: provided session was invalid"); + return zc_liveliness_token_null(); + }; + match session.liveliness().declare_token(key).res() { + Ok(token) => unsafe { core::mem::transmute(token) }, + Err(e) => { + log::error!("Failed to declare liveliness token: {e}"); + zc_liveliness_token_null() + } + } +} + +/// Destroys a liveliness token, notifying subscribers of its destruction. +#[no_mangle] +pub extern "C" fn zc_liveliness_undeclare_token(token: &mut zc_owned_liveliness_token_t) { + let Some(token): Option = + core::mem::replace(token, zc_liveliness_token_null()).into() + else { + return; + }; + if let Err(e) = token.undeclare().res() { + log::error!("Failed to undeclare token: {e}"); + } +} + +/// The options for `zc_liveliness_declare_subscriber` +#[repr(C)] +pub struct zc_owned_liveliness_declare_subscriber_options_t { + _inner: u8, +} +/// The gravestone value for `zc_owned_liveliness_declare_subscriber_options_t` +#[no_mangle] +pub extern "C" fn zc_liveliness_subscriber_options_null( +) -> zc_owned_liveliness_declare_subscriber_options_t { + zc_owned_liveliness_declare_subscriber_options_t { _inner: 0 } +} +/// Returns `true` if the options are valid. +#[no_mangle] +pub extern "C" fn zc_liveliness_subscriber_options_check( + _opts: &zc_owned_liveliness_declare_subscriber_options_t, +) -> bool { + true +} +/// Destroys the options. +#[no_mangle] +pub extern "C" fn zc_liveliness_subscriber_options_drop( + opts: &mut zc_owned_liveliness_declare_subscriber_options_t, +) { + *opts = zc_liveliness_subscriber_options_null() +} + +/// Declares a subscriber on liveliness tokens that intersect `key`. +/// +/// Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. +#[no_mangle] +pub extern "C" fn zc_liveliness_declare_subscriber( + session: z_session_t, + key: z_keyexpr_t, + callback: &mut z_owned_closure_sample_t, + _options: Option<&zc_owned_liveliness_declare_subscriber_options_t>, +) -> z_owned_subscriber_t { + let Some(session) = session.upgrade() else { + log::error!("Failed to declare liveliness token: provided session was invalid"); + return z_owned_subscriber_t::null(); + }; + let callback = core::mem::replace(callback, z_owned_closure_sample_t::empty()); + match session + .liveliness() + .declare_subscriber(key) + .callback(move |sample| { + let payload = sample.payload.contiguous(); + let owner = match payload { + std::borrow::Cow::Owned(v) => zenoh::buffers::ZBuf::from(v), + _ => sample.payload.clone(), + }; + let sample = z_sample_t::new(&sample, &owner); + z_closure_sample_call(&callback, &sample) + }) + .res() + { + Ok(token) => z_owned_subscriber_t::new(token), + Err(e) => { + log::error!("Failed to subscribe to liveliness: {e}"); + z_owned_subscriber_t::null() + } + } +} + +/// The options for `zc_liveliness_declare_subscriber` +#[repr(C)] +pub struct zc_owned_liveliness_get_options_t { + timeout_ms: u32, +} +/// The gravestone value for `zc_owned_liveliness_get_options_t` +#[no_mangle] +pub extern "C" fn zc_liveliness_get_options_null() -> zc_owned_liveliness_get_options_t { + zc_owned_liveliness_get_options_t { timeout_ms: 0 } +} +/// The gravestone value for `zc_owned_liveliness_get_options_t` +#[no_mangle] +pub extern "C" fn zc_liveliness_get_options_default() -> zc_owned_liveliness_get_options_t { + zc_owned_liveliness_get_options_t { timeout_ms: 10000 } +} +/// Returns `true` if the options are valid. +#[no_mangle] +pub extern "C" fn zc_liveliness_get_options_check( + _opts: &zc_owned_liveliness_get_options_t, +) -> bool { + true +} +/// Destroys the options. +#[no_mangle] +pub extern "C" fn zc_liveliness_get_options_drop(opts: &mut zc_owned_liveliness_get_options_t) { + *opts = zc_liveliness_get_options_null() +} + +/// Queries liveliness tokens currently on the network with a key expression intersecting with `key`. +/// +/// Note that the same "value stealing" tricks apply as with a normal `z_get` +/// +/// Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. +#[no_mangle] +pub extern "C" fn zc_liveliness_get( + session: z_session_t, + key: z_keyexpr_t, + callback: &mut z_owned_closure_reply_t, + options: Option<&zc_owned_liveliness_get_options_t>, +) -> i8 { + let Some(session) = session.upgrade() else { + log::error!("Failed to declare liveliness token: provided session was invalid"); + return i8::MIN; + }; + let callback = core::mem::replace(callback, z_owned_closure_reply_t::empty()); + let liveliness: Liveliness<'static> = session.liveliness(); + let mut builder = liveliness + .get(key) + .callback(move |response| z_closure_reply_call(&callback, &mut response.into())); + if let Some(options) = options { + builder = builder.timeout(core::time::Duration::from_millis(options.timeout_ms as u64)) + } + match builder.res() { + Ok(()) => 0, + Err(e) => { + log::error!("Failed to subscribe to liveliness: {e}"); + e.errno().get() + } + } +}