diff --git a/include/zenoh-pico/protocol/codec/transport.h b/include/zenoh-pico/protocol/codec/transport.h index f7e0936ad..344081992 100644 --- a/include/zenoh-pico/protocol/codec/transport.h +++ b/include/zenoh-pico/protocol/codec/transport.h @@ -24,4 +24,21 @@ int8_t _z_scouting_message_decode_na(_z_scouting_message_t *msg, _z_zbuf_t *buf) int8_t _z_transport_message_encode(_z_wbuf_t *buf, const _z_transport_message_t *msg); int8_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *buf); +int8_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t *msg); +int8_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header); + +int8_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t *msg); +int8_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header); + +int8_t _z_open_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_open_t *msg); +int8_t _z_open_decode(_z_t_msg_open_t *msg, _z_zbuf_t *zbf, uint8_t header); + +int8_t _z_close_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_close_t *msg); +int8_t _z_close_decode(_z_t_msg_close_t *msg, _z_zbuf_t *zbf, uint8_t header); + +int8_t _z_keep_alive_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_keep_alive_t *msg); +int8_t _z_keep_alive_decode(_z_t_msg_keep_alive_t *msg, _z_zbuf_t *zbf, uint8_t header); + +int8_t _z_frame_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_frame_t *msg); +int8_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header); #endif /* INCLUDE_ZENOH_PICO_PROTOCOL_CODEC_TRANSPORT_H */ diff --git a/include/zenoh-pico/protocol/definitions/transport.h b/include/zenoh-pico/protocol/definitions/transport.h index bf221968c..80f3683ca 100644 --- a/include/zenoh-pico/protocol/definitions/transport.h +++ b/include/zenoh-pico/protocol/definitions/transport.h @@ -123,7 +123,7 @@ typedef struct { z_what_t _what; uint8_t _version; } _z_s_msg_scout_t; -void _z_s_msg_clear_scout(_z_s_msg_scout_t *msg); +void _z_s_msg_scout_clear(_z_s_msg_scout_t *msg); /*------------------ Hello Message ------------------*/ // NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length @@ -170,7 +170,7 @@ typedef struct { z_whatami_t _whatami; uint8_t _version; } _z_s_msg_hello_t; -void _z_s_msg_clear_hello(_z_s_msg_hello_t *msg); +void _z_s_msg_hello_clear(_z_s_msg_hello_t *msg); /*------------------ Join Message ------------------*/ // # Join message @@ -230,7 +230,7 @@ typedef struct { uint8_t _seq_num_res; uint8_t _version; } _z_t_msg_join_t; -void _z_t_msg_clear_join(_z_t_msg_join_t *msg); +void _z_t_msg_join_clear(_z_t_msg_join_t *msg); /*------------------ Init Message ------------------*/ // # Init message @@ -310,7 +310,7 @@ typedef struct { uint8_t _seq_num_res; uint8_t _version; } _z_t_msg_init_t; -void _z_t_msg_clear_init(_z_t_msg_init_t *msg); +void _z_t_msg_init_clear(_z_t_msg_init_t *msg); /*------------------ Open Message ------------------*/ // NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total lenght @@ -348,7 +348,7 @@ typedef struct { _z_zint_t _initial_sn; _z_bytes_t _cookie; } _z_t_msg_open_t; -void _z_t_msg_clear_open(_z_t_msg_open_t *msg); +void _z_t_msg_open_clear(_z_t_msg_open_t *msg); /*------------------ Close Message ------------------*/ // NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length @@ -378,7 +378,7 @@ void _z_t_msg_clear_open(_z_t_msg_open_t *msg); typedef struct { uint8_t _reason; } _z_t_msg_close_t; -void _z_t_msg_clear_close(_z_t_msg_close_t *msg); +void _z_t_msg_close_clear(_z_t_msg_close_t *msg); /*=============================*/ /* Close reasons */ /*=============================*/ @@ -414,7 +414,7 @@ void _z_t_msg_clear_close(_z_t_msg_close_t *msg); typedef struct { uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior } _z_t_msg_keep_alive_t; -void _z_t_msg_clear_keep_alive(_z_t_msg_keep_alive_t *msg); +void _z_t_msg_keep_alive_clear(_z_t_msg_keep_alive_t *msg); /*------------------ Frame Message ------------------*/ // NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length @@ -445,7 +445,7 @@ typedef struct { _z_network_message_vec_t _messages; _z_zint_t _sn; } _z_t_msg_frame_t; -void _z_t_msg_clear_frame(_z_t_msg_frame_t *msg); +void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg); /*------------------ Fragment Message ------------------*/ // The Fragment message is used to transmit on the wire large Zenoh Message that require fragmentation @@ -473,7 +473,7 @@ typedef struct { _z_bytes_t _payload; _z_zint_t _sn; } _z_t_msg_fragment_t; -void _z_t_msg_clear_fragment(_z_t_msg_fragment_t *msg); +void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg); /*------------------ Transport Message ------------------*/ typedef union { diff --git a/src/protocol/codec/message.c b/src/protocol/codec/message.c index 6c79af01d..68474cbac 100644 --- a/src/protocol/codec/message.c +++ b/src/protocol/codec/message.c @@ -126,11 +126,13 @@ int8_t _z_keyexpr_decode(_z_keyexpr_t *ke, _z_zbuf_t *zbf, _Bool has_suffix) { int8_t ret = _Z_RES_OK; ret |= _z_zint16_decode(&ke->_id, zbf); + ke->_owns_suffix = false; if (has_suffix == true) { char *str = NULL; ret |= _z_str_decode(&str, zbf); if (ret == _Z_RES_OK) { ke->_suffix = str; + ke->_owns_suffix = true; } else { ke->_suffix = NULL; } @@ -248,7 +250,9 @@ int8_t _z_push_body_encode(_z_wbuf_t *wbf, const _z_push_body_t *pshb) { (void)(pshb); int8_t ret = _Z_RES_OK; uint8_t header = pshb->_is_put ? _Z_MID_Z_PUT : _Z_MID_Z_DEL; - _Bool has_source_info = _z_id_check(pshb->_body._put._commons._source_info._id); + _Bool has_source_info = _z_id_check(pshb->_body._put._commons._source_info._id) || + pshb->_body._put._commons._source_info._source_sn != 0 || + pshb->_body._put._commons._source_info._entity_id != 0; _Bool has_timestamp = _z_timestamp_check(&pshb->_body._put._commons._timestamp); _Bool has_encoding = false; if (has_source_info) { @@ -476,7 +480,9 @@ int8_t _z_reply_encode(_z_wbuf_t *wbf, const _z_msg_reply_t *reply) { !_z_bytes_is_empty(&reply->_value.encoding.suffix)) { header |= _Z_FLAG_Z_R_E; } - if (reply->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO || _z_id_check(reply->_ext_source_info._id)) { + _Bool has_sourceinfo = _z_id_check(reply->_ext_source_info._id) || reply->_ext_source_info._source_sn != 0 || + reply->_ext_source_info._entity_id != 0; + if (reply->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO || has_sourceinfo) { header |= _Z_FLAG_Z_Z; } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); @@ -491,7 +497,7 @@ int8_t _z_reply_encode(_z_wbuf_t *wbf, const _z_msg_reply_t *reply) { _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &reply->_value.encoding.suffix)); } _Bool has_consolidation_ext = reply->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO; - if (_z_id_check(reply->_ext_source_info._id)) { + if (has_sourceinfo) { uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x01; if (has_consolidation_ext) { extheader |= _Z_MSG_EXT_FLAG_Z; @@ -557,7 +563,8 @@ int8_t _z_err_encode(_z_wbuf_t *wbf, const _z_msg_err_t *err) { } _Bool has_payload_ext = err->_ext_value.payload.start != NULL || err->_ext_value.encoding.prefix != 0 || !_z_bytes_is_empty(&err->_ext_value.encoding.suffix); - _Bool has_sinfo_ext = _z_id_check(err->_ext_source_info._id); + _Bool has_sinfo_ext = _z_id_check(err->_ext_source_info._id) || err->_ext_source_info._entity_id != 0 || + err->_ext_source_info._source_sn != 0; if (has_sinfo_ext || has_payload_ext) { header |= _Z_FLAG_Z_Z; } @@ -633,7 +640,8 @@ int8_t _z_ack_encode(_z_wbuf_t *wbf, const _z_msg_ack_t *ack) { int8_t ret = _Z_RES_OK; uint8_t header = _Z_MID_Z_ACK; _Bool has_ts = _z_timestamp_check(&ack->_timestamp); - _Bool has_sinfo_ext = _z_id_check(ack->_ext_source_info._id); + _Bool has_sinfo_ext = _z_id_check(ack->_ext_source_info._id) || ack->_ext_source_info._source_sn != 0 || + ack->_ext_source_info._entity_id != 0; if (has_ts) { header |= _Z_FLAG_Z_A_T; } @@ -681,7 +689,8 @@ int8_t _z_ack_decode(_z_msg_ack_t *ack, _z_zbuf_t *zbf, uint8_t header) { int8_t _z_pull_encode(_z_wbuf_t *wbf, const _z_msg_pull_t *pull) { int8_t ret = _Z_RES_OK; uint8_t header = _Z_MID_Z_PULL; - _Bool has_info_ext = _z_id_check(pull->_ext_source_info._id); + _Bool has_info_ext = _z_id_check(pull->_ext_source_info._id) || pull->_ext_source_info._source_sn != 0 || + pull->_ext_source_info._entity_id != 0; if (has_info_ext) { header |= _Z_FLAG_Z_Z; } diff --git a/src/protocol/codec/network.c b/src/protocol/codec/network.c index ddea6a32f..a522af3d7 100644 --- a/src/protocol/codec/network.c +++ b/src/protocol/codec/network.c @@ -236,7 +236,7 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) { _Z_DEBUG("Encoding _Z_MID_N_RESPONSE\n"); _Bool has_qos_ext = msg->_ext_qos._val != _Z_N_QOS_DEFAULT._val; _Bool has_ts_ext = _z_timestamp_check(&msg->_ext_timestamp); - _Bool has_responder_ext = _z_id_check(msg->_ext_responder._zid); + _Bool has_responder_ext = _z_id_check(msg->_ext_responder._zid) || msg->_ext_responder._eid != 0; uint8_t n_ext = (has_qos_ext ? 1 : 0) + (has_ts_ext ? 1 : 0) + (has_responder_ext ? 1 : 0); _Bool has_suffix = _z_keyexpr_has_suffix(msg->_key); if (msg->_key._uses_remote_mapping) { @@ -477,18 +477,23 @@ int8_t _z_network_message_decode(_z_network_message_t *msg, _z_zbuf_t *zbf) { _Z_RETURN_IF_ERR(_z_uint8_decode(&header, zbf)); switch (_Z_MID(header)) { case _Z_MID_N_DECLARE: { + msg->_tag = _Z_N_DECLARE; return _z_declare_decode(&msg->_body._declare, zbf, header); } break; case _Z_MID_N_PUSH: { + msg->_tag = _Z_N_PUSH; return _z_push_decode(&msg->_body._push, zbf, header); } break; case _Z_MID_N_REQUEST: { + msg->_tag = _Z_N_REQUEST; return _z_request_decode(&msg->_body._request, zbf, header); } break; case _Z_MID_N_RESPONSE: { + msg->_tag = _Z_N_RESPONSE; return _z_response_decode(&msg->_body._response, zbf, header); } break; case _Z_MID_N_RESPONSE_FINAL: { + msg->_tag = _Z_N_RESPONSE_FINAL; return _z_response_final_decode(&msg->_body._response_final, zbf, header); } break; default: diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 6f2874a18..75f0b0aba 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -12,10 +12,14 @@ // ZettaScale Zenoh Team, // +#include #include +#include "zenoh-pico/protocol/codec/core.h" #include "zenoh-pico/protocol/codec/ext.h" #include "zenoh-pico/protocol/codec/network.h" +#include "zenoh-pico/protocol/definitions/core.h" +#include "zenoh-pico/protocol/ext.h" #include "zenoh-pico/protocol/iobuf.h" #include "zenoh-pico/protocol/msgcodec.h" #include "zenoh-pico/utils/logging.h" @@ -25,50 +29,45 @@ int8_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t *msg int8_t ret = _Z_RES_OK; _Z_DEBUG("Encoding _Z_MID_T_JOIN\n"); - ret = _z_wbuf_write(wbf, msg->_version); + _Z_RETURN_IF_ERR(_z_wbuf_write(wbf, msg->_version)); - if (ret == _Z_RES_OK) { - uint8_t cbyte = 0; - cbyte |= (msg->_whatami & 0x03); - uint8_t zidlen = _z_id_len(msg->_zid); - cbyte |= ((zidlen - 1) & 0x0F) << 4; - ret = _z_uint8_encode(wbf, cbyte); - if (ret == _Z_RES_OK) { - ret = _z_wbuf_write_bytes(wbf, msg->_zid.id, 0, zidlen); - } - } + uint8_t cbyte = 0; + cbyte |= (msg->_whatami & 0x03); + uint8_t zidlen = _z_id_len(msg->_zid); + cbyte |= ((zidlen - 1) & 0x0F) << 4; + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, cbyte)); + _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, msg->_zid.id, 0, zidlen)); - if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_JOIN_S)) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_JOIN_S)) { uint8_t cbyte = 0; cbyte |= (msg->_seq_num_res & 0x03); cbyte |= ((msg->_req_id_res & 0x03) << 2); - ret = _z_uint8_encode(wbf, cbyte); - if (ret == _Z_RES_OK) { - ret = _z_uint16_encode(wbf, msg->_batch_size); - } - } - if (ret == _Z_RES_OK) { - if (_Z_HAS_FLAG(header, _Z_FLAG_T_JOIN_T) == true) { - ret = _z_zint_encode(wbf, msg->_lease / 1000); - } else { - ret = _z_zint_encode(wbf, msg->_lease); - } + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, cbyte)); + _Z_RETURN_IF_ERR(_z_uint16_encode(wbf, msg->_batch_size)); } - if (ret == _Z_RES_OK) { - if (msg->_next_sn._is_qos) { - if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { - ret = _z_uint8_encode(wbf, 0x51); // QOS-ext: (enc=zbuf)(mandatory=true)(id=1) - for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) { - ret = _z_zint_encode(wbf, msg->_next_sn._val._qos[i]._reliable); - ret |= _z_zint_encode(wbf, msg->_next_sn._val._qos[i]._best_effort); - } - } else { - _Z_DEBUG("Attempted to serialize QoS-SN extension, but the header extension flag was unset"); - ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; + if (_Z_HAS_FLAG(header, _Z_FLAG_T_JOIN_T) == true) { + _Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_lease / 1000)); + } else { + _Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_lease)); + } + _Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_next_sn._val._plain._reliable)); + _Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_next_sn._val._plain._best_effort)); + if (msg->_next_sn._is_qos) { + if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)); + size_t len = 0; + for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) { + len += _z_zint_len(msg->_next_sn._val._qos[i]._reliable) + + _z_zint_len(msg->_next_sn._val._qos[i]._best_effort); + } + _Z_RETURN_IF_ERR(_z_zint_encode(wbf, len)); + for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) { + _Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_next_sn._val._qos[i]._reliable)); + _Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_next_sn._val._qos[i]._best_effort)); } } else { - ret |= _z_zint_encode(wbf, msg->_next_sn._val._plain._reliable); - ret |= _z_zint_encode(wbf, msg->_next_sn._val._plain._best_effort); + _Z_DEBUG("Attempted to serialize QoS-SN extension, but the header extension flag was unset"); + ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED; } } @@ -78,12 +77,13 @@ int8_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t *msg int8_t _z_join_decode_ext(_z_msg_ext_t *extension, void *ctx) { int8_t ret = _Z_RES_OK; _z_t_msg_join_t *msg = (_z_t_msg_join_t *)ctx; - if (_Z_EXT_FULL_ID(extension->_header) == 0x51) { // QOS: (enc=zbuf)(mandatory=true)(id=1) + if (_Z_EXT_FULL_ID(extension->_header) == + (_Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)) { // QOS: (enc=zbuf)(mandatory=true)(id=1) msg->_next_sn._is_qos = true; _z_zbuf_t zbf = _z_zbytes_as_zbuf(extension->_body._zbuf._val); for (int i = 0; (ret == _Z_RES_OK) && (i < Z_PRIORITIES_NUM); ++i) { - ret |= _z_zint_decode(&msg->_next_sn._val._plain._reliable, &zbf); - ret |= _z_zint_decode(&msg->_next_sn._val._plain._best_effort, &zbf); + ret |= _z_zint_decode(&msg->_next_sn._val._qos[i]._reliable, &zbf); + ret |= _z_zint_decode(&msg->_next_sn._val._qos[i]._best_effort, &zbf); } } else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) { ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN; @@ -204,9 +204,6 @@ int8_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header) { if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_INIT_A) == true)) { ret |= _z_bytes_decode(&msg->_cookie, zbf); - if (ret != _Z_RES_OK) { - msg->_cookie = _z_bytes_empty(); - } } else { msg->_cookie = _z_bytes_empty(); } @@ -305,7 +302,9 @@ int8_t _z_keep_alive_decode(_z_t_msg_keep_alive_t *msg, _z_zbuf_t *zbf, uint8_t int8_t ret = _Z_RES_OK; _Z_DEBUG("Decoding _Z_MID_T_KEEP_ALIVE\n"); - ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x03); + if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { + ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x03); + } return ret; } diff --git a/src/protocol/definitions/message.c b/src/protocol/definitions/message.c index 9620161f2..fc6b14491 100644 --- a/src/protocol/definitions/message.c +++ b/src/protocol/definitions/message.c @@ -11,11 +11,11 @@ void _z_msg_put_clear(_z_msg_put_t *msg) { } _z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *msg) { - return (_z_msg_query_reqexts_t){.body = msg->_ext_value.payload.start != NULL || - msg->_ext_value.encoding.prefix != 0 || - !_z_bytes_is_empty(&msg->_ext_value.encoding.suffix), - .info = _z_id_check(msg->_ext_info._id), - .consolidation = msg->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO}; + return (_z_msg_query_reqexts_t){ + .body = msg->_ext_value.payload.start != NULL || msg->_ext_value.encoding.prefix != 0 || + !_z_bytes_is_empty(&msg->_ext_value.encoding.suffix), + .info = _z_id_check(msg->_ext_info._id) || msg->_ext_info._entity_id != 0 || msg->_ext_info._source_sn != 0, + .consolidation = msg->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO}; } void _z_msg_query_clear(_z_msg_query_t *msg) { diff --git a/src/protocol/definitions/transport.c b/src/protocol/definitions/transport.c index bfbd600b7..2af2f11e0 100644 --- a/src/protocol/definitions/transport.c +++ b/src/protocol/definitions/transport.c @@ -2,56 +2,56 @@ #include "zenoh-pico/utils/logging.h" -void _z_s_msg_clear_scout(_z_s_msg_scout_t *msg) {} +void _z_s_msg_scout_clear(_z_s_msg_scout_t *msg) {} /*------------------ Locators Field ------------------*/ void _z_locators_clear(_z_locator_array_t *ls) { _z_locator_array_clear(ls); } -void _z_s_msg_clear_hello(_z_s_msg_hello_t *msg) { _z_locators_clear(&msg->_locators); } +void _z_s_msg_hello_clear(_z_s_msg_hello_t *msg) { _z_locators_clear(&msg->_locators); } -void _z_t_msg_clear_join(_z_t_msg_join_t *msg) {} +void _z_t_msg_join_clear(_z_t_msg_join_t *msg) {} -void _z_t_msg_clear_init(_z_t_msg_init_t *msg) { _z_bytes_clear(&msg->_cookie); } +void _z_t_msg_init_clear(_z_t_msg_init_t *msg) { _z_bytes_clear(&msg->_cookie); } -void _z_t_msg_clear_open(_z_t_msg_open_t *msg) { _z_bytes_clear(&msg->_cookie); } +void _z_t_msg_open_clear(_z_t_msg_open_t *msg) { _z_bytes_clear(&msg->_cookie); } -void _z_t_msg_clear_close(_z_t_msg_close_t *msg) { (void)(msg); } +void _z_t_msg_close_clear(_z_t_msg_close_t *msg) { (void)(msg); } -void _z_t_msg_clear_keep_alive(_z_t_msg_keep_alive_t *msg) { (void)(msg); } +void _z_t_msg_keep_alive_clear(_z_t_msg_keep_alive_t *msg) { (void)(msg); } -void _z_t_msg_clear_frame(_z_t_msg_frame_t *msg) { _z_network_message_vec_clear(&msg->_messages); } +void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg) { _z_network_message_vec_clear(&msg->_messages); } -void _z_t_msg_clear_fragment(_z_t_msg_fragment_t *msg) { _z_bytes_clear(&msg->_payload); } +void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg) { _z_bytes_clear(&msg->_payload); } void _z_t_msg_clear(_z_transport_message_t *msg) { uint8_t mid = _Z_MID(msg->_header); switch (mid) { case _Z_MID_T_JOIN: { - _z_t_msg_clear_join(&msg->_body._join); + _z_t_msg_join_clear(&msg->_body._join); } break; case _Z_MID_T_INIT: { - _z_t_msg_clear_init(&msg->_body._init); + _z_t_msg_init_clear(&msg->_body._init); } break; case _Z_MID_T_OPEN: { - _z_t_msg_clear_open(&msg->_body._open); + _z_t_msg_open_clear(&msg->_body._open); } break; case _Z_MID_T_CLOSE: { - _z_t_msg_clear_close(&msg->_body._close); + _z_t_msg_close_clear(&msg->_body._close); } break; case _Z_MID_T_KEEP_ALIVE: { - _z_t_msg_clear_keep_alive(&msg->_body._keep_alive); + _z_t_msg_keep_alive_clear(&msg->_body._keep_alive); } break; case _Z_MID_T_FRAME: { - _z_t_msg_clear_frame(&msg->_body._frame); + _z_t_msg_frame_clear(&msg->_body._frame); } break; case _Z_MID_T_FRAGMENT: { - _z_t_msg_clear_fragment(&msg->_body._fragment); + _z_t_msg_fragment_clear(&msg->_body._fragment); } break; default: { @@ -321,11 +321,11 @@ void _z_s_msg_clear(_z_scouting_message_t *msg) { uint8_t mid = _Z_MID(msg->_header); switch (mid) { case _Z_MID_SCOUT: { - _z_s_msg_clear_scout(&msg->_body._scout); + _z_s_msg_scout_clear(&msg->_body._scout); } break; case _Z_MID_HELLO: { - _z_s_msg_clear_hello(&msg->_body._hello); + _z_s_msg_hello_clear(&msg->_body._hello); } break; default: { diff --git a/src/session/resource.c b/src/session/resource.c index e90f1fde2..68859175e 100644 --- a/src/session/resource.c +++ b/src/session/resource.c @@ -198,7 +198,6 @@ _z_keyexpr_t _z_get_expanded_key_from_key(_z_session_t *zn, const _z_keyexpr_t * #if Z_MULTI_THREAD == 1 _z_mutex_lock(&zn->_mutex_inner); #endif // Z_MULTI_THREAD == 1 - _Bool is_local = !keyexpr->_uses_remote_mapping; _z_keyexpr_t res = __unsafe_z_get_expanded_key_from_key(zn, keyexpr); #if Z_MULTI_THREAD == 1 diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index 836573ac9..f3523d21f 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -16,6 +16,7 @@ #include "zenoh-pico/protocol/codec/message.h" #include "zenoh-pico/protocol/codec/network.h" #include "zenoh-pico/protocol/definitions/message.h" +#include "zenoh-pico/protocol/definitions/transport.h" #define ZENOH_PICO_TEST_H #include @@ -710,7 +711,7 @@ _z_decl_subscriber_t gen_subscriber_declaration(void) { return e_sd; } -void assert_eq_subscriber_declaration(_z_decl_subscriber_t *left, _z_decl_subscriber_t *right) { +void assert_eq_subscriber_declaration(const _z_decl_subscriber_t *left, const _z_decl_subscriber_t *right) { assert_eq_keyexpr(&left->_keyexpr, &right->_keyexpr); assert(left->_id == right->_id); assert(left->_ext_subinfo._pull_mode == right->_ext_subinfo._pull_mode); @@ -756,7 +757,7 @@ _z_decl_queryable_t gen_queryable_declaration(void) { return e_qd; } -void assert_eq_queryable_declaration(_z_decl_queryable_t *left, _z_decl_queryable_t *right) { +void assert_eq_queryable_declaration(const _z_decl_queryable_t *left, const _z_decl_queryable_t *right) { assert_eq_keyexpr(&left->_keyexpr, &right->_keyexpr); printf("Complete (%u:%u), ", left->_ext_queryable_info._complete, right->_ext_queryable_info._complete); @@ -805,7 +806,7 @@ _z_undecl_kexpr_t gen_forget_resource_declaration(void) { return e_frd; } -void assert_eq_forget_resource_declaration(_z_undecl_kexpr_t *left, _z_undecl_kexpr_t *right) { +void assert_eq_forget_resource_declaration(const _z_undecl_kexpr_t *left, const _z_undecl_kexpr_t *right) { printf("RID (%u:%u)", left->_id, right->_id); assert(left->_id == right->_id); } @@ -846,7 +847,7 @@ _z_undecl_subscriber_t gen_forget_subscriber_declaration(void) { return e_fsd; } -void assert_eq_forget_subscriber_declaration(_z_undecl_subscriber_t *left, _z_undecl_subscriber_t *right) { +void assert_eq_forget_subscriber_declaration(const _z_undecl_subscriber_t *left, const _z_undecl_subscriber_t *right) { assert_eq_keyexpr(&left->_ext_keyexpr, &right->_ext_keyexpr); assert(left->_id == right->_id); } @@ -889,7 +890,7 @@ _z_undecl_queryable_t gen_forget_queryable_declaration(void) { return e_fqd; } -void assert_eq_forget_queryable_declaration(_z_undecl_queryable_t *left, _z_undecl_queryable_t *right) { +void assert_eq_forget_queryable_declaration(const _z_undecl_queryable_t *left, const _z_undecl_queryable_t *right) { assert_eq_keyexpr(&left->_ext_keyexpr, &right->_ext_keyexpr); assert(left->_id == right->_id); } @@ -961,7 +962,7 @@ _z_declaration_t gen_declaration(void) { return d; } -void assert_eq_declaration(_z_declaration_t *left, _z_declaration_t *right) { +void assert_eq_declaration(const _z_declaration_t *left, const _z_declaration_t *right) { printf("Declaration -> "); printf("Header (%x:%x), ", left->_tag, right->_tag); assert(left->_tag == right->_tag); @@ -1447,7 +1448,8 @@ void response_final_message(void) { _z_n_msg_response_final_t decoded; __auto_type zbf = _z_wbuf_to_zbuf(&wbf); __auto_type header = _z_zbuf_read(&zbf); - assert(_Z_RES_OK == _z_response_final_decode(&decoded, &zbf, header)); + int8_t ret = _z_response_final_decode(&decoded, &zbf, header); + assert(_Z_RES_OK == ret); assert_eq_response_final(&expected, &decoded); _z_n_msg_response_final_clear(&decoded); _z_n_msg_response_final_clear(&expected); @@ -1455,6 +1457,232 @@ void response_final_message(void) { _z_wbuf_clear(&wbf); } +_z_transport_message_t gen_join(void) { + _z_conduit_sn_list_t conduit = {._is_qos = gen_bool()}; + if (conduit._is_qos) { + for (int i = 0; i < Z_PRIORITIES_NUM; i++) { + conduit._val._qos[i]._best_effort = gen_uint64(); + conduit._val._qos[i]._reliable = gen_uint64(); + } + } else { + conduit._val._plain._best_effort = gen_uint64(); + conduit._val._plain._reliable = gen_uint64(); + } + return _z_t_msg_make_join(gen_uint8() % 3, gen_uint64(), gen_zid(), conduit); +} +void assert_eq_join(const _z_t_msg_join_t *left, const _z_t_msg_join_t *right) { + assert(memcmp(left->_zid.id, right->_zid.id, 16) == 0); + assert(left->_lease == right->_lease); + assert(left->_batch_size == right->_batch_size); + assert(left->_whatami == right->_whatami); + assert(left->_req_id_res == right->_req_id_res); + assert(left->_seq_num_res == right->_seq_num_res); + assert(left->_version == right->_version); + assert(left->_next_sn._is_qos == right->_next_sn._is_qos); + if (left->_next_sn._is_qos) { + for (int i = 0; i < Z_PRIORITIES_NUM; i++) { + assert(left->_next_sn._val._qos[i]._best_effort == right->_next_sn._val._qos[i]._best_effort); + assert(left->_next_sn._val._qos[i]._reliable == right->_next_sn._val._qos[i]._reliable); + } + } else { + assert(left->_next_sn._val._plain._best_effort == right->_next_sn._val._plain._best_effort); + assert(left->_next_sn._val._plain._reliable == right->_next_sn._val._plain._reliable); + } +} +void join_message(void) { + printf("\n>> Join message\n"); + __auto_type wbf = gen_wbuf(UINT16_MAX); + __auto_type expected = gen_join(); + assert(_z_join_encode(&wbf, expected._header, &expected._body._join) == _Z_RES_OK); + _z_t_msg_join_t decoded; + __auto_type zbf = _z_wbuf_to_zbuf(&wbf); + int8_t ret = _z_join_decode(&decoded, &zbf, expected._header); + assert(_Z_RES_OK == ret); + assert_eq_join(&expected._body._join, &decoded); + _z_t_msg_join_clear(&decoded); + _z_t_msg_clear(&expected); + _z_zbuf_clear(&zbf); + _z_wbuf_clear(&wbf); +} + +_z_transport_message_t gen_init(void) { + if (gen_bool()) { + return _z_t_msg_make_init_syn(gen_uint8() % 3, gen_zid()); + } else { + return _z_t_msg_make_init_ack(gen_uint8() % 3, gen_zid(), gen_bytes(16)); + } +} +void assert_eq_init(const _z_t_msg_init_t *left, const _z_t_msg_init_t *right) { + assert(left->_batch_size == right->_batch_size); + assert(left->_req_id_res == right->_req_id_res); + assert(left->_seq_num_res == right->_seq_num_res); + assert_eq_bytes(&left->_cookie, &right->_cookie); + assert(memcmp(left->_zid.id, right->_zid.id, 16) == 0); + assert(left->_version == right->_version); + assert(left->_whatami == right->_whatami); +} +void init_message(void) { + printf("\n>> Init message\n"); + __auto_type wbf = gen_wbuf(UINT16_MAX); + __auto_type expected = gen_init(); + assert(_z_init_encode(&wbf, expected._header, &expected._body._init) == _Z_RES_OK); + _z_t_msg_init_t decoded; + __auto_type zbf = _z_wbuf_to_zbuf(&wbf); + int8_t ret = _z_init_decode(&decoded, &zbf, expected._header); + assert(_Z_RES_OK == ret); + assert_eq_init(&expected._body._init, &decoded); + _z_t_msg_init_clear(&decoded); + _z_t_msg_clear(&expected); + _z_zbuf_clear(&zbf); + _z_wbuf_clear(&wbf); +} + +_z_transport_message_t gen_open(void) { + if (gen_bool()) { + return _z_t_msg_make_open_syn(gen_uint(), gen_uint(), gen_bytes(16)); + } else { + return _z_t_msg_make_open_ack(gen_uint(), gen_uint()); + } +} +void assert_eq_open(const _z_t_msg_open_t *left, const _z_t_msg_open_t *right) { + assert_eq_bytes(&left->_cookie, &right->_cookie); + assert(left->_initial_sn == right->_initial_sn); + assert(left->_lease == right->_lease); +} +void open_message(void) { + printf("\n>> open message\n"); + __auto_type wbf = gen_wbuf(UINT16_MAX); + __auto_type expected = gen_open(); + assert(_z_open_encode(&wbf, expected._header, &expected._body._open) == _Z_RES_OK); + _z_t_msg_open_t decoded; + __auto_type zbf = _z_wbuf_to_zbuf(&wbf); + int8_t ret = _z_open_decode(&decoded, &zbf, expected._header); + assert(_Z_RES_OK == ret); + assert_eq_open(&expected._body._open, &decoded); + _z_t_msg_open_clear(&decoded); + _z_t_msg_clear(&expected); + _z_zbuf_clear(&zbf); + _z_wbuf_clear(&wbf); +} + +_z_transport_message_t gen_close(void) { return _z_t_msg_make_close(gen_uint(), gen_bool()); } +void assert_eq_close(const _z_t_msg_close_t *left, const _z_t_msg_close_t *right) { + assert(left->_reason == right->_reason); +} +void close_message(void) { + printf("\n>> close message\n"); + __auto_type wbf = gen_wbuf(UINT16_MAX); + __auto_type expected = gen_close(); + assert(_z_close_encode(&wbf, expected._header, &expected._body._close) == _Z_RES_OK); + _z_t_msg_close_t decoded; + __auto_type zbf = _z_wbuf_to_zbuf(&wbf); + int8_t ret = _z_close_decode(&decoded, &zbf, expected._header); + assert(_Z_RES_OK == ret); + assert_eq_close(&expected._body._close, &decoded); + _z_t_msg_close_clear(&decoded); + _z_t_msg_clear(&expected); + _z_zbuf_clear(&zbf); + _z_wbuf_clear(&wbf); +} + +_z_transport_message_t gen_keep_alive(void) { return _z_t_msg_make_keep_alive(); } +void assert_eq_keep_alive(const _z_t_msg_keep_alive_t *left, const _z_t_msg_keep_alive_t *right) { + (void)left; + (void)right; +} +void keep_alive_message(void) { + printf("\n>> keep_alive message\n"); + __auto_type wbf = gen_wbuf(UINT16_MAX); + __auto_type expected = gen_keep_alive(); + assert(_z_keep_alive_encode(&wbf, expected._header, &expected._body._keep_alive) == _Z_RES_OK); + _z_t_msg_keep_alive_t decoded; + __auto_type zbf = _z_wbuf_to_zbuf(&wbf); + int8_t ret = _z_keep_alive_decode(&decoded, &zbf, expected._header); + assert(_Z_RES_OK == ret); + assert_eq_keep_alive(&expected._body._keep_alive, &decoded); + _z_t_msg_keep_alive_clear(&decoded); + _z_t_msg_clear(&expected); + _z_zbuf_clear(&zbf); + _z_wbuf_clear(&wbf); +} +_z_network_message_t gen_net_msg(void) { + switch (gen_uint8() % 5) { + case 0: { + return gen_declare_message(); + } break; + case 1: { + return (_z_network_message_t){._tag = _Z_N_PUSH, ._body._push = gen_push()}; + } break; + case 2: { + return (_z_network_message_t){._tag = _Z_N_REQUEST, ._body._request = gen_request()}; + } break; + case 3: { + return (_z_network_message_t){._tag = _Z_N_RESPONSE, ._body._response = gen_response()}; + } break; + case 4: { + return (_z_network_message_t){._tag = _Z_N_RESPONSE_FINAL, ._body._response_final = gen_response_final()}; + } break; + } + assert(false); +} +void assert_eq_net_msg(const _z_network_message_t *left, const _z_network_message_t *right) { + assert(left->_tag == right->_tag); + switch (left->_tag) { + case _Z_N_DECLARE: { + assert_eq_declaration(&left->_body._declare._decl, &right->_body._declare._decl); + assert_eq_timestamp(&left->_body._declare._ext_timestamp, &right->_body._declare._ext_timestamp); + assert(left->_body._declare._ext_qos._val == right->_body._declare._ext_qos._val); + } break; + case _Z_N_PUSH: { + assert_eq_push(&left->_body._push, &right->_body._push); + } break; + case _Z_N_REQUEST: { + assert_eq_request(&left->_body._request, &right->_body._request); + } break; + case _Z_N_RESPONSE: { + assert_eq_response(&left->_body._response, &right->_body._response); + } break; + case _Z_N_RESPONSE_FINAL: { + assert_eq_response_final(&left->_body._response_final, &right->_body._response_final); + } break; + } +} +_z_network_message_vec_t gen_net_msgs(size_t n) { + _z_network_message_vec_t ret = _z_network_message_vec_make(n); + for (size_t i = 0; i < n; i++) { + _z_network_message_t *msg = (_z_network_message_t *)z_malloc(sizeof(_z_network_message_t)); + *msg = gen_net_msg(); + _z_network_message_vec_append(&ret, msg); + } + return ret; +} + +_z_transport_message_t gen_frame(void) { + return _z_t_msg_make_frame(gen_uint(), gen_net_msgs(gen_uint8() % 16), gen_bool()); +} +void assert_eq_frame(const _z_t_msg_frame_t *left, const _z_t_msg_frame_t *right) { + assert(left->_sn == right->_sn); + assert(left->_messages._len == right->_messages._len); + for (size_t i = 0; i < left->_messages._len; i++) { + assert_eq_net_msg(left->_messages._val[i], right->_messages._val[i]); + } +} +void frame_message(void) { + printf("\n>> frame message\n"); + __auto_type wbf = gen_wbuf(UINT16_MAX); + __auto_type expected = gen_frame(); + assert(_z_frame_encode(&wbf, expected._header, &expected._body._frame) == _Z_RES_OK); + _z_t_msg_frame_t decoded; + __auto_type zbf = _z_wbuf_to_zbuf(&wbf); + int8_t ret = _z_frame_decode(&decoded, &zbf, expected._header); + assert(_Z_RES_OK == ret); + assert_eq_frame(&expected._body._frame, &decoded); + _z_t_msg_frame_clear(&decoded); + _z_t_msg_clear(&expected); + _z_zbuf_clear(&zbf); + _z_wbuf_clear(&wbf); +} + /*=============================*/ /* Main */ /*=============================*/ @@ -1492,13 +1720,13 @@ int main(void) { response_message(); response_final_message(); - // // Transport messages - // join_message(); - // init_message(); - // open_message(); - // close_message(); - // keep_alive_message(); - // frame_message(); + // Transport messages + join_message(); + init_message(); + open_message(); + close_message(); + keep_alive_message(); + frame_message(); // fragment_message(); // transport_message(); // batch();