diff --git a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp index 0b314d35..78308a86 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_data_types.hpp @@ -52,8 +52,9 @@ class rmw_context_impl_s final // An owned session. z_owned_session_t session; - // TODO(yuyuan): SHM +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY std::optional shm_provider; +#endif z_owned_subscriber_t graph_subscriber; diff --git a/rmw_zenoh_cpp/src/rmw_init.cpp b/rmw_zenoh_cpp/src/rmw_init.cpp index da45fc50..5a4182ec 100644 --- a/rmw_zenoh_cpp/src/rmw_init.cpp +++ b/rmw_zenoh_cpp/src/rmw_init.cpp @@ -41,9 +41,11 @@ extern "C" { +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY // Megabytes of SHM to reserve. // TODO(clalancette): Make this configurable, or get it from the configuration #define SHM_BUFFER_SIZE_MB 10 +#endif namespace { @@ -177,13 +179,14 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) { return ret; } - // TODO(yuyuan): SHM +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY z_owned_string_t shm_enabled; zc_config_get_from_str(z_loan(config), Z_CONFIG_SHARED_MEMORY_KEY, &shm_enabled); auto free_shm_= rcpputils::make_scope_exit( [&shm_enabled]() { z_drop(z_move(shm_enabled)); }); +#endif // Initialize the zenoh session. if(z_open(&context->impl->session, z_move(config))) { @@ -221,7 +224,7 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) { } } - // TODO(yuyuan): SHM +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY // Initialize the shm manager if shared_memory is enabled in the config. if (strncmp(z_string_data(z_loan(shm_enabled)), "true", z_string_len(z_loan(shm_enabled))) == 0) { printf(">>> SHM is enabled\n"); @@ -245,6 +248,7 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) { z_drop(z_move(context->impl->shm_provider.value())); } }); +#endif // Initialize the guard condition. context->impl->graph_guard_condition = @@ -380,7 +384,9 @@ rmw_ret_t rmw_init(const rmw_init_options_t *options, rmw_context_t *context) { free_options.cancel(); impl_destructor.cancel(); free_impl.cancel(); +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY free_shm_provider.cancel(); +#endif restore_context.cancel(); return RMW_RET_OK; @@ -397,9 +403,12 @@ rmw_ret_t rmw_shutdown(rmw_context_t *context) { return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); z_undeclare_subscriber(z_move(context->impl->graph_subscriber)); +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY + // Drop SHM provider if (context->impl->shm_provider.has_value()) { z_drop(z_move(context->impl->shm_provider.value())); } +#endif // Close the zenoh session if (z_close(z_move(context->impl->session)) < 0) { RMW_SET_ERROR_MSG("Error while closing zenoh session"); diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index efded20a..8588e23b 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -161,8 +161,10 @@ find_service_type_support(const rosidl_service_type_support_t *type_supports) { extern "C" { +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY // TODO(yuyuan): SHM, make this configurable #define SHM_BUF_OK_SIZE 2621440 +#endif //============================================================================== /// Get the name of the rmw implementation being used @@ -885,22 +887,31 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message, // To store serialized message byte array. char *msg_bytes = nullptr; - // TODO(yuyuan): SHM +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY std::optional shmbuf = std::nullopt; auto always_free_shmbuf = rcpputils::make_scope_exit([&shmbuf]() { if (shmbuf.has_value()) { - // TODO(yuyuan): do we need to drop z_owned_shm_mut_t? z_drop(z_move(shmbuf.value())); } }); +#endif + auto free_msg_bytes = - rcpputils::make_scope_exit([&msg_bytes, allocator, &shmbuf]() { - if (msg_bytes && !shmbuf.has_value()) { + rcpputils::make_scope_exit([&msg_bytes, allocator +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY + , &shmbuf +#endif + ]() { + if (msg_bytes +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY + && !shmbuf.has_value() +#endif + ) { allocator->deallocate(msg_bytes, allocator->state); } }); - // TODO(yuyuan): SHM +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY // Get memory from SHM buffer if available. if (publisher_data->context->impl->shm_provider.has_value()) { // printf(">>> rmw_publish(), SHM enabled\n"); @@ -923,7 +934,9 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message, return RMW_RET_ERROR; } - } else { + } else +#endif + { // printf(">>> rmw_publish(), SHM disabled\n"); // Get memory from the allocator. @@ -966,11 +979,13 @@ rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message, z_owned_bytes_t payload; - // TODO(yuyuan): SHM +#ifdef RMW_ZENOH_BUILD_WITH_SHARED_MEMORY if (shmbuf.has_value()) { z_bytes_serialize_from_shm_mut(&payload, z_move(shmbuf.value())); z_publisher_put(z_loan(publisher_data->pub), z_move(payload), &options); - } else { + } else +#endif + { z_bytes_serialize_from_buf( &payload, reinterpret_cast(msg_bytes), data_length); if (z_publisher_put(z_loan(publisher_data->pub), z_move(payload), &options)) {