diff --git a/examples/z_ping.c b/examples/z_ping.c index 45393aaec..bb061baee 100644 --- a/examples/z_ping.c +++ b/examples/z_ping.c @@ -1,5 +1,4 @@ #include -#include #include #include #include @@ -15,11 +14,11 @@ #define handle_error_en(en, msg) \ do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0) -pthread_cond_t cond; -pthread_mutex_t mutex; +z_condvar_t cond; +z_mutex_t mutex; -void callback(const z_sample_t* sample, void* context) { pthread_cond_signal(&cond); } -void drop(void* context) { pthread_cond_destroy(&cond); } +void callback(const z_sample_t* sample, void* context) { z_condvar_signal(&cond); } +void drop(void* context) { z_condvar_free(&cond); } struct args_t { unsigned int size; // -s @@ -43,8 +42,8 @@ int main(int argc, char** argv) { DEFAULT_PKT_SIZE, DEFAULT_PING_NB, DEFAULT_WARMUP_MS); return 1; } - pthread_mutex_init(&mutex, NULL); - pthread_cond_init(&cond, NULL); + z_mutex_init(&mutex); + z_condvar_init(&cond); z_owned_config_t config = args.config_path ? zc_config_from_file(args.config_path) : z_config_default(); z_owned_session_t session = z_open(z_move(config)); z_keyexpr_t ping = z_keyexpr_unchecked("test/ping"); @@ -56,7 +55,7 @@ int main(int argc, char** argv) { for (int i = 0; i < args.size; i++) { data[i] = i % 10; } - pthread_mutex_lock(&mutex); + z_mutex_lock(&mutex); if (args.warmup_ms) { printf("Warming up for %dms...\n", args.warmup_ms); struct timespec wmup_start, wmup_stop, wmup_timeout; @@ -66,9 +65,9 @@ int main(int argc, char** argv) { clock_gettime(CLOCK_REALTIME, &wmup_timeout); wmup_timeout.tv_sec += PING_TIMEOUT_SEC; z_publisher_put(z_loan(pub), data, args.size, NULL); - int s = pthread_cond_timedwait(&cond, &mutex, &wmup_timeout); + int s = z_condvar_wait(&cond, &mutex); if (s != 0) { - handle_error_en(s, "pthread_cond_timedwait"); + handle_error_en(s, "z_condvar_wait"); } clock_gettime(CLOCK_MONOTONIC, &wmup_stop); elapsed_us = @@ -82,9 +81,9 @@ int main(int argc, char** argv) { t_timeout.tv_sec += PING_TIMEOUT_SEC; clock_gettime(CLOCK_MONOTONIC, &t_start); z_publisher_put(z_loan(pub), data, args.size, NULL); - int s = pthread_cond_timedwait(&cond, &mutex, &t_timeout); + int s = z_condvar_wait(&cond, &mutex); if (s != 0) { - handle_error_en(s, "pthread_cond_timedwait"); + handle_error_en(s, "z_condvar_wait"); } clock_gettime(CLOCK_MONOTONIC, &t_stop); results[i] = (1000000 * (t_stop.tv_sec - t_start.tv_sec) + (t_stop.tv_nsec - t_start.tv_nsec) / 1000); @@ -92,7 +91,7 @@ int main(int argc, char** argv) { for (int i = 0; i < args.number_of_pings; i++) { printf("%d bytes: seq=%d rtt=%luµs, lat=%luµs\n", args.size, i, results[i], results[i] / 2); } - pthread_mutex_unlock(&mutex); + z_mutex_unlock(&mutex); z_free(results); z_free(data); z_drop(z_move(sub)); diff --git a/include/zenoh_commons.h b/include/zenoh_commons.h index a93304fd6..9bf02fe81 100644 --- a/include/zenoh_commons.h +++ b/include/zenoh_commons.h @@ -470,6 +470,20 @@ typedef struct z_owned_closure_zid_t { void (*call)(const struct z_id_t*, void*); void (*drop)(void*); } z_owned_closure_zid_t; +/** + * Condvar + * + */ +typedef struct z_condvar_t { + size_t _0; +} z_condvar_t; +/** + * Mutex + * + */ +typedef struct z_mutex_t { + size_t _0; +} z_mutex_t; /** * An owned zenoh configuration. * @@ -796,6 +810,16 @@ typedef struct z_owned_scouting_config_t { typedef struct z_subscriber_t { const struct z_owned_subscriber_t *_0; } z_subscriber_t; +/** + * Task + * + */ +typedef struct z_task_t { + size_t _0; +} z_task_t; +typedef struct z_task_attr_t { + size_t _0; +} z_task_attr_t; /** * The options for `zc_liveliness_declare_token` */ @@ -1198,6 +1222,10 @@ ZENOHC_API void z_closure_zid_drop(struct z_owned_closure_zid_t *closure); * Constructs a null safe-to-drop value of 'z_owned_closure_zid_t' type */ ZENOHC_API struct z_owned_closure_zid_t z_closure_zid_null(void); +ZENOHC_API int8_t z_condvar_free(struct z_condvar_t *cv); +ZENOHC_API int8_t z_condvar_init(struct z_condvar_t *cv); +ZENOHC_API int8_t z_condvar_signal(struct z_condvar_t *cv); +ZENOHC_API int8_t z_condvar_wait(struct z_condvar_t *cv, struct z_mutex_t *m); /** * Returns ``true`` if `config` is valid. */ @@ -1615,6 +1643,11 @@ ZENOHC_API struct z_owned_str_t z_keyexpr_to_string(struct z_keyexpr_t keyexpr); */ ZENOHC_API struct z_keyexpr_t z_keyexpr_unchecked(const char *name); +ZENOHC_API int8_t z_mutex_free(struct z_mutex_t *m); +ZENOHC_API int8_t z_mutex_init(struct z_mutex_t *m); +ZENOHC_API int8_t z_mutex_lock(struct z_mutex_t *m); +ZENOHC_API int8_t z_mutex_try_lock(struct z_mutex_t *m); +ZENOHC_API int8_t z_mutex_unlock(struct z_mutex_t *m); /** * Opens a zenoh session. Should the session opening fail, `z_check` ing the returned value will return `false`. */ @@ -2028,6 +2061,12 @@ ZENOHC_API struct z_subscriber_options_t z_subscriber_options_default(void); * sub: The :c:type:`z_owned_pull_subscriber_t` to pull from. */ ZENOHC_API int8_t z_subscriber_pull(struct z_pull_subscriber_t sub); +ZENOHC_API +int8_t z_task_init(struct z_task_t *task, + const struct z_task_attr_t *_attr, + void (*fun)(void *arg), + void *arg); +ZENOHC_API int8_t z_task_join(struct z_task_t *task); /** * Returns ``true`` if `ts` is a valid timestamp */ diff --git a/src/lib.rs b/src/lib.rs index b508855e0..718475444 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,8 @@ pub use publication_cache::*; mod querying_subscriber; pub use querying_subscriber::*; pub mod attachment; +pub use platform::*; +pub mod platform; #[cfg(feature = "shared-memory")] mod shm; diff --git a/src/platform/mod.rs b/src/platform/mod.rs new file mode 100644 index 000000000..c9f87a9c7 --- /dev/null +++ b/src/platform/mod.rs @@ -0,0 +1,16 @@ +// +// Copyright (c) 2017, 2023 ZettaScale Technology. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh team, +// + +pub use synchronization::*; +mod synchronization; diff --git a/src/platform/synchronization.rs b/src/platform/synchronization.rs new file mode 100644 index 000000000..dbd9d94b8 --- /dev/null +++ b/src/platform/synchronization.rs @@ -0,0 +1,293 @@ +use std::{ + sync::{Condvar, Mutex, MutexGuard}, + thread::{self, JoinHandle}, +}; + +use libc::c_void; + +use crate::{impl_guarded_transmute, GuardedTransmute}; + +pub struct ZMutex<'a> { + mutex: Mutex<()>, + lock: Option>, +} + +pub struct ZMutexPtr { + data: Option>>, +} + +/// Mutex +/// +#[repr(C)] +#[derive(Clone, Copy)] +pub struct z_mutex_t(usize); + +impl_guarded_transmute!(z_mutex_t, ZMutexPtr); +impl_guarded_transmute!(ZMutexPtr, z_mutex_t); + +// using the same error codes as in GNU pthreads, but with negative sign +// due to convention to return negative values on error +const EBUSY: i8 = -16; +const EINVAL: i8 = -22; +const EAGAIN: i8 = -11; +const EPOISON: i8 = -22; // same as EINVAL + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_mutex_init(m: *mut z_mutex_t) -> i8 { + if m.is_null() { + return EINVAL; + } + let t = ZMutexPtr { + data: Some(Box::new(ZMutex { + mutex: Mutex::new(()), + lock: None, + })), + }; + *m = t.transmute(); + 0 +} + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_mutex_free(m: *mut z_mutex_t) -> i8 { + if m.is_null() { + return EINVAL; + } + let mut t = (*m).transmute(); + + t.data.take(); + *m = t.transmute(); + 0 +} + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_mutex_lock(m: *mut z_mutex_t) -> i8 { + if m.is_null() { + return EINVAL; + } + let mut t = (*m).transmute(); + if t.data.is_none() { + return EINVAL; + } + let mut_data = t.data.as_mut().unwrap(); + match mut_data.mutex.lock() { + Ok(new_lock) => { + let old_lock = mut_data.lock.replace(std::mem::transmute(new_lock)); + std::mem::forget(old_lock); + } + Err(_) => { + return EPOISON; + } + } + + *m = t.transmute(); + 0 +} + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_mutex_unlock(m: *mut z_mutex_t) -> i8 { + if m.is_null() { + return EINVAL; + } + let mut t = (*m).transmute(); + if t.data.is_none() { + return EINVAL; + } + let mut_data = t.data.as_mut().unwrap(); + if mut_data.lock.is_none() { + return EINVAL; + } else { + mut_data.lock.take(); + } + *m = t.transmute(); + 0 +} + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_mutex_try_lock(m: *mut z_mutex_t) -> i8 { + if m.is_null() { + return EINVAL; + } + let mut t = (*m).transmute(); + if t.data.is_none() { + return EINVAL; + } + let mut_data = t.data.as_mut().unwrap(); + let mut ret: i8 = 0; + match mut_data.mutex.try_lock() { + Ok(new_lock) => { + let old_lock = mut_data.lock.replace(std::mem::transmute(new_lock)); + std::mem::forget(old_lock); + } + Err(_) => { + ret = EBUSY; + } + } + *m = t.transmute(); + ret +} + +struct ZCondvarPtr { + data: Option>, +} + +/// Condvar +/// +#[repr(C)] +#[derive(Clone, Copy)] +pub struct z_condvar_t(usize); + +impl_guarded_transmute!(z_condvar_t, ZCondvarPtr); +impl_guarded_transmute!(ZCondvarPtr, z_condvar_t); + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_condvar_init(cv: *mut z_condvar_t) -> i8 { + if cv.is_null() { + return EINVAL; + } + let t: ZCondvarPtr = ZCondvarPtr { + data: Some(Box::new(Condvar::new())), + }; + *cv = t.transmute(); + 0 +} + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_condvar_free(cv: *mut z_condvar_t) -> i8 { + if cv.is_null() { + return EINVAL; + } + let mut t = (*cv).transmute(); + if t.data.is_none() { + return EINVAL; + } + t.data.take(); + *cv = t.transmute(); + 0 +} + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_condvar_signal(cv: *mut z_condvar_t) -> i8 { + if cv.is_null() { + return EINVAL; + } + let t = (*cv).transmute(); + if t.data.is_none() { + return EINVAL; + } + t.data.as_ref().unwrap().notify_one(); + *cv = t.transmute(); + 0 +} + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_condvar_wait(cv: *mut z_condvar_t, m: *mut z_mutex_t) -> i8 { + if cv.is_null() { + return EINVAL; + } + let tcv = (*cv).transmute(); + if tcv.data.is_none() { + return EINVAL; + } + if m.is_null() { + return EINVAL; + } + let mut tm = (*m).transmute(); + if tm.data.is_none() || tm.data.as_ref().unwrap().lock.is_none() { + return EINVAL; + } + let mut_data = tm.data.as_mut().unwrap(); + let lock = mut_data.lock.take().unwrap(); + match tcv.data.as_ref().unwrap().wait(lock) { + Ok(new_lock) => mut_data.lock = Some(std::mem::transmute(new_lock)), + Err(_) => return EPOISON, + } + *cv = tcv.transmute(); + *m = tm.transmute(); + 0 +} + +struct ZTask { + join_handle: JoinHandle<()>, +} + +struct ZTaskPtr { + data: Option>, +} + +/// Task +/// +#[repr(C)] +#[derive(Clone, Copy)] +pub struct z_task_t(usize); + +#[repr(C)] +#[derive(Clone, Copy)] +pub struct z_task_attr_t(usize); + +impl_guarded_transmute!(z_task_t, ZTaskPtr); +impl_guarded_transmute!(ZTaskPtr, z_task_t); + +struct FunArgPair { + fun: unsafe extern "C" fn(arg: *mut c_void), + arg: *mut c_void, +} + +impl FunArgPair { + unsafe fn call(self) { + (self.fun)(self.arg); + } +} + +unsafe impl Send for FunArgPair {} + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_task_init( + task: *mut z_task_t, + _attr: *const z_task_attr_t, + fun: unsafe extern "C" fn(arg: *mut c_void), + arg: *mut c_void, +) -> i8 { + if task.is_null() { + return EINVAL; + } + + let mut ttask = ZTaskPtr { data: None }; + let fun_arg_pair = FunArgPair { fun, arg }; + + let mut ret = 0; + match thread::Builder::new().spawn(move || fun_arg_pair.call()) { + Ok(join_handle) => ttask.data = Some(Box::new(ZTask { join_handle })), + Err(_) => ret = EAGAIN, + } + *task = ttask.transmute(); + ret +} + +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn z_task_join(task: *mut z_task_t) -> i8 { + if task.is_null() { + return EINVAL; + } + let mut ttask = (*task).transmute(); + if ttask.data.is_none() { + return EINVAL; + } + let data = ttask.data.take(); + let ret = match data.unwrap().join_handle.join() { + Ok(_) => 0, + Err(_) => EINVAL, + }; + *task = ttask.transmute(); + ret +}