From b0ae036997e63a54b5f319a0d78266bd9831f30e Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Fri, 22 Mar 2024 16:08:56 +0100 Subject: [PATCH] Store declare information on te receiver side (#378) * feat: store declare info for interests * fix: remove subscriber mode arg * refactor: missing session_unlock * fix: clear interest data on session close --- include/zenoh-pico/net/session.h | 1 + include/zenoh-pico/session/interest.h | 3 +- include/zenoh-pico/session/session.h | 28 +++++-- src/session/interest.c | 102 ++++++++++++++++++++++++-- src/session/queryable.c | 9 +-- src/session/rx.c | 4 +- src/session/subscription.c | 8 +- src/session/utils.c | 2 + 8 files changed, 128 insertions(+), 29 deletions(-) diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index 822bbf094..8de397141 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -65,6 +65,7 @@ typedef struct _z_session_t { // Session interests #if Z_FEATURE_INTEREST == 1 _z_session_interest_rc_list_t *_local_interests; + _z_declare_data_list_t *_remote_declares; #endif } _z_session_t; diff --git a/include/zenoh-pico/session/interest.h b/include/zenoh-pico/session/interest.h index 116ac9385..a4eeb43ba 100644 --- a/include/zenoh-pico/session/interest.h +++ b/include/zenoh-pico/session/interest.h @@ -23,10 +23,11 @@ _z_session_interest_rc_t *_z_get_interest_by_id(_z_session_t *zn, const _z_zint_t id); _z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_interest_t *intr); void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr); -void _z_flush_interest(_z_session_t *zn); #endif // Z_FEATURE_INTEREST == 1 +void _z_flush_interest(_z_session_t *zn); int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl); +int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t *decl); int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id); int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id); int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags); diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index 754f97ce0..5f2980d86 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -190,12 +190,12 @@ int8_t _z_session_generate_zid(_z_id_t *bs, uint8_t size); typedef enum { _Z_INTEREST_MSG_TYPE_FINAL = 0, - _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER, - _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE, - _Z_INTEREST_MSG_TYPE_DECL_TOKEN, - _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER, - _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE, - _Z_INTEREST_MSG_TYPE_UNDECL_TOKEN, + _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER = 1, + _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE = 2, + _Z_INTEREST_MSG_TYPE_DECL_TOKEN = 3, + _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER = 4, + _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE = 5, + _Z_INTEREST_MSG_TYPE_UNDECL_TOKEN = 6, } _z_interest_msg_type_t; typedef struct _z_interest_msg_t { @@ -225,4 +225,20 @@ _Z_ELEM_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t, _z_noop_size, _ _z_noop_copy) _Z_LIST_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t) +typedef enum { + _Z_DECLARE_TYPE_SUBSCRIBER = 0, + _Z_DECLARE_TYPE_QUERYABLE = 1, + _Z_DECLARE_TYPE_TOKEN = 2, +} _z_declare_type_t; + +typedef struct { + _z_keyexpr_t _key; + uint32_t _id; + uint8_t _type; +} _z_declare_data_t; + +void _z_declare_data_clear(_z_declare_data_t *data); +_Z_ELEM_DEFINE(_z_declare_data, _z_declare_data_t, _z_noop_size, _z_declare_data_clear, _z_noop_copy) +_Z_LIST_DEFINE(_z_declare_data, _z_declare_data_t) + #endif /* INCLUDE_ZENOH_PICO_SESSION_SESSION_H */ diff --git a/src/session/interest.c b/src/session/interest.c index 4003f7bc6..499a917d6 100644 --- a/src/session/interest.c +++ b/src/session/interest.c @@ -29,6 +29,12 @@ #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_INTEREST == 1 +void _z_declare_data_clear(_z_declare_data_t *data) { _z_keyexpr_clear(&data->_key); } + +_Bool _z_declare_data_eq(const _z_declare_data_t *left, const _z_declare_data_t *right) { + return ((left->_id == right->_id) && (left->_type == right->_type)); +} + _Bool _z_session_interest_eq(const _z_session_interest_t *one, const _z_session_interest_t *two) { return one->_id == two->_id; } @@ -58,7 +64,7 @@ static _z_session_interest_rc_list_t *__z_get_interest_by_key_and_flags(_z_sessi _z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs); if ((intr->in->val._flags & flags) != 0) { if (_z_keyexpr_intersects(intr->in->val._key._suffix, strlen(intr->in->val._key._suffix), key._suffix, - strlen(key._suffix)) == true) { + strlen(key._suffix))) { ret = _z_session_interest_rc_list_push(ret, _z_session_interest_rc_clone_as_ptr(intr)); } } @@ -120,8 +126,7 @@ static int8_t _z_send_subscriber_interest(_z_session_t *zn) { // Build the declare message to send on the wire _z_keyexpr_t key = _z_keyexpr_alias(sub->in->val._key); _z_declaration_t declaration = - _z_make_decl_subscriber(&key, sub->in->val._id, sub->in->val._info.reliability == Z_RELIABILITY_RELIABLE, - sub->in->val._info.mode == Z_SUBMODE_PULL); + _z_make_decl_subscriber(&key, sub->in->val._id, sub->in->val._info.reliability == Z_RELIABILITY_RELIABLE); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { return _Z_ERR_TRANSPORT_TX_FAILED; @@ -199,46 +204,126 @@ _z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_inte return ret; } +static int8_t _unsafe_z_register_declare(_z_session_t *zn, const _z_keyexpr_t *key, uint32_t id, uint8_t type) { + _z_declare_data_t *decl = NULL; + decl = (_z_declare_data_t *)zp_malloc(sizeof(_z_declare_data_t)); + if (decl == NULL) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + _z_keyexpr_copy(&decl->_key, key); + decl->_id = id; + decl->_type = type; + zn->_remote_declares = _z_declare_data_list_push(zn->_remote_declares, decl); + return _Z_RES_OK; +} + +static _z_keyexpr_t _unsafe_z_get_key_from_declare(_z_session_t *zn, uint32_t id, uint8_t type) { + _z_declare_data_list_t *xs = zn->_remote_declares; + _z_declare_data_t comp = { + ._key = _z_keyexpr_null(), + ._id = id, + ._type = type, + }; + while (xs != NULL) { + _z_declare_data_t *decl = _z_declare_data_list_head(xs); + if (_z_declare_data_eq(&comp, decl)) { + return _z_keyexpr_duplicate(decl->_key); + } + xs = _z_declare_data_list_tail(xs); + } + return _z_keyexpr_null(); +} + +static int8_t _unsafe_z_unregister_declare(_z_session_t *zn, uint32_t id, uint8_t type) { + _z_declare_data_t decl = { + ._key = _z_keyexpr_null(), + ._id = id, + ._type = type, + }; + zn->_remote_declares = _z_declare_data_list_drop_filter(zn->_remote_declares, _z_declare_data_eq, &decl); + return _Z_RES_OK; +} + int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl) { const _z_keyexpr_t *decl_key = NULL; _z_interest_msg_t msg; uint8_t flags = 0; + uint8_t decl_type = 0; switch (decl->_tag) { case _Z_DECL_SUBSCRIBER: msg.type = _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER; msg.id = decl->_body._decl_subscriber._id; decl_key = &decl->_body._decl_subscriber._keyexpr; + decl_type = _Z_DECLARE_TYPE_SUBSCRIBER; flags = _Z_INTEREST_FLAG_SUBSCRIBERS; break; case _Z_DECL_QUERYABLE: msg.type = _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE; msg.id = decl->_body._decl_queryable._id; decl_key = &decl->_body._decl_queryable._keyexpr; + decl_type = _Z_DECLARE_TYPE_QUERYABLE; flags = _Z_INTEREST_FLAG_QUERYABLES; break; + default: + return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN; + } + // Retrieve key + _zp_session_lock_mutex(zn); + _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key); + if (key._suffix == NULL) { + _zp_session_unlock_mutex(zn); + return _Z_ERR_KEYEXPR_UNKNOWN; + } + // Register declare + _unsafe_z_register_declare(zn, &key, msg.id, decl_type); + // Retrieve interests + _z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, key); + _zp_session_unlock_mutex(zn); + // Parse session_interest list + _z_session_interest_rc_list_t *xs = intrs; + while (xs != NULL) { + _z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs); + if (intr->in->val._callback != NULL) { + intr->in->val._callback(&msg, intr->in->val._arg); + } + xs = _z_session_interest_rc_list_tail(xs); + } + // Clean up + _z_keyexpr_clear(&key); + _z_session_interest_rc_list_free(&intrs); + return _Z_RES_OK; +} + +int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t *decl) { + _z_interest_msg_t msg; + uint8_t flags = 0; + uint8_t decl_type = 0; + switch (decl->_tag) { case _Z_UNDECL_SUBSCRIBER: msg.type = _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER; msg.id = decl->_body._undecl_subscriber._id; - decl_key = &decl->_body._undecl_subscriber._ext_keyexpr; + decl_type = _Z_DECLARE_TYPE_SUBSCRIBER; flags = _Z_INTEREST_FLAG_SUBSCRIBERS; break; case _Z_UNDECL_QUERYABLE: msg.type = _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE; msg.id = decl->_body._undecl_queryable._id; - decl_key = &decl->_body._undecl_queryable._ext_keyexpr; + decl_type = _Z_DECLARE_TYPE_QUERYABLE; flags = _Z_INTEREST_FLAG_QUERYABLES; break; default: return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN; } _zp_session_lock_mutex(zn); - _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key); + // Retrieve declare data + _z_keyexpr_t key = _unsafe_z_get_key_from_declare(zn, msg.id, decl_type); if (key._suffix == NULL) { - _z_keyexpr_clear(&key); _zp_session_unlock_mutex(zn); return _Z_ERR_KEYEXPR_UNKNOWN; } _z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, key); + // Remove declare + _unsafe_z_unregister_declare(zn, msg.id, decl_type); _zp_session_unlock_mutex(zn); // Parse session_interest list @@ -266,6 +351,7 @@ void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr) { void _z_flush_interest(_z_session_t *zn) { _zp_session_lock_mutex(zn); _z_session_interest_rc_list_free(&zn->_local_interests); + _z_declare_data_list_free(&zn->_remote_declares); _zp_session_unlock_mutex(zn); } @@ -322,6 +408,8 @@ int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, } #else +void _z_flush_interest(_z_session_t *zn) { _ZP_UNUSED(zn); } + int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl) { _ZP_UNUSED(zn); _ZP_UNUSED(decl); diff --git a/src/session/queryable.c b/src/session/queryable.c index b669b8b3f..d85533466 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -141,9 +141,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const if (key._suffix != NULL) { _z_session_queryable_rc_list_t *qles = __unsafe_z_get_session_queryable_by_key(zn, key); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); // Build the z_query z_query_t query = {._val = {._rc = _z_query_rc_new()}}; @@ -160,10 +158,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const _z_keyexpr_clear(&key); _z_session_queryable_rc_list_free(&qles); } else { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 - + _zp_session_unlock_mutex(zn); ret = _Z_ERR_KEYEXPR_UNKNOWN; } diff --git a/src/session/rx.c b/src/session/rx.c index 451ed1551..4e0a5ba16 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -70,10 +70,10 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint _z_interest_process_declares(zn, &decl._decl); } break; case _Z_UNDECL_SUBSCRIBER: { - _z_interest_process_declares(zn, &decl._decl); + _z_interest_process_undeclares(zn, &decl._decl); } break; case _Z_UNDECL_QUERYABLE: { - _z_interest_process_declares(zn, &decl._decl); + _z_interest_process_undeclares(zn, &decl._decl); } break; case _Z_DECL_TOKEN: { // TODO: add support or explicitly discard diff --git a/src/session/subscription.c b/src/session/subscription.c index bbe38d100..e3a43c2fd 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -170,9 +170,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co if (key._suffix != NULL) { _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, key); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); // Build the sample _z_sample_t s; @@ -195,9 +193,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co _z_keyexpr_clear(&key); _z_subscription_rc_list_free(&subs); } else { -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&zn->_mutex_inner); -#endif // Z_FEATURE_MULTI_THREAD == 1 + _zp_session_unlock_mutex(zn); ret = _Z_ERR_KEYEXPR_UNKNOWN; } diff --git a/src/session/utils.c b/src/session/utils.c index d5d7fe1d7..eac976016 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -18,6 +18,7 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/session/interest.h" #include "zenoh-pico/session/query.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" @@ -115,6 +116,7 @@ void _z_session_clear(_z_session_t *zn) { #if Z_FEATURE_QUERY == 1 _z_flush_pending_queries(zn); #endif + _z_flush_interest(zn); #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_free(&zn->_mutex_inner);