Skip to content

Commit

Permalink
fix aliasing issue in resources
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Aug 11, 2023
1 parent a0f0113 commit 5238f25
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 40 deletions.
21 changes: 17 additions & 4 deletions examples/unix/c11/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <zenoh-pico.h>

#include "zenoh-pico/system/platform.h"

int main(int argc, char **argv) {
const char *keyexpr = "demo/example/zenoh-pico-pub";
const char *value = "Pub from Pico!";
char *value = "Pub from Pico!";
const char *mode = "client";
char *locator = NULL;

int opt;
while ((opt = getopt(argc, argv, "k:v:e:m:")) != -1) {
while ((opt = getopt(argc, argv, "k:v:e:m:l:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
Expand All @@ -40,6 +43,15 @@ int main(int argc, char **argv) {
case 'm':
mode = optarg;
break;
case 'l':
opt = atoi(optarg);
value = z_malloc(opt + 1);
memset(value, 'A', opt);
value[opt] = 0;
for (int i = opt - 1; opt > 0; i--, opt /= 10) {
value[i] = '0' + (opt % 10);
}
break;
case '?':
if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
Expand Down Expand Up @@ -78,10 +90,11 @@ int main(int argc, char **argv) {
return -1;
}

char *buf = (char *)malloc(256);
char *buf = value; // (char *)malloc(256);
for (int idx = 0; 1; ++idx) {
sleep(1);
snprintf(buf, 256, "[%4d] %s", idx, value);
(void)idx;
// snprintf(buf, 256, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);

z_publisher_put_options_t options = z_publisher_put_options_default();
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/protocol/iobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ typedef struct {
size_t _r_idx;
size_t _w_idx;
size_t _capacity;
_Bool _is_expandable;
size_t _expansion_step;
} _z_wbuf_t;

_z_wbuf_t _z_wbuf_make(size_t capacity, _Bool is_expandable);
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/transport/link/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

void __unsafe_z_prepare_wbuf(_z_wbuf_t *buf, _Bool is_streamed);
void __unsafe_z_finalize_wbuf(_z_wbuf_t *buf, _Bool is_streamed);
/*This function is unsafe because it operates in potentially concurrent
data.*Make sure that the following mutexes are locked before calling this function : *-ztu->mutex_tx */
int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn);

/*------------------ Transmission and Reception helpers ------------------*/
Expand Down
45 changes: 26 additions & 19 deletions include/zenoh-pico/utils/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,45 @@ static inline void __z_print_timestamp(void) {
printf("[%s ", z_time_now_as_str(ret, sizeof(ret)));
}

#define _Z_LOG_PREFIX(prefix) \
__z_print_timestamp(); \
printf(#prefix " ::%s] ", __func__);

#if (ZENOH_DEBUG == 3)
#define _Z_DEBUG(x, ...) \
__z_print_timestamp(); \
printf("DEBUG ::%s] ", __func__); \
#define _Z_DEBUG(x, ...) \
_Z_LOG_PREFIX(DEBUG); \
printf(x, ##__VA_ARGS__);
#define _Z_INFO(x, ...) \
__z_print_timestamp(); \
printf("INFO ::%s] ", __func__); \
#define _Z_DEBUG_CONTINUE(x, ...) printf(x, ##__VA_ARGS__);
#define _Z_INFO(x, ...) \
_Z_LOG_PREFIX(INFO); \
printf(x, ##__VA_ARGS__);
#define _Z_ERROR(x, ...) \
__z_print_timestamp(); \
printf("ERROR ::%s] ", __func__); \
#define _Z_INFO_CONTINUE(x, ...) printf(x, ##__VA_ARGS__);
#define _Z_ERROR(x, ...) \
_Z_LOG_PREFIX(ERROR); \
printf(x, ##__VA_ARGS__);
#define _Z_ERROR_CONTINUE(x, ...) printf(x, ##__VA_ARGS__);

#elif (ZENOH_DEBUG == 2)
#define _Z_DEBUG(x, ...) (void)(0)
#define _Z_INFO(x, ...) \
__z_print_timestamp(); \
printf("INFO ::%s] ", __func__); \
#define _Z_DEBUG_CONTINUE(x, ...) (void)(0);
#define _Z_INFO(x, ...) \
_Z_LOG_PREFIX(INFO); \
printf(x, ##__VA_ARGS__);
#define _Z_ERROR(x, ...) \
__z_print_timestamp(); \
printf("ERROR ::%s] ", __func__); \
#define _Z_INFO_CONTINUE(x, ...) printf(x, ##__VA_ARGS__);
#define _Z_ERROR(x, ...) \
_Z_LOG_PREFIX(ERROR); \
printf(x, ##__VA_ARGS__);
#define _Z_ERROR_CONTINUE(x, ...) printf(x, ##__VA_ARGS__);

#elif (ZENOH_DEBUG == 1)
#define _Z_DEBUG(x, ...) (void)(0)
#define _Z_INFO(x, ...) (void)(0)
#define _Z_ERROR(x, ...) \
__z_print_timestamp(); \
printf("ERROR ::%s] ", __func__); \
#define _Z_DEBUG_CONTINUE(x, ...) (void)(0);
#define _Z_INFO(x, ...) (void)(0);
#define _Z_INFO_CONTINUE(x, ...) (void)(0);
#define _Z_ERROR(x, ...) \
_Z_LOG_PREFIX(ERROR); \
printf(x, ##__VA_ARGS__);
#define _Z_ERROR_CONTINUE(x, ...) printf(x, ##__VA_ARGS__);

#elif (ZENOH_DEBUG == 0)
#define _Z_DEBUG(x, ...) (void)(0)
Expand Down
5 changes: 3 additions & 2 deletions src/protocol/codec.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,9 @@ int8_t _z_zint64_decode(uint64_t *zint, _z_zbuf_t *zbf) {
int8_t _z_bytes_val_encode(_z_wbuf_t *wbf, const _z_bytes_t *bs) {
int8_t ret = _Z_RES_OK;

if ((wbf->_is_expandable == true) && (bs->len > Z_TSID_LENGTH)) {
ret |= _z_wbuf_wrap_bytes(wbf, bs->start, 0, bs->len);
if ((wbf->_expansion_step = true) && (bs->len > Z_TSID_LENGTH)) {
// ret |= _z_wbuf_wrap_bytes(wbf, bs->start, 0, bs->len);
ret |= _z_wbuf_write_bytes(wbf, bs->start, 0, bs->len);
} else {
ret |= _z_wbuf_write_bytes(wbf, bs->start, 0, bs->len);
}
Expand Down
5 changes: 3 additions & 2 deletions src/protocol/codec/declarations.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "zenoh-pico/system/platform.h"

int8_t _z_decl_ext_keyexpr_encode(_z_wbuf_t *wbf, _z_keyexpr_t ke, _Bool has_next_ext) {
uint8_t header = _Z_MSG_EXT_ENC_ZBUF | 0x0f | (has_next_ext ? _Z_FLAG_Z_Z : 0);
uint8_t header = _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 0x0f | (has_next_ext ? _Z_FLAG_Z_Z : 0);
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
uint32_t kelen = _z_keyexpr_has_suffix(ke) ? strlen(ke._suffix) : 0;
header = (_z_keyexpr_is_local(&ke) ? 2 : 0) | (kelen != 0 ? 1 : 0);
Expand Down Expand Up @@ -189,6 +189,7 @@ int8_t _z_decl_kexpr_decode(_z_decl_kexpr_t *decl, _z_zbuf_t *zbf, uint8_t heade
*decl = _z_decl_kexpr_null();
_Z_RETURN_IF_ERR(_z_zint16_decode(&decl->_id, zbf));
_Z_RETURN_IF_ERR(_z_keyexpr_decode(&decl->_keyexpr, zbf, _Z_HAS_FLAG(header, _Z_DECL_KEXPR_FLAG_N)));
_z_keyexpr_set_mapping(&decl->_keyexpr, _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE);

if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_skip_non_mandatories(zbf, 0x15));
Expand All @@ -208,7 +209,7 @@ int8_t _z_undecl_kexpr_decode(_z_undecl_kexpr_t *decl, _z_zbuf_t *zbf, uint8_t h
int8_t _z_undecl_decode_extensions(_z_msg_ext_t *extension, void *ctx) {
_z_keyexpr_t *ke = (_z_keyexpr_t *)ctx;
switch (extension->_header) {
case _Z_MSG_EXT_ENC_ZBUF | 0x0f: {
case _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 0x0f: {
_z_zbuf_t _zbf = _z_zbytes_as_zbuf(extension->_body._zbuf._val);
_z_zbuf_t *zbf = &_zbf;
uint8_t header;
Expand Down
12 changes: 6 additions & 6 deletions src/protocol/iobuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ _z_wbuf_t _z_wbuf_make(size_t capacity, _Bool is_expandable) {
}
wbf._w_idx = 0; // This __must__ come after adding ioslices to reset w_idx
wbf._r_idx = 0;
wbf._is_expandable = is_expandable;
wbf._expansion_step = is_expandable ? capacity : 0;
wbf._capacity = capacity;

return wbf;
Expand Down Expand Up @@ -361,8 +361,8 @@ int8_t _z_wbuf_write(_z_wbuf_t *wbf, uint8_t b) {
size_t writable = _z_iosli_writable(ios);
if (writable >= (size_t)1) {
_z_iosli_write(ios, b);
} else if (wbf->_is_expandable == true) {
ios = __z_wbuf_new_iosli(Z_IOSLICE_SIZE);
} else if (wbf->_expansion_step != 0) {
ios = __z_wbuf_new_iosli(wbf->_expansion_step);
_z_wbuf_add_iosli(wbf, ios);
_z_iosli_write(ios, b);
} else {
Expand All @@ -382,12 +382,12 @@ int8_t _z_wbuf_write_bytes(_z_wbuf_t *wbf, const uint8_t *bs, size_t offset, siz
size_t writable = _z_iosli_writable(ios);
if (writable >= llength) {
_z_iosli_write_bytes(ios, bs, loffset, llength);
} else if (wbf->_is_expandable == true) {
} else if (wbf->_expansion_step != 0) {
_z_iosli_write_bytes(ios, bs, loffset, writable);
llength = llength - writable;
loffset = loffset + writable;
while (llength > (size_t)0) {
ios = __z_wbuf_new_iosli(Z_IOSLICE_SIZE);
ios = __z_wbuf_new_iosli(wbf->_expansion_step);
_z_wbuf_add_iosli(wbf, ios);

writable = _z_iosli_writable(ios);
Expand Down Expand Up @@ -530,7 +530,7 @@ void _z_wbuf_copy(_z_wbuf_t *dst, const _z_wbuf_t *src) {
dst->_capacity = src->_capacity;
dst->_r_idx = src->_r_idx;
dst->_w_idx = src->_w_idx;
dst->_is_expandable = src->_is_expandable;
dst->_expansion_step = src->_expansion_step;
_z_iosli_vec_copy(&dst->_ioss, &src->_ioss);
}

Expand Down
6 changes: 4 additions & 2 deletions src/session/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ _z_resource_t *__z_get_resource_by_id(_z_resource_list_t *rl, uint16_t mapping,
_z_resource_list_t *xs = rl;
while (xs != NULL) {
_z_resource_t *r = _z_resource_list_head(xs);
_Z_DEBUG("Checking ressource %d on mapping 0x%x: %d %s\n", r->_id, _z_keyexpr_mapping_id(&r->_key), r->_key._id,
r->_key._suffix);
if (r->_id == id && _z_keyexpr_mapping_id(&r->_key) == mapping) {
ret = r;
break;
Expand Down Expand Up @@ -215,9 +217,9 @@ _z_keyexpr_t _z_get_expanded_key_from_key(_z_session_t *zn, const _z_keyexpr_t *
int8_t _z_register_resource(_z_session_t *zn, _z_resource_t *res) {
int8_t ret = _Z_RES_OK;

_Z_DEBUG(">>> Allocating res decl for (%ju,%u:%s)\n", (uintmax_t)res->_id, (unsigned int)res->_key._id,
res->_key._suffix);
uint16_t mapping = _z_keyexpr_mapping_id(&res->_key);
_Z_DEBUG(">>> Allocating res decl for (%ju,%u:%s) on mapping 0x%x\n", (uintmax_t)res->_id,
(unsigned int)res->_key._id, res->_key._suffix, mapping);
#if Z_MULTI_THREAD == 1
_z_mutex_lock(&zn->_mutex_inner);
#endif // Z_MULTI_THREAD == 1
Expand Down
10 changes: 8 additions & 2 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@
/*------------------ Handle message ------------------*/
int8_t _z_handle_zenoh_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint16_t local_peer_id) {
int8_t ret = _Z_RES_OK;

switch (msg->_tag) {
case _Z_N_DECLARE: {
_Z_DEBUG("Handling _Z_N_DECLARE\n");
_z_n_msg_declare_t decl = msg->_body._declare;
switch (decl._decl._tag) {
case _Z_DECL_KEXPR: {
_z_resource_t *res = (_z_resource_t *)z_malloc(sizeof(_z_resource_t));
res->_id = decl._decl._body._decl_kexpr._id;
res->_key = decl._decl._body._decl_kexpr._keyexpr;
res->_key = _z_keyexpr_duplicate(decl._decl._body._decl_kexpr._keyexpr);
ret = _z_register_resource(zn, res);
} break;
case _Z_UNDECL_KEXPR: {
Expand Down Expand Up @@ -78,13 +80,15 @@ int8_t _z_handle_zenoh_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint16
}
} break;
case _Z_N_PUSH: {
_Z_DEBUG("Handling _Z_N_PUSH\n");
_z_n_msg_push_t push = msg->_body._push;
_z_bytes_t payload = push._body._is_put ? push._body._body._put._payload : _z_bytes_empty();
_z_encoding_t encoding = push._body._is_put ? push._body._body._put._encoding : z_encoding_default();
int kind = push._body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE;
ret = _z_trigger_subscriptions(zn, push._key, payload, encoding, kind, push._timestamp);
} break;
case _Z_N_REQUEST: {
_Z_DEBUG("Handling _Z_N_REQUEST\n");
_z_n_msg_request_t req = msg->_body._request;
switch (req._tag) {
case _Z_REQUEST_QUERY: {
Expand Down Expand Up @@ -119,6 +123,7 @@ int8_t _z_handle_zenoh_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint16
}
} break;
case _Z_N_RESPONSE: {
_Z_DEBUG("Handling _Z_N_RESPONSE\n");
_z_n_msg_response_t response = msg->_body._response;
switch (response._tag) {
case _Z_RESPONSE_BODY_REPLY: {
Expand All @@ -130,7 +135,7 @@ int8_t _z_handle_zenoh_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint16
// @TODO: expose errors to the user
_z_msg_err_t error = response._body._err;
_z_bytes_t payload = error._ext_value.payload;
_Z_ERROR("Received Err for query %d: code=%d, message=%.*s\n", response._request_id, error._code,
_Z_ERROR("Received Err for query %zu: code=%d, message=%.*s\n", response._request_id, error._code,
payload.len, payload.start);
} break;
case _Z_RESPONSE_BODY_ACK: {
Expand All @@ -149,6 +154,7 @@ int8_t _z_handle_zenoh_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint16
}
} break;
case _Z_N_RESPONSE_FINAL: {
_Z_DEBUG("Handling _Z_N_RESPONSE_FINAL\n");
_z_zint_t id = msg->_body._response_final._request_id;
_z_trigger_query_reply_final(zn, id);
} break;
Expand Down
5 changes: 5 additions & 0 deletions src/session/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#include <stddef.h>

#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/keyexpr.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/session/session.h"
#include "zenoh-pico/utils/logging.h"

_Bool _z_subscription_eq(const _z_subscription_t *other, const _z_subscription_t *this) {
Expand Down Expand Up @@ -156,7 +158,9 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co
_z_mutex_lock(&zn->_mutex_inner);
#endif // Z_MULTI_THREAD == 1

_Z_DEBUG("Resolving %d - %s on mapping 0x%x\n", keyexpr._id, keyexpr._suffix, _z_keyexpr_mapping_id(&keyexpr));
_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr);
_Z_DEBUG("Triggering subs for %d - %s\n", key._id, key._suffix);
if (key._suffix != NULL) {
_z_subscription_sptr_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, key);

Expand All @@ -172,6 +176,7 @@ int8_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, co
s.kind = kind;
s.timestamp = timestamp;
_z_subscription_sptr_list_t *xs = subs;
_Z_DEBUG("Triggering %ld subs\n", _z_subscription_sptr_list_len(xs));
while (xs != NULL) {
_z_subscription_sptr_t *sub = _z_subscription_sptr_list_head(xs);
sub->ptr->_callback(&s, sub->ptr->_arg);
Expand Down
30 changes: 30 additions & 0 deletions src/transport/common/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

#include "zenoh-pico/transport/link/tx.h"

#include "zenoh-pico/api/constants.h"
#include "zenoh-pico/protocol/codec/transport.h"
#include "zenoh-pico/protocol/definitions/transport.h"
#include "zenoh-pico/utils/logging.h"

/*------------------ Transmission helper ------------------*/
Expand Down Expand Up @@ -101,3 +103,31 @@ int8_t _z_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_m
return ret;
}
#endif // Z_UNICAST_TRANSPORT == 1 || Z_MULTICAST_TRANSPORT == 1
int8_t __unsafe_z_serialize_zenoh_fragment(_z_wbuf_t *dst, _z_wbuf_t *src, z_reliability_t reliability, size_t sn) {
int8_t ret = _Z_RES_OK;

// Assume first that this is not the final fragment
_Bool is_final = false;
do {
size_t w_pos = _z_wbuf_get_wpos(dst); // Mark the buffer for the writing operation

_z_transport_message_t f_hdr = _z_t_msg_make_frame_header(sn, reliability == Z_RELIABILITY_RELIABLE);
ret = _z_transport_message_encode(dst, &f_hdr); // Encode the frame header
if (ret == _Z_RES_OK) {
size_t space_left = _z_wbuf_space_left(dst);
size_t bytes_left = _z_wbuf_len(src);

if ((is_final == false) && (bytes_left <= space_left)) { // Check if it is really the final fragment
_z_wbuf_set_wpos(dst, w_pos); // Revert the buffer
is_final = true; // It is really the finally fragment, reserialize the header
continue;
}

size_t to_copy = (bytes_left <= space_left) ? bytes_left : space_left; // Compute bytes to write
ret = _z_wbuf_siphon(dst, src, to_copy); // Write the fragment
}
break;
} while (1);

return ret;
}
1 change: 1 addition & 0 deletions src/transport/unicast/link/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ int8_t _z_unicast_handle_transport_message(_z_transport_unicast_t *ztu, _z_trans

// Handle all the zenoh message, one by one
size_t len = _z_vec_len(&t_msg->_body._frame._messages);
_Z_INFO("Z_FRAME contained %ld messages\n", len);
for (size_t i = 0; i < len; i++) {
_z_handle_zenoh_message(ztu->_session,
(_z_zenoh_message_t *)_z_vec_get(&t_msg->_body._frame._messages, i),
Expand Down
Loading

0 comments on commit 5238f25

Please sign in to comment.