Skip to content

Commit

Permalink
feat: add sample kind and push body in replies
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland committed Mar 18, 2024
1 parent 8247da2 commit a1a9507
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 168 deletions.
3 changes: 2 additions & 1 deletion include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle);
* key: The resource key of this reply. The caller keeps the ownership.
* payload: The value of this reply, the caller keeps ownership.
*/
int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload);
int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload,
const z_sample_kind_t kind);
#endif

#if Z_FEATURE_QUERY == 1
Expand Down
59 changes: 29 additions & 30 deletions include/zenoh-pico/protocol/definitions/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,6 @@

#define _Z_FRAG_BUFF_BASE_SIZE 128 // Arbitrary base size of the buffer to encode a fragment message header

// Flags:
// - T: Timestamp If T==1 then the timestamp if present
// - E: Encoding If E==1 then the encoding is present
// - Z: Extension If Z==1 then at least one extension is present
//
// 7 6 5 4 3 2 1 0
// +-+-+-+-+-+-+-+-+
// |Z|E|T| REPLY |
// +-+-+-+---------+
// ~ ts: <u8;z16> ~ if T==1
// +---------------+
// ~ encoding ~ if E==1
// +---------------+
// ~ [repl_exts] ~ if Z==1
// +---------------+
// ~ pl: <u8;z32> ~ -- Payload
// +---------------+
typedef struct {
_z_timestamp_t _timestamp;
_z_value_t _value;
_z_source_info_t _ext_source_info;
z_consolidation_mode_t _ext_consolidation;
#if Z_FEATURE_ATTACHMENT == 1
_z_owned_encoded_attachment_t _ext_attachment;
#endif
} _z_msg_reply_t;
void _z_msg_reply_clear(_z_msg_reply_t *msg);
#define _Z_FLAG_Z_R_T 0x20
#define _Z_FLAG_Z_R_E 0x40

// Flags:
// - T: Timestamp If T==1 then the timestamp if present
// - I: Infrastructure If I==1 then the error is related to the infrastructure else to the user
Expand Down Expand Up @@ -193,4 +163,33 @@ typedef struct {
_z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *msg);
void _z_msg_query_clear(_z_msg_query_t *msg);

typedef struct {
_Bool _is_put;
union {
_z_msg_del_t _del;
_z_msg_put_t _put;
} _body;
} _z_reply_body_t;
// Flags:
// - C: Consolidation If C==1 then consolidation is present
// - X: Reserved
// - Z: Extension If Z==1 then at least one extension is present
//
// 7 6 5 4 3 2 1 0
// +-+-+-+-+-+-+-+-+
// |Z|X|C| REPLY |
// +-+-+-+---------+
// ~ consolidation ~ if C==1
// +---------------+
// ~ [repl_exts] ~ if Z==1
// +---------------+
// ~ ReplyBody ~ -- Payload
// +---------------+
typedef struct {
z_consolidation_mode_t _consolidation;
_z_reply_body_t _body;
} _z_msg_reply_t;
void _z_msg_reply_clear(_z_msg_reply_t *msg);
#define _Z_FLAG_Z_R_C 0x20

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_MESSAGE_H */
10 changes: 2 additions & 8 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,7 @@ typedef struct {
_z_n_msg_request_exts_t _z_n_msg_request_needed_exts(const _z_n_msg_request_t *msg);
void _z_n_msg_request_clear(_z_n_msg_request_t *msg);

typedef struct {
_Bool _is_put;
union {
_z_msg_del_t _del;
_z_msg_put_t _put;
} _body;
} _z_push_body_t;
typedef _z_reply_body_t _z_push_body_t;
_z_push_body_t _z_push_body_null(void);
_z_push_body_t _z_push_body_steal(_z_push_body_t *msg);
void _z_push_body_clear(_z_push_body_t *msg);
Expand Down Expand Up @@ -236,7 +230,7 @@ _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_byt
z_attachment_t attachment
#endif
);
_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_value_t) value);
_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body);
_z_network_message_t _z_n_msg_make_ack(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key);
_z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid);
_z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration);
Expand Down
3 changes: 1 addition & 2 deletions include/zenoh-pico/session/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ _z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t

int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
int8_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_context, const _z_keyexpr_t keyexpr,
const _z_bytes_t payload, const _z_encoding_t encoding, const _z_zint_t kind,
const _z_timestamp_t timestamp);
const _z_msg_put_t *msg);
int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id);
void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq);
void _z_flush_pending_queries(_z_session_t *zn);
Expand Down
2 changes: 1 addition & 1 deletion src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const ui
.len = payload_len,
},
.encoding = {.prefix = opts.encoding.prefix, .suffix = opts.encoding.suffix}};
return _z_send_reply(&query->_val._rc.in->val, keyexpr, value);
return _z_send_reply(&query->_val._rc.in->val, keyexpr, value, Z_SAMPLE_KIND_PUT);
return _Z_ERR_GENERIC;
}
#endif
Expand Down
44 changes: 26 additions & 18 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) {
return _Z_RES_OK;
}

int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload) {
int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload,
const z_sample_kind_t kind) {
int8_t ret = _Z_RES_OK;

_z_keyexpr_t q_ke;
Expand All @@ -359,23 +360,30 @@ int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_val
// Build the reply context decorator. This is NOT the final reply.
_z_id_t zid = ((_z_session_t *)query->_zn)->_local_zid;
_z_keyexpr_t ke = _z_keyexpr_alias(keyexpr);
_z_zenoh_message_t z_msg = {
._tag = _Z_N_RESPONSE,
._body._response =
{
._request_id = query->_request_id,
._key = ke,
._ext_responder = {._zid = zid, ._eid = 0},
._ext_qos = _Z_N_QOS_DEFAULT,
._ext_timestamp = _z_timestamp_null(),
._tag = _Z_RESPONSE_BODY_REPLY,
._body._reply = {._value = payload,
._timestamp = _z_timestamp_null(),
._ext_consolidation = Z_CONSOLIDATION_MODE_AUTO,
._ext_source_info = _z_source_info_null()},
},
};

_z_zenoh_message_t z_msg;
switch (kind) {
default:
return _Z_ERR_GENERIC;
break;
case Z_SAMPLE_KIND_PUT:
z_msg._tag = _Z_N_RESPONSE;
z_msg._body._response._request_id = query->_request_id;
z_msg._body._response._key = ke;
z_msg._body._response._ext_responder._zid = zid;
z_msg._body._response._ext_responder._eid = 0;
z_msg._body._response._ext_qos = _Z_N_QOS_DEFAULT;
z_msg._body._response._ext_timestamp = _z_timestamp_null();
z_msg._body._response._tag = _Z_RESPONSE_BODY_REPLY;
z_msg._body._response._body._reply._consolidation = Z_CONSOLIDATION_MODE_DEFAULT;
z_msg._body._response._body._reply._body._is_put = true;
z_msg._body._response._body._reply._body._body._put._payload = payload.payload;
z_msg._body._response._body._reply._body._body._put._encoding = payload.encoding;
z_msg._body._response._body._reply._body._body._put._attachment.body.decoded = z_attachment_null();
z_msg._body._response._body._reply._body._body._put._attachment.is_encoded = false;
z_msg._body._response._body._reply._body._body._put._commons._timestamp = _z_timestamp_null();
z_msg._body._response._body._reply._body._body._put._commons._source_info = _z_source_info_null();
break;
}
if (_z_send_n_msg(query->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}
Expand Down
100 changes: 17 additions & 83 deletions src/protocol/codec/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -520,107 +520,41 @@ int8_t _z_query_decode(_z_msg_query_t *msg, _z_zbuf_t *zbf, uint8_t header) {

int8_t _z_reply_encode(_z_wbuf_t *wbf, const _z_msg_reply_t *reply) {
uint8_t header = _Z_MID_Z_REPLY;
if (_z_timestamp_check(&reply->_timestamp)) {
header |= _Z_FLAG_Z_R_T;
}
if (reply->_value.encoding.prefix != Z_ENCODING_PREFIX_EMPTY ||
!_z_bytes_is_empty(&reply->_value.encoding.suffix)) {
header |= _Z_FLAG_Z_R_E;
}
#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t att = _z_encoded_as_attachment(&reply->_ext_attachment);
_Bool has_attachment = z_attachment_check(&att);
#else
_Bool has_attachment = false;
#endif
_Bool has_sourceinfo = _z_id_check(reply->_ext_source_info._id) || reply->_ext_source_info._source_sn != 0 ||
reply->_ext_source_info._entity_id != 0;
_Bool has_consolidation_ext = reply->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO;
if (has_consolidation_ext || has_sourceinfo || has_attachment) {
header |= _Z_FLAG_Z_Z;
_Bool has_consolidation = reply->_consolidation != Z_CONSOLIDATION_MODE_DEFAULT;
if (has_consolidation) {
_Z_SET_FLAG(header, _Z_FLAG_Z_R_C);
}
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
int8_t ret = _Z_RES_OK;
if (_z_timestamp_check(&reply->_timestamp)) {
assert(_Z_HAS_FLAG(header, _Z_FLAG_Z_R_T));
_Z_RETURN_IF_ERR(_z_timestamp_encode(wbf, &reply->_timestamp));
}
if (((reply->_value.encoding.prefix != 0) || !_z_bytes_is_empty(&reply->_value.encoding.suffix))) {
assert(_Z_HAS_FLAG(header, _Z_FLAG_Z_R_E));
_Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, reply->_value.encoding.prefix));
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &reply->_value.encoding.suffix));
}
if (has_sourceinfo) {
uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x01;
if (has_consolidation_ext || has_attachment) {
extheader |= _Z_MSG_EXT_FLAG_Z;
}
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader));
_Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &reply->_ext_source_info));
}
if (has_consolidation_ext) {
uint8_t extheader = _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M | 0x02;
if (has_attachment) {
extheader |= _Z_MSG_EXT_FLAG_Z;
}
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader));
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, reply->_ext_consolidation));
}
#if Z_FEATURE_ATTACHMENT == 1
if (has_attachment) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x04));
_Z_RETURN_IF_ERR(_z_attachment_encode_ext(wbf, att));
if (has_consolidation) {
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, reply->_consolidation));
}
#endif
_Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &reply->_value.payload));
return ret;
_Z_RETURN_IF_ERR(_z_push_body_encode(wbf, &reply->_body));
return _Z_RES_OK;
}
int8_t _z_reply_decode_extension(_z_msg_ext_t *extension, void *ctx) {
int8_t ret = _Z_RES_OK;
_z_msg_reply_t *reply = (_z_msg_reply_t *)ctx;
switch (_Z_EXT_FULL_ID(extension->_header)) {
case _Z_MSG_EXT_ENC_ZBUF | 0x01: {
_z_zbuf_t zbf = _z_zbytes_as_zbuf(extension->_body._zbuf._val);
ret = _z_source_info_decode(&reply->_ext_source_info, &zbf);
break;
}
case _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M | 0x02: {
reply->_ext_consolidation = extension->_body._zint._val;
break;
}
#if Z_FEATURE_ATTACHMENT == 1
case _Z_MSG_EXT_ENC_ZBUF | 0x04: {
reply->_ext_attachment.is_encoded = true;
reply->_ext_attachment.body.encoded = extension->_body._zbuf._val._is_alloc
? _z_bytes_steal(&extension->_body._zbuf._val)
: _z_bytes_duplicate(&extension->_body._zbuf._val);
break;
}
#endif
default:
if (_Z_HAS_FLAG(extension->_header, _Z_MSG_EXT_FLAG_M)) {
ret = _z_msg_ext_unknown_error(extension, 0x0a);
}
ret = _z_msg_ext_unknown_error(extension, 0x0a);
break;
}
return ret;
}
int8_t _z_reply_decode(_z_msg_reply_t *reply, _z_zbuf_t *zbf, uint8_t header) {
int8_t ret = _Z_RES_OK;
*reply = (_z_msg_reply_t){0};
reply->_ext_consolidation = Z_CONSOLIDATION_MODE_AUTO;
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_R_T)) {
_Z_RETURN_IF_ERR(_z_timestamp_decode(&reply->_timestamp, zbf));
}
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_R_E)) {
_Z_RETURN_IF_ERR(_z_encoding_prefix_decode(&reply->_value.encoding.prefix, zbf));
_Z_RETURN_IF_ERR(_z_bytes_decode(&reply->_value.encoding.suffix, zbf));
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_R_C)) {
_Z_RETURN_IF_ERR(_z_uint8_decode((uint8_t *)&reply->_consolidation, zbf));
} else {
reply->_consolidation = Z_CONSOLIDATION_MODE_DEFAULT;
}
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_reply_decode_extension, reply));
}
_Z_RETURN_IF_ERR(_z_bytes_decode(&reply->_value.payload, zbf));

return ret;
uint8_t put_header = 0;
_Z_RETURN_IF_ERR(_z_uint8_decode(&put_header, zbf));
_Z_RETURN_IF_ERR(_z_push_body_decode(&reply->_body, zbf, put_header));
return _Z_RES_OK;
}

int8_t _z_err_encode(_z_wbuf_t *wbf, const _z_msg_err_t *err) {
Expand Down
3 changes: 2 additions & 1 deletion src/protocol/definitions/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

#include "zenoh-pico/collections/bytes.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/network.h"

void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_value_clear(&msg->_value); }
void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_push_body_clear(&msg->_body); }

void _z_msg_put_clear(_z_msg_put_t *msg) {
_z_bytes_clear(&msg->_encoding.suffix);
Expand Down
8 changes: 3 additions & 5 deletions src/protocol/definitions/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ _z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_pu
._body._push = {._key = _z_keyexpr_steal(key), ._body = _z_push_body_steal(body)},
};
}
_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_value_t) value) {
_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body) {
return (_z_network_message_t){
._tag = _Z_N_RESPONSE,
._body._response =
Expand All @@ -230,10 +230,8 @@ _z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) ke
._request_id = rid,
._body._reply =
{
._timestamp = _z_timestamp_null(),
._value = _z_value_steal(value),
._ext_source_info = _z_source_info_null(),
._ext_consolidation = Z_CONSOLIDATION_MODE_AUTO,
._consolidation = Z_CONSOLIDATION_MODE_AUTO,
._body = _z_push_body_steal(body),
},
._ext_qos = _Z_N_QOS_DEFAULT,
._ext_timestamp = _z_timestamp_null(),
Expand Down
17 changes: 8 additions & 9 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry)
}

int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, const _z_keyexpr_t keyexpr,
const _z_bytes_t payload, const _z_encoding_t encoding, const _z_zint_t kind,
const _z_timestamp_t timestamp) {
const _z_msg_put_t *msg) {
int8_t ret = _Z_RES_OK;

_zp_session_lock_mutex(zn);
Expand All @@ -156,11 +155,11 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
reply._tag = Z_REPLY_TAG_DATA;
reply.data.replier_id = zn->_local_zid;
reply.data.sample.keyexpr = expanded_ke;
_z_bytes_copy(&reply.data.sample.payload, &payload);
reply.data.sample.encoding.prefix = encoding.prefix;
_z_bytes_copy(&reply.data.sample.encoding.suffix, &encoding.suffix);
reply.data.sample.kind = kind;
reply.data.sample.timestamp = _z_timestamp_duplicate(&timestamp);
_z_bytes_copy(&reply.data.sample.payload, &msg->_payload);
reply.data.sample.encoding.prefix = msg->_encoding.prefix;
_z_bytes_copy(&reply.data.sample.encoding.suffix, &msg->_encoding.suffix);
reply.data.sample.kind = Z_SAMPLE_KIND_PUT;
reply.data.sample.timestamp = _z_timestamp_duplicate(&msg->_commons._timestamp);

// Verify if this is a newer reply, free the old one in case it is
if ((ret == _Z_RES_OK) && ((pen_qry->_consolidation == Z_CONSOLIDATION_MODE_LATEST) ||
Expand All @@ -173,7 +172,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons

// Check if this is the same resource key
if (_z_str_eq(pen_rep->_reply.data.sample.keyexpr._suffix, reply.data.sample.keyexpr._suffix) == true) {
if (timestamp.time <= pen_rep->_tstamp.time) {
if (msg->_commons._timestamp.time <= pen_rep->_tstamp.time) {
drop = true;
} else {
pen_qry->_pending_replies =
Expand All @@ -199,7 +198,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
} else {
pen_rep->_reply = reply; // Store the whole reply in the latest mode
}
pen_rep->_tstamp = _z_timestamp_duplicate(&timestamp);
pen_rep->_tstamp = _z_timestamp_duplicate(&msg->_commons._timestamp);
pen_qry->_pending_replies = _z_pending_reply_list_push(pen_qry->_pending_replies, pen_rep);
} else {
ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY;
Expand Down
7 changes: 5 additions & 2 deletions src/session/reply.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ int8_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t key
// TODO check id to know where to dispatch

#if Z_FEATURE_QUERY == 1
ret = _z_trigger_query_reply_partial(zn, id, key, reply->_value.payload, reply->_value.encoding, Z_SAMPLE_KIND_PUT,
reply->_timestamp);
if (reply->_body._is_put) {
ret = _z_trigger_query_reply_partial(zn, id, key, &reply->_body._body._put);
} else {
ret = _Z_ERR_GENERIC;
}
#endif
return ret;
}
Expand Down
Loading

0 comments on commit a1a9507

Please sign in to comment.