Skip to content

Commit

Permalink
just a few more tests, new keyexpr resolution checked
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Jul 11, 2023
1 parent c25ace3 commit 4b29c02
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 97 deletions.
17 changes: 17 additions & 0 deletions include/zenoh-pico/protocol/codec/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,21 @@ int8_t _z_scouting_message_decode_na(_z_scouting_message_t *msg, _z_zbuf_t *buf)
int8_t _z_transport_message_encode(_z_wbuf_t *buf, const _z_transport_message_t *msg);
int8_t _z_transport_message_decode(_z_transport_message_t *msg, _z_zbuf_t *buf);

int8_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t *msg);
int8_t _z_join_decode(_z_t_msg_join_t *msg, _z_zbuf_t *zbf, uint8_t header);

int8_t _z_init_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_init_t *msg);
int8_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header);

int8_t _z_open_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_open_t *msg);
int8_t _z_open_decode(_z_t_msg_open_t *msg, _z_zbuf_t *zbf, uint8_t header);

int8_t _z_close_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_close_t *msg);
int8_t _z_close_decode(_z_t_msg_close_t *msg, _z_zbuf_t *zbf, uint8_t header);

int8_t _z_keep_alive_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_keep_alive_t *msg);
int8_t _z_keep_alive_decode(_z_t_msg_keep_alive_t *msg, _z_zbuf_t *zbf, uint8_t header);

int8_t _z_frame_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_frame_t *msg);
int8_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header);
#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_CODEC_TRANSPORT_H */
18 changes: 9 additions & 9 deletions include/zenoh-pico/protocol/definitions/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ typedef struct {
z_what_t _what;
uint8_t _version;
} _z_s_msg_scout_t;
void _z_s_msg_clear_scout(_z_s_msg_scout_t *msg);
void _z_s_msg_scout_clear(_z_s_msg_scout_t *msg);

/*------------------ Hello Message ------------------*/
// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
Expand Down Expand Up @@ -170,7 +170,7 @@ typedef struct {
z_whatami_t _whatami;
uint8_t _version;
} _z_s_msg_hello_t;
void _z_s_msg_clear_hello(_z_s_msg_hello_t *msg);
void _z_s_msg_hello_clear(_z_s_msg_hello_t *msg);

/*------------------ Join Message ------------------*/
// # Join message
Expand Down Expand Up @@ -230,7 +230,7 @@ typedef struct {
uint8_t _seq_num_res;
uint8_t _version;
} _z_t_msg_join_t;
void _z_t_msg_clear_join(_z_t_msg_join_t *msg);
void _z_t_msg_join_clear(_z_t_msg_join_t *msg);

/*------------------ Init Message ------------------*/
// # Init message
Expand Down Expand Up @@ -310,7 +310,7 @@ typedef struct {
uint8_t _seq_num_res;
uint8_t _version;
} _z_t_msg_init_t;
void _z_t_msg_clear_init(_z_t_msg_init_t *msg);
void _z_t_msg_init_clear(_z_t_msg_init_t *msg);

/*------------------ Open Message ------------------*/
// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total lenght
Expand Down Expand Up @@ -348,7 +348,7 @@ typedef struct {
_z_zint_t _initial_sn;
_z_bytes_t _cookie;
} _z_t_msg_open_t;
void _z_t_msg_clear_open(_z_t_msg_open_t *msg);
void _z_t_msg_open_clear(_z_t_msg_open_t *msg);

/*------------------ Close Message ------------------*/
// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
Expand Down Expand Up @@ -378,7 +378,7 @@ void _z_t_msg_clear_open(_z_t_msg_open_t *msg);
typedef struct {
uint8_t _reason;
} _z_t_msg_close_t;
void _z_t_msg_clear_close(_z_t_msg_close_t *msg);
void _z_t_msg_close_clear(_z_t_msg_close_t *msg);
/*=============================*/
/* Close reasons */
/*=============================*/
Expand Down Expand Up @@ -414,7 +414,7 @@ void _z_t_msg_clear_close(_z_t_msg_close_t *msg);
typedef struct {
uint8_t __dummy; // Just to avoid empty structures that might cause undefined behavior
} _z_t_msg_keep_alive_t;
void _z_t_msg_clear_keep_alive(_z_t_msg_keep_alive_t *msg);
void _z_t_msg_keep_alive_clear(_z_t_msg_keep_alive_t *msg);

/*------------------ Frame Message ------------------*/
// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
Expand Down Expand Up @@ -445,7 +445,7 @@ typedef struct {
_z_network_message_vec_t _messages;
_z_zint_t _sn;
} _z_t_msg_frame_t;
void _z_t_msg_clear_frame(_z_t_msg_frame_t *msg);
void _z_t_msg_frame_clear(_z_t_msg_frame_t *msg);

/*------------------ Fragment Message ------------------*/
// The Fragment message is used to transmit on the wire large Zenoh Message that require fragmentation
Expand Down Expand Up @@ -473,7 +473,7 @@ typedef struct {
_z_bytes_t _payload;
_z_zint_t _sn;
} _z_t_msg_fragment_t;
void _z_t_msg_clear_fragment(_z_t_msg_fragment_t *msg);
void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg);

/*------------------ Transport Message ------------------*/
typedef union {
Expand Down
21 changes: 15 additions & 6 deletions src/protocol/codec/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ int8_t _z_keyexpr_decode(_z_keyexpr_t *ke, _z_zbuf_t *zbf, _Bool has_suffix) {
int8_t ret = _Z_RES_OK;

ret |= _z_zint16_decode(&ke->_id, zbf);
ke->_owns_suffix = false;
if (has_suffix == true) {
char *str = NULL;
ret |= _z_str_decode(&str, zbf);
if (ret == _Z_RES_OK) {
ke->_suffix = str;
ke->_owns_suffix = true;
} else {
ke->_suffix = NULL;
}
Expand Down Expand Up @@ -248,7 +250,9 @@ int8_t _z_push_body_encode(_z_wbuf_t *wbf, const _z_push_body_t *pshb) {
(void)(pshb);
int8_t ret = _Z_RES_OK;
uint8_t header = pshb->_is_put ? _Z_MID_Z_PUT : _Z_MID_Z_DEL;
_Bool has_source_info = _z_id_check(pshb->_body._put._commons._source_info._id);
_Bool has_source_info = _z_id_check(pshb->_body._put._commons._source_info._id) ||
pshb->_body._put._commons._source_info._source_sn != 0 ||
pshb->_body._put._commons._source_info._entity_id != 0;
_Bool has_timestamp = _z_timestamp_check(&pshb->_body._put._commons._timestamp);
_Bool has_encoding = false;
if (has_source_info) {
Expand Down Expand Up @@ -476,7 +480,9 @@ int8_t _z_reply_encode(_z_wbuf_t *wbf, const _z_msg_reply_t *reply) {
!_z_bytes_is_empty(&reply->_value.encoding.suffix)) {
header |= _Z_FLAG_Z_R_E;
}
if (reply->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO || _z_id_check(reply->_ext_source_info._id)) {
_Bool has_sourceinfo = _z_id_check(reply->_ext_source_info._id) || reply->_ext_source_info._source_sn != 0 ||
reply->_ext_source_info._entity_id != 0;
if (reply->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO || has_sourceinfo) {
header |= _Z_FLAG_Z_Z;
}
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
Expand All @@ -491,7 +497,7 @@ int8_t _z_reply_encode(_z_wbuf_t *wbf, const _z_msg_reply_t *reply) {
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &reply->_value.encoding.suffix));
}
_Bool has_consolidation_ext = reply->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO;
if (_z_id_check(reply->_ext_source_info._id)) {
if (has_sourceinfo) {
uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x01;
if (has_consolidation_ext) {
extheader |= _Z_MSG_EXT_FLAG_Z;
Expand Down Expand Up @@ -557,7 +563,8 @@ int8_t _z_err_encode(_z_wbuf_t *wbf, const _z_msg_err_t *err) {
}
_Bool has_payload_ext = err->_ext_value.payload.start != NULL || err->_ext_value.encoding.prefix != 0 ||
!_z_bytes_is_empty(&err->_ext_value.encoding.suffix);
_Bool has_sinfo_ext = _z_id_check(err->_ext_source_info._id);
_Bool has_sinfo_ext = _z_id_check(err->_ext_source_info._id) || err->_ext_source_info._entity_id != 0 ||
err->_ext_source_info._source_sn != 0;
if (has_sinfo_ext || has_payload_ext) {
header |= _Z_FLAG_Z_Z;
}
Expand Down Expand Up @@ -633,7 +640,8 @@ int8_t _z_ack_encode(_z_wbuf_t *wbf, const _z_msg_ack_t *ack) {
int8_t ret = _Z_RES_OK;
uint8_t header = _Z_MID_Z_ACK;
_Bool has_ts = _z_timestamp_check(&ack->_timestamp);
_Bool has_sinfo_ext = _z_id_check(ack->_ext_source_info._id);
_Bool has_sinfo_ext = _z_id_check(ack->_ext_source_info._id) || ack->_ext_source_info._source_sn != 0 ||
ack->_ext_source_info._entity_id != 0;
if (has_ts) {
header |= _Z_FLAG_Z_A_T;
}
Expand Down Expand Up @@ -681,7 +689,8 @@ int8_t _z_ack_decode(_z_msg_ack_t *ack, _z_zbuf_t *zbf, uint8_t header) {
int8_t _z_pull_encode(_z_wbuf_t *wbf, const _z_msg_pull_t *pull) {
int8_t ret = _Z_RES_OK;
uint8_t header = _Z_MID_Z_PULL;
_Bool has_info_ext = _z_id_check(pull->_ext_source_info._id);
_Bool has_info_ext = _z_id_check(pull->_ext_source_info._id) || pull->_ext_source_info._source_sn != 0 ||
pull->_ext_source_info._entity_id != 0;
if (has_info_ext) {
header |= _Z_FLAG_Z_Z;
}
Expand Down
7 changes: 6 additions & 1 deletion src/protocol/codec/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) {
_Z_DEBUG("Encoding _Z_MID_N_RESPONSE\n");
_Bool has_qos_ext = msg->_ext_qos._val != _Z_N_QOS_DEFAULT._val;
_Bool has_ts_ext = _z_timestamp_check(&msg->_ext_timestamp);
_Bool has_responder_ext = _z_id_check(msg->_ext_responder._zid);
_Bool has_responder_ext = _z_id_check(msg->_ext_responder._zid) || msg->_ext_responder._eid != 0;
uint8_t n_ext = (has_qos_ext ? 1 : 0) + (has_ts_ext ? 1 : 0) + (has_responder_ext ? 1 : 0);
_Bool has_suffix = _z_keyexpr_has_suffix(msg->_key);
if (msg->_key._uses_remote_mapping) {
Expand Down Expand Up @@ -477,18 +477,23 @@ int8_t _z_network_message_decode(_z_network_message_t *msg, _z_zbuf_t *zbf) {
_Z_RETURN_IF_ERR(_z_uint8_decode(&header, zbf));
switch (_Z_MID(header)) {
case _Z_MID_N_DECLARE: {
msg->_tag = _Z_N_DECLARE;
return _z_declare_decode(&msg->_body._declare, zbf, header);
} break;
case _Z_MID_N_PUSH: {
msg->_tag = _Z_N_PUSH;
return _z_push_decode(&msg->_body._push, zbf, header);
} break;
case _Z_MID_N_REQUEST: {
msg->_tag = _Z_N_REQUEST;
return _z_request_decode(&msg->_body._request, zbf, header);
} break;
case _Z_MID_N_RESPONSE: {
msg->_tag = _Z_N_RESPONSE;
return _z_response_decode(&msg->_body._response, zbf, header);
} break;
case _Z_MID_N_RESPONSE_FINAL: {
msg->_tag = _Z_N_RESPONSE_FINAL;
return _z_response_final_decode(&msg->_body._response_final, zbf, header);
} break;
default:
Expand Down
85 changes: 42 additions & 43 deletions src/protocol/codec/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
// ZettaScale Zenoh Team, <[email protected]>
//

#include <stddef.h>
#include <stdint.h>

#include "zenoh-pico/protocol/codec/core.h"
#include "zenoh-pico/protocol/codec/ext.h"
#include "zenoh-pico/protocol/codec/network.h"
#include "zenoh-pico/protocol/definitions/core.h"
#include "zenoh-pico/protocol/ext.h"
#include "zenoh-pico/protocol/iobuf.h"
#include "zenoh-pico/protocol/msgcodec.h"
#include "zenoh-pico/utils/logging.h"
Expand All @@ -25,50 +29,45 @@ int8_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t *msg
int8_t ret = _Z_RES_OK;
_Z_DEBUG("Encoding _Z_MID_T_JOIN\n");

ret = _z_wbuf_write(wbf, msg->_version);
_Z_RETURN_IF_ERR(_z_wbuf_write(wbf, msg->_version));

if (ret == _Z_RES_OK) {
uint8_t cbyte = 0;
cbyte |= (msg->_whatami & 0x03);
uint8_t zidlen = _z_id_len(msg->_zid);
cbyte |= ((zidlen - 1) & 0x0F) << 4;
ret = _z_uint8_encode(wbf, cbyte);
if (ret == _Z_RES_OK) {
ret = _z_wbuf_write_bytes(wbf, msg->_zid.id, 0, zidlen);
}
}
uint8_t cbyte = 0;
cbyte |= (msg->_whatami & 0x03);
uint8_t zidlen = _z_id_len(msg->_zid);
cbyte |= ((zidlen - 1) & 0x0F) << 4;
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, cbyte));
_Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, msg->_zid.id, 0, zidlen));

if ((ret == _Z_RES_OK) && _Z_HAS_FLAG(header, _Z_FLAG_T_JOIN_S)) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_JOIN_S)) {
uint8_t cbyte = 0;
cbyte |= (msg->_seq_num_res & 0x03);
cbyte |= ((msg->_req_id_res & 0x03) << 2);
ret = _z_uint8_encode(wbf, cbyte);
if (ret == _Z_RES_OK) {
ret = _z_uint16_encode(wbf, msg->_batch_size);
}
}
if (ret == _Z_RES_OK) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_JOIN_T) == true) {
ret = _z_zint_encode(wbf, msg->_lease / 1000);
} else {
ret = _z_zint_encode(wbf, msg->_lease);
}
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, cbyte));
_Z_RETURN_IF_ERR(_z_uint16_encode(wbf, msg->_batch_size));
}
if (ret == _Z_RES_OK) {
if (msg->_next_sn._is_qos) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
ret = _z_uint8_encode(wbf, 0x51); // QOS-ext: (enc=zbuf)(mandatory=true)(id=1)
for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) {
ret = _z_zint_encode(wbf, msg->_next_sn._val._qos[i]._reliable);
ret |= _z_zint_encode(wbf, msg->_next_sn._val._qos[i]._best_effort);
}
} else {
_Z_DEBUG("Attempted to serialize QoS-SN extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
if (_Z_HAS_FLAG(header, _Z_FLAG_T_JOIN_T) == true) {
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_lease / 1000));
} else {
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_lease));
}
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_next_sn._val._plain._reliable));
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_next_sn._val._plain._best_effort));
if (msg->_next_sn._is_qos) {
if (_Z_HAS_FLAG(header, _Z_FLAG_T_Z)) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1));
size_t len = 0;
for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) {
len += _z_zint_len(msg->_next_sn._val._qos[i]._reliable) +
_z_zint_len(msg->_next_sn._val._qos[i]._best_effort);
}
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, len));
for (uint8_t i = 0; (i < Z_PRIORITIES_NUM) && (ret == _Z_RES_OK); i++) {
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_next_sn._val._qos[i]._reliable));
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, msg->_next_sn._val._qos[i]._best_effort));
}
} else {
ret |= _z_zint_encode(wbf, msg->_next_sn._val._plain._reliable);
ret |= _z_zint_encode(wbf, msg->_next_sn._val._plain._best_effort);
_Z_DEBUG("Attempted to serialize QoS-SN extension, but the header extension flag was unset");
ret |= _Z_ERR_MESSAGE_SERIALIZATION_FAILED;
}
}

Expand All @@ -78,12 +77,13 @@ int8_t _z_join_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_join_t *msg
int8_t _z_join_decode_ext(_z_msg_ext_t *extension, void *ctx) {
int8_t ret = _Z_RES_OK;
_z_t_msg_join_t *msg = (_z_t_msg_join_t *)ctx;
if (_Z_EXT_FULL_ID(extension->_header) == 0x51) { // QOS: (enc=zbuf)(mandatory=true)(id=1)
if (_Z_EXT_FULL_ID(extension->_header) ==
(_Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 1)) { // QOS: (enc=zbuf)(mandatory=true)(id=1)
msg->_next_sn._is_qos = true;
_z_zbuf_t zbf = _z_zbytes_as_zbuf(extension->_body._zbuf._val);
for (int i = 0; (ret == _Z_RES_OK) && (i < Z_PRIORITIES_NUM); ++i) {
ret |= _z_zint_decode(&msg->_next_sn._val._plain._reliable, &zbf);
ret |= _z_zint_decode(&msg->_next_sn._val._plain._best_effort, &zbf);
ret |= _z_zint_decode(&msg->_next_sn._val._qos[i]._reliable, &zbf);
ret |= _z_zint_decode(&msg->_next_sn._val._qos[i]._best_effort, &zbf);
}
} else if (_Z_MSG_EXT_IS_MANDATORY(extension->_header)) {
ret = _Z_ERR_MESSAGE_EXTENSION_MANDATORY_AND_UNKNOWN;
Expand Down Expand Up @@ -204,9 +204,6 @@ int8_t _z_init_decode(_z_t_msg_init_t *msg, _z_zbuf_t *zbf, uint8_t header) {

if ((ret == _Z_RES_OK) && (_Z_HAS_FLAG(header, _Z_FLAG_T_INIT_A) == true)) {
ret |= _z_bytes_decode(&msg->_cookie, zbf);
if (ret != _Z_RES_OK) {
msg->_cookie = _z_bytes_empty();
}
} else {
msg->_cookie = _z_bytes_empty();
}
Expand Down Expand Up @@ -305,7 +302,9 @@ int8_t _z_keep_alive_decode(_z_t_msg_keep_alive_t *msg, _z_zbuf_t *zbf, uint8_t
int8_t ret = _Z_RES_OK;
_Z_DEBUG("Decoding _Z_MID_T_KEEP_ALIVE\n");

ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x03);
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
ret |= _z_msg_ext_skip_non_mandatories(zbf, 0x03);
}

return ret;
}
Expand Down
10 changes: 5 additions & 5 deletions src/protocol/definitions/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ void _z_msg_put_clear(_z_msg_put_t *msg) {
}

_z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *msg) {
return (_z_msg_query_reqexts_t){.body = msg->_ext_value.payload.start != NULL ||
msg->_ext_value.encoding.prefix != 0 ||
!_z_bytes_is_empty(&msg->_ext_value.encoding.suffix),
.info = _z_id_check(msg->_ext_info._id),
.consolidation = msg->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO};
return (_z_msg_query_reqexts_t){
.body = msg->_ext_value.payload.start != NULL || msg->_ext_value.encoding.prefix != 0 ||
!_z_bytes_is_empty(&msg->_ext_value.encoding.suffix),
.info = _z_id_check(msg->_ext_info._id) || msg->_ext_info._entity_id != 0 || msg->_ext_info._source_sn != 0,
.consolidation = msg->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO};
}

void _z_msg_query_clear(_z_msg_query_t *msg) {
Expand Down
Loading

0 comments on commit 4b29c02

Please sign in to comment.