diff --git a/Cargo.toml b/Cargo.toml index 47af03b38..a689aa9a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ serde_yaml = "0.9.19" [lib] path="src/lib.rs" -name = "zenohc" +name = "zenohcd" crate-type = ["cdylib", "staticlib"] doctest = false diff --git a/examples/z_get_liveliness.c b/examples/z_get_liveliness.c new file mode 100644 index 000000000..c667cb03a --- /dev/null +++ b/examples/z_get_liveliness.c @@ -0,0 +1,67 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include + +#include "zenoh.h" + +int main(int argc, char **argv) { + char *expr = "group1/**"; + if (argc > 1) { + expr = argv[1]; + } + + z_keyexpr_t keyexpr = z_keyexpr(expr); + if (!z_check(keyexpr)) { + printf("%s is not a valid key expression\n", expr); + exit(-1); + } + + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY); + exit(-1); + } + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + + printf("Sending liveliness query '%s'...\n", expr); + z_owned_reply_channel_t channel = zc_reply_fifo_new(16); + zc_liveliness_get(z_loan(s), keyexpr, z_move(channel.send), NULL); + z_owned_reply_t reply = z_reply_null(); + for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + printf(">> Alive token ('%s')\n", z_loan(keystr)); + z_drop(z_move(keystr)); + } else { + printf("Received an error\n"); + } + } + z_drop(z_move(reply)); + z_drop(z_move(channel)); + z_close(z_move(s)); + return 0; +} diff --git a/examples/z_liveliness.c b/examples/z_liveliness.c new file mode 100644 index 000000000..b21cc40bc --- /dev/null +++ b/examples/z_liveliness.c @@ -0,0 +1,76 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) +#include +#define sleep(x) Sleep(x * 1000) +#else +#include +#endif +#include "zenoh.h" + +int main(int argc, char **argv) { + char *expr = "group1/zenoh-rs"; + if (argc > 1) { + expr = argv[1]; + } + + z_keyexpr_t keyexpr = z_keyexpr(expr); + if (!z_check(keyexpr)) { + printf("%s is not a valid key expression\n", expr); + exit(-1); + } + + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[2]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[2], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY); + exit(-1); + } + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + + printf("Declaring liveliness token '%s'...\n", expr); + zc_owned_liveliness_token_t token = zc_liveliness_declare_token(z_loan(s), keyexpr, NULL); + if (!z_check(token)) { + printf("Unable to create liveliness token!\n"); + exit(-1); + } + + printf("Enter 'd' to undeclare liveliness token, 'q' to quit...\n"); + char c = 0; + while (c != 'q') { + c = getchar(); + if (c == -1) { + sleep(1); + } else if (c == 'd') { + printf("Undeclaring liveliness token...\n"); + z_drop(z_move(token)); + } + } + + z_drop(z_move(token)); + z_close(z_move(s)); + return 0; +} diff --git a/examples/z_put.c b/examples/z_put.c index 372fbba57..f74f06110 100644 --- a/examples/z_put.c +++ b/examples/z_put.c @@ -23,6 +23,9 @@ int main(int argc, char **argv) { if (argc > 1) keyexpr = argv[1]; if (argc > 2) value = argv[2]; + z_owned_bytes_map_t attachements = z_bytes_map_new(); + z_bytes_map_insert_by_copy(&attachements, z_bytes_new("hello"), z_bytes_new("there")); + z_owned_config_t config = z_config_default(); if (argc > 3) { if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[3]) < 0) { @@ -44,11 +47,13 @@ int main(int argc, char **argv) { printf("Putting Data ('%s': '%s')...\n", keyexpr, value); z_put_options_t options = z_put_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); + options.attachements = z_bytes_map_as_attachement(&attachements); int res = z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, strlen(value), &options); if (res < 0) { printf("Put failed...\n"); } z_close(z_move(s)); + z_drop(z_move(attachements)); return 0; } diff --git a/examples/z_sub.c b/examples/z_sub.c index 61ae2246e..c5bce39eb 100644 --- a/examples/z_sub.c +++ b/examples/z_sub.c @@ -11,6 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // +#include #include #if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) #include @@ -26,6 +27,10 @@ void data_handler(const z_sample_t *sample, void *arg) { z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); printf(">> [Subscriber] Received %s ('%s': '%.*s')\n", kind_to_str(sample->kind), z_loan(keystr), (int)sample->payload.len, sample->payload.start); + if (z_check(sample->attachements)) { + z_owned_bytes_map_t map = z_bytes_map_from_attachement(sample->attachements); + z_bytes_t there = z_bytes_map_get(&map, z_bytes_new("hello")); + } z_drop(z_move(keystr)); } diff --git a/examples/z_sub_liveliness.c b/examples/z_sub_liveliness.c new file mode 100644 index 000000000..f67ac5a68 --- /dev/null +++ b/examples/z_sub_liveliness.c @@ -0,0 +1,86 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) +#include +#define sleep(x) Sleep(x * 1000) +#else +#include +#endif +#include "zenoh.h" + +void data_handler(const z_sample_t *sample, void *arg) { + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + switch (sample->kind) { + case Z_SAMPLE_KIND_PUT: + printf(">> [LivelinessSubscriber] New alive token ('%s')\n", z_loan(keystr)); + break; + case Z_SAMPLE_KIND_DELETE: + printf(">> [LivelinessSubscriber] Dropped token ('%s')\n", z_loan(keystr)); + break; + } + z_drop(z_move(keystr)); +} + +int main(int argc, char **argv) { + char *expr = "group1/**"; + if (argc > 1) { + expr = argv[1]; + } + + z_keyexpr_t keyexpr = z_keyexpr(expr); + if (!z_check(keyexpr)) { + printf("%s is not a valid key expression\n", expr); + exit(-1); + } + + z_owned_config_t config = z_config_default(); + if (argc > 2) { + if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) { + printf( + "Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " + "JSON-serialized list of strings\n", + argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY); + exit(-1); + } + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + + printf("Declaring liveliness subscriber on '%s'...\n", expr); + z_owned_closure_sample_t callback = z_closure(data_handler); + z_owned_subscriber_t sub = zc_liveliness_declare_subscriber(z_loan(s), keyexpr, z_move(callback), NULL); + if (!z_check(sub)) { + printf("Unable to declare liveliness subscriber.\n"); + exit(-1); + } + + printf("Enter 'q' to quit...\n"); + char c = 0; + while (c != 'q') { + c = getchar(); + if (c == -1) { + sleep(1); + } + } + + z_undeclare_subscriber(z_move(sub)); + z_close(z_move(s)); + return 0; +} diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 763c267aa..1509401ba 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -140,12 +140,68 @@ typedef enum z_sample_kind_t { Z_SAMPLE_KIND_DELETE = 1, } z_sample_kind_t; /** - * An array of bytes. + * A contiguous view of bytes owned by some other entity. + * + * `start` being `null` is considered a gravestone value, + * and empty slices are represented using a possibly dangling pointer for `start`. */ typedef struct z_bytes_t { size_t len; const uint8_t *start; } z_bytes_t; +/** + * The body of a loop over an attachment's key-value pairs. + * + * `key` and `value` are loaned to the body for the duration of a single call. + * `context` is passed transparently through the iteration driver. + * + * Returning `0` is treated as `continue`. + * Returning any other value is treated as `break`. + */ +typedef int8_t (*z_attachement_iter_body_t)(struct z_bytes_t key, + struct z_bytes_t value, + void *context); +/** + * The driver of a loop over an attachement's key-value pairs. + * + * This function is expected to call `loop_body` once for each key-value pair + * within `iterator`, passing `context`, and returning any non-zero value immediately (breaking iteration). + */ +typedef int8_t (*z_attachement_iter_driver_t)(void *iterator, + z_attachement_iter_body_t loop_body, + void *context); +/** + * The v-table for an attachement. + */ +typedef struct z_attachement_vtable_t { + /** + * See `z_attachement_iteration_driver_t`'s documentation. + */ + z_attachement_iter_driver_t iteration_driver; + /** + * Returns the number of key-value pairs within the attachement. + */ + uintptr_t (*len)(const void*); +} z_attachement_vtable_t; +/** + * A v-table based map of byte slice to byte slice. + * + * `vtable == NULL` marks the gravestone value, as this type is often optional. + * Users are encouraged to use `z_attachement_null` and `z_attachement_check` to interact. + */ +typedef struct z_attachement_t { + void *data; + const struct z_attachement_vtable_t *vtable; +} z_attachement_t; +/** + * A map of maybe-owned vector of bytes to owned vector of bytes. + * + * In Zenoh C, this map is backed by Rust's standard HashMap, with a DoS-resistant hasher + */ +typedef struct z_owned_bytes_map_t { + uint64_t _0[2]; + uintptr_t _1[4]; +} z_owned_bytes_map_t; /** * Represents a Zenoh ID. * @@ -331,6 +387,7 @@ typedef struct z_sample_t { const void *_zc_buf; enum z_sample_kind_t kind; struct z_timestamp_t timestamp; + struct z_attachement_t attachements; } z_sample_t; /** * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks. @@ -620,6 +677,7 @@ typedef struct z_put_options_t { struct z_encoding_t encoding; enum z_congestion_control_t congestion_control; enum z_priority_t priority; + struct z_attachement_t attachements; } z_put_options_t; /** * Represents the set of options that can be applied to a query reply, @@ -667,6 +725,34 @@ typedef struct z_owned_scouting_config_t { typedef struct z_subscriber_t { const struct z_owned_subscriber_t *_0; } z_subscriber_t; +/** + * The options for `zc_liveliness_declare_token` + */ +typedef struct zc_owned_liveliness_declaration_options_t { + uint8_t _inner; +} zc_owned_liveliness_declaration_options_t; +/** + * The options for `zc_liveliness_declare_subscriber` + */ +typedef struct zc_owned_liveliness_declare_subscriber_options_t { + uint8_t _inner; +} zc_owned_liveliness_declare_subscriber_options_t; +/** + * A liveliness token that can be used to provide the network with information about connectivity to its + * declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key + * expressions. + * + * A DELETE on the token's key expression will be received by subscribers if the token is destroyed, or if connectivity between the subscriber and the token's creator is lost. + */ +typedef struct zc_owned_liveliness_token_t { + uintptr_t _inner[4]; +} zc_owned_liveliness_token_t; +/** + * The options for `zc_liveliness_declare_subscriber` + */ +typedef struct zc_owned_liveliness_get_options_t { + uint32_t timeout_ms; +} zc_owned_liveliness_get_options_t; /** * An owned payload, backed by a reference counted owner. * @@ -703,10 +789,124 @@ ZENOHC_API extern const char *Z_CONFIG_MULTICAST_IPV4_ADDRESS_KEY; ZENOHC_API extern const char *Z_CONFIG_SCOUTING_TIMEOUT_KEY; ZENOHC_API extern const char *Z_CONFIG_SCOUTING_DELAY_KEY; ZENOHC_API extern const char *Z_CONFIG_ADD_TIMESTAMP_KEY; +/** + * Returns the gravestone value for `z_attachement_t`. + */ +ZENOHC_API bool z_attachement_check(const struct z_attachement_t *this_); +/** + * Iterate over `this`'s key-value pairs, breaking if `body` returns a non-zero + * value for a key-value pair, and returning the latest return value. + * + * `context` is passed to `body` to allow stateful closures. + * + * This function takes no ownership whatsoever. + */ +ZENOHC_API +int8_t z_attachement_iterate(struct z_attachement_t this_, + z_attachement_iter_body_t body, + void *context); +/** + * Returns the number of key-value pairs in `this`. + */ +ZENOHC_API uintptr_t z_attachement_len(struct z_attachement_t this_); +/** + * Returns the gravestone value for `z_attachement_t`. + */ +ZENOHC_API struct z_attachement_t z_attachement_null(void); /** * Returns ``true`` if `b` is initialized. */ ZENOHC_API bool z_bytes_check(const struct z_bytes_t *b); +/** + * Aliases `this` into a generic `z_attachement_t`, allowing it to be passed to corresponding APIs. + */ +ZENOHC_API +struct z_attachement_t z_bytes_map_as_attachement(const struct z_owned_bytes_map_t *this_); +/** + * Returns `true` if the map is not in its gravestone state + */ +ZENOHC_API bool z_bytes_map_check(const struct z_owned_bytes_map_t *this_); +/** + * Destroys the map, resetting `this` to its gravestone value. + * + * This function is double-free safe, passing a pointer to the gravestone value will have no effect. + */ +ZENOHC_API void z_bytes_map_drop(struct z_owned_bytes_map_t *this_); +/** + * Constructs a map from the provided attachement, copying keys and values. + * + * If `this` is at gravestone value, the returned value will also be at gravestone value. + */ +ZENOHC_API struct z_owned_bytes_map_t z_bytes_map_from_attachement(struct z_attachement_t this_); +/** + * Constructs a map from the provided attachement, aliasing the attachement's keys and values. + * + * If `this` is at gravestone value, the returned value will also be at gravestone value. + */ +ZENOHC_API +struct z_owned_bytes_map_t z_bytes_map_from_attachement_aliasing(struct z_attachement_t this_); +/** + * Returns the value associated with `key`, returning a gravestone value if: + * - `this` or `key` is in gravestone state. + * - `this` has no value associated to `key` + */ +ZENOHC_API +struct z_bytes_t z_bytes_map_get(const struct z_owned_bytes_map_t *this_, + struct z_bytes_t key); +/** + * Associates `value` to `key` in the map, aliasing them. + * + * Note that once `key` is aliased, reinserting at the same key may alias the previous instance, or the new instance of `key`. + * + * Calling this with `NULL` or the gravestone value is undefined behaviour. + */ +ZENOHC_API +void z_bytes_map_insert_by_alias(const struct z_owned_bytes_map_t *this_, + struct z_bytes_t key, + struct z_bytes_t value); +/** + * Associates `value` to `key` in the map, copying them to obtain ownership: `key` and `value` are not aliased past the function's return. + * + * Calling this with `NULL` or the gravestone value is undefined behaviour. + */ +ZENOHC_API +void z_bytes_map_insert_by_copy(const struct z_owned_bytes_map_t *this_, + struct z_bytes_t key, + struct z_bytes_t value); +/** + * Iterates over the key-value pairs in the map. + * + * `body` will be called once per pair, with `ctx` as its last argument. + * If `body` returns a non-zero value, the iteration will stop immediately and the value will be returned. + * Otherwise, this will return 0 once all pairs have been visited. + * `body` is not given ownership of the key nor value, which alias the pairs in the map. + * It is safe to keep these aliases until existing keys are modified/removed, or the map is destroyed. + * Note that this map is unordered. + * + * Calling this with `NULL` or the gravestone value is undefined behaviour. + */ +ZENOHC_API +int8_t z_bytes_map_iter(const struct z_owned_bytes_map_t *this_, + z_attachement_iter_body_t body, + void *ctx); +/** + * Constructs a new map. + */ +ZENOHC_API struct z_owned_bytes_map_t z_bytes_map_new(void); +/** + * Constructs the gravestone value for `z_owned_bytes_map_t` + */ +ZENOHC_API struct z_owned_bytes_map_t z_bytes_map_null(void); +/** + * Returns a view of `str` using `strlen`. + * + * `str == NULL` will cause this to return `z_bytes_null()` + */ +ZENOHC_API struct z_bytes_t z_bytes_new(const char *str); +/** + * Returns the gravestone value for `z_bytes_t` + */ +ZENOHC_API struct z_bytes_t z_bytes_null(void); /** * Closes a zenoh session. This drops and invalidates `session` for double-drop safety. * @@ -1679,6 +1879,99 @@ ZENOHC_API struct z_keyexpr_t zc_keyexpr_from_slice(const char *name, uintptr_t ZENOHC_API struct z_keyexpr_t zc_keyexpr_from_slice_unchecked(const char *start, uintptr_t len); +/** + * Returns `true` if the options are valid. + */ +ZENOHC_API +bool zc_liveliness_declaration_options_check(const struct zc_owned_liveliness_declaration_options_t *_opts); +/** + * Destroys the options. + */ +ZENOHC_API +void zc_liveliness_declaration_options_drop(struct zc_owned_liveliness_declaration_options_t *opts); +/** + * The gravestone value for `zc_owned_liveliness_declaration_options_t` + */ +ZENOHC_API +struct zc_owned_liveliness_declaration_options_t zc_liveliness_declaration_options_null(void); +/** + * Declares a subscriber on liveliness tokens that intersect `key`. + * + * Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. + */ +ZENOHC_API +struct z_owned_subscriber_t zc_liveliness_declare_subscriber(struct z_session_t session, + struct z_keyexpr_t key, + struct z_owned_closure_sample_t *callback, + const struct zc_owned_liveliness_declare_subscriber_options_t *_options); +/** + * Constructs and declares a liveliness token on the network. + * + * Liveliness token subscribers on an intersecting key expression will receive a PUT sample when connectivity + * is achieved, and a DELETE sample if it's lost. + * + * Passing `NULL` as options is valid and equivalent to a pointer to the default options. + */ +ZENOHC_API +struct zc_owned_liveliness_token_t zc_liveliness_declare_token(struct z_session_t session, + struct z_keyexpr_t key, + const struct zc_owned_liveliness_declaration_options_t *_options); +/** + * Queries liveliness tokens currently on the network with a key expression intersecting with `key`. + * + * Note that the same "value stealing" tricks apply as with a normal `z_get` + * + * Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. + */ +ZENOHC_API +int8_t zc_liveliness_get(struct z_session_t session, + struct z_keyexpr_t key, + struct z_owned_closure_reply_t *callback, + const struct zc_owned_liveliness_get_options_t *options); +/** + * Returns `true` if the options are valid. + */ +ZENOHC_API +bool zc_liveliness_get_options_check(const struct zc_owned_liveliness_get_options_t *_opts); +/** + * The gravestone value for `zc_owned_liveliness_get_options_t` + */ +ZENOHC_API struct zc_owned_liveliness_get_options_t zc_liveliness_get_options_default(void); +/** + * Destroys the options. + */ +ZENOHC_API void zc_liveliness_get_options_drop(struct zc_owned_liveliness_get_options_t *opts); +/** + * The gravestone value for `zc_owned_liveliness_get_options_t` + */ +ZENOHC_API struct zc_owned_liveliness_get_options_t zc_liveliness_get_options_null(void); +/** + * Returns `true` if the options are valid. + */ +ZENOHC_API +bool zc_liveliness_subscriber_options_check(const struct zc_owned_liveliness_declare_subscriber_options_t *_opts); +/** + * Destroys the options. + */ +ZENOHC_API +void zc_liveliness_subscriber_options_drop(struct zc_owned_liveliness_declare_subscriber_options_t *opts); +/** + * The gravestone value for `zc_owned_liveliness_declare_subscriber_options_t` + */ +ZENOHC_API +struct zc_owned_liveliness_declare_subscriber_options_t zc_liveliness_subscriber_options_null(void); +/** + * Returns `true` unless the token is at its gravestone value. + */ +ZENOHC_API bool zc_liveliness_token_check(const struct zc_owned_liveliness_token_t *token); +/** + * The gravestone value for liveliness tokens. + */ +ZENOHC_API struct zc_owned_liveliness_token_t zc_liveliness_token_null(void); +/** + * Destroys a liveliness token, notifying subscribers of its destruction. + */ +ZENOHC_API void zc_liveliness_undeclare_token(struct zc_owned_liveliness_token_t *token); /** * Returns `false` if `payload` is the gravestone value. */ diff --git a/include/zenoh_macros.h b/include/zenoh_macros.h index d817fb6ad..bf4db996c 100644 --- a/include/zenoh_macros.h +++ b/include/zenoh_macros.h @@ -37,7 +37,9 @@ z_owned_reply_channel_t * : z_reply_channel_drop, \ zc_owned_payload_t * : zc_payload_drop, \ zc_owned_shmbuf_t * : zc_shmbuf_drop, \ - zc_owned_shm_manager_t * : zc_shm_manager_drop \ + zc_owned_shm_manager_t * : zc_shm_manager_drop, \ + zc_owned_liveliness_token_t * : zc_liveliness_undeclare_token, \ + z_owned_bytes_map_t * : z_bytes_map_drop \ )(x) #define z_null(x) (*x = \ @@ -62,27 +64,33 @@ z_owned_reply_channel_t * : z_reply_channel_null, \ zc_owned_payload_t * : zc_payload_null, \ zc_owned_shmbuf_t * : zc_shmbuf_null, \ - zc_owned_shm_manager_t * : zc_shm_manager_null \ + zc_owned_shm_manager_t * : zc_shm_manager_null, \ + zc_owned_liveliness_token_t * : zc_liveliness_token_null, \ + z_owned_bytes_map_t * : z_bytes_map_null, \ + z_attachement_t : z_attachement_null \ )()) #define z_check(x) \ - _Generic((x), z_owned_session_t : z_session_check, \ - z_owned_publisher_t : z_publisher_check, \ - z_owned_keyexpr_t : z_keyexpr_check, \ - z_keyexpr_t : z_keyexpr_is_initialized, \ - z_owned_config_t : z_config_check, \ - z_owned_scouting_config_t : z_scouting_config_check, \ - z_bytes_t : z_bytes_check, \ - z_owned_subscriber_t : z_subscriber_check, \ - z_owned_pull_subscriber_t : z_pull_subscriber_check, \ - z_owned_queryable_t : z_queryable_check, \ - z_owned_encoding_t : z_encoding_check, \ - z_owned_reply_t : z_reply_check, \ - z_owned_hello_t : z_hello_check, \ - z_owned_str_t : z_str_check, \ - zc_owned_payload_t : zc_payload_check, \ - zc_owned_shmbuf_t : zc_shmbuf_check, \ - zc_owned_shm_manager_t : zc_shm_manager_check \ + _Generic((x), z_owned_session_t : z_session_check, \ + z_owned_publisher_t : z_publisher_check, \ + z_owned_keyexpr_t : z_keyexpr_check, \ + z_keyexpr_t : z_keyexpr_is_initialized, \ + z_owned_config_t : z_config_check, \ + z_owned_scouting_config_t : z_scouting_config_check, \ + z_bytes_t : z_bytes_check, \ + z_owned_subscriber_t : z_subscriber_check, \ + z_owned_pull_subscriber_t : z_pull_subscriber_check, \ + z_owned_queryable_t : z_queryable_check, \ + z_owned_encoding_t : z_encoding_check, \ + z_owned_reply_t : z_reply_check, \ + z_owned_hello_t : z_hello_check, \ + z_owned_str_t : z_str_check, \ + zc_owned_payload_t : zc_payload_check, \ + zc_owned_shmbuf_t : zc_shmbuf_check, \ + zc_owned_shm_manager_t : zc_shm_manager_check, \ + zc_owned_liveliness_token_t : zc_liveliness_token_check,\ + z_owned_bytes_map_t : z_bytes_map_check, \ + z_attachement_t : z_attachement_check \ )(&x) #define z_call(x, ...) \ diff --git a/src/attachements.rs b/src/attachements.rs new file mode 100644 index 000000000..05bdc75ee --- /dev/null +++ b/src/attachements.rs @@ -0,0 +1,292 @@ +use std::{borrow::Cow, cell::UnsafeCell, collections::HashMap}; + +use libc::c_void; + +use crate::{z_bytes_null, z_bytes_t}; + +/// The body of a loop over an attachment's key-value pairs. +/// +/// `key` and `value` are loaned to the body for the duration of a single call. +/// `context` is passed transparently through the iteration driver. +/// +/// Returning `0` is treated as `continue`. +/// Returning any other value is treated as `break`. +pub type z_attachement_iter_body_t = + extern "C" fn(key: z_bytes_t, value: z_bytes_t, context: *mut c_void) -> i8; + +/// The driver of a loop over an attachement's key-value pairs. +/// +/// This function is expected to call `loop_body` once for each key-value pair +/// within `iterator`, passing `context`, and returning any non-zero value immediately (breaking iteration). +pub type z_attachement_iter_driver_t = extern "C" fn( + iterator: *mut c_void, + loop_body: z_attachement_iter_body_t, + context: *mut c_void, +) -> i8; + +/// The v-table for an attachement. +#[repr(C)] +pub struct z_attachement_vtable_t { + /// See `z_attachement_iteration_driver_t`'s documentation. + iteration_driver: z_attachement_iter_driver_t, + /// Returns the number of key-value pairs within the attachement. + len: extern "C" fn(*const c_void) -> usize, +} + +/// A v-table based map of byte slice to byte slice. +/// +/// `vtable == NULL` marks the gravestone value, as this type is often optional. +/// Users are encouraged to use `z_attachement_null` and `z_attachement_check` to interact. +#[repr(C)] +pub struct z_attachement_t { + data: *mut c_void, + vtable: Option<&'static z_attachement_vtable_t>, +} + +/// Returns the gravestone value for `z_attachement_t`. +#[no_mangle] +pub extern "C" fn z_attachement_check(this: &z_attachement_t) -> bool { + this.vtable.is_some() +} + +/// Returns the gravestone value for `z_attachement_t`. +#[no_mangle] +pub extern "C" fn z_attachement_null() -> z_attachement_t { + z_attachement_t { + data: core::ptr::null_mut(), + vtable: None, + } +} + +/// Iterate over `this`'s key-value pairs, breaking if `body` returns a non-zero +/// value for a key-value pair, and returning the latest return value. +/// +/// `context` is passed to `body` to allow stateful closures. +/// +/// This function takes no ownership whatsoever. +#[no_mangle] +pub extern "C" fn z_attachement_iterate( + this: z_attachement_t, + body: z_attachement_iter_body_t, + context: *mut c_void, +) -> i8 { + (this.vtable.unwrap().iteration_driver)(this.data, body, context) +} + +/// Returns the number of key-value pairs in `this`. +#[no_mangle] +pub extern "C" fn z_attachement_len(this: z_attachement_t) -> usize { + (this.vtable.unwrap().len)(this.data) +} + +/// A map of maybe-owned vector of bytes to owned vector of bytes. +/// +/// In Zenoh C, this map is backed by Rust's standard HashMap, with a DoS-resistant hasher +#[repr(C)] +pub struct z_owned_bytes_map_t { + _0: [u64; 2], + _1: [usize; 4], +} +impl core::ops::Deref for z_owned_bytes_map_t { + type Target = Option, Cow<'static, [u8]>>>>; + fn deref(&self) -> &Self::Target { + unsafe { core::mem::transmute(self) } + } +} + +/// Constructs a new map. +#[no_mangle] +pub extern "C" fn z_bytes_map_new() -> z_owned_bytes_map_t { + unsafe { core::mem::transmute(Some(HashMap::, Cow<[u8]>>::new())) } +} + +/// Constructs the gravestone value for `z_owned_bytes_map_t` +#[no_mangle] +pub extern "C" fn z_bytes_map_null() -> z_owned_bytes_map_t { + unsafe { core::mem::transmute(None::, Cow<[u8]>>>) } +} + +/// Returns `true` if the map is not in its gravestone state +#[no_mangle] +pub extern "C" fn z_bytes_map_check(this: &z_owned_bytes_map_t) -> bool { + this.is_some() +} +/// Destroys the map, resetting `this` to its gravestone value. +/// +/// This function is double-free safe, passing a pointer to the gravestone value will have no effect. +#[no_mangle] +pub extern "C" fn z_bytes_map_drop(this: &mut z_owned_bytes_map_t) { + let this = core::mem::replace(this, z_bytes_map_null()); + if z_bytes_map_check(&this) { + core::mem::drop(unsafe { core::mem::transmute::<_, HashMap, Cow<[u8]>>>(this) }) + } +} + +/// Returns the value associated with `key`, returning a gravestone value if: +/// - `this` or `key` is in gravestone state. +/// - `this` has no value associated to `key` +#[no_mangle] +pub extern "C" fn z_bytes_map_get(this: &z_owned_bytes_map_t, key: z_bytes_t) -> z_bytes_t { + let (Some(this), Some(key)) = (this.as_ref(), key.as_slice()) else { + return z_bytes_null(); + }; + if let Some(value) = unsafe { &*this.get() }.get(key) { + value.as_ref().into() + } else { + z_bytes_null() + } +} + +/// Associates `value` to `key` in the map, copying them to obtain ownership: `key` and `value` are not aliased past the function's return. +/// +/// Calling this with `NULL` or the gravestone value is undefined behaviour. +#[no_mangle] +pub extern "C" fn z_bytes_map_insert_by_copy( + this: &z_owned_bytes_map_t, + key: z_bytes_t, + value: z_bytes_t, +) { + if let (Some(this), Some(key), Some(value)) = (this.as_ref(), key.as_slice(), value.as_slice()) + { + unsafe { &mut *this.get() } + .insert(Cow::Owned(key.to_owned()), Cow::Owned(value.to_owned())); + } +} + +/// Associates `value` to `key` in the map, aliasing them. +/// +/// Note that once `key` is aliased, reinserting at the same key may alias the previous instance, or the new instance of `key`. +/// +/// Calling this with `NULL` or the gravestone value is undefined behaviour. +#[no_mangle] +pub extern "C" fn z_bytes_map_insert_by_alias( + this: &z_owned_bytes_map_t, + key: z_bytes_t, + value: z_bytes_t, +) { + if let (Some(this), Some(key), Some(value)) = (this.as_ref(), key.as_slice(), value.as_slice()) + { + unsafe { + (*this.get()).insert( + Cow::Borrowed(core::mem::transmute(key)), + Cow::Borrowed(core::mem::transmute(value)), + ) + }; + } +} + +/// Returns the number of key-value pairs in the map. +/// +/// Calling this with `NULL` or the gravestone value is undefined behaviour. +#[no_mangle] +extern "C" fn z_bytes_map_len(this: &z_owned_bytes_map_t) -> usize { + this.as_ref() + .map_or(0, |this| unsafe { &*this.get() }.len()) +} + +/// Iterates over the key-value pairs in the map. +/// +/// `body` will be called once per pair, with `ctx` as its last argument. +/// If `body` returns a non-zero value, the iteration will stop immediately and the value will be returned. +/// Otherwise, this will return 0 once all pairs have been visited. +/// `body` is not given ownership of the key nor value, which alias the pairs in the map. +/// It is safe to keep these aliases until existing keys are modified/removed, or the map is destroyed. +/// Note that this map is unordered. +/// +/// Calling this with `NULL` or the gravestone value is undefined behaviour. +#[no_mangle] +pub extern "C" fn z_bytes_map_iter( + this: &z_owned_bytes_map_t, + body: z_attachement_iter_body_t, + ctx: *mut c_void, +) -> i8 { + if let Some(this) = this.as_ref() { + for (key, value) in unsafe { &*this.get() }.iter() { + let result = body(key.as_ref().into(), value.as_ref().into(), ctx); + if result != 0 { + return result; + } + } + } + 0 +} + +const Z_BYTES_MAP_VTABLE: z_attachement_vtable_t = z_attachement_vtable_t { + len: unsafe { core::mem::transmute(z_bytes_map_len as extern "C" fn(_) -> usize) }, + iteration_driver: unsafe { + core::mem::transmute(z_bytes_map_iter as extern "C" fn(_, _, _) -> i8) + }, +}; + +/// Aliases `this` into a generic `z_attachement_t`, allowing it to be passed to corresponding APIs. +#[no_mangle] +pub extern "C" fn z_bytes_map_as_attachement(this: &z_owned_bytes_map_t) -> z_attachement_t { + if z_bytes_map_check(this) { + z_attachement_t { + data: this as *const z_owned_bytes_map_t as *mut _, + vtable: Some(&Z_BYTES_MAP_VTABLE), + } + } else { + z_attachement_t { + data: core::ptr::null_mut(), + vtable: None, + } + } +} + +extern "C" fn bytes_map_from_attachement_iterator( + key: z_bytes_t, + value: z_bytes_t, + ctx: *mut c_void, +) -> i8 { + let map = unsafe { &*ctx.cast::() }; + z_bytes_map_insert_by_copy(map, key, value); + 0 +} +extern "C" fn bytes_map_from_attachement_iterator_by_alias( + key: z_bytes_t, + value: z_bytes_t, + ctx: *mut c_void, +) -> i8 { + let map = unsafe { &*ctx.cast::() }; + z_bytes_map_insert_by_alias(map, key, value); + 0 +} + +/// Constructs a map from the provided attachement, copying keys and values. +/// +/// If `this` is at gravestone value, the returned value will also be at gravestone value. +#[no_mangle] +pub extern "C" fn z_bytes_map_from_attachement(this: z_attachement_t) -> z_owned_bytes_map_t { + if z_attachement_check(&this) { + let mut map = z_bytes_map_new(); + z_attachement_iterate( + this, + bytes_map_from_attachement_iterator, + &mut map as *mut _ as *mut _, + ); + map + } else { + z_bytes_map_null() + } +} + +/// Constructs a map from the provided attachement, aliasing the attachement's keys and values. +/// +/// If `this` is at gravestone value, the returned value will also be at gravestone value. +#[no_mangle] +pub extern "C" fn z_bytes_map_from_attachement_aliasing( + this: z_attachement_t, +) -> z_owned_bytes_map_t { + if z_attachement_check(&this) { + let mut map = z_bytes_map_new(); + z_attachement_iterate( + this, + bytes_map_from_attachement_iterator_by_alias, + &mut map as *mut _ as *mut _, + ); + map + } else { + z_bytes_map_null() + } +} diff --git a/src/collections.rs b/src/collections.rs index 76c02d403..77224c2c8 100644 --- a/src/collections.rs +++ b/src/collections.rs @@ -11,10 +11,13 @@ // Contributors: // ZettaScale Zenoh team, // -use libc::size_t; +use libc::{c_char, size_t}; use zenoh::prelude::ZenohId; -/// An array of bytes. +/// A contiguous view of bytes owned by some other entity. +/// +/// `start` being `null` is considered a gravestone value, +/// and empty slices are represented using a possibly dangling pointer for `start`. #[repr(C)] #[derive(Clone, Copy, Debug)] pub struct z_bytes_t { @@ -23,6 +26,12 @@ pub struct z_bytes_t { } impl z_bytes_t { + pub fn as_slice(&self) -> Option<&[u8]> { + if self.start.is_null() { + return None; + } + Some(unsafe { core::slice::from_raw_parts(self.start, self.len) }) + } pub fn empty() -> Self { z_bytes_t { start: std::ptr::null(), @@ -43,6 +52,32 @@ pub extern "C" fn z_bytes_check(b: &z_bytes_t) -> bool { !b.start.is_null() } +/// Returns the gravestone value for `z_bytes_t` +#[no_mangle] +pub extern "C" fn z_bytes_null() -> z_bytes_t { + z_bytes_t { + len: 0, + start: core::ptr::null(), + } +} + +/// Returns a view of `str` using `strlen`. +/// +/// `str == NULL` will cause this to return `z_bytes_null()` +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_bytes_new(str: *const c_char) -> z_bytes_t { + if str.is_null() { + z_bytes_null() + } else { + let len = unsafe { libc::strlen(str) }; + z_bytes_t { + len, + start: str.cast(), + } + } +} + /// Frees `b` and invalidates it for double-drop safety. #[allow(clippy::missing_safety_doc)] pub(crate) unsafe fn z_bytes_drop(b: &mut z_bytes_t) { diff --git a/src/commons.rs b/src/commons.rs index 908dfac76..db2526b76 100644 --- a/src/commons.rs +++ b/src/commons.rs @@ -12,6 +12,8 @@ // ZettaScale Zenoh team, // +use crate::attachements::z_attachement_null; +use crate::attachements::z_attachement_t; use crate::collections::*; use crate::keyexpr::*; use crate::z_id_t; @@ -202,6 +204,7 @@ pub struct z_sample_t<'a> { pub _zc_buf: &'a c_void, pub kind: z_sample_kind_t, pub timestamp: z_timestamp_t, + pub attachements: z_attachement_t, } impl<'a> z_sample_t<'a> { @@ -216,6 +219,7 @@ impl<'a> z_sample_t<'a> { _zc_buf: unsafe { std::mem::transmute(owner) }, kind: sample.kind.into(), timestamp: sample.timestamp.as_ref().into(), + attachements: z_attachement_null(), } } } diff --git a/src/lib.rs b/src/lib.rs index c24a536a5..c07f1e63f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,9 @@ // Contributors: // ZettaScale Zenoh team, // + +#![allow(non_camel_case_types)] + mod collections; pub use crate::collections::*; mod config; @@ -39,6 +42,9 @@ mod publisher; pub use crate::publisher::*; mod closures; pub use closures::*; +mod liveliness; +pub use liveliness::*; +pub mod attachements; #[cfg(feature = "shared-memory")] mod shm; diff --git a/src/liveliness.rs b/src/liveliness.rs new file mode 100644 index 000000000..ecf695010 --- /dev/null +++ b/src/liveliness.rs @@ -0,0 +1,246 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh::{ + liveliness::{Liveliness, LivelinessToken}, + prelude::{SessionDeclarations, SplitBuffer}, +}; +use zenoh_util::core::{zresult::ErrNo, SyncResolve}; + +use crate::{ + z_closure_reply_call, z_closure_sample_call, z_keyexpr_t, z_owned_closure_reply_t, + z_owned_closure_sample_t, z_owned_subscriber_t, z_sample_t, z_session_t, +}; + +/// A liveliness token that can be used to provide the network with information about connectivity to its +/// declarer: when constructed, a PUT sample will be received by liveliness subscribers on intersecting key +/// expressions. +/// +/// A DELETE on the token's key expression will be received by subscribers if the token is destroyed, or if connectivity between the subscriber and the token's creator is lost. +#[repr(C)] +pub struct zc_owned_liveliness_token_t { + _inner: [usize; 4], +} + +/// The gravestone value for liveliness tokens. +#[no_mangle] +pub extern "C" fn zc_liveliness_token_null() -> zc_owned_liveliness_token_t { + zc_owned_liveliness_token_t { _inner: [0; 4] } +} + +/// Returns `true` unless the token is at its gravestone value. +#[no_mangle] +pub extern "C" fn zc_liveliness_token_check(token: &zc_owned_liveliness_token_t) -> bool { + token._inner.iter().any(|v| *v != 0) +} +/// The options for `zc_liveliness_declare_token` +#[repr(C)] +pub struct zc_owned_liveliness_declaration_options_t { + _inner: u8, +} +/// The gravestone value for `zc_owned_liveliness_declaration_options_t` +#[no_mangle] +pub extern "C" fn zc_liveliness_declaration_options_null( +) -> zc_owned_liveliness_declaration_options_t { + zc_owned_liveliness_declaration_options_t { _inner: 0 } +} +/// Returns `true` if the options are valid. +#[no_mangle] +pub extern "C" fn zc_liveliness_declaration_options_check( + _opts: &zc_owned_liveliness_declaration_options_t, +) -> bool { + true +} +/// Destroys the options. +#[no_mangle] +pub extern "C" fn zc_liveliness_declaration_options_drop( + opts: &mut zc_owned_liveliness_declaration_options_t, +) { + *opts = zc_liveliness_declaration_options_null() +} +impl From> for zc_owned_liveliness_token_t { + fn from(value: LivelinessToken<'static>) -> Self { + unsafe { core::mem::transmute(value) } + } +} +impl From for Option> { + fn from(value: zc_owned_liveliness_token_t) -> Self { + if value._inner.iter().all(|v| *v == 0) { + None + } else { + Some(unsafe { core::mem::transmute(value) }) + } + } +} +/// Constructs and declares a liveliness token on the network. +/// +/// Liveliness token subscribers on an intersecting key expression will receive a PUT sample when connectivity +/// is achieved, and a DELETE sample if it's lost. +/// +/// Passing `NULL` as options is valid and equivalent to a pointer to the default options. +#[no_mangle] +pub extern "C" fn zc_liveliness_declare_token( + session: z_session_t, + key: z_keyexpr_t, + _options: Option<&zc_owned_liveliness_declaration_options_t>, +) -> zc_owned_liveliness_token_t { + let Some(session) = session.upgrade() else { + log::error!("Failed to declare liveliness token: provided session was invalid"); + return zc_liveliness_token_null(); + }; + match session.liveliness().declare_token(key).res() { + Ok(token) => unsafe { core::mem::transmute(token) }, + Err(e) => { + log::error!("Failed to declare liveliness token: {e}"); + zc_liveliness_token_null() + } + } +} + +/// Destroys a liveliness token, notifying subscribers of its destruction. +#[no_mangle] +pub extern "C" fn zc_liveliness_undeclare_token(token: &mut zc_owned_liveliness_token_t) { + let Some(token): Option = + core::mem::replace(token, zc_liveliness_token_null()).into() + else { + return; + }; + if let Err(e) = token.undeclare().res() { + log::error!("Failed to undeclare token: {e}"); + } +} + +/// The options for `zc_liveliness_declare_subscriber` +#[repr(C)] +pub struct zc_owned_liveliness_declare_subscriber_options_t { + _inner: u8, +} +/// The gravestone value for `zc_owned_liveliness_declare_subscriber_options_t` +#[no_mangle] +pub extern "C" fn zc_liveliness_subscriber_options_null( +) -> zc_owned_liveliness_declare_subscriber_options_t { + zc_owned_liveliness_declare_subscriber_options_t { _inner: 0 } +} +/// Returns `true` if the options are valid. +#[no_mangle] +pub extern "C" fn zc_liveliness_subscriber_options_check( + _opts: &zc_owned_liveliness_declare_subscriber_options_t, +) -> bool { + true +} +/// Destroys the options. +#[no_mangle] +pub extern "C" fn zc_liveliness_subscriber_options_drop( + opts: &mut zc_owned_liveliness_declare_subscriber_options_t, +) { + *opts = zc_liveliness_subscriber_options_null() +} + +/// Declares a subscriber on liveliness tokens that intersect `key`. +/// +/// Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. +#[no_mangle] +pub extern "C" fn zc_liveliness_declare_subscriber( + session: z_session_t, + key: z_keyexpr_t, + callback: &mut z_owned_closure_sample_t, + _options: Option<&zc_owned_liveliness_declare_subscriber_options_t>, +) -> z_owned_subscriber_t { + let Some(session) = session.upgrade() else { + log::error!("Failed to declare liveliness token: provided session was invalid"); + return z_owned_subscriber_t::null(); + }; + let callback = core::mem::replace(callback, z_owned_closure_sample_t::empty()); + match session + .liveliness() + .declare_subscriber(key) + .callback(move |sample| { + let payload = sample.payload.contiguous(); + let owner = match payload { + std::borrow::Cow::Owned(v) => zenoh::buffers::ZBuf::from(v), + _ => sample.payload.clone(), + }; + let sample = z_sample_t::new(&sample, &owner); + z_closure_sample_call(&callback, &sample) + }) + .res() + { + Ok(token) => z_owned_subscriber_t::new(token), + Err(e) => { + log::error!("Failed to subscribe to liveliness: {e}"); + z_owned_subscriber_t::null() + } + } +} + +/// The options for `zc_liveliness_declare_subscriber` +#[repr(C)] +pub struct zc_owned_liveliness_get_options_t { + timeout_ms: u32, +} +/// The gravestone value for `zc_owned_liveliness_get_options_t` +#[no_mangle] +pub extern "C" fn zc_liveliness_get_options_null() -> zc_owned_liveliness_get_options_t { + zc_owned_liveliness_get_options_t { timeout_ms: 0 } +} +/// The gravestone value for `zc_owned_liveliness_get_options_t` +#[no_mangle] +pub extern "C" fn zc_liveliness_get_options_default() -> zc_owned_liveliness_get_options_t { + zc_owned_liveliness_get_options_t { timeout_ms: 10000 } +} +/// Returns `true` if the options are valid. +#[no_mangle] +pub extern "C" fn zc_liveliness_get_options_check( + _opts: &zc_owned_liveliness_get_options_t, +) -> bool { + true +} +/// Destroys the options. +#[no_mangle] +pub extern "C" fn zc_liveliness_get_options_drop(opts: &mut zc_owned_liveliness_get_options_t) { + *opts = zc_liveliness_get_options_null() +} + +/// Queries liveliness tokens currently on the network with a key expression intersecting with `key`. +/// +/// Note that the same "value stealing" tricks apply as with a normal `z_get` +/// +/// Passing `NULL` as options is valid and equivalent to passing a pointer to the default options. +#[no_mangle] +pub extern "C" fn zc_liveliness_get( + session: z_session_t, + key: z_keyexpr_t, + callback: &mut z_owned_closure_reply_t, + options: Option<&zc_owned_liveliness_get_options_t>, +) -> i8 { + let Some(session) = session.upgrade() else { + log::error!("Failed to declare liveliness token: provided session was invalid"); + return i8::MIN; + }; + let callback = core::mem::replace(callback, z_owned_closure_reply_t::empty()); + let liveliness: Liveliness<'static> = session.liveliness(); + let mut builder = liveliness + .get(key) + .callback(move |response| z_closure_reply_call(&callback, &mut response.into())); + if let Some(options) = options { + builder = builder.timeout(core::time::Duration::from_millis(options.timeout_ms as u64)) + } + match builder.res() { + Ok(()) => 0, + Err(e) => { + log::error!("Failed to subscribe to liveliness: {e}"); + e.errno().get() + } + } +} diff --git a/src/put.rs b/src/put.rs index 3efd007b0..bb8699bac 100644 --- a/src/put.rs +++ b/src/put.rs @@ -1,3 +1,5 @@ +use crate::attachements::z_attachement_null; +use crate::attachements::z_attachement_t; // // Copyright (c) 2017, 2022 ZettaScale Technology. // @@ -112,6 +114,7 @@ pub struct z_put_options_t { pub encoding: z_encoding_t, pub congestion_control: z_congestion_control_t, pub priority: z_priority_t, + pub attachements: z_attachement_t, } /// Constructs the default value for :c:type:`z_put_options_t`. @@ -122,6 +125,7 @@ pub unsafe extern "C" fn z_put_options_default() -> z_put_options_t { encoding: z_encoding_default(), congestion_control: CongestionControl::default().into(), priority: Priority::default().into(), + attachements: z_attachement_null(), } }