Skip to content

Commit

Permalink
fix declarations, including prefixes for subs
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Aug 24, 2023
1 parent c5f879a commit 82f060f
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 49 deletions.
3 changes: 2 additions & 1 deletion BSDmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ CMAKE_OPT=-DZENOH_DEBUG=$(ZENOH_DEBUG) -DBUILD_EXAMPLES=$(BUILD_EXAMPLES) -DCMAK

all: make

$(BUILD_DIR)/Makefile: CMakeLists.txt
$(BUILD_DIR)/Makefile:
mkdir -p $(BUILD_DIR)
echo $(CMAKE_OPT)
cmake $(CMAKE_OPT) -B$(BUILD_DIR)

make: $(BUILD_DIR)/Makefile
Expand Down
3 changes: 2 additions & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ CMAKE_OPT=-DZENOH_DEBUG=$(ZENOH_DEBUG) -DBUILD_EXAMPLES=$(BUILD_EXAMPLES) -DCMAK

all: make

$(BUILD_DIR)/Makefile: CMakeLists.txt
$(BUILD_DIR)/Makefile:
mkdir -p $(BUILD_DIR)
echo $(CMAKE_OPT)
cmake $(CMAKE_OPT) -B $(BUILD_DIR)

make: $(BUILD_DIR)/Makefile
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/keyexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ _Bool _z_keyexpr_intersects(const char *lstart, const size_t llen, const char *r
/*------------------ clone/Copy/Free helpers ------------------*/
void _z_keyexpr_copy(_z_keyexpr_t *dst, const _z_keyexpr_t *src);
_z_keyexpr_t _z_keyexpr_duplicate(_z_keyexpr_t src);
_z_keyexpr_t _z_keyexpr_to_owned(_z_keyexpr_t src);
_z_keyexpr_t _z_keyexpr_alias(_z_keyexpr_t src);
_z_keyexpr_t _z_keyexpr_steal(_Z_MOVE(_z_keyexpr_t) src);
static inline _z_keyexpr_t _z_keyexpr_null(void) { return (_z_keyexpr_t){._id = 0, ._mapping = {0}, ._suffix = NULL}; }
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ uint16_t _z_get_resource_id(_z_session_t *zn);
_z_resource_t *_z_get_resource_by_id(_z_session_t *zn, uint16_t mapping, _z_zint_t rid);
_z_resource_t *_z_get_resource_by_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr);
_z_keyexpr_t _z_get_expanded_key_from_key(_z_session_t *zn, const _z_keyexpr_t *keyexpr);
int8_t _z_register_resource(_z_session_t *zn, _z_resource_t *res);
int16_t _z_register_resource(_z_session_t *zn, const _z_keyexpr_t key, uint16_t id, uint16_t register_to_mapping);
void _z_unregister_resource(_z_session_t *zn, uint16_t id, uint16_t mapping);
void _z_unregister_resources_for_peer(_z_session_t *zn, uint16_t mapping);
void _z_flush_resources(_z_session_t *zn);
Expand Down
13 changes: 12 additions & 1 deletion src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "zenoh-pico/session/queryable.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/system/platform.h"
#include "zenoh-pico/utils/logging.h"
#include "zenoh-pico/utils/result.h"
#include "zenoh-pico/utils/uuid.h"

Expand Down Expand Up @@ -818,8 +820,17 @@ z_owned_subscriber_t z_declare_subscriber(z_session_t zs, z_keyexpr_t keyexpr, z
#endif // Z_MULTICAST_TRANSPORT == 1
_z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr);
if (r == NULL) {
char *wild = strpbrk(keyexpr._suffix, "*$");
if (wild != NULL) {
size_t len = wild - keyexpr._suffix;
char *suffix = z_malloc(len + 1);
memcpy(suffix, keyexpr._suffix, len);
suffix[len] = 0;
keyexpr._suffix = suffix;
_z_keyexpr_set_owns_suffix(&keyexpr, true);
}
uint16_t id = _z_declare_resource(zs._val, keyexpr);
key = _z_rid_with_suffix(id, NULL);
key = _z_rid_with_suffix(id, wild);
}
#if Z_MULTICAST_TRANSPORT == 1
}
Expand Down
27 changes: 10 additions & 17 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,18 @@ uint16_t _z_declare_resource(_z_session_t *zn, _z_keyexpr_t keyexpr) {

if (zn->_tp._type ==
_Z_TRANSPORT_UNICAST_TYPE) { // FIXME: remove when resource declaration is implemented for multicast transport

_z_resource_t *r = (_z_resource_t *)z_malloc(sizeof(_z_resource_t));
if (r != NULL) {
r->_id = _z_get_resource_id(zn);
r->_key = _z_keyexpr_duplicate(keyexpr);
if (_z_register_resource(zn, r) == _Z_RES_OK) {
// Build the declare message to send on the wire
_z_keyexpr_t alias = _z_keyexpr_alias(r->_key);
_z_declaration_t declaration = _z_make_decl_keyexpr(r->_id, &alias);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) {
ret = r->_id;
} else {
_z_unregister_resource(zn, r->_id, _z_keyexpr_mapping_id(&keyexpr));
}
_z_n_msg_clear(&n_msg);
uint16_t id = _z_register_resource(zn, keyexpr, 0, _Z_KEYEXPR_MAPPING_LOCAL);
if (id != 0) {
// Build the declare message to send on the wire
_z_keyexpr_t alias = _z_keyexpr_alias(keyexpr);
_z_declaration_t declaration = _z_make_decl_keyexpr(id, &alias);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) == _Z_RES_OK) {
ret = id;
} else {
_z_resource_free(&r);
_z_unregister_resource(zn, id, _Z_KEYEXPR_MAPPING_LOCAL);
}
_z_n_msg_clear(&n_msg);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/protocol/keyexpr.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ _z_keyexpr_t _z_keyexpr_duplicate(_z_keyexpr_t src) {
return dst;
}

_z_keyexpr_t _z_keyexpr_to_owned(_z_keyexpr_t src) {
return _z_keyexpr_owns_suffix(&src) ? src : _z_keyexpr_duplicate(src);
}

_z_keyexpr_t _z_keyexpr_steal(_Z_MOVE(_z_keyexpr_t) src) {
_z_keyexpr_t stolen = *src;
*src = _z_keyexpr_null();
Expand Down
60 changes: 39 additions & 21 deletions src/session/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include "zenoh-pico/api/types.h"
#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/keyexpr.h"
#include "zenoh-pico/session/session.h"
#include "zenoh-pico/system/platform.h"
#include "zenoh-pico/utils/logging.h"

_Bool _z_resource_eq(const _z_resource_t *other, const _z_resource_t *this) { return this->_id == other->_id; }
Expand Down Expand Up @@ -214,29 +216,39 @@ _z_keyexpr_t _z_get_expanded_key_from_key(_z_session_t *zn, const _z_keyexpr_t *
return res;
}

int8_t _z_register_resource(_z_session_t *zn, _z_resource_t *res) {
int8_t ret = _Z_RES_OK;
/// Returns the ID of the registered keyexpr. Returns 0 if registration failed.
int16_t _z_register_resource(_z_session_t *zn, _z_keyexpr_t key, uint16_t id, uint16_t register_to_mapping) {
int16_t ret = Z_RESOURCE_ID_NONE;
key = _z_keyexpr_alias(key);
uint16_t mapping = register_to_mapping;
uint16_t parent_mapping = _z_keyexpr_mapping_id(&key);

uint16_t mapping = _z_keyexpr_mapping_id(&res->_key);
_Z_DEBUG(">>> Allocating res decl for (%ju,%u:%s) on mapping 0x%x\n", (uintmax_t)res->_id,
(unsigned int)res->_key._id, res->_key._suffix, mapping);
#if Z_MULTI_THREAD == 1
_z_mutex_lock(&zn->_mutex_inner);
#endif // Z_MULTI_THREAD == 1

// FIXME: check by keyexpr instead
_z_resource_t *r = __unsafe_z_get_resource_by_id(zn, mapping, res->_id);
if (r == NULL) {
if (key._id != Z_RESOURCE_ID_NONE) {
if (parent_mapping == mapping || parent_mapping == _Z_KEYEXPR_MAPPING_LOCAL) {
_z_resource_t *parent = __unsafe_z_get_resource_by_id(zn, mapping, key._id);
parent->_refcount++;
} else {
key = __unsafe_z_get_expanded_key_from_key(zn, &key);
}
}
_z_resource_t *res = z_malloc(sizeof(_z_resource_t));
if ((key._suffix != NULL || mapping != _Z_KEYEXPR_MAPPING_LOCAL) && res != NULL) {
res->_refcount = 1;
res->_key = _z_keyexpr_to_owned(key);
ret = id == Z_RESOURCE_ID_NONE ? _z_get_resource_id(zn) : id;
res->_id = ret;
// Register the resource
if (mapping == _Z_KEYEXPR_MAPPING_LOCAL) {
zn->_local_resources = _z_resource_list_push(zn->_local_resources, res);
} else {
zn->_remote_resources = _z_resource_list_push(zn->_remote_resources, res);
}
} else {
r->_refcount++;
ret = _Z_ERR_ENTITY_DECLARATION_FAILED;
ret = Z_RESOURCE_ID_NONE;
}

#if Z_MULTI_THREAD == 1
Expand All @@ -251,20 +263,26 @@ void _z_unregister_resource(_z_session_t *zn, uint16_t id, uint16_t mapping) {
#if Z_MULTI_THREAD == 1
_z_mutex_lock(&zn->_mutex_inner);
#endif // Z_MULTI_THREAD == 1
_z_resource_list_t *list = is_local ? zn->_local_resources : zn->_remote_resources;
while (list) {
_z_resource_t *head = _z_resource_list_head(list);
if (head && head->_id == id && _z_keyexpr_mapping_id(&head->_key) == mapping) {
head->_refcount--;
if (head->_refcount == 0) {
_z_resource_list_pop(list, &head);
_z_resource_free(&head);
while (id != 0) {
_z_resource_list_t **list = is_local ? &zn->_local_resources : &zn->_remote_resources;
_z_resource_list_t *tail = *list;
while (list) {
_z_resource_t *head = _z_resource_list_head(tail);
if (head && head->_id == id && _z_keyexpr_mapping_id(&head->_key) == mapping) {
head->_refcount--;
if (head->_refcount == 0) {
*list = _z_resource_list_pop(*list, &head);
id = head->_key._id;
mapping = _z_keyexpr_mapping_id(&head->_key);
_z_resource_free(&head);
} else {
id = 0;
}
break;
}
break;
tail = _z_resource_list_tail(tail);
}
list = _z_resource_list_tail(list);
}

#if Z_MULTI_THREAD == 1
_z_mutex_unlock(&zn->_mutex_inner);
#endif // Z_MULTI_THREAD == 1
Expand Down
10 changes: 5 additions & 5 deletions src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ int8_t _z_handle_zenoh_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint16
_z_n_msg_declare_t decl = msg->_body._declare;
switch (decl._decl._tag) {
case _Z_DECL_KEXPR: {
_z_resource_t *res = (_z_resource_t *)z_malloc(sizeof(_z_resource_t));
res->_id = decl._decl._body._decl_kexpr._id;
res->_key = _z_keyexpr_duplicate(decl._decl._body._decl_kexpr._keyexpr);
ret = _z_register_resource(zn, res);
if (_z_register_resource(zn, decl._decl._body._decl_kexpr._keyexpr,
decl._decl._body._decl_kexpr._id, local_peer_id) == 0) {
ret = _Z_ERR_ENTITY_DECLARATION_FAILED;
}
} break;
case _Z_UNDECL_KEXPR: {
_z_unregister_resource(zn, decl._decl._body._undecl_kexpr._id, local_peer_id);
Expand Down Expand Up @@ -136,7 +136,7 @@ int8_t _z_handle_zenoh_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint16
_z_msg_err_t error = response._body._err;
_z_bytes_t payload = error._ext_value.payload;
_Z_ERROR("Received Err for query %zu: code=%d, message=%.*s\n", response._request_id, error._code,
payload.len, payload.start);
(int)payload.len, payload.start);
} break;
case _Z_RESPONSE_BODY_ACK: {
// @TODO: implement ACKs for puts/dels
Expand Down
3 changes: 2 additions & 1 deletion src/transport/multicast/link/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "zenoh-pico/transport/link/rx.h"

#include <stddef.h>
#include <stdint.h>

#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/codec/network.h"
Expand Down Expand Up @@ -91,7 +92,7 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me
} while (false); // The 1-iteration loop to use continue to break the entire loop on error

if (ret == _Z_RES_OK) {
_Z_DEBUG(">> \t transport_message_decode: %ld\n", _z_zbuf_len(&ztm->_zbuf));
_Z_DEBUG(">> \t transport_message_decode: %ju\n", (uintmax_t)_z_zbuf_len(&ztm->_zbuf));
ret = _z_transport_message_decode(t_msg, &ztm->_zbuf);
}

Expand Down
1 change: 1 addition & 0 deletions tests/z_api_alignment_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ int main(int argc, char **argv) {

printf("Undeclaring Keyexpr...");
_ret_int8 = z_undeclare_keyexpr(z_loan(s1), z_move(_ret_expr));
printf(" %02x\n", _ret_int8);
assert(_ret_int8 == 0);
assert(!z_check(_ret_expr));
printf("Ok\n");
Expand Down
2 changes: 1 addition & 1 deletion zenohpico.pc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ prefix=/usr/local
Name: zenohpico
Description:
URL:
Version: 0.10.20230821dev
Version: 0.10.20230824dev
Cflags: -I${prefix}/
Libs: -L${prefix}/ -lzenohpico

0 comments on commit 82f060f

Please sign in to comment.