Skip to content

Commit

Permalink
refactor: go through session_lock function
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Mar 8, 2024
1 parent d982418 commit 20857b3
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 144 deletions.
49 changes: 13 additions & 36 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "zenoh-pico/net/memory.h"
#include "zenoh-pico/protocol/keyexpr.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_QUERY == 1
Expand Down Expand Up @@ -103,15 +104,11 @@ _z_pending_query_t *__unsafe__z_get_pending_query_by_id(_z_session_t *zn, const
}

_z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t id) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_pending_query_t *pql = __unsafe__z_get_pending_query_by_id(zn, id);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
return pql;
}

Expand All @@ -121,9 +118,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry)
_Z_DEBUG(">>> Allocating query for (%ju:%s,%s)", (uintmax_t)pen_qry->_key._id, pen_qry->_key._suffix,
pen_qry->_parameters);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_pending_query_t *pql = __unsafe__z_get_pending_query_by_id(zn, pen_qry->_id);
if (pql == NULL) { // Register query only if a pending one with the same ID does not exist
Expand All @@ -132,9 +127,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry)
ret = _Z_ERR_ENTITY_DECLARATION_FAILED;
}

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return ret;
}
Expand All @@ -144,9 +137,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
const _z_timestamp_t timestamp) {
int8_t ret = _Z_RES_OK;

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id);
if ((ret == _Z_RES_OK) && (pen_qry == NULL)) {
Expand Down Expand Up @@ -216,9 +207,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
}
}

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

// Trigger the user callback
if ((ret == _Z_RES_OK) && (pen_qry->_consolidation != Z_CONSOLIDATION_MODE_LATEST)) {
Expand All @@ -235,9 +224,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) {
int8_t ret = _Z_RES_OK;

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

// Final reply received for unknown query id
_z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id);
Expand All @@ -261,34 +248,24 @@ int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) {
zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry);
}

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return ret;
}

void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
}

void _z_flush_pending_queries(_z_session_t *zn) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_pending_query_list_free(&zn->_pending_queries);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
}
#endif
44 changes: 11 additions & 33 deletions src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,30 +95,22 @@ _z_session_queryable_rc_list_t *__unsafe_z_get_session_queryable_by_key(_z_sessi
}

_z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, const _z_zint_t id) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_session_queryable_rc_t *qle = __unsafe_z_get_session_queryable_by_id(zn, id);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return qle;
}

_z_session_queryable_rc_list_t *_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, keyexpr);
_z_session_queryable_rc_list_t *qles = __unsafe_z_get_session_queryable_by_key(zn, key);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return qles;
}
Expand All @@ -127,29 +119,23 @@ _z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_se
_Z_DEBUG(">>> Allocating queryable for (%ju:%s)", (uintmax_t)q->_key._id, q->_key._suffix);
_z_session_queryable_rc_t *ret = NULL;

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

ret = (_z_session_queryable_rc_t *)zp_malloc(sizeof(_z_session_queryable_rc_t));
if (ret != NULL) {
*ret = _z_session_queryable_rc_new_from_val(*q);
zn->_local_queryable = _z_session_queryable_rc_list_push(zn->_local_queryable, ret);
}

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return ret;
}

int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const _z_keyexpr_t q_key, uint32_t qid) {
int8_t ret = _Z_RES_OK;

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &q_key);
if (key._suffix != NULL) {
Expand Down Expand Up @@ -185,28 +171,20 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const
}

void _z_unregister_session_queryable(_z_session_t *zn, _z_session_queryable_rc_t *qle) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

zn->_local_queryable =
_z_session_queryable_rc_list_drop_filter(zn->_local_queryable, _z_session_queryable_rc_eq, qle);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
}

void _z_flush_session_queryable(_z_session_t *zn) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_session_queryable_rc_list_free(&zn->_local_queryable);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
}

#endif
57 changes: 15 additions & 42 deletions src/session/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/keyexpr.h"
#include "zenoh-pico/session/session.h"
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/system/platform.h"
#include "zenoh-pico/utils/logging.h"

Expand Down Expand Up @@ -180,15 +181,11 @@ _z_keyexpr_t __unsafe_z_get_expanded_key_from_key(_z_session_t *zn, const _z_key
}

_z_resource_t *_z_get_resource_by_id(_z_session_t *zn, uint16_t mapping, _z_zint_t rid) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_resource_t *res = __unsafe_z_get_resource_by_id(zn, mapping, rid);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return res;
}
Expand All @@ -197,28 +194,20 @@ _z_resource_t *_z_get_resource_by_key(_z_session_t *zn, const _z_keyexpr_t *keye
if (keyexpr->_suffix == NULL) {
return _z_get_resource_by_id(zn, _z_keyexpr_mapping_id(keyexpr), keyexpr->_id);
}
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_resource_t *res = __unsafe_z_get_resource_by_key(zn, keyexpr);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return res;
}

_z_keyexpr_t _z_get_expanded_key_from_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);
_z_keyexpr_t res = __unsafe_z_get_expanded_key_from_key(zn, keyexpr);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return res;
}
Expand All @@ -230,9 +219,7 @@ int16_t _z_register_resource(_z_session_t *zn, _z_keyexpr_t key, uint16_t id, ui
uint16_t mapping = register_to_mapping;
uint16_t parent_mapping = _z_keyexpr_mapping_id(&key);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

if (key._id != Z_RESOURCE_ID_NONE) {
if (parent_mapping == mapping) {
Expand Down Expand Up @@ -261,19 +248,15 @@ int16_t _z_register_resource(_z_session_t *zn, _z_keyexpr_t key, uint16_t id, ui
}
}

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return ret;
}

void _z_unregister_resource(_z_session_t *zn, uint16_t id, uint16_t mapping) {
_Bool is_local = mapping == _Z_KEYEXPR_MAPPING_LOCAL;
_Z_DEBUG("unregistering: id %d, mapping: %d", id, mapping);
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);
_z_resource_list_t **parent_mut = is_local ? &zn->_local_resources : &zn->_remote_resources;
while (id != 0) {
_z_resource_list_t *parent = *parent_mut;
Expand All @@ -295,37 +278,27 @@ void _z_unregister_resource(_z_session_t *zn, uint16_t id, uint16_t mapping) {
parent = *parent_mut;
}
}
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
}

_Bool _z_unregister_resource_for_peer_filter(const _z_resource_t *candidate, const _z_resource_t *ctx) {
uint16_t mapping = ctx->_id;
return _z_keyexpr_mapping_id(&candidate->_key) == mapping;
}
void _z_unregister_resources_for_peer(_z_session_t *zn, uint16_t mapping) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);
_z_resource_t ctx = {._id = mapping, ._refcount = 0, ._key = {0}};
zn->_remote_resources =
_z_resource_list_drop_filter(zn->_remote_resources, _z_unregister_resource_for_peer_filter, &ctx);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
}

void _z_flush_resources(_z_session_t *zn) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_resource_list_free(&zn->_local_resources);
_z_resource_list_free(&zn->_remote_resources);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
}
Loading

0 comments on commit 20857b3

Please sign in to comment.