Skip to content

Commit

Permalink
address carlos's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Aug 28, 2023
1 parent 15e4960 commit a4484bf
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 25 deletions.
4 changes: 2 additions & 2 deletions include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
typedef struct {
_z_value_t _value;
_z_keyexpr_t _key;
uint32_t _rid;
uint32_t _request_id;
void *_zn; // FIXME: _z_session_t *zn;
char *_parameters;
_Bool _anyke;
Expand All @@ -35,7 +35,7 @@ typedef struct {
* Return type when declaring a queryable.
*/
typedef struct {
uint32_t _id;
uint32_t _entity_id;
void *_zn; // FIXME: _z_session_t *zn;
} _z_queryable_t;

Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/subscribe.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* Return type when declaring a subscriber.
*/
typedef struct {
uint32_t _id;
uint32_t _entity_id;
_z_session_t *_zn;
} _z_subscriber_t;

Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int8_t _z_session_close(_z_session_t *zn, uint8_t reason);
void _z_session_clear(_z_session_t *zn);
void _z_session_free(_z_session_t **zn);

int8_t _z_handle_zenoh_message(_z_session_t *zn, _z_zenoh_message_t *z_msg, uint16_t local_peer_id);
int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *z_msg, uint16_t local_peer_id);
int8_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);

Expand Down
7 changes: 6 additions & 1 deletion src/link/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ size_t _z_link_recv_exact_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, size_t len

int8_t _z_link_send_wbuf(const _z_link_t *link, const _z_wbuf_t *wbf) {
int8_t ret = _Z_RES_OK;
for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)); i++) {
_Bool link_is_streamed = _Z_LINK_IS_STREAMED(link->_capabilities);
for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)) && (ret == _Z_RES_OK); i++) {
_z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(wbf, i));
size_t n = bs.len;
do {
Expand All @@ -157,6 +158,10 @@ int8_t _z_link_send_wbuf(const _z_link_t *link, const _z_wbuf_t *wbf) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
break;
}
if (link_is_streamed && wb != n) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
break;
}
n = n - wb;
bs.start = bs.start + (bs.len - n);
} while (n > (size_t)0);
Expand Down
16 changes: 8 additions & 8 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ _z_subscriber_t *_z_declare_subscriber(_z_session_t *zn, _z_keyexpr_t keyexpr, _
_z_subscriber_t *ret = (_z_subscriber_t *)z_malloc(sizeof(_z_subscriber_t));
if (ret != NULL) {
ret->_zn = zn;
ret->_id = s._id;
ret->_entity_id = s._id;

_z_subscription_sptr_t *sp_s = _z_register_subscription(
zn, _Z_RESOURCE_IS_LOCAL, &s); // This a pointer to the entry stored at session-level.
Expand Down Expand Up @@ -171,10 +171,10 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) {
int8_t ret = _Z_ERR_GENERIC;

if (sub != NULL) {
_z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_id);
_z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_entity_id);
if (s != NULL) {
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_subscriber(sub->_id, &s->ptr->_key);
_z_declaration_t declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->ptr->_key);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(sub->_zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) {
// Only if message is successfully send, local subscription state can be removed
Expand Down Expand Up @@ -207,7 +207,7 @@ _z_queryable_t *_z_declare_queryable(_z_session_t *zn, _z_keyexpr_t keyexpr, _Bo
_z_queryable_t *ret = (_z_queryable_t *)z_malloc(sizeof(_z_queryable_t));
if (ret != NULL) {
ret->_zn = zn;
ret->_id = q._id;
ret->_entity_id = q._id;

_z_questionable_sptr_t *sp_q =
_z_register_questionable(zn, &q); // This a pointer to the entry stored at session-level.
Expand Down Expand Up @@ -237,10 +237,10 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) {
int8_t ret = _Z_RES_OK;

if (qle != NULL) {
_z_questionable_sptr_t *q = _z_get_questionable_by_id(qle->_zn, qle->_id);
_z_questionable_sptr_t *q = _z_get_questionable_by_id(qle->_zn, qle->_entity_id);
if (q != NULL) {
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_queryable(qle->_id, &q->ptr->_key);
_z_declaration_t declaration = _z_make_undecl_queryable(qle->_entity_id, &q->ptr->_key);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(qle->_zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) {
// Only if message is successfully send, local queryable state can be removed
Expand Down Expand Up @@ -283,7 +283,7 @@ int8_t _z_send_reply(const z_query_t *query, _z_keyexpr_t keyexpr, const _z_valu
._tag = _Z_N_RESPONSE,
._body._response =
{
._request_id = query->_rid,
._request_id = query->_request_id,
._key = ke,
._ext_responder = {._zid = zid, ._eid = 0},
._ext_qos = _Z_N_QOS_DEFAULT,
Expand Down Expand Up @@ -398,7 +398,7 @@ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters,
int8_t _z_subscriber_pull(const _z_subscriber_t *sub) {
int8_t ret = _Z_RES_OK;

_z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_id);
_z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_entity_id);
if (s != NULL) {
_z_zint_t pull_id = _z_get_pull_id(sub->_zn);
_z_zenoh_message_t z_msg = _z_msg_make_pull(_z_keyexpr_alias(s->ptr->_key), pull_id);
Expand Down
4 changes: 2 additions & 2 deletions src/session/queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, cons
// Build the query
z_query_t q;
q._zn = zn;
q._rid = qid;
q._request_id = qid;
q._key = key;
#if defined(__STDC_NO_VLA__) || ((__STDC_VERSION__ < 201000L) && (defined(_WIN32) || defined(WIN32)))
char *params = z_malloc(query->_parameters.len + 1);
Expand Down Expand Up @@ -184,7 +184,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, cons

// Send the final reply
// Create the final reply
_z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q._rid);
_z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q._request_id);
if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}
Expand Down
2 changes: 1 addition & 1 deletion src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#include "zenoh-pico/utils/logging.h"

/*------------------ Handle message ------------------*/
int8_t _z_handle_zenoh_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint16_t local_peer_id) {
int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint16_t local_peer_id) {
int8_t ret = _Z_RES_OK;

switch (msg->_tag) {
Expand Down
4 changes: 2 additions & 2 deletions src/transport/multicast/link/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t
for (size_t i = 0; i < len; i++) {
_z_network_message_t *zm = _z_network_message_vec_get(&t_msg->_body._frame._messages, i);
_z_msg_fix_mapping(zm, mapping);
_z_handle_zenoh_message(ztm->_session, zm, mapping);
_z_handle_network_message(ztm->_session, zm, mapping);
}

break;
Expand Down Expand Up @@ -195,7 +195,7 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t
if (ret == _Z_RES_OK) {
uint16_t mapping = entry->_peer_id;
_z_msg_fix_mapping(&zm, mapping);
_z_handle_zenoh_message(ztm->_session, &zm, mapping);
_z_handle_network_message(ztm->_session, &zm, mapping);
_z_msg_clear(&zm); // Clear must be explicitly called for fragmented zenoh messages. Non-fragmented
// zenoh messages are released when their transport message is released.
}
Expand Down
8 changes: 4 additions & 4 deletions src/transport/unicast/link/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans
// Handle all the zenoh message, one by one
size_t len = _z_vec_len(&t_msg->_body._frame._messages);
for (size_t i = 0; i < len; i++) {
_z_handle_zenoh_message(ztu->_session,
(_z_zenoh_message_t *)_z_vec_get(&t_msg->_body._frame._messages, i),
_Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE);
_z_handle_network_message(ztu->_session,
(_z_zenoh_message_t *)_z_vec_get(&t_msg->_body._frame._messages, i),
_Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE);
}

break;
Expand Down Expand Up @@ -153,7 +153,7 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans
_z_zenoh_message_t zm;
int8_t ret = _z_network_message_decode(&zm, &zbf);
if (ret == _Z_RES_OK) {
_z_handle_zenoh_message(ztu->_session, &zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE);
_z_handle_network_message(ztu->_session, &zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE);
_z_msg_clear(&zm); // Clear must be explicitly called for fragmented zenoh messages. Non-fragmented
// zenoh messages are released when their transport message is released.
} else {
Expand Down
4 changes: 2 additions & 2 deletions tests/z_peer_multicast_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ int main(int argc, char **argv) {
z_owned_subscriber_t *sub = (z_owned_subscriber_t *)z_malloc(sizeof(z_owned_subscriber_t));
*sub = z_declare_subscriber(z_loan(s2), z_keyexpr(s1_res), &callback, NULL);
assert(z_check(*sub));
printf("Declared subscription on session 2: %ju %zu %s\n", (uintmax_t)z_subscriber_loan(sub)._val->_id,
printf("Declared subscription on session 2: %ju %zu %s\n", (uintmax_t)z_subscriber_loan(sub)._val->_entity_id,
(z_zint_t)0, s1_res);
subs2 = _z_list_push(subs2, sub);
}
Expand Down Expand Up @@ -150,7 +150,7 @@ int main(int argc, char **argv) {
// Undeclare subscribers and queryables on second session
while (subs2) {
z_owned_subscriber_t *sub = _z_list_head(subs2);
printf("Undeclared subscriber on session 2: %ju\n", (uintmax_t)z_subscriber_loan(sub)._val->_id);
printf("Undeclared subscriber on session 2: %ju\n", (uintmax_t)z_subscriber_loan(sub)._val->_entity_id);
z_undeclare_subscriber(z_move(*sub));
subs2 = _z_list_pop(subs2, _z_noop_elem_free, NULL);
}
Expand Down
2 changes: 1 addition & 1 deletion zenohpico.pc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ prefix=/usr/local
Name: zenohpico
Description:
URL:
Version: 0.10.20230825dev
Version: 0.10.20230828dev
Cflags: -I${prefix}/
Libs: -L${prefix}/ -lzenohpico

0 comments on commit a4484bf

Please sign in to comment.