Skip to content

Commit

Permalink
Add ze_querying_subscriber_get method
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Nov 22, 2023
1 parent 94bb045 commit ed63054
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
9 changes: 5 additions & 4 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -2091,10 +2091,11 @@ ZENOHC_API struct ze_publication_cache_options_t ze_publication_cache_options_de
* Returns ``true`` if `sub` is valid.
*/
ZENOHC_API bool ze_querying_subscriber_check(const struct ze_owned_querying_subscriber_t *sub);
/**
* Returns ``true`` if `sub` is valid.
*/
ZENOHC_API int8_t ze_querying_subscriber_fetch(const struct ze_owned_querying_subscriber_t *sub);
ZENOHC_API
int8_t ze_querying_subscriber_get(const struct ze_owned_querying_subscriber_t *sub,
struct z_keyexpr_t selector,
struct z_owned_closure_reply_t *callback,
const struct z_get_options_t *options);
/**
* Constructs a null safe-to-drop value of 'ze_owned_querying_subscriber_t' type
*/
Expand Down
43 changes: 27 additions & 16 deletions src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@ use zenoh_protocol::core::SubInfo;
use zenoh_util::core::zresult::ErrNo;

use crate::{
impl_guarded_transmute, z_closure_sample_call, z_keyexpr_t, z_owned_closure_sample_t,
z_query_consolidation_none, z_query_consolidation_t, z_query_target_default, z_query_target_t,
z_reliability_t, z_sample_t, z_session_t, zcu_locality_default, zcu_locality_t,
zcu_reply_keyexpr_default, zcu_reply_keyexpr_t, GuardedTransmute, LOG_INVALID_SESSION,
impl_guarded_transmute, z_closure_sample_call, z_get, z_get_options_t, z_keyexpr_t,
z_owned_closure_reply_t, z_owned_closure_sample_t, z_query_consolidation_none,
z_query_consolidation_t, z_query_target_default, z_query_target_t, z_reliability_t, z_sample_t,
z_session_t, zcu_locality_default, zcu_locality_t, zcu_reply_keyexpr_default,
zcu_reply_keyexpr_t, GuardedTransmute, LOG_INVALID_SESSION,
};

type FetchingSubscriber = Option<Box<zenoh_ext::FetchingSubscriber<'static, ()>>>;
struct FetchingSubscriberWrapper {
fetching_subscriber: zenoh_ext::FetchingSubscriber<'static, ()>,
session: z_session_t,
}
type FetchingSubscriber = Option<Box<FetchingSubscriberWrapper>>;
//type FetchingSubscriber = Option<Box<zenoh_ext::FetchingSubscriber<'static, ()>>>;

/// An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription.
///
Expand Down Expand Up @@ -72,8 +78,12 @@ impl AsMut<FetchingSubscriber> for ze_owned_querying_subscriber_t {
}

impl ze_owned_querying_subscriber_t {
pub fn new(sub: zenoh_ext::FetchingSubscriber<'static, ()>) -> Self {
Some(Box::new(sub)).into()
pub fn new(sub: zenoh_ext::FetchingSubscriber<'static, ()>, session: z_session_t) -> Self {
Some(Box::new(FetchingSubscriberWrapper {
fetching_subscriber: sub,
session: session,
}))
.into()
}
pub fn null() -> Self {
None.into()
Expand Down Expand Up @@ -203,7 +213,7 @@ pub unsafe extern "C" fn ze_declare_querying_subscriber(
})
.res()
{
Ok(sub) => ze_owned_querying_subscriber_t::new(sub),
Ok(sub) => ze_owned_querying_subscriber_t::new(sub, session),
Err(e) => {
log::debug!("{}", e);
ze_owned_querying_subscriber_t::null()
Expand All @@ -217,15 +227,16 @@ pub unsafe extern "C" fn ze_declare_querying_subscriber(
}
}

/// Returns ``true`` if `sub` is valid.
#[allow(clippy::missing_safety_doc)]
#[no_mangle]
pub extern "C" fn ze_querying_subscriber_fetch(sub: &ze_owned_querying_subscriber_t) -> i8 {
if let Some(s) = sub.as_mut().take() {
if let Err(e) = s.fetch(s.callback).res_sync() {
log::warn!("{}", e);
return e.errno().get();
}
pub unsafe extern "C" fn ze_querying_subscriber_get(
sub: &ze_owned_querying_subscriber_t,
selector: z_keyexpr_t,
callback: &mut z_owned_closure_reply_t,
options: Option<&z_get_options_t>,
) -> i8 {
if let Some(s) = sub.as_ref() {
return z_get(s.session, selector, std::ptr::null(), callback, options);
}
0
}
Expand All @@ -235,7 +246,7 @@ pub extern "C" fn ze_querying_subscriber_fetch(sub: &ze_owned_querying_subscribe
#[no_mangle]
pub extern "C" fn ze_undeclare_querying_subscriber(sub: &mut ze_owned_querying_subscriber_t) -> i8 {
if let Some(s) = sub.as_mut().take() {
if let Err(e) = s.close().res_sync() {
if let Err(e) = s.fetching_subscriber.close().res_sync() {
log::warn!("{}", e);
return e.errno().get();
}
Expand Down

0 comments on commit ed63054

Please sign in to comment.