Skip to content

Commit

Permalink
network layer tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Jul 7, 2023
1 parent 7fe228b commit c25ace3
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 50 deletions.
8 changes: 7 additions & 1 deletion src/protocol/codec/ext.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ int8_t _z_msg_ext_unknown_error(_z_msg_ext_t *extension, uint8_t trace_id) {
}
case _Z_MSG_EXT_ENC_ZINT: {
_Z_ERROR("Unknown mandatory extension found (extension_id: %02x, trace_id: %02x), ZINT(%02x)\n", ext_id,
trace_id, extension->_body._zint);
trace_id, extension->_body._zint._val);
break;
}
case _Z_MSG_EXT_ENC_ZBUF: {
Expand Down Expand Up @@ -215,7 +215,13 @@ int8_t _z_msg_ext_decode_iter(_z_zbuf_t *zbf, int8_t (*callback)(_z_msg_ext_t *,
_Bool has_next = true;
while (has_next && ret == _Z_RES_OK) {
_z_msg_ext_t ext = _z_msg_ext_make_unit(0);
size_t start = zbf->_ios._r_pos;
ret |= _z_msg_ext_decode(&ext, zbf, &has_next);
printf("EXT (%d): ", ret);
for (; start < zbf->_ios._r_pos; start++) {
printf("%02x ", zbf->_ios._buf[start]);
}
printf("\n");
if (ret == _Z_RES_OK) {
ret |= callback(&ext, context);
_z_msg_ext_clear(&ext);
Expand Down
39 changes: 20 additions & 19 deletions src/protocol/codec/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ int8_t _z_push_body_encode(_z_wbuf_t *wbf, const _z_push_body_t *pshb) {
(void)(wbf);
(void)(pshb);
int8_t ret = _Z_RES_OK;
uint8_t header = pshb->_is_put ? _Z_M_PUT_ID : _Z_M_DEL_ID;
uint8_t header = pshb->_is_put ? _Z_MID_Z_PUT : _Z_MID_Z_DEL;
_Bool has_source_info = _z_id_check(pshb->_body._put._commons._source_info._id);
_Bool has_timestamp = _z_timestamp_check(&pshb->_body._put._commons._timestamp);
_Bool has_encoding = false;
Expand Down Expand Up @@ -310,7 +310,7 @@ int8_t _z_push_body_decode(_z_push_body_t *pshb, _z_zbuf_t *zbf, uint8_t header)
int8_t ret = _Z_RES_OK;
if (ret == _Z_RES_OK) {
switch (_Z_MID(header)) {
case _Z_M_PUT_ID: {
case _Z_MID_Z_PUT: {
pshb->_is_put = true;
pshb->_body._put = (_z_msg_put_t){0};
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_P_T)) {
Expand All @@ -328,7 +328,7 @@ int8_t _z_push_body_decode(_z_push_body_t *pshb, _z_zbuf_t *zbf, uint8_t header)
}
break;
}
case _Z_M_DEL_ID: {
case _Z_MID_Z_DEL: {
pshb->_is_put = false;
pshb->_body._del = (_z_msg_del_t){0};
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_D_T)) {
Expand Down Expand Up @@ -397,10 +397,9 @@ int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *msg) {
extheader |= _Z_FLAG_Z_Z;
}
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader));
_Z_RETURN_IF_ERR(_z_zint_encode(
wbf, 1 + _z_zint_len(msg->_ext_value.encoding.prefix) + _z_zint_len(msg->_ext_value.encoding.suffix.len) +
msg->_ext_value.encoding.suffix.len + _z_zint_len(msg->_ext_value.payload.len) +
msg->_ext_value.payload.len));
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, _z_zint_len(msg->_ext_value.encoding.prefix) +
_z_bytes_encode_len(&msg->_ext_value.encoding.suffix) +
_z_bytes_encode_len(&msg->_ext_value.payload)));
_Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, msg->_ext_value.encoding.prefix));
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &msg->_ext_value.encoding.suffix));
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &msg->_ext_value.payload));
Expand All @@ -415,6 +414,7 @@ int8_t _z_query_encode(_z_wbuf_t *wbf, const _z_msg_query_t *msg) {
}
if (required_exts.info) {
uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x01;
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader));
_Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &msg->_ext_info));
}

Expand Down Expand Up @@ -452,15 +452,16 @@ int8_t _z_query_decode_extensions(_z_msg_ext_t *extension, void *ctx) {
int8_t _z_query_decode(_z_msg_query_t *msg, _z_zbuf_t *zbf, uint8_t header) {
_Z_DEBUG("Decoding _Z_MID_Z_QUERY\n");
*msg = (_z_msg_query_t){0};
msg->_ext_consolidation = Z_CONSOLIDATION_MODE_AUTO;
int8_t ret = _Z_RES_OK;
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_P)) {
ret = _z_bytes_decode(&msg->_parameters, zbf);
_Z_RETURN_IF_ERR(_z_bytes_decode(&msg->_parameters, zbf));
} else {
_z_bytes_clear(&msg->_parameters);
}

if (ret == _Z_RES_OK) {
ret = _z_msg_ext_decode_iter(zbf, _z_query_decode_extensions, msg);
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_query_decode_extensions, msg));
}

return ret;
Expand Down Expand Up @@ -630,7 +631,7 @@ int8_t _z_err_decode(_z_msg_err_t *err, _z_zbuf_t *zbf, uint8_t header) {

int8_t _z_ack_encode(_z_wbuf_t *wbf, const _z_msg_ack_t *ack) {
int8_t ret = _Z_RES_OK;
uint8_t header = _Z_MID_Z_ERR;
uint8_t header = _Z_MID_Z_ACK;
_Bool has_ts = _z_timestamp_check(&ack->_timestamp);
_Bool has_sinfo_ext = _z_id_check(ack->_ext_source_info._id);
if (has_ts) {
Expand All @@ -639,13 +640,13 @@ int8_t _z_ack_encode(_z_wbuf_t *wbf, const _z_msg_ack_t *ack) {
if (has_sinfo_ext) {
header |= _Z_FLAG_Z_Z;
}
ret = _z_uint8_encode(wbf, header);
if ((ret == _Z_RES_OK) && has_ts) {
ret = _z_timestamp_encode(wbf, &ack->_timestamp);
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
if (has_ts) {
_Z_RETURN_IF_ERR(_z_timestamp_encode(wbf, &ack->_timestamp));
}
if ((ret == _Z_RES_OK) && has_sinfo_ext) {
ret = _z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x01);
ret |= _z_source_info_encode_ext(wbf, &ack->_ext_source_info);
if (has_sinfo_ext) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x01));
_Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &ack->_ext_source_info));
}
return ret;
}
Expand All @@ -669,9 +670,9 @@ int8_t _z_ack_decode(_z_msg_ack_t *ack, _z_zbuf_t *zbf, uint8_t header) {
int8_t ret = _Z_RES_OK;
*ack = (_z_msg_ack_t){0};
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_A_T)) {
ret = _z_timestamp_decode(&ack->_timestamp, zbf);
_Z_RETURN_IF_ERR(_z_timestamp_decode(&ack->_timestamp, zbf));
}
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
ret = _z_msg_ext_decode_iter(zbf, _z_ack_decode_extension, ack);
}
return ret;
Expand Down
58 changes: 40 additions & 18 deletions src/protocol/codec/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ int8_t _z_push_decode_ext_cb(_z_msg_ext_t *extension, void *ctx) {

int8_t _z_push_decode(_z_n_msg_push_t *msg, _z_zbuf_t *zbf, uint8_t header) {
int8_t ret = _Z_RES_OK;
*msg = (_z_n_msg_push_t){0};
msg->_qos = _Z_N_QOS_DEFAULT;
ret |= _z_keyexpr_decode(&msg->_key, zbf, _Z_HAS_FLAG(header, _Z_FLAG_N_PUSH_N));
msg->_key._uses_remote_mapping = !_Z_HAS_FLAG(header, _Z_FLAG_N_PUSH_M);
if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_N_Z)) {
Expand All @@ -110,11 +112,11 @@ int8_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg) {
if (has_suffix) {
header |= _Z_FLAG_N_REQUEST_N;
}
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
_z_n_msg_request_exts_t exts = _z_n_msg_request_needed_exts(msg);
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header | (exts.n != 0 ? _Z_FLAG_Z_Z : 0)));
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_rid));
_Z_RETURN_IF_ERR(_z_keyexpr_encode(wbf, has_suffix, &msg->_key));

_z_n_msg_request_exts_t exts = _z_n_msg_request_needed_exts(msg);
if (exts.ext_qos) {
exts.n -= 1;
uint8_t extheader = 0x01 | _Z_MSG_EXT_ENC_ZINT | (exts.n ? _Z_FLAG_Z_Z : 0);
Expand Down Expand Up @@ -174,7 +176,7 @@ int8_t _z_request_decode_extensions(_z_msg_ext_t *extension, void *ctx) {
_Z_RETURN_IF_ERR(_z_timestamp_decode(&msg->_ext_timestamp, &zbf));
break;
}
case 0x04 | _Z_MSG_EXT_ENC_ZINT: {
case 0x04 | _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M: {
msg->_ext_target = extension->_body._zint._val;
if (msg->_ext_target > 2) {
return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED;
Expand All @@ -193,25 +195,33 @@ int8_t _z_request_decode_extensions(_z_msg_ext_t *extension, void *ctx) {
}
return _Z_RES_OK;
}
int8_t _z_request_decode(_z_n_msg_request_t *msg, _z_zbuf_t *zbf, uint8_t header) {
int8_t _z_request_decode(_z_n_msg_request_t *msg, _z_zbuf_t *zbf, const uint8_t header) {
*msg = (_z_n_msg_request_t){0};
msg->_ext_qos = _Z_N_QOS_DEFAULT;
_Z_RETURN_IF_ERR(_z_zint_decode(&msg->_rid, zbf));
_Z_RETURN_IF_ERR(_z_keyexpr_decode(&msg->_key, zbf, _Z_HAS_FLAG(header, _Z_FLAG_N_REQUEST_N)));
msg->_key._uses_remote_mapping = !_Z_HAS_FLAG(header, _Z_FLAG_N_REQUEST_M);
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_request_decode_extensions, msg));
}
switch (_Z_MID(header)) {
uint8_t zheader;
_Z_RETURN_IF_ERR(_z_uint8_decode(&zheader, zbf));
switch (_Z_MID(zheader)) {
case _Z_MID_Z_QUERY: {
_Z_RETURN_IF_ERR(_z_query_decode(&msg->_body._query, zbf, header));
msg->_tag = _Z_REQUEST_QUERY;
_Z_RETURN_IF_ERR(_z_query_decode(&msg->_body._query, zbf, zheader));
} break;
case _Z_MID_Z_PUT: {
_Z_RETURN_IF_ERR(_z_put_decode(&msg->_body._put, zbf, header));
msg->_tag = _Z_REQUEST_PUT;
_Z_RETURN_IF_ERR(_z_put_decode(&msg->_body._put, zbf, zheader));
} break;
case _Z_MID_Z_DEL: {
_Z_RETURN_IF_ERR(_z_del_decode(&msg->_body._del, zbf, header));
msg->_tag = _Z_REQUEST_DEL;
_Z_RETURN_IF_ERR(_z_del_decode(&msg->_body._del, zbf, zheader));
} break;
case _Z_MID_Z_PULL: {
_Z_RETURN_IF_ERR(_z_pull_decode(&msg->_body._pull, zbf, header));
msg->_tag = _Z_REQUEST_PULL;
_Z_RETURN_IF_ERR(_z_pull_decode(&msg->_body._pull, zbf, zheader));
} break;
default:
return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED;
Expand All @@ -228,10 +238,11 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) {
_Bool has_ts_ext = _z_timestamp_check(&msg->_ext_timestamp);
_Bool has_responder_ext = _z_id_check(msg->_ext_responder._zid);
uint8_t n_ext = (has_qos_ext ? 1 : 0) + (has_ts_ext ? 1 : 0) + (has_responder_ext ? 1 : 0);
_Bool has_suffix = _z_keyexpr_has_suffix(msg->_key);
if (msg->_key._uses_remote_mapping) {
_Z_SET_FLAG(header, _Z_FLAG_N_RESPONSE_M);
}
if (msg->_key._suffix != NULL) {
if (has_suffix) {
_Z_SET_FLAG(header, _Z_FLAG_N_RESPONSE_N);
}
if (n_ext != 0) {
Expand All @@ -241,7 +252,7 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_request_id));
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_key._id));
if (msg->_key._suffix != NULL) {
if (has_suffix) {
_Z_RETURN_IF_ERR(_z_str_encode(wbf, msg->_key._suffix))
}
if (has_qos_ext) {
Expand All @@ -261,12 +272,12 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) {
}
if (has_responder_ext) {
n_ext -= 1;
uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x02 | (n_ext != 0 ? _Z_FLAG_Z_Z : 0);
uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x03 | (n_ext != 0 ? _Z_FLAG_Z_Z : 0);
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader));
uint8_t zidlen = _z_id_len(msg->_ext_responder._zid);
extheader = (zidlen - 1) << 4;
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader));
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, zidlen + 1 + _z_zint_len(msg->_ext_responder._eid)));
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader));
_Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, msg->_ext_responder._zid.id, 0, zidlen));
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_ext_responder._eid));
}
Expand Down Expand Up @@ -329,6 +340,8 @@ int8_t _z_response_decode_extension(_z_msg_ext_t *extension, void *ctx) {

int8_t _z_response_decode(_z_n_msg_response_t *msg, _z_zbuf_t *zbf, uint8_t header) {
_Z_DEBUG("Decoding _Z_MID_N_RESPONSE\n");
*msg = (_z_n_msg_response_t){0};
msg->_ext_qos = _Z_N_QOS_DEFAULT;
int8_t ret = _Z_RES_OK;
msg->_key._uses_remote_mapping = !_Z_HAS_FLAG(header, _Z_FLAG_N_RESPONSE_M);
_Z_RETURN_IF_ERR(_z_zint_decode(&msg->_request_id, zbf));
Expand All @@ -342,23 +355,28 @@ int8_t _z_response_decode(_z_n_msg_response_t *msg, _z_zbuf_t *zbf, uint8_t head

switch (_Z_MID(inner_header)) {
case _Z_MID_Z_REPLY: {
_z_reply_decode(&msg->_body._reply, zbf, header);
msg->_tag = _Z_RESPONSE_BODY_REPLY;
ret = _z_reply_decode(&msg->_body._reply, zbf, inner_header);
break;
}
case _Z_MID_Z_ERR: {
_z_err_decode(&msg->_body._err, zbf, header);
msg->_tag = _Z_RESPONSE_BODY_ERR;
ret = _z_err_decode(&msg->_body._err, zbf, inner_header);
break;
}
case _Z_MID_Z_ACK: {
_z_ack_decode(&msg->_body._ack, zbf, header);
msg->_tag = _Z_RESPONSE_BODY_ACK;
ret = _z_ack_decode(&msg->_body._ack, zbf, inner_header);
break;
}
case _Z_MID_Z_PUT: {
_z_put_decode(&msg->_body._put, zbf, header);
msg->_tag = _Z_RESPONSE_BODY_PUT;
ret = _z_put_decode(&msg->_body._put, zbf, inner_header);
break;
}
case _Z_MID_Z_DEL: {
_z_del_decode(&msg->_body._del, zbf, header);
msg->_tag = _Z_RESPONSE_BODY_DEL;
ret = _z_del_decode(&msg->_body._del, zbf, inner_header);
break;
}
default: {
Expand All @@ -380,6 +398,8 @@ int8_t _z_response_final_encode(_z_wbuf_t *wbf, const _z_n_msg_response_final_t
}
int8_t _z_response_final_decode(_z_n_msg_response_final_t *msg, _z_zbuf_t *zbf, uint8_t header) {
(void)(header);

*msg = (_z_n_msg_response_final_t){0};
int8_t ret = _Z_RES_OK;
ret |= _z_zint_decode(&msg->_request_id, zbf);
return ret;
Expand Down Expand Up @@ -425,6 +445,8 @@ int8_t _z_declare_decode_extensions(_z_msg_ext_t *extension, void *ctx) {
return _Z_RES_OK;
}
int8_t _z_declare_decode(_z_n_msg_declare_t *decl, _z_zbuf_t *zbf, uint8_t header) {
*decl = (_z_n_msg_declare_t){0};
decl->_ext_qos = _Z_N_QOS_DEFAULT;
if (_Z_HAS_FLAG(header, _Z_FLAG_N_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_declare_decode_extensions, decl))
}
Expand Down
6 changes: 5 additions & 1 deletion src/protocol/definitions/message.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "zenoh-pico/protocol/definitions/message.h"

#include "zenoh-pico/collections/bytes.h"

void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_value_clear(&msg->_value); }

void _z_msg_put_clear(_z_msg_put_t *msg) {
Expand All @@ -9,7 +11,9 @@ void _z_msg_put_clear(_z_msg_put_t *msg) {
}

_z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *msg) {
return (_z_msg_query_reqexts_t){.body = msg->_ext_value.payload.start != NULL,
return (_z_msg_query_reqexts_t){.body = msg->_ext_value.payload.start != NULL ||
msg->_ext_value.encoding.prefix != 0 ||
!_z_bytes_is_empty(&msg->_ext_value.encoding.suffix),
.info = _z_id_check(msg->_ext_info._id),
.consolidation = msg->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO};
}
Expand Down
22 changes: 11 additions & 11 deletions tests/z_msgcodec_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,8 @@ void request_message(void) {
_z_n_msg_request_t decoded;
__auto_type zbf = _z_wbuf_to_zbuf(&wbf);
__auto_type header = _z_zbuf_read(&zbf);
assert(_Z_RES_OK == _z_request_decode(&decoded, &zbf, header));
int8_t ret = _z_request_decode(&decoded, &zbf, header);
assert(_Z_RES_OK == ret);
assert_eq_request(&expected, &decoded);
_z_n_msg_request_clear(&decoded);
_z_n_msg_request_clear(&expected);
Expand Down Expand Up @@ -1425,7 +1426,8 @@ void response_message(void) {
_z_n_msg_response_t decoded;
__auto_type zbf = _z_wbuf_to_zbuf(&wbf);
__auto_type header = _z_zbuf_read(&zbf);
assert(_Z_RES_OK == _z_response_decode(&decoded, &zbf, header));
int8_t ret = _z_response_decode(&decoded, &zbf, header);
assert(_Z_RES_OK == ret);
assert_eq_response(&expected, &decoded);
_z_n_msg_response_clear(&decoded);
_z_n_msg_response_clear(&expected);
Expand Down Expand Up @@ -1479,19 +1481,17 @@ int main(void) {
declare_message();
push_body_message();
pull_message();
// query_message();
query_message();
err_message();
ack_message();
reply_message();

// // Network messages
// push_message();
// request_message();
// response_message();
// response_final_message();
// Network messages
push_message();
request_message();
response_message();
response_final_message();

// YAY, LET'S TRY IT!
// wohoo!
// // Transport messages
// join_message();
// init_message();
Expand All @@ -1504,7 +1504,7 @@ int main(void) {
// batch();
// fragmentation();

// Scouting messages
// // Scouting messages
// scout_message();
// hello_message();
}
Expand Down

0 comments on commit c25ace3

Please sign in to comment.