Skip to content

Commit

Permalink
Merge pull request #250 from DenisBiryukov91/feature/platform-primitives
Browse files Browse the repository at this point in the history
add synchronization primitives for zenoh-c (similar to zenoh-pico)
  • Loading branch information
milyin authored Feb 28, 2024
2 parents 1e36616 + b75fed0 commit 5d7bb83
Show file tree
Hide file tree
Showing 5 changed files with 362 additions and 13 deletions.
25 changes: 12 additions & 13 deletions examples/z_ping.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include <errno.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
Expand All @@ -15,11 +14,11 @@
#define handle_error_en(en, msg) \
do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)

pthread_cond_t cond;
pthread_mutex_t mutex;
z_condvar_t cond;
z_mutex_t mutex;

void callback(const z_sample_t* sample, void* context) { pthread_cond_signal(&cond); }
void drop(void* context) { pthread_cond_destroy(&cond); }
void callback(const z_sample_t* sample, void* context) { z_condvar_signal(&cond); }
void drop(void* context) { z_condvar_free(&cond); }

struct args_t {
unsigned int size; // -s
Expand All @@ -43,8 +42,8 @@ int main(int argc, char** argv) {
DEFAULT_PKT_SIZE, DEFAULT_PING_NB, DEFAULT_WARMUP_MS);
return 1;
}
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
z_mutex_init(&mutex);
z_condvar_init(&cond);
z_owned_config_t config = args.config_path ? zc_config_from_file(args.config_path) : z_config_default();
z_owned_session_t session = z_open(z_move(config));
z_keyexpr_t ping = z_keyexpr_unchecked("test/ping");
Expand All @@ -56,7 +55,7 @@ int main(int argc, char** argv) {
for (int i = 0; i < args.size; i++) {
data[i] = i % 10;
}
pthread_mutex_lock(&mutex);
z_mutex_lock(&mutex);
if (args.warmup_ms) {
printf("Warming up for %dms...\n", args.warmup_ms);
struct timespec wmup_start, wmup_stop, wmup_timeout;
Expand All @@ -66,9 +65,9 @@ int main(int argc, char** argv) {
clock_gettime(CLOCK_REALTIME, &wmup_timeout);
wmup_timeout.tv_sec += PING_TIMEOUT_SEC;
z_publisher_put(z_loan(pub), data, args.size, NULL);
int s = pthread_cond_timedwait(&cond, &mutex, &wmup_timeout);
int s = z_condvar_wait(&cond, &mutex);
if (s != 0) {
handle_error_en(s, "pthread_cond_timedwait");
handle_error_en(s, "z_condvar_wait");
}
clock_gettime(CLOCK_MONOTONIC, &wmup_stop);
elapsed_us =
Expand All @@ -82,17 +81,17 @@ int main(int argc, char** argv) {
t_timeout.tv_sec += PING_TIMEOUT_SEC;
clock_gettime(CLOCK_MONOTONIC, &t_start);
z_publisher_put(z_loan(pub), data, args.size, NULL);
int s = pthread_cond_timedwait(&cond, &mutex, &t_timeout);
int s = z_condvar_wait(&cond, &mutex);
if (s != 0) {
handle_error_en(s, "pthread_cond_timedwait");
handle_error_en(s, "z_condvar_wait");
}
clock_gettime(CLOCK_MONOTONIC, &t_stop);
results[i] = (1000000 * (t_stop.tv_sec - t_start.tv_sec) + (t_stop.tv_nsec - t_start.tv_nsec) / 1000);
}
for (int i = 0; i < args.number_of_pings; i++) {
printf("%d bytes: seq=%d rtt=%luµs, lat=%luµs\n", args.size, i, results[i], results[i] / 2);
}
pthread_mutex_unlock(&mutex);
z_mutex_unlock(&mutex);
z_free(results);
z_free(data);
z_drop(z_move(sub));
Expand Down
39 changes: 39 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,20 @@ typedef struct z_owned_closure_zid_t {
void (*call)(const struct z_id_t*, void*);
void (*drop)(void*);
} z_owned_closure_zid_t;
/**
* Condvar
*
*/
typedef struct z_condvar_t {
size_t _0;
} z_condvar_t;
/**
* Mutex
*
*/
typedef struct z_mutex_t {
size_t _0;
} z_mutex_t;
/**
* An owned zenoh configuration.
*
Expand Down Expand Up @@ -796,6 +810,16 @@ typedef struct z_owned_scouting_config_t {
typedef struct z_subscriber_t {
const struct z_owned_subscriber_t *_0;
} z_subscriber_t;
/**
* Task
*
*/
typedef struct z_task_t {
size_t _0;
} z_task_t;
typedef struct z_task_attr_t {
size_t _0;
} z_task_attr_t;
/**
* The options for `zc_liveliness_declare_token`
*/
Expand Down Expand Up @@ -1198,6 +1222,10 @@ ZENOHC_API void z_closure_zid_drop(struct z_owned_closure_zid_t *closure);
* Constructs a null safe-to-drop value of 'z_owned_closure_zid_t' type
*/
ZENOHC_API struct z_owned_closure_zid_t z_closure_zid_null(void);
ZENOHC_API int8_t z_condvar_free(struct z_condvar_t *cv);
ZENOHC_API int8_t z_condvar_init(struct z_condvar_t *cv);
ZENOHC_API int8_t z_condvar_signal(struct z_condvar_t *cv);
ZENOHC_API int8_t z_condvar_wait(struct z_condvar_t *cv, struct z_mutex_t *m);
/**
* Returns ``true`` if `config` is valid.
*/
Expand Down Expand Up @@ -1615,6 +1643,11 @@ ZENOHC_API struct z_owned_str_t z_keyexpr_to_string(struct z_keyexpr_t keyexpr);
*/
ZENOHC_API
struct z_keyexpr_t z_keyexpr_unchecked(const char *name);
ZENOHC_API int8_t z_mutex_free(struct z_mutex_t *m);
ZENOHC_API int8_t z_mutex_init(struct z_mutex_t *m);
ZENOHC_API int8_t z_mutex_lock(struct z_mutex_t *m);
ZENOHC_API int8_t z_mutex_try_lock(struct z_mutex_t *m);
ZENOHC_API int8_t z_mutex_unlock(struct z_mutex_t *m);
/**
* Opens a zenoh session. Should the session opening fail, `z_check` ing the returned value will return `false`.
*/
Expand Down Expand Up @@ -2028,6 +2061,12 @@ ZENOHC_API struct z_subscriber_options_t z_subscriber_options_default(void);
* sub: The :c:type:`z_owned_pull_subscriber_t` to pull from.
*/
ZENOHC_API int8_t z_subscriber_pull(struct z_pull_subscriber_t sub);
ZENOHC_API
int8_t z_task_init(struct z_task_t *task,
const struct z_task_attr_t *_attr,
void (*fun)(void *arg),
void *arg);
ZENOHC_API int8_t z_task_join(struct z_task_t *task);
/**
* Returns ``true`` if `ts` is a valid timestamp
*/
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub use publication_cache::*;
mod querying_subscriber;
pub use querying_subscriber::*;
pub mod attachment;
pub use platform::*;
pub mod platform;
#[cfg(feature = "shared-memory")]
mod shm;

Expand Down
16 changes: 16 additions & 0 deletions src/platform/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//
// Copyright (c) 2017, 2023 ZettaScale Technology.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh team, <[email protected]>
//

pub use synchronization::*;
mod synchronization;
Loading

0 comments on commit 5d7bb83

Please sign in to comment.