From c25ace38ecca43cf977cfba5ec75a91c5ec598b9 Mon Sep 17 00:00:00 2001 From: Pierre Avital Date: Fri, 7 Jul 2023 18:00:59 +0200 Subject: [PATCH] network layer tests passing --- src/protocol/codec/ext.c | 8 ++++- src/protocol/codec/message.c | 39 ++++++++++---------- src/protocol/codec/network.c | 58 ++++++++++++++++++++---------- src/protocol/definitions/message.c | 6 +++- tests/z_msgcodec_test.c | 22 ++++++------ 5 files changed, 83 insertions(+), 50 deletions(-) diff --git a/src/protocol/codec/ext.c b/src/protocol/codec/ext.c index ff096e25e..a5ce38e3f 100644 --- a/src/protocol/codec/ext.c +++ b/src/protocol/codec/ext.c @@ -178,7 +178,7 @@ int8_t _z_msg_ext_unknown_error(_z_msg_ext_t *extension, uint8_t trace_id) { } case _Z_MSG_EXT_ENC_ZINT: { _Z_ERROR("Unknown mandatory extension found (extension_id: %02x, trace_id: %02x), ZINT(%02x)\n", ext_id, - trace_id, extension->_body._zint); + trace_id, extension->_body._zint._val); break; } case _Z_MSG_EXT_ENC_ZBUF: { @@ -215,7 +215,13 @@ int8_t _z_msg_ext_decode_iter(_z_zbuf_t *zbf, int8_t (*callback)(_z_msg_ext_t *, _Bool has_next = true; while (has_next && ret == _Z_RES_OK) { _z_msg_ext_t ext = _z_msg_ext_make_unit(0); + size_t start = zbf->_ios._r_pos; ret |= _z_msg_ext_decode(&ext, zbf, &has_next); + printf("EXT (%d): ", ret); + for (; start < zbf->_ios._r_pos; start++) { + printf("%02x ", zbf->_ios._buf[start]); + } + printf("\n"); if (ret == _Z_RES_OK) { ret |= callback(&ext, context); _z_msg_ext_clear(&ext); diff --git a/src/protocol/codec/message.c b/src/protocol/codec/message.c index 51b2342d2..6c79af01d 100644 --- a/src/protocol/codec/message.c +++ b/src/protocol/codec/message.c @@ -247,7 +247,7 @@ int8_t _z_push_body_encode(_z_wbuf_t *wbf, const _z_push_body_t *pshb) { (void)(wbf); (void)(pshb); int8_t ret = _Z_RES_OK; - uint8_t header = pshb->_is_put ? _Z_M_PUT_ID : _Z_M_DEL_ID; + uint8_t header = pshb->_is_put ? _Z_MID_Z_PUT : _Z_MID_Z_DEL; _Bool has_source_info = _z_id_check(pshb->_body._put._commons._source_info._id); _Bool has_timestamp = _z_timestamp_check(&pshb->_body._put._commons._timestamp); _Bool has_encoding = false; @@ -310,7 +310,7 @@ int8_t _z_push_body_decode(_z_push_body_t *pshb, _z_zbuf_t *zbf, uint8_t header) int8_t ret = _Z_RES_OK; if (ret == _Z_RES_OK) { switch (_Z_MID(header)) { - case _Z_M_PUT_ID: { + case _Z_MID_Z_PUT: { pshb->_is_put = true; pshb->_body._put = (_z_msg_put_t){0}; if (_Z_HAS_FLAG(header, _Z_FLAG_Z_P_T)) { @@ -328,7 +328,7 @@ int8_t _z_push_body_decode(_z_push_body_t *pshb, _z_zbuf_t *zbf, uint8_t header) } break; } - case _Z_M_DEL_ID: { + case _Z_MID_Z_DEL: { pshb->_is_put = false; pshb->_body._del = (_z_msg_del_t){0}; if (_Z_HAS_FLAG(header, _Z_FLAG_Z_D_T)) { @@ -397,10 +397,9 @@ int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *msg) { extheader |= _Z_FLAG_Z_Z; } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); - _Z_RETURN_IF_ERR(_z_zint_encode( - wbf, 1 + _z_zint_len(msg->_ext_value.encoding.prefix) + _z_zint_len(msg->_ext_value.encoding.suffix.len) + - msg->_ext_value.encoding.suffix.len + _z_zint_len(msg->_ext_value.payload.len) + - msg->_ext_value.payload.len)); + _Z_RETURN_IF_ERR(_z_zint_encode(wbf, _z_zint_len(msg->_ext_value.encoding.prefix) + + _z_bytes_encode_len(&msg->_ext_value.encoding.suffix) + + _z_bytes_encode_len(&msg->_ext_value.payload))); _Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, msg->_ext_value.encoding.prefix)); _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &msg->_ext_value.encoding.suffix)); _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &msg->_ext_value.payload)); @@ -415,6 +414,7 @@ int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *msg) { } if (required_exts.info) { uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x01; + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); _Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &msg->_ext_info)); } @@ -452,15 +452,16 @@ int8_t _z_query_decode_extensions(_z_msg_ext_t *extension, void *ctx) { int8_t _z_query_decode(_z_msg_query_t *msg, _z_zbuf_t *zbf, uint8_t header) { _Z_DEBUG("Decoding _Z_MID_Z_QUERY\n"); *msg = (_z_msg_query_t){0}; + msg->_ext_consolidation = Z_CONSOLIDATION_MODE_AUTO; int8_t ret = _Z_RES_OK; if (_Z_HAS_FLAG(header, _Z_FLAG_Z_P)) { - ret = _z_bytes_decode(&msg->_parameters, zbf); + _Z_RETURN_IF_ERR(_z_bytes_decode(&msg->_parameters, zbf)); } else { _z_bytes_clear(&msg->_parameters); } - if (ret == _Z_RES_OK) { - ret = _z_msg_ext_decode_iter(zbf, _z_query_decode_extensions, msg); + if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { + _Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_query_decode_extensions, msg)); } return ret; @@ -630,7 +631,7 @@ int8_t _z_err_decode(_z_msg_err_t *err, _z_zbuf_t *zbf, uint8_t header) { int8_t _z_ack_encode(_z_wbuf_t *wbf, const _z_msg_ack_t *ack) { int8_t ret = _Z_RES_OK; - uint8_t header = _Z_MID_Z_ERR; + uint8_t header = _Z_MID_Z_ACK; _Bool has_ts = _z_timestamp_check(&ack->_timestamp); _Bool has_sinfo_ext = _z_id_check(ack->_ext_source_info._id); if (has_ts) { @@ -639,13 +640,13 @@ int8_t _z_ack_encode(_z_wbuf_t *wbf, const _z_msg_ack_t *ack) { if (has_sinfo_ext) { header |= _Z_FLAG_Z_Z; } - ret = _z_uint8_encode(wbf, header); - if ((ret == _Z_RES_OK) && has_ts) { - ret = _z_timestamp_encode(wbf, &ack->_timestamp); + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); + if (has_ts) { + _Z_RETURN_IF_ERR(_z_timestamp_encode(wbf, &ack->_timestamp)); } - if ((ret == _Z_RES_OK) && has_sinfo_ext) { - ret = _z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x01); - ret |= _z_source_info_encode_ext(wbf, &ack->_ext_source_info); + if (has_sinfo_ext) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x01)); + _Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &ack->_ext_source_info)); } return ret; } @@ -669,9 +670,9 @@ int8_t _z_ack_decode(_z_msg_ack_t *ack, _z_zbuf_t *zbf, uint8_t header) { int8_t ret = _Z_RES_OK; *ack = (_z_msg_ack_t){0}; if (_Z_HAS_FLAG(header, _Z_FLAG_Z_A_T)) { - ret = _z_timestamp_decode(&ack->_timestamp, zbf); + _Z_RETURN_IF_ERR(_z_timestamp_decode(&ack->_timestamp, zbf)); } - if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { + if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { ret = _z_msg_ext_decode_iter(zbf, _z_ack_decode_extension, ack); } return ret; diff --git a/src/protocol/codec/network.c b/src/protocol/codec/network.c index 06bc21bb6..ddea6a32f 100644 --- a/src/protocol/codec/network.c +++ b/src/protocol/codec/network.c @@ -88,6 +88,8 @@ int8_t _z_push_decode_ext_cb(_z_msg_ext_t *extension, void *ctx) { int8_t _z_push_decode(_z_n_msg_push_t *msg, _z_zbuf_t *zbf, uint8_t header) { int8_t ret = _Z_RES_OK; + *msg = (_z_n_msg_push_t){0}; + msg->_qos = _Z_N_QOS_DEFAULT; ret |= _z_keyexpr_decode(&msg->_key, zbf, _Z_HAS_FLAG(header, _Z_FLAG_N_PUSH_N)); msg->_key._uses_remote_mapping = !_Z_HAS_FLAG(header, _Z_FLAG_N_PUSH_M); if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_N_Z)) { @@ -110,11 +112,11 @@ int8_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg) { if (has_suffix) { header |= _Z_FLAG_N_REQUEST_N; } - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); + _z_n_msg_request_exts_t exts = _z_n_msg_request_needed_exts(msg); + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header | (exts.n != 0 ? _Z_FLAG_Z_Z : 0))); _Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_rid)); _Z_RETURN_IF_ERR(_z_keyexpr_encode(wbf, has_suffix, &msg->_key)); - _z_n_msg_request_exts_t exts = _z_n_msg_request_needed_exts(msg); if (exts.ext_qos) { exts.n -= 1; uint8_t extheader = 0x01 | _Z_MSG_EXT_ENC_ZINT | (exts.n ? _Z_FLAG_Z_Z : 0); @@ -174,7 +176,7 @@ int8_t _z_request_decode_extensions(_z_msg_ext_t *extension, void *ctx) { _Z_RETURN_IF_ERR(_z_timestamp_decode(&msg->_ext_timestamp, &zbf)); break; } - case 0x04 | _Z_MSG_EXT_ENC_ZINT: { + case 0x04 | _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M: { msg->_ext_target = extension->_body._zint._val; if (msg->_ext_target > 2) { return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; @@ -193,25 +195,33 @@ int8_t _z_request_decode_extensions(_z_msg_ext_t *extension, void *ctx) { } return _Z_RES_OK; } -int8_t _z_request_decode(_z_n_msg_request_t *msg, _z_zbuf_t *zbf, uint8_t header) { +int8_t _z_request_decode(_z_n_msg_request_t *msg, _z_zbuf_t *zbf, const uint8_t header) { + *msg = (_z_n_msg_request_t){0}; + msg->_ext_qos = _Z_N_QOS_DEFAULT; _Z_RETURN_IF_ERR(_z_zint_decode(&msg->_rid, zbf)); _Z_RETURN_IF_ERR(_z_keyexpr_decode(&msg->_key, zbf, _Z_HAS_FLAG(header, _Z_FLAG_N_REQUEST_N))); msg->_key._uses_remote_mapping = !_Z_HAS_FLAG(header, _Z_FLAG_N_REQUEST_M); if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { _Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_request_decode_extensions, msg)); } - switch (_Z_MID(header)) { + uint8_t zheader; + _Z_RETURN_IF_ERR(_z_uint8_decode(&zheader, zbf)); + switch (_Z_MID(zheader)) { case _Z_MID_Z_QUERY: { - _Z_RETURN_IF_ERR(_z_query_decode(&msg->_body._query, zbf, header)); + msg->_tag = _Z_REQUEST_QUERY; + _Z_RETURN_IF_ERR(_z_query_decode(&msg->_body._query, zbf, zheader)); } break; case _Z_MID_Z_PUT: { - _Z_RETURN_IF_ERR(_z_put_decode(&msg->_body._put, zbf, header)); + msg->_tag = _Z_REQUEST_PUT; + _Z_RETURN_IF_ERR(_z_put_decode(&msg->_body._put, zbf, zheader)); } break; case _Z_MID_Z_DEL: { - _Z_RETURN_IF_ERR(_z_del_decode(&msg->_body._del, zbf, header)); + msg->_tag = _Z_REQUEST_DEL; + _Z_RETURN_IF_ERR(_z_del_decode(&msg->_body._del, zbf, zheader)); } break; case _Z_MID_Z_PULL: { - _Z_RETURN_IF_ERR(_z_pull_decode(&msg->_body._pull, zbf, header)); + msg->_tag = _Z_REQUEST_PULL; + _Z_RETURN_IF_ERR(_z_pull_decode(&msg->_body._pull, zbf, zheader)); } break; default: return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED; @@ -228,10 +238,11 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) { _Bool has_ts_ext = _z_timestamp_check(&msg->_ext_timestamp); _Bool has_responder_ext = _z_id_check(msg->_ext_responder._zid); uint8_t n_ext = (has_qos_ext ? 1 : 0) + (has_ts_ext ? 1 : 0) + (has_responder_ext ? 1 : 0); + _Bool has_suffix = _z_keyexpr_has_suffix(msg->_key); if (msg->_key._uses_remote_mapping) { _Z_SET_FLAG(header, _Z_FLAG_N_RESPONSE_M); } - if (msg->_key._suffix != NULL) { + if (has_suffix) { _Z_SET_FLAG(header, _Z_FLAG_N_RESPONSE_N); } if (n_ext != 0) { @@ -241,7 +252,7 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) { _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); _Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_request_id)); _Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_key._id)); - if (msg->_key._suffix != NULL) { + if (has_suffix) { _Z_RETURN_IF_ERR(_z_str_encode(wbf, msg->_key._suffix)) } if (has_qos_ext) { @@ -261,12 +272,12 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) { } if (has_responder_ext) { n_ext -= 1; - uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x02 | (n_ext != 0 ? _Z_FLAG_Z_Z : 0); + uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x03 | (n_ext != 0 ? _Z_FLAG_Z_Z : 0); _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); uint8_t zidlen = _z_id_len(msg->_ext_responder._zid); extheader = (zidlen - 1) << 4; - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); _Z_RETURN_IF_ERR(_z_zint_encode(wbf, zidlen + 1 + _z_zint_len(msg->_ext_responder._eid))); + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); _Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, msg->_ext_responder._zid.id, 0, zidlen)); _Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_ext_responder._eid)); } @@ -329,6 +340,8 @@ int8_t _z_response_decode_extension(_z_msg_ext_t *extension, void *ctx) { int8_t _z_response_decode(_z_n_msg_response_t *msg, _z_zbuf_t *zbf, uint8_t header) { _Z_DEBUG("Decoding _Z_MID_N_RESPONSE\n"); + *msg = (_z_n_msg_response_t){0}; + msg->_ext_qos = _Z_N_QOS_DEFAULT; int8_t ret = _Z_RES_OK; msg->_key._uses_remote_mapping = !_Z_HAS_FLAG(header, _Z_FLAG_N_RESPONSE_M); _Z_RETURN_IF_ERR(_z_zint_decode(&msg->_request_id, zbf)); @@ -342,23 +355,28 @@ int8_t _z_response_decode(_z_n_msg_response_t *msg, _z_zbuf_t *zbf, uint8_t head switch (_Z_MID(inner_header)) { case _Z_MID_Z_REPLY: { - _z_reply_decode(&msg->_body._reply, zbf, header); + msg->_tag = _Z_RESPONSE_BODY_REPLY; + ret = _z_reply_decode(&msg->_body._reply, zbf, inner_header); break; } case _Z_MID_Z_ERR: { - _z_err_decode(&msg->_body._err, zbf, header); + msg->_tag = _Z_RESPONSE_BODY_ERR; + ret = _z_err_decode(&msg->_body._err, zbf, inner_header); break; } case _Z_MID_Z_ACK: { - _z_ack_decode(&msg->_body._ack, zbf, header); + msg->_tag = _Z_RESPONSE_BODY_ACK; + ret = _z_ack_decode(&msg->_body._ack, zbf, inner_header); break; } case _Z_MID_Z_PUT: { - _z_put_decode(&msg->_body._put, zbf, header); + msg->_tag = _Z_RESPONSE_BODY_PUT; + ret = _z_put_decode(&msg->_body._put, zbf, inner_header); break; } case _Z_MID_Z_DEL: { - _z_del_decode(&msg->_body._del, zbf, header); + msg->_tag = _Z_RESPONSE_BODY_DEL; + ret = _z_del_decode(&msg->_body._del, zbf, inner_header); break; } default: { @@ -380,6 +398,8 @@ int8_t _z_response_final_encode(_z_wbuf_t *wbf, const _z_n_msg_response_final_t } int8_t _z_response_final_decode(_z_n_msg_response_final_t *msg, _z_zbuf_t *zbf, uint8_t header) { (void)(header); + + *msg = (_z_n_msg_response_final_t){0}; int8_t ret = _Z_RES_OK; ret |= _z_zint_decode(&msg->_request_id, zbf); return ret; @@ -425,6 +445,8 @@ int8_t _z_declare_decode_extensions(_z_msg_ext_t *extension, void *ctx) { return _Z_RES_OK; } int8_t _z_declare_decode(_z_n_msg_declare_t *decl, _z_zbuf_t *zbf, uint8_t header) { + *decl = (_z_n_msg_declare_t){0}; + decl->_ext_qos = _Z_N_QOS_DEFAULT; if (_Z_HAS_FLAG(header, _Z_FLAG_N_Z)) { _Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_declare_decode_extensions, decl)) } diff --git a/src/protocol/definitions/message.c b/src/protocol/definitions/message.c index cfbb81f24..9620161f2 100644 --- a/src/protocol/definitions/message.c +++ b/src/protocol/definitions/message.c @@ -1,5 +1,7 @@ #include "zenoh-pico/protocol/definitions/message.h" +#include "zenoh-pico/collections/bytes.h" + void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_value_clear(&msg->_value); } void _z_msg_put_clear(_z_msg_put_t *msg) { @@ -9,7 +11,9 @@ void _z_msg_put_clear(_z_msg_put_t *msg) { } _z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *msg) { - return (_z_msg_query_reqexts_t){.body = msg->_ext_value.payload.start != NULL, + return (_z_msg_query_reqexts_t){.body = msg->_ext_value.payload.start != NULL || + msg->_ext_value.encoding.prefix != 0 || + !_z_bytes_is_empty(&msg->_ext_value.encoding.suffix), .info = _z_id_check(msg->_ext_info._id), .consolidation = msg->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO}; } diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index ef1231d62..836573ac9 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -1345,7 +1345,8 @@ void request_message(void) { _z_n_msg_request_t decoded; __auto_type zbf = _z_wbuf_to_zbuf(&wbf); __auto_type header = _z_zbuf_read(&zbf); - assert(_Z_RES_OK == _z_request_decode(&decoded, &zbf, header)); + int8_t ret = _z_request_decode(&decoded, &zbf, header); + assert(_Z_RES_OK == ret); assert_eq_request(&expected, &decoded); _z_n_msg_request_clear(&decoded); _z_n_msg_request_clear(&expected); @@ -1425,7 +1426,8 @@ void response_message(void) { _z_n_msg_response_t decoded; __auto_type zbf = _z_wbuf_to_zbuf(&wbf); __auto_type header = _z_zbuf_read(&zbf); - assert(_Z_RES_OK == _z_response_decode(&decoded, &zbf, header)); + int8_t ret = _z_response_decode(&decoded, &zbf, header); + assert(_Z_RES_OK == ret); assert_eq_response(&expected, &decoded); _z_n_msg_response_clear(&decoded); _z_n_msg_response_clear(&expected); @@ -1479,19 +1481,17 @@ int main(void) { declare_message(); push_body_message(); pull_message(); - // query_message(); + query_message(); err_message(); ack_message(); reply_message(); - // // Network messages - // push_message(); - // request_message(); - // response_message(); - // response_final_message(); + // Network messages + push_message(); + request_message(); + response_message(); + response_final_message(); - // YAY, LET'S TRY IT! - // wohoo! // // Transport messages // join_message(); // init_message(); @@ -1504,7 +1504,7 @@ int main(void) { // batch(); // fragmentation(); - // Scouting messages + // // Scouting messages // scout_message(); // hello_message(); }