diff --git a/include/zenoh-pico/net/query.h b/include/zenoh-pico/net/query.h index bda52795f..cb59f6dd5 100644 --- a/include/zenoh-pico/net/query.h +++ b/include/zenoh-pico/net/query.h @@ -25,7 +25,7 @@ typedef struct { _z_value_t _value; _z_keyexpr_t _key; - uint32_t _rid; + uint32_t _request_id; void *_zn; // FIXME: _z_session_t *zn; char *_parameters; _Bool _anyke; @@ -35,7 +35,7 @@ typedef struct { * Return type when declaring a queryable. */ typedef struct { - uint32_t _id; + uint32_t _entity_id; void *_zn; // FIXME: _z_session_t *zn; } _z_queryable_t; diff --git a/include/zenoh-pico/net/subscribe.h b/include/zenoh-pico/net/subscribe.h index 4e350148b..2a8a53c1b 100644 --- a/include/zenoh-pico/net/subscribe.h +++ b/include/zenoh-pico/net/subscribe.h @@ -24,7 +24,7 @@ * Return type when declaring a subscriber. */ typedef struct { - uint32_t _id; + uint32_t _entity_id; _z_session_t *_zn; } _z_subscriber_t; diff --git a/include/zenoh-pico/session/utils.h b/include/zenoh-pico/session/utils.h index 497e0a015..39cae73d3 100644 --- a/include/zenoh-pico/session/utils.h +++ b/include/zenoh-pico/session/utils.h @@ -31,7 +31,7 @@ int8_t _z_session_close(_z_session_t *zn, uint8_t reason); void _z_session_clear(_z_session_t *zn); void _z_session_free(_z_session_t **zn); -int8_t _z_handle_zenoh_message(_z_session_t *zn, _z_zenoh_message_t *z_msg, uint16_t local_peer_id); +int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *z_msg, uint16_t local_peer_id); int8_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliability_t reliability, z_congestion_control_t cong_ctrl); diff --git a/src/link/link.c b/src/link/link.c index 3705ca1a4..5c409e353 100644 --- a/src/link/link.c +++ b/src/link/link.c @@ -148,7 +148,8 @@ size_t _z_link_recv_exact_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, size_t len int8_t _z_link_send_wbuf(const _z_link_t *link, const _z_wbuf_t *wbf) { int8_t ret = _Z_RES_OK; - for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)); i++) { + _Bool link_is_streamed = _Z_LINK_IS_STREAMED(link->_capabilities); + for (size_t i = 0; (i < _z_wbuf_len_iosli(wbf)) && (ret == _Z_RES_OK); i++) { _z_bytes_t bs = _z_iosli_to_bytes(_z_wbuf_get_iosli(wbf, i)); size_t n = bs.len; do { @@ -157,6 +158,10 @@ int8_t _z_link_send_wbuf(const _z_link_t *link, const _z_wbuf_t *wbf) { ret = _Z_ERR_TRANSPORT_TX_FAILED; break; } + if (link_is_streamed && wb != n) { + ret = _Z_ERR_TRANSPORT_TX_FAILED; + break; + } n = n - wb; bs.start = bs.start + (bs.len - n); } while (n > (size_t)0); diff --git a/src/net/primitives.c b/src/net/primitives.c index 6397dd588..374e52b0d 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -141,7 +141,7 @@ _z_subscriber_t *_z_declare_subscriber(_z_session_t *zn, _z_keyexpr_t keyexpr, _ _z_subscriber_t *ret = (_z_subscriber_t *)z_malloc(sizeof(_z_subscriber_t)); if (ret != NULL) { ret->_zn = zn; - ret->_id = s._id; + ret->_entity_id = s._id; _z_subscription_sptr_t *sp_s = _z_register_subscription( zn, _Z_RESOURCE_IS_LOCAL, &s); // This a pointer to the entry stored at session-level. @@ -171,10 +171,10 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) { int8_t ret = _Z_ERR_GENERIC; if (sub != NULL) { - _z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_id); + _z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_entity_id); if (s != NULL) { // Build the declare message to send on the wire - _z_declaration_t declaration = _z_make_undecl_subscriber(sub->_id, &s->ptr->_key); + _z_declaration_t declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->ptr->_key); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); if (_z_send_n_msg(sub->_zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) { // Only if message is successfully send, local subscription state can be removed @@ -207,7 +207,7 @@ _z_queryable_t *_z_declare_queryable(_z_session_t *zn, _z_keyexpr_t keyexpr, _Bo _z_queryable_t *ret = (_z_queryable_t *)z_malloc(sizeof(_z_queryable_t)); if (ret != NULL) { ret->_zn = zn; - ret->_id = q._id; + ret->_entity_id = q._id; _z_questionable_sptr_t *sp_q = _z_register_questionable(zn, &q); // This a pointer to the entry stored at session-level. @@ -237,10 +237,10 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) { int8_t ret = _Z_RES_OK; if (qle != NULL) { - _z_questionable_sptr_t *q = _z_get_questionable_by_id(qle->_zn, qle->_id); + _z_questionable_sptr_t *q = _z_get_questionable_by_id(qle->_zn, qle->_entity_id); if (q != NULL) { // Build the declare message to send on the wire - _z_declaration_t declaration = _z_make_undecl_queryable(qle->_id, &q->ptr->_key); + _z_declaration_t declaration = _z_make_undecl_queryable(qle->_entity_id, &q->ptr->_key); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); if (_z_send_n_msg(qle->_zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) { // Only if message is successfully send, local queryable state can be removed @@ -283,7 +283,7 @@ int8_t _z_send_reply(const z_query_t *query, _z_keyexpr_t keyexpr, const _z_valu ._tag = _Z_N_RESPONSE, ._body._response = { - ._request_id = query->_rid, + ._request_id = query->_request_id, ._key = ke, ._ext_responder = {._zid = zid, ._eid = 0}, ._ext_qos = _Z_N_QOS_DEFAULT, @@ -398,7 +398,7 @@ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, int8_t _z_subscriber_pull(const _z_subscriber_t *sub) { int8_t ret = _Z_RES_OK; - _z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_id); + _z_subscription_sptr_t *s = _z_get_subscription_by_id(sub->_zn, _Z_RESOURCE_IS_LOCAL, sub->_entity_id); if (s != NULL) { _z_zint_t pull_id = _z_get_pull_id(sub->_zn); _z_zenoh_message_t z_msg = _z_msg_make_pull(_z_keyexpr_alias(s->ptr->_key), pull_id); diff --git a/src/session/queryable.c b/src/session/queryable.c index 078ecdab2..d18c5705e 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -156,7 +156,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, cons // Build the query z_query_t q; q._zn = zn; - q._rid = qid; + q._request_id = qid; q._key = key; #if defined(__STDC_NO_VLA__) || ((__STDC_VERSION__ < 201000L) && (defined(_WIN32) || defined(WIN32))) char *params = z_malloc(query->_parameters.len + 1); @@ -184,7 +184,7 @@ int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, cons // Send the final reply // Create the final reply - _z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q._rid); + _z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q._request_id); if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { ret = _Z_ERR_TRANSPORT_TX_FAILED; } diff --git a/src/session/rx.c b/src/session/rx.c index fad9e46d9..082fe21ee 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -33,7 +33,7 @@ #include "zenoh-pico/utils/logging.h" /*------------------ Handle message ------------------*/ -int8_t _z_handle_zenoh_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint16_t local_peer_id) { +int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint16_t local_peer_id) { int8_t ret = _Z_RES_OK; switch (msg->_tag) { diff --git a/src/transport/multicast/link/rx.c b/src/transport/multicast/link/rx.c index 17bc9fd9b..b5d50e030 100644 --- a/src/transport/multicast/link/rx.c +++ b/src/transport/multicast/link/rx.c @@ -154,7 +154,7 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t for (size_t i = 0; i < len; i++) { _z_network_message_t *zm = _z_network_message_vec_get(&t_msg->_body._frame._messages, i); _z_msg_fix_mapping(zm, mapping); - _z_handle_zenoh_message(ztm->_session, zm, mapping); + _z_handle_network_message(ztm->_session, zm, mapping); } break; @@ -195,7 +195,7 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t if (ret == _Z_RES_OK) { uint16_t mapping = entry->_peer_id; _z_msg_fix_mapping(&zm, mapping); - _z_handle_zenoh_message(ztm->_session, &zm, mapping); + _z_handle_network_message(ztm->_session, &zm, mapping); _z_msg_clear(&zm); // Clear must be explicitly called for fragmented zenoh messages. Non-fragmented // zenoh messages are released when their transport message is released. } diff --git a/src/transport/unicast/link/rx.c b/src/transport/unicast/link/rx.c index aabb50d74..bac48d82e 100644 --- a/src/transport/unicast/link/rx.c +++ b/src/transport/unicast/link/rx.c @@ -118,9 +118,9 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans // Handle all the zenoh message, one by one size_t len = _z_vec_len(&t_msg->_body._frame._messages); for (size_t i = 0; i < len; i++) { - _z_handle_zenoh_message(ztu->_session, - (_z_zenoh_message_t *)_z_vec_get(&t_msg->_body._frame._messages, i), - _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); + _z_handle_network_message(ztu->_session, + (_z_zenoh_message_t *)_z_vec_get(&t_msg->_body._frame._messages, i), + _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); } break; @@ -153,7 +153,7 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans _z_zenoh_message_t zm; int8_t ret = _z_network_message_decode(&zm, &zbf); if (ret == _Z_RES_OK) { - _z_handle_zenoh_message(ztu->_session, &zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); + _z_handle_network_message(ztu->_session, &zm, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE); _z_msg_clear(&zm); // Clear must be explicitly called for fragmented zenoh messages. Non-fragmented // zenoh messages are released when their transport message is released. } else { diff --git a/tests/z_peer_multicast_test.c b/tests/z_peer_multicast_test.c index 2933d5bc4..6d803b67b 100644 --- a/tests/z_peer_multicast_test.c +++ b/tests/z_peer_multicast_test.c @@ -108,7 +108,7 @@ int main(int argc, char **argv) { z_owned_subscriber_t *sub = (z_owned_subscriber_t *)z_malloc(sizeof(z_owned_subscriber_t)); *sub = z_declare_subscriber(z_loan(s2), z_keyexpr(s1_res), &callback, NULL); assert(z_check(*sub)); - printf("Declared subscription on session 2: %ju %zu %s\n", (uintmax_t)z_subscriber_loan(sub)._val->_id, + printf("Declared subscription on session 2: %ju %zu %s\n", (uintmax_t)z_subscriber_loan(sub)._val->_entity_id, (z_zint_t)0, s1_res); subs2 = _z_list_push(subs2, sub); } @@ -150,7 +150,7 @@ int main(int argc, char **argv) { // Undeclare subscribers and queryables on second session while (subs2) { z_owned_subscriber_t *sub = _z_list_head(subs2); - printf("Undeclared subscriber on session 2: %ju\n", (uintmax_t)z_subscriber_loan(sub)._val->_id); + printf("Undeclared subscriber on session 2: %ju\n", (uintmax_t)z_subscriber_loan(sub)._val->_entity_id); z_undeclare_subscriber(z_move(*sub)); subs2 = _z_list_pop(subs2, _z_noop_elem_free, NULL); } diff --git a/zenohpico.pc b/zenohpico.pc index ff26b6d65..2827509d3 100644 --- a/zenohpico.pc +++ b/zenohpico.pc @@ -3,6 +3,6 @@ prefix=/usr/local Name: zenohpico Description: URL: -Version: 0.10.20230825dev +Version: 0.10.20230828dev Cflags: -I${prefix}/ Libs: -L${prefix}/ -lzenohpico