diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 1ebfbdcf2..1d95bac7c 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -211,7 +211,8 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle); * key: The resource key of this reply. The caller keeps the ownership. * payload: The value of this reply, the caller keeps ownership. */ -int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload); +int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload, + const z_sample_kind_t kind); #endif #if Z_FEATURE_QUERY == 1 diff --git a/include/zenoh-pico/protocol/definitions/message.h b/include/zenoh-pico/protocol/definitions/message.h index c796d12e4..4a001444b 100644 --- a/include/zenoh-pico/protocol/definitions/message.h +++ b/include/zenoh-pico/protocol/definitions/message.h @@ -47,36 +47,6 @@ #define _Z_FRAG_BUFF_BASE_SIZE 128 // Arbitrary base size of the buffer to encode a fragment message header -// Flags: -// - T: Timestamp If T==1 then the timestamp if present -// - E: Encoding If E==1 then the encoding is present -// - Z: Extension If Z==1 then at least one extension is present -// -// 7 6 5 4 3 2 1 0 -// +-+-+-+-+-+-+-+-+ -// |Z|E|T| REPLY | -// +-+-+-+---------+ -// ~ ts: ~ if T==1 -// +---------------+ -// ~ encoding ~ if E==1 -// +---------------+ -// ~ [repl_exts] ~ if Z==1 -// +---------------+ -// ~ pl: ~ -- Payload -// +---------------+ -typedef struct { - _z_timestamp_t _timestamp; - _z_value_t _value; - _z_source_info_t _ext_source_info; - z_consolidation_mode_t _ext_consolidation; -#if Z_FEATURE_ATTACHMENT == 1 - _z_owned_encoded_attachment_t _ext_attachment; -#endif -} _z_msg_reply_t; -void _z_msg_reply_clear(_z_msg_reply_t *msg); -#define _Z_FLAG_Z_R_T 0x20 -#define _Z_FLAG_Z_R_E 0x40 - // Flags: // - T: Timestamp If T==1 then the timestamp if present // - I: Infrastructure If I==1 then the error is related to the infrastructure else to the user @@ -193,4 +163,33 @@ typedef struct { _z_msg_query_reqexts_t _z_msg_query_required_extensions(const _z_msg_query_t *msg); void _z_msg_query_clear(_z_msg_query_t *msg); +typedef struct { + _Bool _is_put; + union { + _z_msg_del_t _del; + _z_msg_put_t _put; + } _body; +} _z_reply_body_t; +// Flags: +// - C: Consolidation If C==1 then consolidation is present +// - X: Reserved +// - Z: Extension If Z==1 then at least one extension is present +// +// 7 6 5 4 3 2 1 0 +// +-+-+-+-+-+-+-+-+ +// |Z|X|C| REPLY | +// +-+-+-+---------+ +// ~ consolidation ~ if C==1 +// +---------------+ +// ~ [repl_exts] ~ if Z==1 +// +---------------+ +// ~ ReplyBody ~ -- Payload +// +---------------+ +typedef struct { + z_consolidation_mode_t _consolidation; + _z_reply_body_t _body; +} _z_msg_reply_t; +void _z_msg_reply_clear(_z_msg_reply_t *msg); +#define _Z_FLAG_Z_R_C 0x20 + #endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_MESSAGE_H */ diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 5c09247d3..689102dbe 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -118,13 +118,7 @@ typedef struct { _z_n_msg_request_exts_t _z_n_msg_request_needed_exts(const _z_n_msg_request_t *msg); void _z_n_msg_request_clear(_z_n_msg_request_t *msg); -typedef struct { - _Bool _is_put; - union { - _z_msg_del_t _del; - _z_msg_put_t _put; - } _body; -} _z_push_body_t; +typedef _z_reply_body_t _z_push_body_t; _z_push_body_t _z_push_body_null(void); _z_push_body_t _z_push_body_steal(_z_push_body_t *msg); void _z_push_body_clear(_z_push_body_t *msg); @@ -236,7 +230,7 @@ _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_byt z_attachment_t attachment #endif ); -_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_value_t) value); +_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body); _z_network_message_t _z_n_msg_make_ack(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key); _z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid); _z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration); diff --git a/include/zenoh-pico/session/query.h b/include/zenoh-pico/session/query.h index 86d03bb6f..6dbcb6fc7 100644 --- a/include/zenoh-pico/session/query.h +++ b/include/zenoh-pico/session/query.h @@ -26,8 +26,7 @@ _z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pq); int8_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_context, const _z_keyexpr_t keyexpr, - const _z_bytes_t payload, const _z_encoding_t encoding, const _z_zint_t kind, - const _z_timestamp_t timestamp); + const _z_msg_put_t *msg); int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id); void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq); void _z_flush_pending_queries(_z_session_t *zn); diff --git a/src/api/api.c b/src/api/api.c index 43afb1b39..9a64be720 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -934,7 +934,7 @@ int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const ui .len = payload_len, }, .encoding = {.prefix = opts.encoding.prefix, .suffix = opts.encoding.suffix}}; - return _z_send_reply(&query->_val._rc.in->val, keyexpr, value); + return _z_send_reply(&query->_val._rc.in->val, keyexpr, value, Z_SAMPLE_KIND_PUT); return _Z_ERR_GENERIC; } #endif diff --git a/src/net/primitives.c b/src/net/primitives.c index 4958b04be..2aa5b7920 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -340,7 +340,8 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) { return _Z_RES_OK; } -int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload) { +int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload, + const z_sample_kind_t kind) { int8_t ret = _Z_RES_OK; _z_keyexpr_t q_ke; @@ -359,23 +360,30 @@ int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_val // Build the reply context decorator. This is NOT the final reply. _z_id_t zid = ((_z_session_t *)query->_zn)->_local_zid; _z_keyexpr_t ke = _z_keyexpr_alias(keyexpr); - _z_zenoh_message_t z_msg = { - ._tag = _Z_N_RESPONSE, - ._body._response = - { - ._request_id = query->_request_id, - ._key = ke, - ._ext_responder = {._zid = zid, ._eid = 0}, - ._ext_qos = _Z_N_QOS_DEFAULT, - ._ext_timestamp = _z_timestamp_null(), - ._tag = _Z_RESPONSE_BODY_REPLY, - ._body._reply = {._value = payload, - ._timestamp = _z_timestamp_null(), - ._ext_consolidation = Z_CONSOLIDATION_MODE_AUTO, - ._ext_source_info = _z_source_info_null()}, - }, - }; - + _z_zenoh_message_t z_msg; + switch (kind) { + default: + return _Z_ERR_GENERIC; + break; + case Z_SAMPLE_KIND_PUT: + z_msg._tag = _Z_N_RESPONSE; + z_msg._body._response._request_id = query->_request_id; + z_msg._body._response._key = ke; + z_msg._body._response._ext_responder._zid = zid; + z_msg._body._response._ext_responder._eid = 0; + z_msg._body._response._ext_qos = _Z_N_QOS_DEFAULT; + z_msg._body._response._ext_timestamp = _z_timestamp_null(); + z_msg._body._response._tag = _Z_RESPONSE_BODY_REPLY; + z_msg._body._response._body._reply._consolidation = Z_CONSOLIDATION_MODE_DEFAULT; + z_msg._body._response._body._reply._body._is_put = true; + z_msg._body._response._body._reply._body._body._put._payload = payload.payload; + z_msg._body._response._body._reply._body._body._put._encoding = payload.encoding; + z_msg._body._response._body._reply._body._body._put._attachment.body.decoded = z_attachment_null(); + z_msg._body._response._body._reply._body._body._put._attachment.is_encoded = false; + z_msg._body._response._body._reply._body._body._put._commons._timestamp = _z_timestamp_null(); + z_msg._body._response._body._reply._body._body._put._commons._source_info = _z_source_info_null(); + break; + } if (_z_send_n_msg(query->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { ret = _Z_ERR_TRANSPORT_TX_FAILED; } diff --git a/src/protocol/codec/message.c b/src/protocol/codec/message.c index b8942df12..8a8d96c88 100644 --- a/src/protocol/codec/message.c +++ b/src/protocol/codec/message.c @@ -520,107 +520,41 @@ int8_t _z_query_decode(_z_msg_query_t *msg, _z_zbuf_t *zbf, uint8_t header) { int8_t _z_reply_encode(_z_wbuf_t *wbf, const _z_msg_reply_t *reply) { uint8_t header = _Z_MID_Z_REPLY; - if (_z_timestamp_check(&reply->_timestamp)) { - header |= _Z_FLAG_Z_R_T; - } - if (reply->_value.encoding.prefix != Z_ENCODING_PREFIX_EMPTY || - !_z_bytes_is_empty(&reply->_value.encoding.suffix)) { - header |= _Z_FLAG_Z_R_E; - } -#if Z_FEATURE_ATTACHMENT == 1 - z_attachment_t att = _z_encoded_as_attachment(&reply->_ext_attachment); - _Bool has_attachment = z_attachment_check(&att); -#else - _Bool has_attachment = false; -#endif - _Bool has_sourceinfo = _z_id_check(reply->_ext_source_info._id) || reply->_ext_source_info._source_sn != 0 || - reply->_ext_source_info._entity_id != 0; - _Bool has_consolidation_ext = reply->_ext_consolidation != Z_CONSOLIDATION_MODE_AUTO; - if (has_consolidation_ext || has_sourceinfo || has_attachment) { - header |= _Z_FLAG_Z_Z; + _Bool has_consolidation = reply->_consolidation != Z_CONSOLIDATION_MODE_DEFAULT; + if (has_consolidation) { + _Z_SET_FLAG(header, _Z_FLAG_Z_R_C); } _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); - int8_t ret = _Z_RES_OK; - if (_z_timestamp_check(&reply->_timestamp)) { - assert(_Z_HAS_FLAG(header, _Z_FLAG_Z_R_T)); - _Z_RETURN_IF_ERR(_z_timestamp_encode(wbf, &reply->_timestamp)); - } - if (((reply->_value.encoding.prefix != 0) || !_z_bytes_is_empty(&reply->_value.encoding.suffix))) { - assert(_Z_HAS_FLAG(header, _Z_FLAG_Z_R_E)); - _Z_RETURN_IF_ERR(_z_encoding_prefix_encode(wbf, reply->_value.encoding.prefix)); - _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &reply->_value.encoding.suffix)); - } - if (has_sourceinfo) { - uint8_t extheader = _Z_MSG_EXT_ENC_ZBUF | 0x01; - if (has_consolidation_ext || has_attachment) { - extheader |= _Z_MSG_EXT_FLAG_Z; - } - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); - _Z_RETURN_IF_ERR(_z_source_info_encode_ext(wbf, &reply->_ext_source_info)); - } - if (has_consolidation_ext) { - uint8_t extheader = _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M | 0x02; - if (has_attachment) { - extheader |= _Z_MSG_EXT_FLAG_Z; - } - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); - _Z_RETURN_IF_ERR(_z_zint_encode(wbf, reply->_ext_consolidation)); - } -#if Z_FEATURE_ATTACHMENT == 1 - if (has_attachment) { - _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, _Z_MSG_EXT_ENC_ZBUF | 0x04)); - _Z_RETURN_IF_ERR(_z_attachment_encode_ext(wbf, att)); + if (has_consolidation) { + _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, reply->_consolidation)); } -#endif - _Z_RETURN_IF_ERR(_z_bytes_encode(wbf, &reply->_value.payload)); - return ret; + _Z_RETURN_IF_ERR(_z_push_body_encode(wbf, &reply->_body)); + return _Z_RES_OK; } int8_t _z_reply_decode_extension(_z_msg_ext_t *extension, void *ctx) { int8_t ret = _Z_RES_OK; _z_msg_reply_t *reply = (_z_msg_reply_t *)ctx; switch (_Z_EXT_FULL_ID(extension->_header)) { - case _Z_MSG_EXT_ENC_ZBUF | 0x01: { - _z_zbuf_t zbf = _z_zbytes_as_zbuf(extension->_body._zbuf._val); - ret = _z_source_info_decode(&reply->_ext_source_info, &zbf); - break; - } - case _Z_MSG_EXT_ENC_ZINT | _Z_MSG_EXT_FLAG_M | 0x02: { - reply->_ext_consolidation = extension->_body._zint._val; - break; - } -#if Z_FEATURE_ATTACHMENT == 1 - case _Z_MSG_EXT_ENC_ZBUF | 0x04: { - reply->_ext_attachment.is_encoded = true; - reply->_ext_attachment.body.encoded = extension->_body._zbuf._val._is_alloc - ? _z_bytes_steal(&extension->_body._zbuf._val) - : _z_bytes_duplicate(&extension->_body._zbuf._val); - break; - } -#endif default: - if (_Z_HAS_FLAG(extension->_header, _Z_MSG_EXT_FLAG_M)) { - ret = _z_msg_ext_unknown_error(extension, 0x0a); - } + ret = _z_msg_ext_unknown_error(extension, 0x0a); + break; } return ret; } int8_t _z_reply_decode(_z_msg_reply_t *reply, _z_zbuf_t *zbf, uint8_t header) { - int8_t ret = _Z_RES_OK; *reply = (_z_msg_reply_t){0}; - reply->_ext_consolidation = Z_CONSOLIDATION_MODE_AUTO; - if (_Z_HAS_FLAG(header, _Z_FLAG_Z_R_T)) { - _Z_RETURN_IF_ERR(_z_timestamp_decode(&reply->_timestamp, zbf)); - } - if (_Z_HAS_FLAG(header, _Z_FLAG_Z_R_E)) { - _Z_RETURN_IF_ERR(_z_encoding_prefix_decode(&reply->_value.encoding.prefix, zbf)); - _Z_RETURN_IF_ERR(_z_bytes_decode(&reply->_value.encoding.suffix, zbf)); + if (_Z_HAS_FLAG(header, _Z_FLAG_Z_R_C)) { + _Z_RETURN_IF_ERR(_z_uint8_decode((uint8_t *)&reply->_consolidation, zbf)); + } else { + reply->_consolidation = Z_CONSOLIDATION_MODE_DEFAULT; } if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) { _Z_RETURN_IF_ERR(_z_msg_ext_decode_iter(zbf, _z_reply_decode_extension, reply)); } - _Z_RETURN_IF_ERR(_z_bytes_decode(&reply->_value.payload, zbf)); - - return ret; + uint8_t put_header = 0; + _Z_RETURN_IF_ERR(_z_uint8_decode(&put_header, zbf)); + _Z_RETURN_IF_ERR(_z_push_body_decode(&reply->_body, zbf, put_header)); + return _Z_RES_OK; } int8_t _z_err_encode(_z_wbuf_t *wbf, const _z_msg_err_t *err) { diff --git a/src/protocol/definitions/message.c b/src/protocol/definitions/message.c index 61573e3c4..ea0b5eb82 100644 --- a/src/protocol/definitions/message.c +++ b/src/protocol/definitions/message.c @@ -18,8 +18,9 @@ #include "zenoh-pico/collections/bytes.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/network.h" -void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_value_clear(&msg->_value); } +void _z_msg_reply_clear(_z_msg_reply_t *msg) { _z_push_body_clear(&msg->_body); } void _z_msg_put_clear(_z_msg_put_t *msg) { _z_bytes_clear(&msg->_encoding.suffix); diff --git a/src/protocol/definitions/network.c b/src/protocol/definitions/network.c index 959361710..5f26ecf58 100644 --- a/src/protocol/definitions/network.c +++ b/src/protocol/definitions/network.c @@ -220,7 +220,7 @@ _z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_pu ._body._push = {._key = _z_keyexpr_steal(key), ._body = _z_push_body_steal(body)}, }; } -_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_value_t) value) { +_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body) { return (_z_network_message_t){ ._tag = _Z_N_RESPONSE, ._body._response = @@ -230,10 +230,8 @@ _z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) ke ._request_id = rid, ._body._reply = { - ._timestamp = _z_timestamp_null(), - ._value = _z_value_steal(value), - ._ext_source_info = _z_source_info_null(), - ._ext_consolidation = Z_CONSOLIDATION_MODE_AUTO, + ._consolidation = Z_CONSOLIDATION_MODE_AUTO, + ._body = _z_push_body_steal(body), }, ._ext_qos = _Z_N_QOS_DEFAULT, ._ext_timestamp = _z_timestamp_null(), diff --git a/src/session/query.c b/src/session/query.c index 90d300621..a3080761d 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -133,8 +133,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry) } int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, const _z_keyexpr_t keyexpr, - const _z_bytes_t payload, const _z_encoding_t encoding, const _z_zint_t kind, - const _z_timestamp_t timestamp) { + const _z_msg_put_t *msg) { int8_t ret = _Z_RES_OK; _zp_session_lock_mutex(zn); @@ -156,11 +155,11 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons reply._tag = Z_REPLY_TAG_DATA; reply.data.replier_id = zn->_local_zid; reply.data.sample.keyexpr = expanded_ke; - _z_bytes_copy(&reply.data.sample.payload, &payload); - reply.data.sample.encoding.prefix = encoding.prefix; - _z_bytes_copy(&reply.data.sample.encoding.suffix, &encoding.suffix); - reply.data.sample.kind = kind; - reply.data.sample.timestamp = _z_timestamp_duplicate(×tamp); + _z_bytes_copy(&reply.data.sample.payload, &msg->_payload); + reply.data.sample.encoding.prefix = msg->_encoding.prefix; + _z_bytes_copy(&reply.data.sample.encoding.suffix, &msg->_encoding.suffix); + reply.data.sample.kind = Z_SAMPLE_KIND_PUT; + reply.data.sample.timestamp = _z_timestamp_duplicate(&msg->_commons._timestamp); // Verify if this is a newer reply, free the old one in case it is if ((ret == _Z_RES_OK) && ((pen_qry->_consolidation == Z_CONSOLIDATION_MODE_LATEST) || @@ -173,7 +172,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons // Check if this is the same resource key if (_z_str_eq(pen_rep->_reply.data.sample.keyexpr._suffix, reply.data.sample.keyexpr._suffix) == true) { - if (timestamp.time <= pen_rep->_tstamp.time) { + if (msg->_commons._timestamp.time <= pen_rep->_tstamp.time) { drop = true; } else { pen_qry->_pending_replies = @@ -199,7 +198,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons } else { pen_rep->_reply = reply; // Store the whole reply in the latest mode } - pen_rep->_tstamp = _z_timestamp_duplicate(×tamp); + pen_rep->_tstamp = _z_timestamp_duplicate(&msg->_commons._timestamp); pen_qry->_pending_replies = _z_pending_reply_list_push(pen_qry->_pending_replies, pen_rep); } else { ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; diff --git a/src/session/reply.c b/src/session/reply.c index 7fcffc0e3..7441b5e7f 100644 --- a/src/session/reply.c +++ b/src/session/reply.c @@ -25,8 +25,11 @@ int8_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t key // TODO check id to know where to dispatch #if Z_FEATURE_QUERY == 1 - ret = _z_trigger_query_reply_partial(zn, id, key, reply->_value.payload, reply->_value.encoding, Z_SAMPLE_KIND_PUT, - reply->_timestamp); + if (reply->_body._is_put) { + ret = _z_trigger_query_reply_partial(zn, id, key, &reply->_body._body._put); + } else { + ret = _Z_ERR_GENERIC; + } #endif return ret; } diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index 668749953..80e66747c 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -1226,18 +1226,14 @@ void ack_message(void) { _z_msg_reply_t gen_reply(void) { return (_z_msg_reply_t){ - ._ext_source_info = gen_bool() ? gen_source_info() : _z_source_info_null(), - ._timestamp = gen_timestamp(), - ._ext_consolidation = (gen_uint8() % 4) - 1, - ._value = gen_value(), + ._consolidation = (gen_uint8() % 4) - 1, + ._body = gen_push_body(), }; } void assert_eq_reply(const _z_msg_reply_t *left, const _z_msg_reply_t *right) { - assert_eq_timestamp(&left->_timestamp, &right->_timestamp); - assert_eq_source_info(&left->_ext_source_info, &right->_ext_source_info); - assert(left->_ext_consolidation == right->_ext_consolidation); - assert_eq_value(&left->_value, &right->_value); + assert(left->_consolidation == right->_consolidation); + assert_eq_push_body(&left->_body, &right->_body); } void reply_message(void) {