Skip to content

Commit

Permalink
Merge pull request #192 from ZettaScaleLabs/pointer_to_optional_refac…
Browse files Browse the repository at this point in the history
…toring

Options processing refactoring
  • Loading branch information
p-avital authored Nov 17, 2023
2 parents 193fe45 + fe120c8 commit 9dd8fc7
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 177 deletions.
50 changes: 0 additions & 50 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -935,31 +935,6 @@ struct z_owned_publisher_t z_declare_publisher(struct z_session_t session,
*
* z_subscriber_options_t opts = z_subscriber_options_default();
* z_owned_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts);
*
* Passing custom arguments to the **callback** can be done by defining a custom structure:
*
* .. code-block:: C
*
* typedef struct {
* z_keyexpr_t forward;
* z_session_t session;
* } myargs_t;
*
* void callback(const z_sample_t sample, const void *arg)
* {
* myargs_t *myargs = (myargs_t *)arg;
* z_put(myargs->session, myargs->forward, sample->value, NULL);
* }
*
* int main() {
* myargs_t cargs = {
* forward = z_keyexpr("forward"),
* session = s,
* };
* z_pull_subscriber_options_t opts = z_pull_subscriber_options_default();
* opts.cargs = (void *)&cargs;
* z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts);
* }
*/
ZENOHC_API
struct z_owned_pull_subscriber_t z_declare_pull_subscriber(struct z_session_t session,
Expand Down Expand Up @@ -1015,31 +990,6 @@ struct z_owned_queryable_t z_declare_queryable(struct z_session_t session,
*
* z_subscriber_options_t opts = z_subscriber_options_default();
* z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts);
*
* Passing custom arguments to the **callback** can be done by defining a custom structure:
*
* .. code-block:: C
*
* typedef struct {
* z_keyexpr_t forward;
* z_session_t session;
* } myargs_t;
*
* void callback(const z_sample_t sample, const void *arg)
* {
* myargs_t *myargs = (myargs_t *)arg;
* z_put(myargs->session, myargs->forward, sample->value, NULL);
* }
*
* int main() {
* myargs_t cargs = {
* forward = z_keyexpr("forward"),
* session = s,
* };
* z_subscriber_options_t opts = z_subscriber_options_default();
* opts.cargs = (void *)&cargs;
* z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts);
* }
*/
ZENOHC_API
struct z_owned_subscriber_t z_declare_subscriber(struct z_session_t session,
Expand Down
46 changes: 8 additions & 38 deletions src/pull_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::LOG_INVALID_SESSION;
use zenoh::prelude::sync::SyncResolve;
use zenoh::prelude::SessionDeclarations;
use zenoh::prelude::SplitBuffer;
use zenoh::subscriber::Reliability;
use zenoh_protocol::core::SubInfo;
use zenoh_util::core::zresult::ErrNo;

Expand Down Expand Up @@ -148,50 +147,20 @@ pub extern "C" fn z_pull_subscriber_options_default() -> z_pull_subscriber_optio
///
/// z_subscriber_options_t opts = z_subscriber_options_default();
/// z_owned_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts);
///
/// Passing custom arguments to the **callback** can be done by defining a custom structure:
///
/// .. code-block:: C
///
/// typedef struct {
/// z_keyexpr_t forward;
/// z_session_t session;
/// } myargs_t;
///
/// void callback(const z_sample_t sample, const void *arg)
/// {
/// myargs_t *myargs = (myargs_t *)arg;
/// z_put(myargs->session, myargs->forward, sample->value, NULL);
/// }
///
/// int main() {
/// myargs_t cargs = {
/// forward = z_keyexpr("forward"),
/// session = s,
/// };
/// z_pull_subscriber_options_t opts = z_pull_subscriber_options_default();
/// opts.cargs = (void *)&cargs;
/// z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts);
/// }
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn z_declare_pull_subscriber(
pub extern "C" fn z_declare_pull_subscriber(
session: z_session_t,
keyexpr: z_keyexpr_t,
callback: &mut z_owned_closure_sample_t,
mut opts: *const z_pull_subscriber_options_t,
opts: Option<&z_pull_subscriber_options_t>,
) -> z_owned_pull_subscriber_t {
let mut closure = z_owned_closure_sample_t::empty();
std::mem::swap(callback, &mut closure);

match session.upgrade() {
Some(s) => {
if opts.is_null() {
let default = z_pull_subscriber_options_default();
opts = &default;
}
let reliability: Reliability = (*opts).reliability.into();
let res = s
let mut res = s
.declare_subscriber(keyexpr)
.callback(move |sample| {
let payload = sample.payload.contiguous();
Expand All @@ -202,10 +171,11 @@ pub unsafe extern "C" fn z_declare_pull_subscriber(
let sample = z_sample_t::new(&sample, &owner);
z_closure_sample_call(&closure, &sample)
})
.reliability(reliability)
.pull_mode()
.res();
match res {
.pull_mode();
if let Some(opts) = opts {
res = res.reliability(opts.reliability.into())
}
match res.res() {
Ok(sub) => z_owned_pull_subscriber_t::new(sub),
Err(e) => {
log::debug!("{}", e);
Expand Down
81 changes: 37 additions & 44 deletions src/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ pub struct z_put_options_t {
/// Constructs the default value for :c:type:`z_put_options_t`.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn z_put_options_default() -> z_put_options_t {
pub extern "C" fn z_put_options_default() -> z_put_options_t {
z_put_options_t {
encoding: z_encoding_default(),
congestion_control: CongestionControl::default().into(),
Expand All @@ -144,22 +144,20 @@ pub unsafe extern "C" fn z_put(
keyexpr: z_keyexpr_t,
payload: *const u8,
len: size_t,
mut opts: *const z_put_options_t,
opts: Option<&z_put_options_t>,
) -> i8 {
match session.upgrade() {
Some(s) => {
let default = z_put_options_default();
if opts.is_null() {
opts = &default;
}
match s
let mut res = s
.put(keyexpr, std::slice::from_raw_parts(payload, len))
.encoding((*opts).encoding)
.kind(SampleKind::Put)
.congestion_control((*opts).congestion_control.into())
.priority((*opts).priority.into())
.res_sync()
{
.kind(SampleKind::Put);
if let Some(opts) = opts {
res = res
.encoding(opts.encoding)
.congestion_control(opts.congestion_control.into())
.priority(opts.priority.into());
}
match res.res_sync() {
Err(e) => {
log::error!("{}", e);
e.errno().get()
Expand Down Expand Up @@ -191,27 +189,23 @@ pub unsafe extern "C" fn z_put(
/// ``0`` in case of success, negative values in case of failure.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn zc_put_owned(
pub extern "C" fn zc_put_owned(
session: z_session_t,
keyexpr: z_keyexpr_t,
payload: Option<&mut zc_owned_payload_t>,
mut opts: *const z_put_options_t,
opts: Option<&z_put_options_t>,
) -> i8 {
match session.upgrade() {
Some(s) => {
let default = z_put_options_default();
if opts.is_null() {
opts = &default;
}
if let Some(payload) = payload.and_then(|p| p.take()) {
match s
.put(keyexpr, payload)
.encoding((*opts).encoding)
.kind(SampleKind::Put)
.congestion_control((*opts).congestion_control.into())
.priority((*opts).priority.into())
.res_sync()
{
let mut res = s.put(keyexpr, payload).kind(SampleKind::Put);
if let Some(opts) = opts {
res = res
.encoding(opts.encoding)
.congestion_control(opts.congestion_control.into())
.priority(opts.priority.into());
}
match res.res_sync() {
Err(e) => {
log::error!("{}", e);
e.errno().get()
Expand Down Expand Up @@ -258,28 +252,27 @@ pub unsafe extern "C" fn z_delete_options_default() -> z_delete_options_t {
/// ``0`` in case of success, negative values in case of failure.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn z_delete(
pub extern "C" fn z_delete(
session: z_session_t,
keyexpr: z_keyexpr_t,
mut opts: *const z_delete_options_t,
opts: Option<&z_delete_options_t>,
) -> i8 {
let default = z_delete_options_default();
if opts.is_null() {
opts = &default;
}
match session.upgrade() {
Some(s) => match s
.delete(keyexpr)
.congestion_control((*opts).congestion_control.into())
.priority((*opts).priority.into())
.res_sync()
{
Err(e) => {
log::error!("{}", e);
e.errno().get()
Some(s) => {
let mut res = s.delete(keyexpr);
if let Some(opts) = opts {
res = res
.congestion_control(opts.congestion_control.into())
.priority(opts.priority.into());
}
Ok(()) => 0,
},
match res.res_sync() {
Err(e) => {
log::error!("{}", e);
e.errno().get()
}
Ok(()) => 0,
}
}
None => {
log::debug!("{}", LOG_INVALID_SESSION);
i8::MIN
Expand Down
59 changes: 14 additions & 45 deletions src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,63 +185,32 @@ pub extern "C" fn z_subscriber_options_default() -> z_subscriber_options_t {
///
/// z_subscriber_options_t opts = z_subscriber_options_default();
/// z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts);
///
/// Passing custom arguments to the **callback** can be done by defining a custom structure:
///
/// .. code-block:: C
///
/// typedef struct {
/// z_keyexpr_t forward;
/// z_session_t session;
/// } myargs_t;
///
/// void callback(const z_sample_t sample, const void *arg)
/// {
/// myargs_t *myargs = (myargs_t *)arg;
/// z_put(myargs->session, myargs->forward, sample->value, NULL);
/// }
///
/// int main() {
/// myargs_t cargs = {
/// forward = z_keyexpr("forward"),
/// session = s,
/// };
/// z_subscriber_options_t opts = z_subscriber_options_default();
/// opts.cargs = (void *)&cargs;
/// z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts);
/// }
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn z_declare_subscriber(
pub extern "C" fn z_declare_subscriber(
session: z_session_t,
keyexpr: z_keyexpr_t,
callback: &mut z_owned_closure_sample_t,
mut opts: *const z_subscriber_options_t,
opts: Option<&z_subscriber_options_t>,
) -> z_owned_subscriber_t {
let mut closure = z_owned_closure_sample_t::empty();
std::mem::swap(callback, &mut closure);

match session.upgrade() {
Some(s) => {
if opts.is_null() {
let default = z_subscriber_options_default();
opts = &default;
let mut res = s.declare_subscriber(keyexpr).callback(move |sample| {
let payload = sample.payload.contiguous();
let owner = match payload {
std::borrow::Cow::Owned(v) => zenoh::buffers::ZBuf::from(v),
_ => sample.payload.clone(),
};
let sample = z_sample_t::new(&sample, &owner);
z_closure_sample_call(&closure, &sample)
});
if let Some(opts) = opts {
res = res.reliability(opts.reliability.into())
}
let reliability: Reliability = (*opts).reliability.into();
let res = s
.declare_subscriber(keyexpr)
.callback(move |sample| {
let payload = sample.payload.contiguous();
let owner = match payload {
std::borrow::Cow::Owned(v) => zenoh::buffers::ZBuf::from(v),
_ => sample.payload.clone(),
};
let sample = z_sample_t::new(&sample, &owner);
z_closure_sample_call(&closure, &sample)
})
.reliability(reliability)
.res();
match res {
match res.res() {
Ok(sub) => z_owned_subscriber_t::new(sub),
Err(e) => {
log::debug!("{}", e);
Expand Down

0 comments on commit 9dd8fc7

Please sign in to comment.