Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add querier matching support #862

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,9 @@ Functions
.. autocfunction:: primitives.h::z_undeclare_querier
.. autocfunction:: primitives.h::z_querier_get
.. autocfunction:: primitives.h::z_querier_keyexpr
.. autocfunction:: primitives.h::z_querier_get_matching_status
.. autocfunction:: primitives.h::z_querier_declare_matching_listener
.. autocfunction:: primitives.h::z_querier_declare_background_matching_listener

.. autocfunction:: primitives.h::z_querier_options_default
.. autocfunction:: primitives.h::z_querier_get_options_default
Expand Down
28 changes: 27 additions & 1 deletion examples/unix/c11/z_querier.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@

#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1 && defined Z_FEATURE_UNSTABLE_API

#if Z_FEATURE_MATCHING == 1
void matching_status_handler(const z_matching_status_t *matching_status, void *arg) {
(void)arg;
if (matching_status->matching) {
printf("Querier has matching queryable.\n");
} else {
printf("Querier has NO MORE matching queryables.\n");
}
}
#endif

int main(int argc, char **argv) {
const char *selector = "demo/example/**";
const char *mode = "client";
Expand All @@ -27,9 +38,10 @@ int main(int argc, char **argv) {
const char *value = NULL;
int n = INT_MAX;
int timeout_ms = 0;
bool add_matching_listener = false;

int opt;
while ((opt = getopt(argc, argv, "s:e:m:v:l:n:t:")) != -1) {
while ((opt = getopt(argc, argv, "s:e:m:v:l:n:t:a")) != -1) {
switch (opt) {
case 's':
selector = optarg;
Expand All @@ -52,6 +64,9 @@ int main(int argc, char **argv) {
case 't':
timeout_ms = atoi(optarg);
break;
case 'a':
add_matching_listener = true;
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' ||
optopt == 'n' || optopt == 't') {
Expand Down Expand Up @@ -115,6 +130,17 @@ int main(int argc, char **argv) {
exit(-1);
}

if (add_matching_listener) {
#if Z_FEATURE_MATCHING == 1
z_owned_closure_matching_status_t callback;
z_closure(&callback, matching_status_handler, NULL, NULL);
z_querier_declare_background_matching_listener(z_loan(querier), z_move(callback));
#else
printf("ERROR: Zenoh pico was compiled without Z_FEATURE_MATCHING but this example requires it.\n");
return -2;
#endif
}

printf("Press CTRL-C to quit...\n");
char buf[256];
for (int idx = 0; idx != n; ++idx) {
Expand Down
49 changes: 49 additions & 0 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,55 @@ z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *paramete
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
const z_loaned_keyexpr_t *z_querier_keyexpr(const z_loaned_querier_t *querier);

#if Z_FEATURE_MATCHING == 1
/**
* Declares a matching listener, registering a callback for notifying queryables matching the given querier key
* expression and target. The callback will be run in the background until the corresponding querier is dropped.
*
* Parameters:
* querier: A querier to associate with matching listener.
* callback: A closure that will be called every time the matching status of the querier changes (If last
* queryable disconnects or when the first queryable connects).
*
* Return:
* ``0`` if put operation is successful, ``negative value`` otherwise.
*
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
z_result_t z_querier_declare_background_matching_listener(const z_loaned_querier_t *querier,
z_moved_closure_matching_status_t *callback);
/**
* Constructs matching listener, registering a callback for notifying queryables matching with a given querier's
* key expression and target.
*
* Parameters:
* querier: A querier to associate with matching listener.
* matching_listener: An uninitialized memory location where matching listener will be constructed. The matching
* listener's callback will be automatically dropped when the querier is dropped.
* callback: A closure that will be called every time the matching status of the querier changes (If last
* queryable disconnects or when the first queryable connects).
*
* Return:
* ``0`` if put operation is successful, ``negative value`` otherwise.
*
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
z_result_t z_querier_declare_matching_listener(const z_loaned_querier_t *querier,
z_owned_matching_listener_t *matching_listener,
z_moved_closure_matching_status_t *callback);
/**
* Gets querier matching status - i.e. if there are any queryables matching its key expression and target.
*
* Return:
* ``0`` if put operation is successful, ``negative value`` otherwise.
*
* .. warning:: This API has been marked as unstable: it works as advertised, but it may be changed in a future release.
*/
z_result_t z_querier_get_matching_status(const z_loaned_querier_t *querier, z_matching_status_t *matching_status);

#endif // Z_FEATURE_MATCHING == 1

#endif // Z_FEATURE_UNSTABLE_API

/**
Expand Down
6 changes: 3 additions & 3 deletions include/zenoh-pico/net/matching.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ typedef struct _z_matching_listener_t {

#if Z_FEATURE_MATCHING == 1
_z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _z_keyexpr_t *key, _z_zint_t entity_id,
_z_closure_matching_status_t callback);
uint8_t interest_type_flag, _z_closure_matching_status_t callback);
z_result_t _z_matching_listener_entity_undeclare(_z_session_t *zn, _z_zint_t entity_id);
z_result_t _z_matching_listener_undeclare(_z_matching_listener_t *listener);
// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_matching_listener_t _z_matching_listener_null(void) { return (_z_matching_listener_t){0}; }
static inline bool _z_matching_listener_check(const _z_matching_listener_t *matching_listener) {
return !_Z_RC_IS_NULL(&matching_listener->_zn);
}
void _z_matching_listener_clear(_z_matching_listener_t *pub);
void _z_matching_listener_free(_z_matching_listener_t **pub);
void _z_matching_listener_clear(_z_matching_listener_t *listener);
void _z_matching_listener_free(_z_matching_listener_t **listener);
#endif // Z_FEATURE_MATCHING == 1

#ifdef __cplusplus
Expand Down
3 changes: 3 additions & 0 deletions include/zenoh-pico/session/matching.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ typedef struct {
} _z_closure_matching_status_t;

#if Z_FEATURE_MATCHING == 1

#define _Z_MATCHING_LISTENER_CTX_NULL_ID 0xFFFFFFFF

typedef struct _z_matching_listener_ctx_t {
uint32_t decl_id;
_z_closure_matching_status_t callback;
Expand Down
37 changes: 33 additions & 4 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1122,8 +1122,8 @@ z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *pub
z_owned_matching_listener_t *matching_listener,
z_moved_closure_matching_status_t *callback) {
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&publisher->_zn);
_z_matching_listener_t listener =
_z_matching_listener_declare(&sess_rc, &publisher->_key, publisher->_id, callback->_this._val);
_z_matching_listener_t listener = _z_matching_listener_declare(&sess_rc, &publisher->_key, publisher->_id,
_Z_INTEREST_FLAG_SUBSCRIBERS, callback->_this._val);
_z_session_rc_drop(&sess_rc);

z_internal_closure_matching_status_null(&callback->_this);
Expand All @@ -1135,8 +1135,6 @@ z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *pub

z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher,
z_matching_status_t *matching_status) {
// Ideally this should be implemented as a real request to the router, but this works much faster.
// And it works as long as filtering is enabled along with interest
matching_status->matching = publisher->_filter.ctx->state != WRITE_FILTER_ACTIVE;
return _Z_RES_OK;
}
Expand Down Expand Up @@ -1353,6 +1351,37 @@ z_result_t z_querier_get(const z_loaned_querier_t *querier, const char *paramete
const z_loaned_keyexpr_t *z_querier_keyexpr(const z_loaned_querier_t *querier) {
return (const z_loaned_keyexpr_t *)&querier->_key;
}

#if Z_FEATURE_MATCHING == 1
z_result_t z_querier_declare_background_matching_listener(const z_loaned_querier_t *querier,
z_moved_closure_matching_status_t *callback) {
z_owned_matching_listener_t listener;
_Z_RETURN_IF_ERR(z_querier_declare_matching_listener(querier, &listener, callback));
_z_matching_listener_clear(&listener._val);
return _Z_RES_OK;
}
z_result_t z_querier_declare_matching_listener(const z_loaned_querier_t *querier,
z_owned_matching_listener_t *matching_listener,
z_moved_closure_matching_status_t *callback) {
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&querier->_zn);
_z_matching_listener_t listener = _z_matching_listener_declare(&sess_rc, &querier->_key, querier->_id,
_Z_INTEREST_FLAG_QUERYABLES, callback->_this._val);
_z_session_rc_drop(&sess_rc);

z_internal_closure_matching_status_null(&callback->_this);

matching_listener->_val = listener;

return _z_matching_listener_check(&listener) ? _Z_RES_OK : _Z_ERR_GENERIC;
}

z_result_t z_querier_get_matching_status(const z_loaned_querier_t *querier, z_matching_status_t *matching_status) {
matching_status->matching = querier->_filter.ctx->state != WRITE_FILTER_ACTIVE;
return _Z_RES_OK;
}

#endif // Z_FEATURE_MATCHING == 1

#endif // Z_FEATURE_UNSTABLE_API

bool z_reply_is_ok(const z_loaned_reply_t *reply) { return reply->data._tag != _Z_REPLY_TAG_ERROR; }
Expand Down
24 changes: 14 additions & 10 deletions src/net/matching.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "zenoh-pico/api/types.h"
#include "zenoh-pico/net/primitives.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/definitions/interest.h"
#include "zenoh-pico/session/matching.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/utils/logging.h"
Expand All @@ -28,18 +29,21 @@
#if Z_FEATURE_MATCHING == 1
static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *arg) {
_z_matching_listener_ctx_t *ctx = (_z_matching_listener_ctx_t *)arg;

switch (msg->type) {
case _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER: {
ctx->decl_id = msg->id;
z_matching_status_t status = {.matching = true};
z_closure_matching_status_call(&ctx->callback, &status);
case _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER:
case _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE: {
if (ctx->decl_id == _Z_MATCHING_LISTENER_CTX_NULL_ID) {
ctx->decl_id = msg->id;
z_matching_status_t status = {.matching = true};
z_closure_matching_status_call(&ctx->callback, &status);
}
break;
}

case _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER: {
case _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER:
case _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE: {
if (ctx->decl_id == msg->id) {
ctx->decl_id = 0;
ctx->decl_id = _Z_MATCHING_LISTENER_CTX_NULL_ID;
z_matching_status_t status = {.matching = false};
z_closure_matching_status_call(&ctx->callback, &status);
}
Expand All @@ -52,9 +56,9 @@ static void _z_matching_listener_callback(const _z_interest_msg_t *msg, void *ar
}

_z_matching_listener_t _z_matching_listener_declare(_z_session_rc_t *zn, const _z_keyexpr_t *key, _z_zint_t entity_id,
_z_closure_matching_status_t callback) {
uint8_t flags = _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_FUTURE |
_Z_INTEREST_FLAG_AGGREGATE;
uint8_t interest_type_flag, _z_closure_matching_status_t callback) {
uint8_t flags = interest_type_flag | _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_FUTURE |
_Z_INTEREST_FLAG_AGGREGATE | _Z_INTEREST_FLAG_CURRENT;
_z_matching_listener_t ret = _z_matching_listener_null();

_z_matching_listener_ctx_t *ctx = _z_matching_listener_ctx_new(callback);
Expand Down
3 changes: 3 additions & 0 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,9 @@ z_result_t _z_undeclare_querier(_z_querier_t *querier) {
if (querier == NULL || _Z_RC_IS_NULL(&querier->_zn)) {
return _Z_ERR_ENTITY_UNKNOWN;
}
#if Z_FEATURE_MATCHING == 1
_z_matching_listener_entity_undeclare(_Z_RC_IN_VAL(&querier->_zn), querier->_id);
#endif
_z_write_filter_destroy(_Z_RC_IN_VAL(&querier->_zn), &querier->_filter);
_z_undeclare_resource(_Z_RC_IN_VAL(&querier->_zn), querier->_key._id);
return _Z_RES_OK;
Expand Down
2 changes: 1 addition & 1 deletion src/session/matching.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
_z_matching_listener_ctx_t *_z_matching_listener_ctx_new(_z_closure_matching_status_t callback) {
_z_matching_listener_ctx_t *ctx = z_malloc(sizeof(_z_matching_listener_ctx_t));

ctx->decl_id = 0;
ctx->decl_id = _Z_MATCHING_LISTENER_CTX_NULL_ID;
ctx->callback = callback;

return ctx;
Expand Down
Loading
Loading