Skip to content

Commit

Permalink
Store declare information on te receiver side (#378)
Browse files Browse the repository at this point in the history
* feat: store declare info for interests

* fix: remove subscriber mode arg

* refactor: missing session_unlock

* fix: clear interest data on session close
  • Loading branch information
jean-roland authored Mar 22, 2024
1 parent 9bd061c commit b0ae036
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 29 deletions.
1 change: 1 addition & 0 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ typedef struct _z_session_t {
// Session interests
#if Z_FEATURE_INTEREST == 1
_z_session_interest_rc_list_t *_local_interests;
_z_declare_data_list_t *_remote_declares;
#endif
} _z_session_t;

Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/session/interest.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
_z_session_interest_rc_t *_z_get_interest_by_id(_z_session_t *zn, const _z_zint_t id);
_z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_interest_t *intr);
void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr);
void _z_flush_interest(_z_session_t *zn);
#endif // Z_FEATURE_INTEREST == 1

void _z_flush_interest(_z_session_t *zn);
int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl);
int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t *decl);
int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags);
Expand Down
28 changes: 22 additions & 6 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,12 @@ int8_t _z_session_generate_zid(_z_id_t *bs, uint8_t size);

typedef enum {
_Z_INTEREST_MSG_TYPE_FINAL = 0,
_Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER,
_Z_INTEREST_MSG_TYPE_DECL_QUERYABLE,
_Z_INTEREST_MSG_TYPE_DECL_TOKEN,
_Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER,
_Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE,
_Z_INTEREST_MSG_TYPE_UNDECL_TOKEN,
_Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER = 1,
_Z_INTEREST_MSG_TYPE_DECL_QUERYABLE = 2,
_Z_INTEREST_MSG_TYPE_DECL_TOKEN = 3,
_Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER = 4,
_Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE = 5,
_Z_INTEREST_MSG_TYPE_UNDECL_TOKEN = 6,
} _z_interest_msg_type_t;

typedef struct _z_interest_msg_t {
Expand Down Expand Up @@ -225,4 +225,20 @@ _Z_ELEM_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t, _z_noop_size, _
_z_noop_copy)
_Z_LIST_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t)

typedef enum {
_Z_DECLARE_TYPE_SUBSCRIBER = 0,
_Z_DECLARE_TYPE_QUERYABLE = 1,
_Z_DECLARE_TYPE_TOKEN = 2,
} _z_declare_type_t;

typedef struct {
_z_keyexpr_t _key;
uint32_t _id;
uint8_t _type;
} _z_declare_data_t;

void _z_declare_data_clear(_z_declare_data_t *data);
_Z_ELEM_DEFINE(_z_declare_data, _z_declare_data_t, _z_noop_size, _z_declare_data_clear, _z_noop_copy)
_Z_LIST_DEFINE(_z_declare_data, _z_declare_data_t)

#endif /* INCLUDE_ZENOH_PICO_SESSION_SESSION_H */
102 changes: 95 additions & 7 deletions src/session/interest.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_INTEREST == 1
void _z_declare_data_clear(_z_declare_data_t *data) { _z_keyexpr_clear(&data->_key); }

_Bool _z_declare_data_eq(const _z_declare_data_t *left, const _z_declare_data_t *right) {
return ((left->_id == right->_id) && (left->_type == right->_type));
}

_Bool _z_session_interest_eq(const _z_session_interest_t *one, const _z_session_interest_t *two) {
return one->_id == two->_id;
}
Expand Down Expand Up @@ -58,7 +64,7 @@ static _z_session_interest_rc_list_t *__z_get_interest_by_key_and_flags(_z_sessi
_z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs);
if ((intr->in->val._flags & flags) != 0) {
if (_z_keyexpr_intersects(intr->in->val._key._suffix, strlen(intr->in->val._key._suffix), key._suffix,
strlen(key._suffix)) == true) {
strlen(key._suffix))) {
ret = _z_session_interest_rc_list_push(ret, _z_session_interest_rc_clone_as_ptr(intr));
}
}
Expand Down Expand Up @@ -120,8 +126,7 @@ static int8_t _z_send_subscriber_interest(_z_session_t *zn) {
// Build the declare message to send on the wire
_z_keyexpr_t key = _z_keyexpr_alias(sub->in->val._key);
_z_declaration_t declaration =
_z_make_decl_subscriber(&key, sub->in->val._id, sub->in->val._info.reliability == Z_RELIABILITY_RELIABLE,
sub->in->val._info.mode == Z_SUBMODE_PULL);
_z_make_decl_subscriber(&key, sub->in->val._id, sub->in->val._info.reliability == Z_RELIABILITY_RELIABLE);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
Expand Down Expand Up @@ -199,46 +204,126 @@ _z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_inte
return ret;
}

static int8_t _unsafe_z_register_declare(_z_session_t *zn, const _z_keyexpr_t *key, uint32_t id, uint8_t type) {
_z_declare_data_t *decl = NULL;
decl = (_z_declare_data_t *)zp_malloc(sizeof(_z_declare_data_t));
if (decl == NULL) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
_z_keyexpr_copy(&decl->_key, key);
decl->_id = id;
decl->_type = type;
zn->_remote_declares = _z_declare_data_list_push(zn->_remote_declares, decl);
return _Z_RES_OK;
}

static _z_keyexpr_t _unsafe_z_get_key_from_declare(_z_session_t *zn, uint32_t id, uint8_t type) {
_z_declare_data_list_t *xs = zn->_remote_declares;
_z_declare_data_t comp = {
._key = _z_keyexpr_null(),
._id = id,
._type = type,
};
while (xs != NULL) {
_z_declare_data_t *decl = _z_declare_data_list_head(xs);
if (_z_declare_data_eq(&comp, decl)) {
return _z_keyexpr_duplicate(decl->_key);
}
xs = _z_declare_data_list_tail(xs);
}
return _z_keyexpr_null();
}

static int8_t _unsafe_z_unregister_declare(_z_session_t *zn, uint32_t id, uint8_t type) {
_z_declare_data_t decl = {
._key = _z_keyexpr_null(),
._id = id,
._type = type,
};
zn->_remote_declares = _z_declare_data_list_drop_filter(zn->_remote_declares, _z_declare_data_eq, &decl);
return _Z_RES_OK;
}

int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl) {
const _z_keyexpr_t *decl_key = NULL;
_z_interest_msg_t msg;
uint8_t flags = 0;
uint8_t decl_type = 0;
switch (decl->_tag) {
case _Z_DECL_SUBSCRIBER:
msg.type = _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER;
msg.id = decl->_body._decl_subscriber._id;
decl_key = &decl->_body._decl_subscriber._keyexpr;
decl_type = _Z_DECLARE_TYPE_SUBSCRIBER;
flags = _Z_INTEREST_FLAG_SUBSCRIBERS;
break;
case _Z_DECL_QUERYABLE:
msg.type = _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE;
msg.id = decl->_body._decl_queryable._id;
decl_key = &decl->_body._decl_queryable._keyexpr;
decl_type = _Z_DECLARE_TYPE_QUERYABLE;
flags = _Z_INTEREST_FLAG_QUERYABLES;
break;
default:
return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN;
}
// Retrieve key
_zp_session_lock_mutex(zn);
_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key);
if (key._suffix == NULL) {
_zp_session_unlock_mutex(zn);
return _Z_ERR_KEYEXPR_UNKNOWN;
}
// Register declare
_unsafe_z_register_declare(zn, &key, msg.id, decl_type);
// Retrieve interests
_z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, key);
_zp_session_unlock_mutex(zn);
// Parse session_interest list
_z_session_interest_rc_list_t *xs = intrs;
while (xs != NULL) {
_z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs);
if (intr->in->val._callback != NULL) {
intr->in->val._callback(&msg, intr->in->val._arg);
}
xs = _z_session_interest_rc_list_tail(xs);
}
// Clean up
_z_keyexpr_clear(&key);
_z_session_interest_rc_list_free(&intrs);
return _Z_RES_OK;
}

int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t *decl) {
_z_interest_msg_t msg;
uint8_t flags = 0;
uint8_t decl_type = 0;
switch (decl->_tag) {
case _Z_UNDECL_SUBSCRIBER:
msg.type = _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER;
msg.id = decl->_body._undecl_subscriber._id;
decl_key = &decl->_body._undecl_subscriber._ext_keyexpr;
decl_type = _Z_DECLARE_TYPE_SUBSCRIBER;
flags = _Z_INTEREST_FLAG_SUBSCRIBERS;
break;
case _Z_UNDECL_QUERYABLE:
msg.type = _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE;
msg.id = decl->_body._undecl_queryable._id;
decl_key = &decl->_body._undecl_queryable._ext_keyexpr;
decl_type = _Z_DECLARE_TYPE_QUERYABLE;
flags = _Z_INTEREST_FLAG_QUERYABLES;
break;
default:
return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN;
}
_zp_session_lock_mutex(zn);
_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key);
// Retrieve declare data
_z_keyexpr_t key = _unsafe_z_get_key_from_declare(zn, msg.id, decl_type);
if (key._suffix == NULL) {
_z_keyexpr_clear(&key);
_zp_session_unlock_mutex(zn);
return _Z_ERR_KEYEXPR_UNKNOWN;
}
_z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, key);
// Remove declare
_unsafe_z_unregister_declare(zn, msg.id, decl_type);
_zp_session_unlock_mutex(zn);

// Parse session_interest list
Expand Down Expand Up @@ -266,6 +351,7 @@ void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr) {
void _z_flush_interest(_z_session_t *zn) {
_zp_session_lock_mutex(zn);
_z_session_interest_rc_list_free(&zn->_local_interests);
_z_declare_data_list_free(&zn->_remote_declares);
_zp_session_unlock_mutex(zn);
}

Expand Down Expand Up @@ -322,6 +408,8 @@ int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key,
}

#else
void _z_flush_interest(_z_session_t *zn) { _ZP_UNUSED(zn); }

int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl) {
_ZP_UNUSED(zn);
_ZP_UNUSED(decl);
Expand Down
9 changes: 2 additions & 7 deletions src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const
if (key._suffix != NULL) {
_z_session_queryable_rc_list_t *qles = __unsafe_z_get_session_queryable_by_key(zn, key);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

// Build the z_query
z_query_t query = {._val = {._rc = _z_query_rc_new()}};
Expand All @@ -160,10 +158,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *msgq, const
_z_keyexpr_clear(&key);
_z_session_queryable_rc_list_free(&qles);
} else {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1

_zp_session_unlock_mutex(zn);
ret = _Z_ERR_KEYEXPR_UNKNOWN;
}

Expand Down
4 changes: 2 additions & 2 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
_z_interest_process_declares(zn, &decl._decl);
} break;
case _Z_UNDECL_SUBSCRIBER: {
_z_interest_process_declares(zn, &decl._decl);
_z_interest_process_undeclares(zn, &decl._decl);
} break;
case _Z_UNDECL_QUERYABLE: {
_z_interest_process_declares(zn, &decl._decl);
_z_interest_process_undeclares(zn, &decl._decl);
} break;
case _Z_DECL_TOKEN: {
// TODO: add support or explicitly discard
Expand Down
8 changes: 2 additions & 6 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co
if (key._suffix != NULL) {
_z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, key);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

// Build the sample
_z_sample_t s;
Expand All @@ -195,9 +193,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co
_z_keyexpr_clear(&key);
_z_subscription_rc_list_free(&subs);
} else {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
ret = _Z_ERR_KEYEXPR_UNKNOWN;
}

Expand Down
2 changes: 2 additions & 0 deletions src/session/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/session/interest.h"
#include "zenoh-pico/session/query.h"
#include "zenoh-pico/session/queryable.h"
#include "zenoh-pico/session/resource.h"
Expand Down Expand Up @@ -115,6 +116,7 @@ void _z_session_clear(_z_session_t *zn) {
#if Z_FEATURE_QUERY == 1
_z_flush_pending_queries(zn);
#endif
_z_flush_interest(zn);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_free(&zn->_mutex_inner);
Expand Down

0 comments on commit b0ae036

Please sign in to comment.