From 079228fc8f8c9a4642e1680283d5df92f42fae7d Mon Sep 17 00:00:00 2001 From: Alexander Date: Wed, 19 Jun 2024 17:23:40 +0200 Subject: [PATCH] Rework channels (#435) * Rework channels * Fix modular build --- examples/unix/c11/z_get_channel.c | 11 +- examples/unix/c11/z_pull.c | 11 +- examples/unix/c11/z_queryable_channel.c | 11 +- examples/unix/c11/z_sub_channel.c | 12 +- include/zenoh-pico/api/handlers.h | 223 ++++++++++++++---------- include/zenoh-pico/api/macros.h | 127 ++++++++++---- src/api/handlers.c | 79 --------- tests/z_channels_test.c | 118 +++++++------ 8 files changed, 318 insertions(+), 274 deletions(-) delete mode 100644 src/api/handlers.c diff --git a/examples/unix/c11/z_get_channel.c b/examples/unix/c11/z_get_channel.c index b7cdff16d..f66c59dc5 100644 --- a/examples/unix/c11/z_get_channel.c +++ b/examples/unix/c11/z_get_channel.c @@ -96,16 +96,17 @@ int main(int argc, char **argv) { z_bytes_serialize_from_string(&payload, value); opts.payload = &payload; } - z_owned_reply_ring_channel_t channel; - z_reply_ring_channel_new(&channel, 1); - if (z_get(z_loan(s), z_loan(ke), "", z_move(channel.send), &opts) < 0) { + z_owned_closure_reply_t closure; + z_owned_ring_handler_reply_t handler; + z_ring_channel_reply_new(&closure, &handler, 1); + if (z_get(z_loan(s), z_loan(ke), "", z_move(closure), &opts) < 0) { printf("Unable to send query.\n"); return -1; } z_owned_reply_t reply; z_null(&reply); - for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) { + for (z_recv(z_loan(handler), &reply); z_check(reply); z_recv(z_loan(handler), &reply)) { if (z_reply_is_ok(z_loan(reply))) { const z_loaned_sample_t *sample = z_reply_ok(z_loan(reply)); z_owned_string_t keystr; @@ -121,7 +122,7 @@ int main(int argc, char **argv) { } } - z_drop(z_move(channel)); + z_drop(z_move(handler)); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_loan_mut(s)); diff --git a/examples/unix/c11/z_pull.c b/examples/unix/c11/z_pull.c index b301d6222..c4bb84cae 100644 --- a/examples/unix/c11/z_pull.c +++ b/examples/unix/c11/z_pull.c @@ -73,12 +73,13 @@ int main(int argc, char **argv) { } printf("Declaring Subscriber on '%s'...\n", keyexpr); - z_owned_sample_ring_channel_t channel; - z_sample_ring_channel_new(&channel, size); + z_owned_closure_sample_t closure; + z_owned_ring_handler_sample_t handler; + z_ring_channel_sample_new(&closure, &handler, size); z_owned_subscriber_t sub; z_view_keyexpr_t ke; z_view_keyexpr_from_string(&ke, keyexpr); - if (z_declare_subscriber(&sub, z_loan(s), z_loan(ke), z_move(channel.send), NULL) < 0) { + if (z_declare_subscriber(&sub, z_loan(s), z_loan(ke), z_move(closure), NULL) < 0) { printf("Unable to declare subscriber.\n"); return -1; } @@ -87,7 +88,7 @@ int main(int argc, char **argv) { z_owned_sample_t sample; z_null(&sample); while (true) { - for (z_call(channel.try_recv, &sample); z_check(sample); z_call(channel.try_recv, &sample)) { + for (z_try_recv(z_loan(handler), &sample); z_check(sample); z_try_recv(z_loan(handler), &sample)) { z_owned_string_t keystr; z_keyexpr_to_string(z_sample_keyexpr(z_loan(sample)), &keystr); z_owned_string_t value; @@ -103,7 +104,7 @@ int main(int argc, char **argv) { } z_undeclare_subscriber(z_move(sub)); - z_drop(z_move(channel)); + z_drop(z_move(handler)); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_loan_mut(s)); diff --git a/examples/unix/c11/z_queryable_channel.c b/examples/unix/c11/z_queryable_channel.c index 42c36f62e..ed81b42b5 100644 --- a/examples/unix/c11/z_queryable_channel.c +++ b/examples/unix/c11/z_queryable_channel.c @@ -88,17 +88,18 @@ int main(int argc, char **argv) { } printf("Creating Queryable on '%s'...\n", keyexpr); - z_owned_query_ring_channel_t channel; - z_query_ring_channel_new(&channel, 10); + z_owned_closure_query_t closure; + z_owned_ring_handler_query_t handler; + z_ring_channel_query_new(&closure, &handler, 10); z_owned_queryable_t qable; - if (z_declare_queryable(&qable, z_loan(s), z_loan(ke), z_move(channel.send), NULL) < 0) { + if (z_declare_queryable(&qable, z_loan(s), z_loan(ke), z_move(closure), NULL) < 0) { printf("Unable to create queryable.\n"); return -1; } z_owned_query_t query; z_null(&query); - for (z_call(channel.recv, &query); z_check(query); z_call(channel.recv, &query)) { + for (z_recv(z_loan(handler), &query); z_check(query); z_recv(z_loan(handler), &query)) { const z_loaned_query_t *q = z_loan(query); z_owned_string_t keystr; z_keyexpr_to_string(z_query_keyexpr(q), &keystr); @@ -125,7 +126,7 @@ int main(int argc, char **argv) { z_drop(z_move(query)); } - z_drop(z_move(channel)); + z_drop(z_move(handler)); z_undeclare_queryable(z_move(qable)); // Stop read and lease tasks for zenoh-pico diff --git a/examples/unix/c11/z_sub_channel.c b/examples/unix/c11/z_sub_channel.c index 8a00df929..375c66c2c 100644 --- a/examples/unix/c11/z_sub_channel.c +++ b/examples/unix/c11/z_sub_channel.c @@ -65,19 +65,21 @@ int main(int argc, char **argv) { } printf("Declaring Subscriber on '%s'...\n", keyexpr); - z_owned_sample_fifo_channel_t channel; - z_sample_fifo_channel_new(&channel, 3); + z_owned_closure_sample_t closure; + z_owned_fifo_handler_sample_t handler; + z_fifo_channel_sample_new(&closure, &handler, 3); + z_owned_subscriber_t sub; z_view_keyexpr_t ke; z_view_keyexpr_from_string(&ke, keyexpr); - if (z_declare_subscriber(&sub, z_loan(s), z_loan(ke), z_move(channel.send), NULL) < 0) { + if (z_declare_subscriber(&sub, z_loan(s), z_loan(ke), z_move(closure), NULL) < 0) { printf("Unable to declare subscriber.\n"); return -1; } z_owned_sample_t sample; z_null(&sample); - for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { + for (z_recv(z_loan(handler), &sample); z_check(sample); z_recv(z_loan(handler), &sample)) { z_owned_string_t keystr; z_keyexpr_to_string(z_sample_keyexpr(z_loan(sample)), &keystr); z_owned_string_t value; @@ -90,7 +92,7 @@ int main(int argc, char **argv) { } z_undeclare_subscriber(z_move(sub)); - z_drop(z_move(channel)); + z_drop(z_move(handler)); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_loan_mut(s)); diff --git a/include/zenoh-pico/api/handlers.h b/include/zenoh-pico/api/handlers.h index 99d4d15b9..681d38f4b 100644 --- a/include/zenoh-pico/api/handlers.h +++ b/include/zenoh-pico/api/handlers.h @@ -23,102 +23,147 @@ #include "zenoh-pico/collections/ring_mt.h" #include "zenoh-pico/utils/logging.h" -// -- Samples handler -void _z_owned_sample_move(z_owned_sample_t *dst, z_owned_sample_t *src); -z_owned_sample_t *_z_sample_to_owned_ptr(const z_loaned_sample_t *src); - -// -- Queries handler -void _z_owned_query_move(z_owned_query_t *dst, z_owned_query_t *src); -z_owned_query_t *_z_query_to_owned_ptr(const z_loaned_query_t *src); - -// -- Reply handler -void _z_owned_reply_move(z_owned_reply_t *dst, z_owned_reply_t *src); -z_owned_reply_t *_z_reply_to_owned_ptr(const z_loaned_reply_t *src); - // -- Channel -#define _Z_CHANNEL_DEFINE(name, send_closure_name, recv_closure_name, send_type, recv_type, collection_type, \ - collection_new_f, collection_free_f, collection_push_f, collection_pull_f, \ - collection_try_pull_f, elem_move_f, elem_convert_f, elem_drop_f) \ - typedef struct { \ - z_owned_##send_closure_name##_t send; \ - z_owned_##recv_closure_name##_t recv; \ - z_owned_##recv_closure_name##_t try_recv; \ - collection_type *collection; \ - } z_owned_##name##_t; \ - \ - static inline void _z_##name##_elem_free(void **elem) { \ - elem_drop_f((recv_type *)*elem); \ - zp_free(*elem); \ - *elem = NULL; \ - } \ - static inline void _z_##name##_elem_move(void *dst, void *src) { \ - elem_move_f((recv_type *)dst, (recv_type *)src); \ - } \ - static inline void _z_##name##_send(send_type *elem, void *context) { \ - void *internal_elem = elem_convert_f(elem); \ - if (internal_elem == NULL) { \ - return; \ - } \ - int8_t ret = collection_push_f(internal_elem, context, _z_##name##_elem_free); \ - if (ret != _Z_RES_OK) { \ - _Z_ERROR("%s failed: %i", #collection_push_f, ret); \ - } \ - } \ - static inline void _z_##name##_recv(recv_type *elem, void *context) { \ - int8_t ret = collection_pull_f(elem, context, _z_##name##_elem_move); \ - if (ret != _Z_RES_OK) { \ - _Z_ERROR("%s failed: %i", #collection_pull_f, ret); \ - } \ - } \ - static inline void _z_##name##_try_recv(recv_type *elem, void *context) { \ - int8_t ret = collection_try_pull_f(elem, context, _z_##name##_elem_move); \ - if (ret != _Z_RES_OK) { \ - _Z_ERROR("%s failed: %i", #collection_try_pull_f, ret); \ - } \ - } \ - \ - static inline int8_t z_##name##_new(z_owned_##name##_t *channel, size_t capacity) { \ - channel->collection = collection_new_f(capacity); \ - z_##send_closure_name(&channel->send, _z_##name##_send, NULL, channel->collection); \ - z_##recv_closure_name(&channel->recv, _z_##name##_recv, NULL, channel->collection); \ - z_##recv_closure_name(&channel->try_recv, _z_##name##_try_recv, NULL, channel->collection); \ - return _Z_RES_OK; \ - } \ - static inline z_owned_##name##_t *z_##name##_move(z_owned_##name##_t *val) { return val; } \ - static inline void z_##name##_drop(z_owned_##name##_t *channel) { \ - collection_free_f(channel->collection, _z_##name##_elem_free); \ - z_##send_closure_name##_drop(&channel->send); \ - z_##recv_closure_name##_drop(&channel->recv); \ +#define _Z_CHANNEL_DEFINE_IMPL(handler_type, handler_name, handler_new_f_name, callback_type, callback_new_f, \ + collection_type, collection_new_f, collection_free_f, collection_push_f, \ + collection_pull_f, collection_try_pull_f, elem_owned_type, elem_loaned_type, \ + elem_copy_f, elem_drop_f) \ + typedef struct { \ + collection_type *collection; \ + } handler_type; \ + \ + _Z_OWNED_TYPE_PTR(handler_type, handler_name) \ + _Z_LOANED_TYPE(handler_type, handler_name) \ + \ + static inline void _z_##handler_name##_elem_free(void **elem) { \ + elem_drop_f((elem_owned_type *)*elem); \ + zp_free(*elem); \ + *elem = NULL; \ + } \ + static inline void _z_##handler_name##_elem_move(void *dst, void *src) { \ + memcpy(dst, src, sizeof(elem_owned_type)); \ + zp_free(src); \ + } \ + static inline void _z_##handler_name##_send(const elem_loaned_type *elem, void *context) { \ + elem_owned_type *internal_elem = (elem_owned_type *)zp_malloc(sizeof(elem_owned_type)); \ + if (internal_elem == NULL) { \ + _Z_ERROR("Out of memory"); \ + return; \ + } \ + if (elem == NULL) { \ + internal_elem->_rc.in = NULL; \ + } else { \ + elem_copy_f(&internal_elem->_rc, elem); \ + } \ + int8_t ret = collection_push_f(internal_elem, context, _z_##handler_name##_elem_free); \ + if (ret != _Z_RES_OK) { \ + _Z_ERROR("%s failed: %i", #collection_push_f, ret); \ + } \ + } \ + static inline void z_##handler_name##_recv(const z_loaned_##handler_name##_t *handler, elem_owned_type *elem) { \ + int8_t ret = collection_pull_f(elem, (collection_type *)handler->collection, _z_##handler_name##_elem_move); \ + if (ret != _Z_RES_OK) { \ + _Z_ERROR("%s failed: %i", #collection_pull_f, ret); \ + } \ + } \ + static inline void z_##handler_name##_try_recv(const z_loaned_##handler_name##_t *handler, \ + elem_owned_type *elem) { \ + int8_t ret = \ + collection_try_pull_f(elem, (collection_type *)handler->collection, _z_##handler_name##_elem_move); \ + if (ret != _Z_RES_OK) { \ + _Z_ERROR("%s failed: %i", #collection_try_pull_f, ret); \ + } \ + } \ + \ + static inline void _z_##handler_name##_free(handler_type **handler) { \ + handler_type *ptr = *handler; \ + if (ptr != NULL) { \ + collection_free_f(ptr->collection, _z_##handler_name##_elem_free); \ + z_free(ptr); \ + *handler = NULL; \ + } \ + } \ + static inline void _z_##handler_name##_copy(void *dst, const void *src) { \ + (void)(dst); \ + (void)(src); \ + } \ + \ + _Z_OWNED_FUNCTIONS_PTR_IMPL(handler_type, handler_name, _z_##handler_name##_copy, _z_##handler_name##_free) \ + \ + static inline int8_t handler_new_f_name(callback_type *callback, z_owned_##handler_name##_t *handler, \ + size_t capacity) { \ + handler->_val = (handler_type *)z_malloc(sizeof(handler_type)); \ + handler->_val->collection = collection_new_f(capacity); \ + callback_new_f(callback, _z_##handler_name##_send, NULL, handler->_val->collection); \ + return _Z_RES_OK; \ } -// z_owned_sample_ring_channel_t -_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, const z_loaned_sample_t, z_owned_sample_t, - _z_ring_mt_t, _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull, - _z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop) +#define _Z_CHANNEL_DEFINE(item_name, kind_name) \ + _Z_CHANNEL_DEFINE_IMPL(/* handler_type */ _z_##kind_name##_handler_##item_name##_t, \ + /* handler_name */ kind_name##_handler_##item_name, \ + /* handler_new_f_name */ z_##kind_name##_channel_##item_name##_new, \ + /* callback_type */ z_owned_closure_##item_name##_t, \ + /* callback_new_f */ z_closure_##item_name, \ + /* collection_type */ _z_##kind_name##_mt_t, \ + /* collection_new_f */ _z_##kind_name##_mt_new, \ + /* collection_free_f */ _z_##kind_name##_mt_free, \ + /* collection_push_f */ _z_##kind_name##_mt_push, \ + /* collection_pull_f */ _z_##kind_name##_mt_pull, \ + /* collection_try_pull_f */ _z_##kind_name##_mt_try_pull, \ + /* elem_owned_type */ z_owned_##item_name##_t, \ + /* elem_loaned_type */ z_loaned_##item_name##_t, \ + /* elem_copy_f */ _z_##item_name##_rc_copy, \ + /* elem_drop_f */ z_##item_name##_drop) + +#define _Z_CHANNEL_DEFINE_DUMMY(item_name, kind_name) \ + typedef struct { \ + } z_owned_##kind_name##_handler_##item_name##_t; \ + typedef struct { \ + } z_loaned_##kind_name##_handler_##item_name##_t; \ + void *z_##kind_name##_handler_##item_name##_loan(); \ + void *z_##kind_name##_handler_##item_name##_move(); \ + void *z_##kind_name##_handler_##item_name##_drop(); \ + void *z_##kind_name##_handler_##item_name##_recv(); \ + void *z_##kind_name##_handler_##item_name##_try_recv(); + +// This macro defines: +// z_ring_channel_sample_new() +// z_owned_ring_handler_sample_t/z_loaned_ring_handler_sample_t +_Z_CHANNEL_DEFINE(sample, ring) -// z_owned_sample_fifo_channel_t -_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, const z_loaned_sample_t, z_owned_sample_t, - _z_fifo_mt_t, _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull, - _z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop) +// This macro defines: +// z_fifo_channel_sample_new() +// z_owned_fifo_handler_sample_t/z_loaned_fifo_handler_sample_t +_Z_CHANNEL_DEFINE(sample, fifo) -// z_owned_query_ring_channel_t -_Z_CHANNEL_DEFINE(query_ring_channel, closure_query, closure_owned_query, const z_loaned_query_t, z_owned_query_t, - _z_ring_mt_t, _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull, - _z_owned_query_move, _z_query_to_owned_ptr, z_query_drop) +#if Z_FEATURE_QUERYABLE == 1 +// This macro defines: +// z_ring_channel_query_new() +// z_owned_ring_handler_query_t/z_loaned_ring_handler_query_t +_Z_CHANNEL_DEFINE(query, ring) -// z_owned_query_fifo_channel_t -_Z_CHANNEL_DEFINE(query_fifo_channel, closure_query, closure_owned_query, const z_loaned_query_t, z_owned_query_t, - _z_fifo_mt_t, _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull, - _z_owned_query_move, _z_query_to_owned_ptr, z_query_drop) +// This macro defines: +// z_fifo_channel_query_new() +// z_owned_fifo_handler_query_t/z_loaned_fifo_handler_query_t +_Z_CHANNEL_DEFINE(query, fifo) +#else // Z_FEATURE_QUERYABLE +_Z_CHANNEL_DEFINE_DUMMY(query, ring) +_Z_CHANNEL_DEFINE_DUMMY(query, fifo) +#endif // Z_FEATURE_QUERYABLE -// z_owned_reply_ring_channel_t -_Z_CHANNEL_DEFINE(reply_ring_channel, closure_reply, closure_owned_reply, const z_loaned_reply_t, z_owned_reply_t, - _z_ring_mt_t, _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull, - _z_owned_reply_move, _z_reply_to_owned_ptr, z_reply_drop) +#if Z_FEATURE_QUERY == 1 +// This macro defines: +// z_ring_channel_reply_new() +// z_owned_ring_handler_reply_t/z_loaned_ring_handler_reply_t +_Z_CHANNEL_DEFINE(reply, ring) -// z_owned_reply_fifo_channel_t -_Z_CHANNEL_DEFINE(reply_fifo_channel, closure_reply, closure_owned_reply, const z_loaned_reply_t, z_owned_reply_t, - _z_fifo_mt_t, _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull, - _z_owned_reply_move, _z_reply_to_owned_ptr, z_reply_drop) +// This macro defines: +// z_fifo_channel_reply_new() +// z_owned_fifo_handler_reply_t/z_loaned_fifo_handler_reply_t +_Z_CHANNEL_DEFINE(reply, fifo) +#else // Z_FEATURE_QUERY +_Z_CHANNEL_DEFINE_DUMMY(reply, ring) +_Z_CHANNEL_DEFINE_DUMMY(reply, fifo) +#endif // Z_FEATURE_QUERY #endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H diff --git a/include/zenoh-pico/api/macros.h b/include/zenoh-pico/api/macros.h index d61e25f83..aad0dd4e9 100644 --- a/include/zenoh-pico/api/macros.h +++ b/include/zenoh-pico/api/macros.h @@ -34,25 +34,31 @@ */ #define z_loan(x) _Generic((x), \ - z_owned_keyexpr_t : z_keyexpr_loan, \ - z_view_keyexpr_t : z_view_keyexpr_loan, \ - z_owned_config_t : z_config_loan, \ - z_owned_scouting_config_t : z_scouting_config_loan, \ - z_owned_session_t : z_session_loan, \ - z_owned_subscriber_t : z_subscriber_loan, \ - z_owned_publisher_t : z_publisher_loan, \ - z_owned_queryable_t : z_queryable_loan, \ - z_owned_reply_t : z_reply_loan, \ - z_owned_hello_t : z_hello_loan, \ - z_owned_string_t : z_string_loan, \ - z_view_string_t : z_view_string_loan, \ - z_owned_string_array_t : z_string_array_loan, \ - z_owned_sample_t : z_sample_loan, \ - z_owned_query_t : z_query_loan, \ - z_owned_slice_t : z_slice_loan, \ - z_owned_bytes_t : z_bytes_loan, \ - z_owned_encoding_t : z_encoding_loan, \ - z_owned_reply_err_t : z_reply_err_loan \ + z_owned_keyexpr_t : z_keyexpr_loan, \ + z_view_keyexpr_t : z_view_keyexpr_loan, \ + z_owned_config_t : z_config_loan, \ + z_owned_scouting_config_t : z_scouting_config_loan, \ + z_owned_session_t : z_session_loan, \ + z_owned_subscriber_t : z_subscriber_loan, \ + z_owned_publisher_t : z_publisher_loan, \ + z_owned_queryable_t : z_queryable_loan, \ + z_owned_reply_t : z_reply_loan, \ + z_owned_hello_t : z_hello_loan, \ + z_owned_string_t : z_string_loan, \ + z_view_string_t : z_view_string_loan, \ + z_owned_string_array_t : z_string_array_loan, \ + z_owned_sample_t : z_sample_loan, \ + z_owned_query_t : z_query_loan, \ + z_owned_slice_t : z_slice_loan, \ + z_owned_bytes_t : z_bytes_loan, \ + z_owned_encoding_t : z_encoding_loan, \ + z_owned_fifo_handler_query_t : z_fifo_handler_query_loan, \ + z_owned_fifo_handler_reply_t : z_fifo_handler_reply_loan, \ + z_owned_fifo_handler_sample_t : z_fifo_handler_sample_loan, \ + z_owned_ring_handler_query_t : z_ring_handler_query_loan, \ + z_owned_ring_handler_reply_t : z_ring_handler_reply_loan, \ + z_owned_ring_handler_sample_t : z_ring_handler_sample_loan, \ + z_owned_reply_err_t : z_reply_err_loan \ )(&x) #define z_loan_mut(x) _Generic((x), \ @@ -102,12 +108,12 @@ z_owned_closure_reply_t * : z_closure_reply_drop, \ z_owned_closure_hello_t * : z_closure_hello_drop, \ z_owned_closure_zid_t * : z_closure_zid_drop, \ - z_owned_sample_ring_channel_t * : z_sample_ring_channel_drop, \ - z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_drop, \ - z_owned_query_ring_channel_t * : z_query_ring_channel_drop, \ - z_owned_query_fifo_channel_t * : z_query_fifo_channel_drop, \ - z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \ - z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop, \ + z_owned_fifo_handler_query_t * : z_fifo_handler_query_drop, \ + z_owned_fifo_handler_reply_t * : z_fifo_handler_reply_drop, \ + z_owned_fifo_handler_sample_t * : z_fifo_handler_sample_drop, \ + z_owned_ring_handler_query_t * : z_ring_handler_query_drop, \ + z_owned_ring_handler_reply_t * : z_ring_handler_reply_drop, \ + z_owned_ring_handler_sample_t * : z_ring_handler_sample_drop, \ z_owned_reply_err_t : z_reply_err_drop \ )(x) @@ -159,6 +165,26 @@ z_owned_closure_owned_reply_t : z_closure_owned_reply_call \ ) (&x, __VA_ARGS__) +#define z_try_recv(x, ...) \ + _Generic((x), \ + const z_loaned_fifo_handler_query_t* : z_fifo_handler_query_try_recv, \ + const z_loaned_fifo_handler_reply_t* : z_fifo_handler_reply_try_recv, \ + const z_loaned_fifo_handler_sample_t* : z_fifo_handler_sample_try_recv, \ + const z_loaned_ring_handler_query_t* : z_ring_handler_query_try_recv, \ + const z_loaned_ring_handler_reply_t* : z_ring_handler_reply_try_recv, \ + const z_loaned_ring_handler_sample_t* : z_ring_handler_sample_try_recv \ + )(x, __VA_ARGS__) + +#define z_recv(x, ...) \ + _Generic((x), \ + const z_loaned_fifo_handler_query_t* : z_fifo_handler_query_recv, \ + const z_loaned_fifo_handler_reply_t* : z_fifo_handler_reply_recv, \ + const z_loaned_fifo_handler_sample_t* : z_fifo_handler_sample_recv, \ + const z_loaned_ring_handler_query_t* : z_ring_handler_query_recv, \ + const z_loaned_ring_handler_reply_t* : z_ring_handler_reply_recv, \ + const z_loaned_ring_handler_sample_t* : z_ring_handler_sample_recv \ + )(x, __VA_ARGS__) + /** * Defines a generic function for moving any of the ``z_owned_X_t`` types. * @@ -192,12 +218,12 @@ z_owned_slice_t : z_slice_move, \ z_owned_bytes_t : z_bytes_move, \ z_owned_encoding_t : z_encoding_move, \ - z_owned_sample_ring_channel_t : z_sample_ring_channel_move, \ - z_owned_sample_fifo_channel_t : z_sample_fifo_channel_move, \ - z_owned_query_ring_channel_t : z_query_ring_channel_move, \ - z_owned_query_fifo_channel_t : z_query_fifo_channel_move, \ - z_owned_reply_ring_channel_t : z_reply_ring_channel_move, \ - z_owned_reply_fifo_channel_t : z_reply_fifo_channel_move, \ + z_owned_ring_handler_query_t : z_ring_handler_query_move, \ + z_owned_ring_handler_reply_t : z_ring_handler_reply_move, \ + z_owned_ring_handler_sample_t : z_ring_handler_sample_move, \ + z_owned_fifo_handler_query_t : z_fifo_handler_query_move, \ + z_owned_fifo_handler_reply_t : z_fifo_handler_reply_move, \ + z_owned_fifo_handler_sample_t : z_fifo_handler_sample_move, \ z_owned_reply_err_t : z_reply_err_move \ )(&x) @@ -467,6 +493,45 @@ inline void z_closure( closure->call = call; }; +inline bool z_try_recv(const z_loaned_fifo_handler_query_t* this_, z_owned_query_t* query) { + return z_fifo_handler_query_try_recv(this_, query); +}; +inline bool z_try_recv(const z_loaned_fifo_handler_reply_t* this_, z_owned_reply_t* reply) { + return z_fifo_handler_reply_try_recv(this_, reply); +}; +inline bool z_try_recv(const z_loaned_fifo_handler_sample_t* this_, z_owned_sample_t* sample) { + return z_fifo_handler_sample_try_recv(this_, sample); +}; +inline bool z_try_recv(const z_loaned_ring_handler_query_t* this_, z_owned_query_t* query) { + return z_ring_handler_query_try_recv(this_, query); +}; +inline bool z_try_recv(const z_loaned_ring_handler_reply_t* this_, z_owned_reply_t* reply) { + return z_ring_handler_reply_try_recv(this_, reply); +}; +inline bool z_try_recv(const z_loaned_ring_handler_sample_t* this_, z_owned_sample_t* sample) { + return z_ring_handler_sample_try_recv(this_, sample); +}; + + +inline bool z_recv(const z_loaned_fifo_handler_query_t* this_, z_owned_query_t* query) { + return z_fifo_handler_query_recv(this_, query); +}; +inline bool z_recv(const z_loaned_fifo_handler_reply_t* this_, z_owned_reply_t* reply) { + return z_fifo_handler_reply_recv(this_, reply); +}; +inline bool z_recv(const z_loaned_fifo_handler_sample_t* this_, z_owned_sample_t* sample) { + return z_fifo_handler_sample_recv(this_, sample); +}; +inline bool z_recv(const z_loaned_ring_handler_query_t* this_, z_owned_query_t* query) { + return z_ring_handler_query_recv(this_, query); +}; +inline bool z_recv(const z_loaned_ring_handler_reply_t* this_, z_owned_reply_t* reply) { + return z_ring_handler_reply_recv(this_, reply); +}; +inline bool z_recv(const z_loaned_ring_handler_sample_t* this_, z_owned_sample_t* sample) { + return z_ring_handler_sample_recv(this_, sample); +}; + // clang-format on inline z_owned_bytes_t* z_move(z_owned_bytes_t& x) { return z_bytes_move(&x); }; diff --git a/src/api/handlers.c b/src/api/handlers.c deleted file mode 100644 index c136e97b0..000000000 --- a/src/api/handlers.c +++ /dev/null @@ -1,79 +0,0 @@ -// -// Copyright (c) 2024 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#include "zenoh-pico/api/handlers.h" - -#include "zenoh-pico/net/sample.h" -#include "zenoh-pico/system/platform.h" - -// -- Sample -void _z_owned_sample_move(z_owned_sample_t *dst, z_owned_sample_t *src) { - memcpy(dst, src, sizeof(z_owned_sample_t)); - zp_free(src); -} - -z_owned_sample_t *_z_sample_to_owned_ptr(const z_loaned_sample_t *src) { - z_owned_sample_t *dst = (z_owned_sample_t *)zp_malloc(sizeof(z_owned_sample_t)); - if (dst == NULL) { - return NULL; - } - if (src == NULL) { - dst->_rc.in = NULL; - } else { - _z_sample_rc_copy(&dst->_rc, src); - } - return dst; -} - -#if Z_FEATURE_QUERYABLE == 1 -// -- Query -void _z_owned_query_move(z_owned_query_t *dst, z_owned_query_t *src) { - memcpy(dst, src, sizeof(z_owned_query_t)); - zp_free(src); -} - -z_owned_query_t *_z_query_to_owned_ptr(const z_loaned_query_t *src) { - z_owned_query_t *dst = (z_owned_query_t *)zp_malloc(sizeof(z_owned_query_t)); - if (dst == NULL) { - return NULL; - } - if (src == NULL) { - dst->_rc.in = NULL; - } else { - _z_query_rc_copy(&dst->_rc, src); - } - return dst; -} -#endif // Z_FEATURE_QUERYABLE - -#if Z_FEATURE_QUERY == 1 -// -- Reply -void _z_owned_reply_move(z_owned_reply_t *dst, z_owned_reply_t *src) { - memcpy(dst, src, sizeof(z_owned_reply_t)); - zp_free(src); -} - -z_owned_reply_t *_z_reply_to_owned_ptr(const z_loaned_reply_t *src) { - z_owned_reply_t *dst = (z_owned_reply_t *)zp_malloc(sizeof(z_owned_reply_t)); - if (dst == NULL) { - return NULL; - } - if (src == NULL) { - dst->_rc.in = NULL; - } else { - _z_reply_rc_copy(&dst->_rc, src); - } - return dst; -} -#endif // Z_FEATURE_QUERY diff --git a/tests/z_channels_test.c b/tests/z_channels_test.c index b9954cdcb..ef123ad0d 100644 --- a/tests/z_channels_test.c +++ b/tests/z_channels_test.c @@ -23,7 +23,7 @@ #undef NDEBUG #include -#define SEND(channel, v) \ +#define SEND(closure, v) \ do { \ _z_sample_t s = {.keyexpr = _z_rname("key"), \ .payload = {._slice = {.start = (const uint8_t *)v, .len = strlen(v)}}, \ @@ -32,14 +32,14 @@ .kind = 0, \ .qos = {0}}; \ z_loaned_sample_t sample = _z_sample_rc_new_from_val(s); \ - z_call(channel.send, &sample); \ + z_call(closure, &sample); \ } while (0); -#define _RECV(channel, method, buf) \ +#define _RECV(handler, method, buf) \ do { \ z_owned_sample_t sample; \ z_sample_null(&sample); \ - z_call(channel.method, &sample); \ + method(z_loan(handler), &sample); \ if (z_check(sample)) { \ z_owned_slice_t value; \ z_bytes_deserialize_into_slice(z_sample_payload(z_loan(sample)), &value); \ @@ -53,111 +53,119 @@ } \ } while (0); -#define RECV(channel, buf) _RECV(channel, recv, buf) -#define TRY_RECV(channel, buf) _RECV(channel, try_recv, buf) +#define RECV(handler, buf) _RECV(handler, z_recv, buf) +#define TRY_RECV(handler, buf) _RECV(handler, z_try_recv, buf) void sample_fifo_channel_test(void) { - z_owned_sample_fifo_channel_t channel; - z_sample_fifo_channel_new(&channel, 10); + z_owned_closure_sample_t closure; + z_owned_fifo_handler_sample_t handler; + z_fifo_channel_sample_new(&closure, &handler, 10); - SEND(channel, "v1") - SEND(channel, "v22") - SEND(channel, "v333") - SEND(channel, "v4444") + SEND(closure, "v1") + SEND(closure, "v22") + SEND(closure, "v333") + SEND(closure, "v4444") char buf[100]; - RECV(channel, buf) + RECV(handler, buf) assert(strcmp(buf, "v1") == 0); - RECV(channel, buf) + RECV(handler, buf) assert(strcmp(buf, "v22") == 0); - RECV(channel, buf) + RECV(handler, buf) assert(strcmp(buf, "v333") == 0); - RECV(channel, buf) + RECV(handler, buf) assert(strcmp(buf, "v4444") == 0); - z_drop(z_move(channel)); + z_drop(z_move(handler)); + z_drop(z_move(closure)); } void sample_fifo_channel_test_try_recv(void) { - z_owned_sample_fifo_channel_t channel; - z_sample_fifo_channel_new(&channel, 10); + z_owned_closure_sample_t closure; + z_owned_fifo_handler_sample_t handler; + z_fifo_channel_sample_new(&closure, &handler, 10); char buf[100]; - TRY_RECV(channel, buf) + TRY_RECV(handler, buf) assert(strcmp(buf, "") == 0); - SEND(channel, "v1") - SEND(channel, "v22") - SEND(channel, "v333") - SEND(channel, "v4444") + SEND(closure, "v1") + SEND(closure, "v22") + SEND(closure, "v333") + SEND(closure, "v4444") - TRY_RECV(channel, buf) + TRY_RECV(handler, buf) assert(strcmp(buf, "v1") == 0); - TRY_RECV(channel, buf) + TRY_RECV(handler, buf) assert(strcmp(buf, "v22") == 0); - TRY_RECV(channel, buf) + TRY_RECV(handler, buf) assert(strcmp(buf, "v333") == 0); - TRY_RECV(channel, buf) + TRY_RECV(handler, buf) assert(strcmp(buf, "v4444") == 0); - TRY_RECV(channel, buf) + TRY_RECV(handler, buf) assert(strcmp(buf, "") == 0); - z_drop(z_move(channel)); + z_drop(z_move(handler)); + z_drop(z_move(closure)); } void sample_ring_channel_test_in_size(void) { - z_owned_sample_ring_channel_t channel; - z_sample_ring_channel_new(&channel, 10); + z_owned_closure_sample_t closure; + z_owned_ring_handler_sample_t handler; + z_ring_channel_sample_new(&closure, &handler, 10); char buf[100]; - TRY_RECV(channel, buf) + TRY_RECV(handler, buf) assert(strcmp(buf, "") == 0); - SEND(channel, "v1") - SEND(channel, "v22") - SEND(channel, "v333") - SEND(channel, "v4444") + SEND(closure, "v1") + SEND(closure, "v22") + SEND(closure, "v333") + SEND(closure, "v4444") - RECV(channel, buf) + RECV(handler, buf) assert(strcmp(buf, "v1") == 0); - RECV(channel, buf) + RECV(handler, buf) assert(strcmp(buf, "v22") == 0); - RECV(channel, buf) + RECV(handler, buf) assert(strcmp(buf, "v333") == 0); - RECV(channel, buf) + RECV(handler, buf) assert(strcmp(buf, "v4444") == 0); - TRY_RECV(channel, buf) + TRY_RECV(handler, buf) assert(strcmp(buf, "") == 0); - z_drop(z_move(channel)); + z_drop(z_move(handler)); + z_drop(z_move(closure)); } void sample_ring_channel_test_over_size(void) { - z_owned_sample_ring_channel_t channel; - z_sample_ring_channel_new(&channel, 3); + z_owned_closure_sample_t closure; + z_owned_ring_handler_sample_t handler; + z_ring_channel_sample_new(&closure, &handler, 3); char buf[100]; - TRY_RECV(channel, buf) + TRY_RECV(handler, buf) assert(strcmp(buf, "") == 0); - SEND(channel, "v1") - SEND(channel, "v22") - SEND(channel, "v333") - SEND(channel, "v4444") + SEND(closure, "v1") + SEND(closure, "v22") + SEND(closure, "v333") + SEND(closure, "v4444") - RECV(channel, buf) + RECV(handler, buf) assert(strcmp(buf, "v22") == 0); - RECV(channel, buf) + RECV(handler, buf) assert(strcmp(buf, "v333") == 0); - RECV(channel, buf) + RECV(handler, buf) assert(strcmp(buf, "v4444") == 0); - TRY_RECV(channel, buf) + TRY_RECV(handler, buf) assert(strcmp(buf, "") == 0); - z_drop(z_move(channel)); + z_drop(z_move(handler)); + z_drop(z_move(closure)); } int main(void) {