Skip to content

Commit

Permalink
Hide all shm stuff behind #ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Aug 20, 2024
1 parent 8f36772 commit 9fac5ee
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 11 deletions.
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_data_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ class rmw_context_impl_s final
// An owned session.
z_owned_session_t session;

// TODO(yuyuan): SHM
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<z_owned_shm_provider_t> shm_provider;
#endif

z_owned_subscriber_t graph_subscriber;

Expand Down
13 changes: 11 additions & 2 deletions rmw_zenoh_cpp/src/rmw_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@


extern "C" {
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Megabytes of SHM to reserve.
// TODO(clalancette): Make this configurable, or get it from the configuration
#define SHM_BUFFER_SIZE_MB 10
#endif

namespace
{
Expand Down Expand Up @@ -177,13 +179,14 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) {
return ret;
}

// TODO(yuyuan): SHM
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
z_owned_string_t shm_enabled;
zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled);
auto free_shm_= rcpputils::make_scope_exit(
[&shm_enabled]() {
z_drop(z_move(shm_enabled));
});
#endif

// Initialize the zenoh session.
if(z_open(&context->impl->session, z_move(config))) {
Expand Down Expand Up @@ -221,7 +224,7 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) {
}
}

// TODO(yuyuan): SHM
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Initialize the shm manager if shared_memory is enabled in the config.
if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) {
printf(">>> SHM is enabled\n");
Expand All @@ -245,6 +248,7 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) {
z_drop(z_move(context->impl->shm_provider.value()));
}
});
#endif

// Initialize the guard condition.
context->impl->graph_guard_condition =
Expand Down Expand Up @@ -380,7 +384,9 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) {
free_options.cancel();
impl_destructor.cancel();
free_impl.cancel();
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
free_shm_provider.cancel();
#endif
restore_context.cancel();

return RMW_RET_OK;
Expand All @@ -397,9 +403,12 @@ rmw_ret_t rmw_shutdown(rmw_context_t *context) {
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

z_undeclare_subscriber(z_move(context->impl->graph_subscriber));
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Drop SHM provider
if (context->impl->shm_provider.has_value()) {
z_drop(z_move(context->impl->shm_provider.value()));
}
#endif
// Close the zenoh session
if (z_close(z_move(context->impl->session)) < 0) {
RMW_SET_ERROR_MSG("Error while closing zenoh session");
Expand Down
31 changes: 23 additions & 8 deletions rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ find_service_type_support(const rosidl_service_type_support_t *type_supports) {

extern "C" {

#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// TODO(yuyuan): SHM, make this configurable
#define SHM_BUF_OK_SIZE 2621440
#endif

//==============================================================================
/// Get the name of the rmw implementation being used
Expand Down Expand Up @@ -885,22 +887,31 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message,
// To store serialized message byte array.
char *msg_bytes = nullptr;

// TODO(yuyuan): SHM
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
std::optional<z_owned_shm_mut_t> shmbuf = std::nullopt;
auto always_free_shmbuf = rcpputils::make_scope_exit([&shmbuf]() {
if (shmbuf.has_value()) {
// TODO(yuyuan): do we need to drop z_owned_shm_mut_t?
z_drop(z_move(shmbuf.value()));
}
});
#endif

auto free_msg_bytes =
rcpputils::make_scope_exit([&msg_bytes, allocator, &shmbuf]() {
if (msg_bytes && !shmbuf.has_value()) {
rcpputils::make_scope_exit([&msg_bytes, allocator
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
, &shmbuf
#endif
]() {
if (msg_bytes
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
&& !shmbuf.has_value()
#endif
) {
allocator->deallocate(msg_bytes, allocator->state);
}
});

// TODO(yuyuan): SHM
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
// Get memory from SHM buffer if available.
if (publisher_data->context->impl->shm_provider.has_value()) {
// printf(">>> rmw_publish(), SHM enabled\n");
Expand All @@ -923,7 +934,9 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message,
return RMW_RET_ERROR;
}

} else {
} else
#endif
{
// printf(">>> rmw_publish(), SHM disabled\n");

// Get memory from the allocator.
Expand Down Expand Up @@ -966,11 +979,13 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message,

z_owned_bytes_t payload;

// TODO(yuyuan): SHM
#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY
if (shmbuf.has_value()) {
z_bytes_serialize_from_shm_mut(&payload, z_move(shmbuf.value()));
z_publisher_put(z_loan(publisher_data->pub), z_move(payload), &options);
} else {
} else
#endif
{
z_bytes_serialize_from_buf(
&payload, reinterpret_cast<const uint8_t *>(msg_bytes), data_length);
if (z_publisher_put(z_loan(publisher_data->pub), z_move(payload), &options)) {
Expand Down

0 comments on commit 9fac5ee

Please sign in to comment.