From ed630542b6427550fd24a064db8427bfc8b1cf97 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 22 Nov 2023 15:49:28 +0100 Subject: [PATCH] Add ze_querying_subscriber_get method --- include/zenoh_commons.h | 9 ++++---- src/querying_subscriber.rs | 43 ++++++++++++++++++++++++-------------- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index 1f05153d7..377b2d7fc 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -2091,10 +2091,11 @@ ZENOHC_API struct ze_publication_cache_options_t ze_publication_cache_options_de * Returns ``true`` if `sub` is valid. */ ZENOHC_API bool ze_querying_subscriber_check(const struct ze_owned_querying_subscriber_t *sub); -/** - * Returns ``true`` if `sub` is valid. - */ -ZENOHC_API int8_t ze_querying_subscriber_fetch(const struct ze_owned_querying_subscriber_t *sub); +ZENOHC_API +int8_t ze_querying_subscriber_get(const struct ze_owned_querying_subscriber_t *sub, + struct z_keyexpr_t selector, + struct z_owned_closure_reply_t *callback, + const struct z_get_options_t *options); /** * Constructs a null safe-to-drop value of 'ze_owned_querying_subscriber_t' type */ diff --git a/src/querying_subscriber.rs b/src/querying_subscriber.rs index 1d48df93c..e4ac2f8a2 100644 --- a/src/querying_subscriber.rs +++ b/src/querying_subscriber.rs @@ -20,13 +20,19 @@ use zenoh_protocol::core::SubInfo; use zenoh_util::core::zresult::ErrNo; use crate::{ - impl_guarded_transmute, z_closure_sample_call, z_keyexpr_t, z_owned_closure_sample_t, - z_query_consolidation_none, z_query_consolidation_t, z_query_target_default, z_query_target_t, - z_reliability_t, z_sample_t, z_session_t, zcu_locality_default, zcu_locality_t, - zcu_reply_keyexpr_default, zcu_reply_keyexpr_t, GuardedTransmute, LOG_INVALID_SESSION, + impl_guarded_transmute, z_closure_sample_call, z_get, z_get_options_t, z_keyexpr_t, + z_owned_closure_reply_t, z_owned_closure_sample_t, z_query_consolidation_none, + z_query_consolidation_t, z_query_target_default, z_query_target_t, z_reliability_t, z_sample_t, + z_session_t, zcu_locality_default, zcu_locality_t, zcu_reply_keyexpr_default, + zcu_reply_keyexpr_t, GuardedTransmute, LOG_INVALID_SESSION, }; -type FetchingSubscriber = Option>>; +struct FetchingSubscriberWrapper { + fetching_subscriber: zenoh_ext::FetchingSubscriber<'static, ()>, + session: z_session_t, +} +type FetchingSubscriber = Option>; +//type FetchingSubscriber = Option>>; /// An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription. /// @@ -72,8 +78,12 @@ impl AsMut for ze_owned_querying_subscriber_t { } impl ze_owned_querying_subscriber_t { - pub fn new(sub: zenoh_ext::FetchingSubscriber<'static, ()>) -> Self { - Some(Box::new(sub)).into() + pub fn new(sub: zenoh_ext::FetchingSubscriber<'static, ()>, session: z_session_t) -> Self { + Some(Box::new(FetchingSubscriberWrapper { + fetching_subscriber: sub, + session: session, + })) + .into() } pub fn null() -> Self { None.into() @@ -203,7 +213,7 @@ pub unsafe extern "C" fn ze_declare_querying_subscriber( }) .res() { - Ok(sub) => ze_owned_querying_subscriber_t::new(sub), + Ok(sub) => ze_owned_querying_subscriber_t::new(sub, session), Err(e) => { log::debug!("{}", e); ze_owned_querying_subscriber_t::null() @@ -217,15 +227,16 @@ pub unsafe extern "C" fn ze_declare_querying_subscriber( } } -/// Returns ``true`` if `sub` is valid. #[allow(clippy::missing_safety_doc)] #[no_mangle] -pub extern "C" fn ze_querying_subscriber_fetch(sub: &ze_owned_querying_subscriber_t) -> i8 { - if let Some(s) = sub.as_mut().take() { - if let Err(e) = s.fetch(s.callback).res_sync() { - log::warn!("{}", e); - return e.errno().get(); - } +pub unsafe extern "C" fn ze_querying_subscriber_get( + sub: &ze_owned_querying_subscriber_t, + selector: z_keyexpr_t, + callback: &mut z_owned_closure_reply_t, + options: Option<&z_get_options_t>, +) -> i8 { + if let Some(s) = sub.as_ref() { + return z_get(s.session, selector, std::ptr::null(), callback, options); } 0 } @@ -235,7 +246,7 @@ pub extern "C" fn ze_querying_subscriber_fetch(sub: &ze_owned_querying_subscribe #[no_mangle] pub extern "C" fn ze_undeclare_querying_subscriber(sub: &mut ze_owned_querying_subscriber_t) -> i8 { if let Some(s) = sub.as_mut().take() { - if let Err(e) = s.close().res_sync() { + if let Err(e) = s.fetching_subscriber.close().res_sync() { log::warn!("{}", e); return e.errno().get(); }