From 511bc67abbb672a0e7edebd0fa54dfeeeb60f457 Mon Sep 17 00:00:00 2001 From: Dmitrii Bannov <104833606+yellowhatter@users.noreply.github.com> Date: Fri, 3 May 2024 16:14:28 +0300 Subject: [PATCH] Shm refine (#986) * [skip ci] SHM Payload API example and test * Add payload_mut to sample for zsliceshmmut deserialization * Improve SHM examples * Fix merge * Query/Reply shared memory examples * rename payload tests to bytes tests * - fix API exports - fix z_payload_shm example * Add attachment_mut to Sample * [skip ci] fix SHM exports in new api export mechanism * Massive renaming for ZSliceShm and ZSliceShmMut * fix ci * [skip ci] z_payload_shm -> z_bytes_shm * Polish SHM examples * fix lints * fix lint * fix after merge * Update z_alloc_shm.rs --------- Co-authored-by: Luca Cominardi --- .../src/api/{slice => buffer}/mod.rs | 4 +- .../src/api/{slice => buffer}/traits.rs | 0 .../{slice/zsliceshm.rs => buffer/zshm.rs} | 76 ++++----- .../zsliceshmmut.rs => buffer/zshmmut.rs} | 86 +++++------ commons/zenoh-shm/src/api/mod.rs | 2 +- .../api/provider/shared_memory_provider.rs | 12 +- commons/zenoh-shm/src/api/provider/types.rs | 4 +- examples/Cargo.toml | 22 ++- examples/examples/z_alloc_shm.rs | 40 ++--- examples/examples/z_bytes_shm.rs | 103 +++++++++++++ examples/examples/z_get_shm.rs | 144 ++++++++++++++++++ examples/examples/z_ping_shm.rs | 39 ++--- examples/examples/z_posix_shm_provider.rs | 44 ++++++ examples/examples/z_pub_shm.rs | 48 ++---- examples/examples/z_pub_shm_thr.rs | 37 ++--- examples/examples/z_queryable.rs | 7 +- examples/examples/z_queryable_shm.rs | 118 ++++++++++++++ examples/examples/z_sub_shm.rs | 39 +++-- zenoh/src/api/bytes.rs | 48 +++--- zenoh/src/api/encoding.rs | 6 +- zenoh/src/api/query.rs | 5 + zenoh/src/api/queryable.rs | 37 +++-- zenoh/src/api/sample.rs | 13 ++ zenoh/src/api/session.rs | 12 +- zenoh/src/lib.rs | 30 +++- zenoh/src/net/runtime/adminspace.rs | 6 +- zenoh/src/prelude.rs | 4 +- zenoh/tests/bytes.rs | 69 +++++++++ zenoh/tests/payload.rs | 86 ----------- 29 files changed, 784 insertions(+), 357 deletions(-) rename commons/zenoh-shm/src/api/{slice => buffer}/mod.rs (92%) rename commons/zenoh-shm/src/api/{slice => buffer}/traits.rs (100%) rename commons/zenoh-shm/src/api/{slice/zsliceshm.rs => buffer/zshm.rs} (59%) rename commons/zenoh-shm/src/api/{slice/zsliceshmmut.rs => buffer/zshmmut.rs} (59%) create mode 100644 examples/examples/z_bytes_shm.rs create mode 100644 examples/examples/z_get_shm.rs create mode 100644 examples/examples/z_posix_shm_provider.rs create mode 100644 examples/examples/z_queryable_shm.rs create mode 100644 zenoh/tests/bytes.rs delete mode 100644 zenoh/tests/payload.rs diff --git a/commons/zenoh-shm/src/api/slice/mod.rs b/commons/zenoh-shm/src/api/buffer/mod.rs similarity index 92% rename from commons/zenoh-shm/src/api/slice/mod.rs rename to commons/zenoh-shm/src/api/buffer/mod.rs index 59c793f94a..8a3e040da9 100644 --- a/commons/zenoh-shm/src/api/slice/mod.rs +++ b/commons/zenoh-shm/src/api/buffer/mod.rs @@ -13,5 +13,5 @@ // pub mod traits; -pub mod zsliceshm; -pub mod zsliceshmmut; +pub mod zshm; +pub mod zshmmut; diff --git a/commons/zenoh-shm/src/api/slice/traits.rs b/commons/zenoh-shm/src/api/buffer/traits.rs similarity index 100% rename from commons/zenoh-shm/src/api/slice/traits.rs rename to commons/zenoh-shm/src/api/buffer/traits.rs diff --git a/commons/zenoh-shm/src/api/slice/zsliceshm.rs b/commons/zenoh-shm/src/api/buffer/zshm.rs similarity index 59% rename from commons/zenoh-shm/src/api/slice/zsliceshm.rs rename to commons/zenoh-shm/src/api/buffer/zshm.rs index b2ba611b3c..d6f34f293a 100644 --- a/commons/zenoh-shm/src/api/slice/zsliceshm.rs +++ b/commons/zenoh-shm/src/api/buffer/zshm.rs @@ -20,44 +20,44 @@ use std::{ use zenoh_buffers::{ZBuf, ZSlice}; -use super::{traits::SHMBuf, zsliceshmmut::zsliceshmmut}; +use super::{traits::SHMBuf, zshmmut::zshmmut}; use crate::SharedMemoryBuf; -/// An immutable SHM slice +/// An immutable SHM buffer #[zenoh_macros::unstable_doc] #[repr(transparent)] #[derive(Clone, Debug, PartialEq, Eq)] -pub struct ZSliceShm(pub(crate) SharedMemoryBuf); +pub struct ZShm(pub(crate) SharedMemoryBuf); -impl SHMBuf for ZSliceShm { +impl SHMBuf for ZShm { fn is_valid(&self) -> bool { self.0.is_valid() } } -impl PartialEq<&zsliceshm> for ZSliceShm { - fn eq(&self, other: &&zsliceshm) -> bool { +impl PartialEq<&zshm> for ZShm { + fn eq(&self, other: &&zshm) -> bool { self.0 == other.0 .0 } } -impl Borrow for ZSliceShm { - fn borrow(&self) -> &zsliceshm { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] +impl Borrow for ZShm { + fn borrow(&self) -> &zshm { + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } -impl BorrowMut for ZSliceShm { - fn borrow_mut(&mut self) -> &mut zsliceshm { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] +impl BorrowMut for ZShm { + fn borrow_mut(&mut self) -> &mut zshm { + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } -impl Deref for ZSliceShm { +impl Deref for ZShm { type Target = [u8]; fn deref(&self) -> &Self::Target { @@ -65,37 +65,37 @@ impl Deref for ZSliceShm { } } -impl AsRef<[u8]> for ZSliceShm { +impl AsRef<[u8]> for ZShm { fn as_ref(&self) -> &[u8] { self } } -impl From for ZSliceShm { +impl From for ZShm { fn from(value: SharedMemoryBuf) -> Self { Self(value) } } -impl From for ZSlice { - fn from(value: ZSliceShm) -> Self { +impl From for ZSlice { + fn from(value: ZShm) -> Self { value.0.into() } } -impl From for ZBuf { - fn from(value: ZSliceShm) -> Self { +impl From for ZBuf { + fn from(value: ZShm) -> Self { value.0.into() } } -impl TryFrom<&mut ZSliceShm> for &mut zsliceshmmut { +impl TryFrom<&mut ZShm> for &mut zshmmut { type Error = (); - fn try_from(value: &mut ZSliceShm) -> Result { + fn try_from(value: &mut ZShm) -> Result { match value.0.is_unique() && value.0.is_valid() { true => { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction Ok(unsafe { core::mem::transmute(value) }) } @@ -104,64 +104,64 @@ impl TryFrom<&mut ZSliceShm> for &mut zsliceshmmut { } } -/// A borrowed immutable SHM slice +/// A borrowed immutable SHM buffer #[zenoh_macros::unstable_doc] #[derive(Debug, PartialEq, Eq)] #[allow(non_camel_case_types)] #[repr(transparent)] -pub struct zsliceshm(ZSliceShm); +pub struct zshm(ZShm); -impl ToOwned for zsliceshm { - type Owned = ZSliceShm; +impl ToOwned for zshm { + type Owned = ZShm; fn to_owned(&self) -> Self::Owned { self.0.clone() } } -impl PartialEq for &zsliceshm { - fn eq(&self, other: &ZSliceShm) -> bool { +impl PartialEq for &zshm { + fn eq(&self, other: &ZShm) -> bool { self.0 .0 == other.0 } } -impl Deref for zsliceshm { - type Target = ZSliceShm; +impl Deref for zshm { + type Target = ZShm; fn deref(&self) -> &Self::Target { &self.0 } } -impl DerefMut for zsliceshm { +impl DerefMut for zshm { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } -impl From<&SharedMemoryBuf> for &zsliceshm { +impl From<&SharedMemoryBuf> for &zshm { fn from(value: &SharedMemoryBuf) -> Self { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(value) } } } -impl From<&mut SharedMemoryBuf> for &mut zsliceshm { +impl From<&mut SharedMemoryBuf> for &mut zshm { fn from(value: &mut SharedMemoryBuf) -> Self { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(value) } } } -impl TryFrom<&mut zsliceshm> for &mut zsliceshmmut { +impl TryFrom<&mut zshm> for &mut zshmmut { type Error = (); - fn try_from(value: &mut zsliceshm) -> Result { + fn try_from(value: &mut zshm) -> Result { match value.0 .0.is_unique() && value.0 .0.is_valid() { true => { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction Ok(unsafe { core::mem::transmute(value) }) } diff --git a/commons/zenoh-shm/src/api/slice/zsliceshmmut.rs b/commons/zenoh-shm/src/api/buffer/zshmmut.rs similarity index 59% rename from commons/zenoh-shm/src/api/slice/zsliceshmmut.rs rename to commons/zenoh-shm/src/api/buffer/zshmmut.rs index d866e4173e..7341b7600c 100644 --- a/commons/zenoh-shm/src/api/slice/zsliceshmmut.rs +++ b/commons/zenoh-shm/src/api/buffer/zshmmut.rs @@ -19,37 +19,37 @@ use zenoh_buffers::{ZBuf, ZSlice}; use super::{ traits::{SHMBuf, SHMBufMut}, - zsliceshm::{zsliceshm, ZSliceShm}, + zshm::{zshm, ZShm}, }; use crate::SharedMemoryBuf; -/// A mutable SHM slice +/// A mutable SHM buffer #[zenoh_macros::unstable_doc] #[derive(Debug, PartialEq, Eq)] #[repr(transparent)] -pub struct ZSliceShmMut(SharedMemoryBuf); +pub struct ZShmMut(SharedMemoryBuf); -impl SHMBuf for ZSliceShmMut { +impl SHMBuf for ZShmMut { fn is_valid(&self) -> bool { self.0.is_valid() } } -impl SHMBufMut for ZSliceShmMut {} +impl SHMBufMut for ZShmMut {} -impl ZSliceShmMut { +impl ZShmMut { pub(crate) unsafe fn new_unchecked(data: SharedMemoryBuf) -> Self { Self(data) } } -impl PartialEq for &ZSliceShmMut { - fn eq(&self, other: &zsliceshmmut) -> bool { +impl PartialEq for &ZShmMut { + fn eq(&self, other: &zshmmut) -> bool { self.0 == other.0 .0 } } -impl TryFrom for ZSliceShmMut { +impl TryFrom for ZShmMut { type Error = SharedMemoryBuf; fn try_from(value: SharedMemoryBuf) -> Result { @@ -60,10 +60,10 @@ impl TryFrom for ZSliceShmMut { } } -impl TryFrom for ZSliceShmMut { - type Error = ZSliceShm; +impl TryFrom for ZShmMut { + type Error = ZShm; - fn try_from(value: ZSliceShm) -> Result { + fn try_from(value: ZShm) -> Result { match value.0.is_unique() && value.0.is_valid() { true => Ok(Self(value.0)), false => Err(value), @@ -71,39 +71,39 @@ impl TryFrom for ZSliceShmMut { } } -impl Borrow for ZSliceShmMut { - fn borrow(&self) -> &zsliceshm { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] +impl Borrow for ZShmMut { + fn borrow(&self) -> &zshm { + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } -impl BorrowMut for ZSliceShmMut { - fn borrow_mut(&mut self) -> &mut zsliceshm { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] +impl BorrowMut for ZShmMut { + fn borrow_mut(&mut self) -> &mut zshm { + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } -impl Borrow for ZSliceShmMut { - fn borrow(&self) -> &zsliceshmmut { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] +impl Borrow for ZShmMut { + fn borrow(&self) -> &zshmmut { + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } -impl BorrowMut for ZSliceShmMut { - fn borrow_mut(&mut self) -> &mut zsliceshmmut { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] +impl BorrowMut for ZShmMut { + fn borrow_mut(&mut self) -> &mut zshmmut { + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction unsafe { core::mem::transmute(self) } } } -impl Deref for ZSliceShmMut { +impl Deref for ZShmMut { type Target = [u8]; fn deref(&self) -> &Self::Target { @@ -111,75 +111,75 @@ impl Deref for ZSliceShmMut { } } -impl DerefMut for ZSliceShmMut { +impl DerefMut for ZShmMut { fn deref_mut(&mut self) -> &mut Self::Target { self.0.as_mut() } } -impl AsRef<[u8]> for ZSliceShmMut { +impl AsRef<[u8]> for ZShmMut { fn as_ref(&self) -> &[u8] { self } } -impl AsMut<[u8]> for ZSliceShmMut { +impl AsMut<[u8]> for ZShmMut { fn as_mut(&mut self) -> &mut [u8] { self } } -impl From for ZSliceShm { - fn from(value: ZSliceShmMut) -> Self { +impl From for ZShm { + fn from(value: ZShmMut) -> Self { value.0.into() } } -impl From for ZSlice { - fn from(value: ZSliceShmMut) -> Self { +impl From for ZSlice { + fn from(value: ZShmMut) -> Self { value.0.into() } } -impl From for ZBuf { - fn from(value: ZSliceShmMut) -> Self { +impl From for ZBuf { + fn from(value: ZShmMut) -> Self { value.0.into() } } -/// A borrowed mutable SHM slice +/// A borrowed mutable SHM buffer #[zenoh_macros::unstable_doc] #[derive(Debug, PartialEq, Eq)] #[allow(non_camel_case_types)] #[repr(transparent)] -pub struct zsliceshmmut(ZSliceShmMut); +pub struct zshmmut(ZShmMut); -impl PartialEq for &zsliceshmmut { - fn eq(&self, other: &ZSliceShmMut) -> bool { +impl PartialEq for &zshmmut { + fn eq(&self, other: &ZShmMut) -> bool { self.0 .0 == other.0 } } -impl Deref for zsliceshmmut { - type Target = ZSliceShmMut; +impl Deref for zshmmut { + type Target = ZShmMut; fn deref(&self) -> &Self::Target { &self.0 } } -impl DerefMut for zsliceshmmut { +impl DerefMut for zshmmut { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } -impl TryFrom<&mut SharedMemoryBuf> for &mut zsliceshmmut { +impl TryFrom<&mut SharedMemoryBuf> for &mut zshmmut { type Error = (); fn try_from(value: &mut SharedMemoryBuf) -> Result { match value.is_unique() && value.is_valid() { - // SAFETY: ZSliceShm, ZSliceShmMut, zsliceshm and zsliceshmmut are #[repr(transparent)] + // SAFETY: ZShm, ZShmMut, zshm and zshmmut are #[repr(transparent)] // to SharedMemoryBuf type, so it is safe to transmute them in any direction true => Ok(unsafe { core::mem::transmute(value) }), false => Err(()), diff --git a/commons/zenoh-shm/src/api/mod.rs b/commons/zenoh-shm/src/api/mod.rs index 08a5678fa8..a87188da29 100644 --- a/commons/zenoh-shm/src/api/mod.rs +++ b/commons/zenoh-shm/src/api/mod.rs @@ -12,9 +12,9 @@ // ZettaScale Zenoh Team, // +pub mod buffer; pub mod client; pub mod client_storage; pub mod common; pub mod protocol_implementations; pub mod provider; -pub mod slice; diff --git a/commons/zenoh-shm/src/api/provider/shared_memory_provider.rs b/commons/zenoh-shm/src/api/provider/shared_memory_provider.rs index 58109a699d..1ca560f07e 100644 --- a/commons/zenoh-shm/src/api/provider/shared_memory_provider.rs +++ b/commons/zenoh-shm/src/api/provider/shared_memory_provider.rs @@ -28,7 +28,7 @@ use super::{ types::{AllocAlignment, BufAllocResult, ChunkAllocResult, MemoryLayout, ZAllocError}, }; use crate::{ - api::{common::types::ProtocolID, slice::zsliceshmmut::ZSliceShmMut}, + api::{buffer::zshmmut::ZShmMut, common::types::ProtocolID}, header::{ allocated_descriptor::AllocatedHeaderDescriptor, descriptor::HeaderDescriptor, storage::GLOBAL_HEADER_STORAGE, @@ -712,11 +712,11 @@ where self.backend.defragment() } - /// Map externally-allocated chunk into ZSliceShmMut. + /// Map externally-allocated chunk into ZShmMut. /// This method is designed to be used with push data sources. /// Remember that chunk's len may be >= len! #[zenoh_macros::unstable_doc] - pub fn map(&self, chunk: AllocatedChunk, len: usize) -> ZResult { + pub fn map(&self, chunk: AllocatedChunk, len: usize) -> ZResult { // allocate resources for SHM buffer let (allocated_header, allocated_watchdog, confirmed_watchdog) = Self::alloc_resources()?; @@ -728,7 +728,7 @@ where allocated_watchdog, confirmed_watchdog, ); - Ok(unsafe { ZSliceShmMut::new_unchecked(wrapped) }) + Ok(unsafe { ZShmMut::new_unchecked(wrapped) }) } /// Try to collect free chunks. @@ -805,7 +805,7 @@ where allocated_watchdog, confirmed_watchdog, ); - Ok(unsafe { ZSliceShmMut::new_unchecked(wrapped) }) + Ok(unsafe { ZShmMut::new_unchecked(wrapped) }) } fn alloc_resources() -> ZResult<( @@ -910,6 +910,6 @@ where allocated_watchdog, confirmed_watchdog, ); - Ok(unsafe { ZSliceShmMut::new_unchecked(wrapped) }) + Ok(unsafe { ZShmMut::new_unchecked(wrapped) }) } } diff --git a/commons/zenoh-shm/src/api/provider/types.rs b/commons/zenoh-shm/src/api/provider/types.rs index ddf949ee75..beae24bfb7 100644 --- a/commons/zenoh-shm/src/api/provider/types.rs +++ b/commons/zenoh-shm/src/api/provider/types.rs @@ -17,7 +17,7 @@ use std::fmt::Display; use zenoh_result::{bail, ZResult}; use super::chunk::AllocatedChunk; -use crate::api::slice::zsliceshmmut::ZSliceShmMut; +use crate::api::buffer::zshmmut::ZShmMut; /// Allocation errors /// @@ -169,4 +169,4 @@ pub type ChunkAllocResult = Result; /// SHM buffer allocation result #[zenoh_macros::unstable_doc] -pub type BufAllocResult = Result; +pub type BufAllocResult = Result; diff --git a/examples/Cargo.toml b/examples/Cargo.toml index e117507ae9..90281ae558 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -100,6 +100,11 @@ path = "examples/z_pull.rs" name = "z_queryable" path = "examples/z_queryable.rs" +[[example]] +name = "z_queryable_shm" +path = "examples/z_queryable_shm.rs" +required-features = ["unstable", "shared-memory"] + [[example]] name = "z_storage" path = "examples/z_storage.rs" @@ -108,6 +113,11 @@ path = "examples/z_storage.rs" name = "z_get" path = "examples/z_get.rs" +[[example]] +name = "z_get_shm" +path = "examples/z_get_shm.rs" +required-features = ["unstable", "shared-memory"] + [[example]] name = "z_forward" path = "examples/z_forward.rs" @@ -156,4 +166,14 @@ path = "examples/z_pong.rs" [[example]] name = "z_alloc_shm" path = "examples/z_alloc_shm.rs" -required-features = ["unstable", "shared-memory"] \ No newline at end of file +required-features = ["unstable", "shared-memory"] + +[[example]] +name = "z_bytes_shm" +path = "examples/z_bytes_shm.rs" +required-features = ["unstable", "shared-memory"] + +[[example]] +name = "z_posix_shm_provider" +path = "examples/z_posix_shm_provider.rs" +required-features = ["unstable", "shared-memory"] diff --git a/examples/examples/z_alloc_shm.rs b/examples/examples/z_alloc_shm.rs index acff39379c..93df5d821d 100644 --- a/examples/examples/z_alloc_shm.rs +++ b/examples/examples/z_alloc_shm.rs @@ -21,29 +21,15 @@ async fn main() { } async fn run() -> ZResult<()> { - // Construct an SHM backend - let backend = { - // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. - // The initialisation of SHM backend is completely backend-specific and user is free to do - // anything reasonable here. This code is execuated at the provider's first use - - // Alignment for POSIX SHM provider - // All allocations will be aligned corresponding to this alignment - - // that means that the provider will be able to satisfy allocation layouts - // with alignment <= provider_alignment - let provider_alignment = AllocAlignment::default(); - - // Create layout for POSIX Provider's memory - let provider_layout = MemoryLayout::new(65536, provider_alignment).unwrap(); - - PosixSharedMemoryProviderBackend::builder() - .with_layout(provider_layout) - .res() - .unwrap() - }; - - // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID - let shared_memory_provider = SharedMemoryProviderBuilder::builder() + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(65536) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); @@ -54,14 +40,10 @@ async fn run() -> ZResult<()> { // This layout is reusable and can handle series of similar allocations let buffer_layout = { // OPTION 1: Simple (default) configuration: - let simple_layout = shared_memory_provider - .alloc_layout() - .size(512) - .res() - .unwrap(); + let simple_layout = provider.alloc_layout().size(512).res().unwrap(); // OPTION 2: Comprehensive configuration: - let _comprehensive_layout = shared_memory_provider + let _comprehensive_layout = provider .alloc_layout() .size(512) .alignment(AllocAlignment::new(2)) diff --git a/examples/examples/z_bytes_shm.rs b/examples/examples/z_bytes_shm.rs new file mode 100644 index 0000000000..5c582e56e6 --- /dev/null +++ b/examples/examples/z_bytes_shm.rs @@ -0,0 +1,103 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use zenoh::{ + bytes::ZBytes, + shm::{ + zshm, zshmmut, PosixSharedMemoryProviderBackend, SharedMemoryProviderBuilder, ZShm, + ZShmMut, POSIX_PROTOCOL_ID, + }, +}; + +fn main() { + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(4096) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); + + // Allocate an SHM buffer + // NOTE: For allocation API please check z_alloc_shm.rs example + // NOTE: For buf's API please check z_bytes_shm.rs example + let mut owned_shm_buf_mut = provider + .alloc_layout() + .size(1024) + .res() + .unwrap() + .alloc() + .res() + .unwrap(); + + // mutable and immutable API + let _data: &[u8] = &owned_shm_buf_mut; + let _data_mut: &mut [u8] = &mut owned_shm_buf_mut; + + // convert into immutable owned buffer (ZShmMut -> ZSlceShm) + let owned_shm_buf: ZShm = owned_shm_buf_mut.into(); + + // immutable API + let _data: &[u8] = &owned_shm_buf; + + // convert again into mutable owned buffer (ZShm -> ZSlceShmMut) + let mut owned_shm_buf_mut: ZShmMut = owned_shm_buf.try_into().unwrap(); + + // mutable and immutable API + let _data: &[u8] = &owned_shm_buf_mut; + let _data_mut: &mut [u8] = &mut owned_shm_buf_mut; + + // build a ZBytes from an SHM buffer (ZShmMut -> ZBytes) + let mut payload: ZBytes = owned_shm_buf_mut.into(); + + // branch to illustrate immutable access to SHM data + { + // deserialize ZBytes as an immutably borrowed zshm (ZBytes -> &zshm) + let borrowed_shm_buf: &zshm = payload.deserialize().unwrap(); + + // immutable API + let _data: &[u8] = borrowed_shm_buf; + + // construct owned buffer from borrowed type (&zshm -> ZShm) + let owned = borrowed_shm_buf.to_owned(); + + // immutable API + let _data: &[u8] = &owned; + + // try to construct mutable ZShmMut (ZShm -> ZShmMut) + let owned_mut: Result = owned.try_into(); + // the attempt fails because ZShm has two existing references ('owned' and inside 'payload') + assert!(owned_mut.is_err()) + } + + // branch to illustrate mutable access to SHM data + { + // deserialize ZBytes as mutably borrowed zshm (ZBytes -> &mut zshm) + let borrowed_shm_buf: &mut zshm = payload.deserialize_mut().unwrap(); + + // immutable API + let _data: &[u8] = borrowed_shm_buf; + + // convert zshm to zshmmut (&mut zshm -> &mut zshmmut) + let borrowed_shm_buf_mut: &mut zshmmut = borrowed_shm_buf.try_into().unwrap(); + + // mutable and immutable API + let _data: &[u8] = borrowed_shm_buf_mut; + let _data_mut: &mut [u8] = borrowed_shm_buf_mut; + } +} diff --git a/examples/examples/z_get_shm.rs b/examples/examples/z_get_shm.rs new file mode 100644 index 0000000000..39caf3a101 --- /dev/null +++ b/examples/examples/z_get_shm.rs @@ -0,0 +1,144 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::time::Duration; + +use clap::Parser; +use zenoh::prelude::*; +use zenoh_examples::CommonArgs; + +const N: usize = 10; + +#[tokio::main] +async fn main() { + // initiate logging + zenoh_util::try_init_log_from_env(); + + let (mut config, selector, mut value, target, timeout) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); + + println!("Opening session..."); + let session = zenoh::open(config).await.unwrap(); + + println!("Creating POSIX SHM provider..."); + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(N * 1024) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); + + // Allocate an SHM buffer + // NOTE: For allocation API please check z_alloc_shm.rs example + // NOTE: For buf's API please check z_bytes_shm.rs example + println!("Allocating Shared Memory Buffer..."); + let mut sbuf = provider + .alloc_layout() + .size(1024) + .res() + .unwrap() + .alloc() + .with_policy::>() + .res_async() + .await + .unwrap(); + + let content = value + .take() + .unwrap_or_else(|| "Get from SharedMemory Rust!".to_string()); + sbuf[0..content.len()].copy_from_slice(content.as_bytes()); + + println!("Sending Query '{selector}'..."); + let replies = session + .get(&selector) + .value(sbuf) + .target(target) + .timeout(timeout) + .await + .unwrap(); + + while let Ok(reply) = replies.recv_async().await { + match reply.result() { + Ok(sample) => { + print!(">> Received ('{}': ", sample.key_expr().as_str()); + match sample.payload().deserialize::<&zshm>() { + Ok(payload) => println!("'{}')", String::from_utf8_lossy(payload),), + Err(e) => println!("'Not a SharedMemoryBuf: {:?}')", e), + } + } + Err(err) => { + let payload = err + .payload() + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)); + println!(">> Received (ERROR: '{}')", payload); + } + } + } +} + +#[derive(clap::ValueEnum, Clone, Copy, Debug)] +#[value(rename_all = "SCREAMING_SNAKE_CASE")] +enum Qt { + BestMatching, + All, + AllComplete, +} + +#[derive(Parser, Clone, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/**")] + /// The selection of resources to query + selector: Selector<'static>, + /// The value to publish. + value: Option, + #[arg(short, long, default_value = "BEST_MATCHING")] + /// The target queryables of the query. + target: Qt, + #[arg(short = 'o', long, default_value = "10000")] + /// The query timeout in milliseconds. + timeout: u64, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> ( + Config, + Selector<'static>, + Option, + QueryTarget, + Duration, +) { + let args = Args::parse(); + ( + args.common.into(), + args.selector, + args.value, + match args.target { + Qt::BestMatching => QueryTarget::BestMatching, + Qt::All => QueryTarget::All, + Qt::AllComplete => QueryTarget::AllComplete, + }, + Duration::from_millis(args.timeout), + ) +} diff --git a/examples/examples/z_ping_shm.rs b/examples/examples/z_ping_shm.rs index d4c5b4f162..4c3ad4ed40 100644 --- a/examples/examples/z_ping_shm.rs +++ b/examples/examples/z_ping_shm.rs @@ -45,34 +45,23 @@ fn main() { let mut samples = Vec::with_capacity(n); - // Construct an SHM backend - let backend = { - // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. - // The initialisation of SHM backend is completely backend-specific and user is free to do - // anything reasonable here. This code is execuated at the provider's first use - - // Alignment for POSIX SHM provider - // All allocations will be aligned corresponding to this alignment - - // that means that the provider will be able to satisfy allocation layouts - // with alignment <= provider_alignment - let provider_alignment = AllocAlignment::default(); - - // Create layout for POSIX Provider's memory - let provider_layout = MemoryLayout::new(size, provider_alignment).unwrap(); - - PosixSharedMemoryProviderBackend::builder() - .with_layout(provider_layout) - .res() - .unwrap() - }; - - // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID - let shared_memory_provider = SharedMemoryProviderBuilder::builder() + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(size) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); - let buf = shared_memory_provider + // Allocate an SHM buffer + // NOTE: For allocation API please check z_alloc_shm.rs example + // NOTE: For buf's API please check z_bytes_shm.rs example + let buf = provider .alloc_layout() .size(size) .res() @@ -81,7 +70,7 @@ fn main() { .res() .unwrap(); - // convert ZSliceShmMut into ZSlice as ZSliceShmMut does not support Clone + // convert ZShmMut into ZSlice as ZShmMut does not support Clone let buf: ZSlice = buf.into(); // -- warmup -- diff --git a/examples/examples/z_posix_shm_provider.rs b/examples/examples/z_posix_shm_provider.rs new file mode 100644 index 0000000000..cdf502bc61 --- /dev/null +++ b/examples/examples/z_posix_shm_provider.rs @@ -0,0 +1,44 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use zenoh::prelude::*; + +fn main() { + // Construct an SHM backend + let backend = { + // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. + + // Total amount of shared memory to allocate + let size = 4096; + + // An alignment for POSIX SHM provider + // Due to internal optimization, all allocations will be aligned corresponding to this alignment, + // so the provider will be able to satisfy allocation layouts with alignment <= provider_alignment + let provider_alignment = AllocAlignment::default(); + + // A layout for POSIX Provider's memory + let provider_layout = MemoryLayout::new(size, provider_alignment).unwrap(); + + // Build a provider backend + PosixSharedMemoryProviderBackend::builder() + .with_layout(provider_layout) + .res() + .unwrap() + }; + + // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID + let _shared_memory_provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); +} diff --git a/examples/examples/z_pub_shm.rs b/examples/examples/z_pub_shm.rs index 92d19b6b06..d2a87a59cc 100644 --- a/examples/examples/z_pub_shm.rs +++ b/examples/examples/z_pub_shm.rs @@ -16,7 +16,6 @@ use zenoh::prelude::*; use zenoh_examples::CommonArgs; const N: usize = 10; -const K: u32 = 3; #[tokio::main] async fn main() -> Result<(), ZError> { @@ -33,46 +32,31 @@ async fn main() -> Result<(), ZError> { println!("Opening session..."); let session = zenoh::open(config).await.unwrap(); - println!("Creating POSIX SHM backend..."); - // Construct an SHM backend - let backend = { - // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. - // The initialisation of SHM backend is completely backend-specific and user is free to do - // anything reasonable here. This code is execuated at the provider's first use - - // Alignment for POSIX SHM provider - // All allocations will be aligned corresponding to this alignment - - // that means that the provider will be able to satisfy allocation layouts - // with alignment <= provider_alignment - let provider_alignment = AllocAlignment::default(); - - // Create layout for POSIX Provider's memory - let provider_layout = MemoryLayout::new(N * 1024, provider_alignment).unwrap(); - - PosixSharedMemoryProviderBackend::builder() - .with_layout(provider_layout) - .res() - .unwrap() - }; - - println!("Creating SHM Provider with POSIX backend..."); - // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID - let shared_memory_provider = SharedMemoryProviderBuilder::builder() + println!("Creating POSIX SHM provider..."); + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(N * 1024) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); let publisher = session.declare_publisher(&path).await.unwrap(); + // Create allocation layout for series of similar allocations println!("Allocating Shared Memory Buffer..."); - let layout = shared_memory_provider - .alloc_layout() - .size(1024) - .res() - .unwrap(); + let layout = provider.alloc_layout().size(1024).res().unwrap(); println!("Press CTRL-C to quit..."); - for idx in 0..(K * N as u32) { + for idx in 0..u32::MAX { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // Allocate particular SHM buffer using pre-created layout let mut sbuf = layout .alloc() .with_policy::>() diff --git a/examples/examples/z_pub_shm_thr.rs b/examples/examples/z_pub_shm_thr.rs index 0b94304321..0d44fbe6ee 100644 --- a/examples/examples/z_pub_shm_thr.rs +++ b/examples/examples/z_pub_shm_thr.rs @@ -28,34 +28,23 @@ async fn main() { let z = zenoh::open(config).await.unwrap(); - // Construct an SHM backend - let backend = { - // NOTE: code in this block is a specific PosixSharedMemoryProviderBackend API. - // The initialisation of SHM backend is completely backend-specific and user is free to do - // anything reasonable here. This code is execuated at the provider's first use - - // Alignment for POSIX SHM provider - // All allocations will be aligned corresponding to this alignment - - // that means that the provider will be able to satisfy allocation layouts - // with alignment <= provider_alignment - let provider_alignment = AllocAlignment::default(); - - // Create layout for POSIX Provider's memory - let provider_layout = MemoryLayout::new(sm_size, provider_alignment).unwrap(); - - PosixSharedMemoryProviderBackend::builder() - .with_layout(provider_layout) - .res() - .unwrap() - }; - - // Construct an SHM provider for particular backend and POSIX_PROTOCOL_ID - let shared_memory_provider = SharedMemoryProviderBuilder::builder() + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(sm_size) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() .protocol_id::() .backend(backend) .res(); - let mut buf = shared_memory_provider + // Allocate an SHM buffer + // NOTE: For allocation API please check z_alloc_shm.rs example + // NOTE: For buf's API please check z_bytes_shm.rs example + let mut buf = provider .alloc_layout() .size(size) .res() diff --git a/examples/examples/z_queryable.rs b/examples/examples/z_queryable.rs index e24b8e80cb..dcdca82c09 100644 --- a/examples/examples/z_queryable.rs +++ b/examples/examples/z_queryable.rs @@ -20,7 +20,12 @@ async fn main() { // initiate logging zenoh_util::try_init_log_from_env(); - let (config, key_expr, value, complete) = parse_args(); + let (mut config, key_expr, value, complete) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); println!("Opening session..."); let session = zenoh::open(config).await.unwrap(); diff --git a/examples/examples/z_queryable_shm.rs b/examples/examples/z_queryable_shm.rs new file mode 100644 index 0000000000..ed2320d2c5 --- /dev/null +++ b/examples/examples/z_queryable_shm.rs @@ -0,0 +1,118 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::Parser; +use zenoh::prelude::*; +use zenoh_examples::CommonArgs; + +const N: usize = 10; + +#[tokio::main] +async fn main() { + // initiate logging + zenoh_util::try_init_log_from_env(); + + let (mut config, key_expr, value, complete) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_get_shm` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); + + println!("Opening session..."); + let session = zenoh::open(config).await.unwrap(); + + println!("Creating POSIX SHM provider..."); + // create an SHM backend... + // NOTE: For extended PosixSharedMemoryProviderBackend API please check z_posix_shm_provider.rs + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(N * 1024) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); + + println!("Declaring Queryable on '{key_expr}'..."); + let queryable = session + .declare_queryable(&key_expr) + .complete(complete) + .await + .unwrap(); + + println!("Press CTRL-C to quit..."); + while let Ok(query) = queryable.recv_async().await { + print!( + ">> [Queryable] Received Query '{}' ('{}'", + query.selector(), + query.key_expr().as_str(), + ); + if let Some(payload) = query.payload() { + match payload.deserialize::<&zshm>() { + Ok(payload) => print!(": '{}'", String::from_utf8_lossy(payload)), + Err(e) => print!(": 'Not a SharedMemoryBuf: {:?}'", e), + } + } + println!(")"); + + // Allocate an SHM buffer + // NOTE: For allocation API please check z_alloc_shm.rs example + // NOTE: For buf's API please check z_bytes_shm.rs example + println!("Allocating Shared Memory Buffer..."); + let mut sbuf = provider + .alloc_layout() + .size(1024) + .res() + .unwrap() + .alloc() + .with_policy::>() + .res_async() + .await + .unwrap(); + + sbuf[0..value.len()].copy_from_slice(value.as_bytes()); + + println!( + ">> [Queryable] Responding ('{}': '{}')", + key_expr.as_str(), + value, + ); + query + .reply(key_expr.clone(), sbuf) + .await + .unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}")); + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/zenoh-rs-queryable")] + /// The key expression matching queries to reply to. + key: KeyExpr<'static>, + #[arg(short, long, default_value = "Queryable from SharedMemory Rust!")] + /// The value to reply to queries. + value: String, + #[arg(long)] + /// Declare the queryable as complete w.r.t. the key expression. + complete: bool, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, KeyExpr<'static>, String, bool) { + let args = Args::parse(); + (args.common.into(), args.key, args.value, args.complete) +} diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index 9914539ed5..bab31d4a2a 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -35,18 +35,37 @@ async fn main() { println!("Press CTRL-C to quit..."); while let Ok(sample) = subscriber.recv_async().await { - match sample.payload().deserialize::<&zsliceshm>() { - Ok(payload) => println!( - ">> [Subscriber] Received {} ('{}': '{:02x?}')", - sample.kind(), - sample.key_expr().as_str(), - payload - ), - Err(e) => { - println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); - } + print!( + ">> [Subscriber] Received {} ('{}': ", + sample.kind(), + sample.key_expr().as_str(), + ); + match sample.payload().deserialize::<&zshm>() { + Ok(payload) => print!("'{}'", String::from_utf8_lossy(payload)), + Err(e) => print!("'Not a SharedMemoryBuf: {:?}'", e), } + println!(")"); } + + // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber + // // holding a reference to the SHM buffer, then it will be able to get a mutable reference to it. + // // With the mutable reference at hand, it's possible to mutate in place the SHM buffer content. + // + // use zenoh::shm::zshmmut; + + // while let Ok(mut sample) = subscriber.recv_async().await { + // let kind = sample.kind(); + // let key_expr = sample.key_expr().to_string(); + // match sample.payload_mut().deserialize_mut::<&mut zshmmut>() { + // Ok(payload) => println!( + // ">> [Subscriber] Received {} ('{}': '{:02x?}')", + // kind, key_expr, payload + // ), + // Err(e) => { + // println!(">> [Subscriber] Not a SharedMemoryBuf: {:?}", e); + // } + // } + // } } #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] diff --git a/zenoh/src/api/bytes.rs b/zenoh/src/api/bytes.rs index fb32910b54..98afd1a3c3 100644 --- a/zenoh/src/api/bytes.rs +++ b/zenoh/src/api/bytes.rs @@ -30,9 +30,9 @@ use zenoh_protocol::{core::Properties, zenoh::ext::AttachmentType}; use zenoh_result::{ZError, ZResult}; #[cfg(all(feature = "shared-memory", feature = "unstable"))] use zenoh_shm::{ - api::slice::{ - zsliceshm::{zsliceshm, ZSliceShm}, - zsliceshmmut::{zsliceshmmut, ZSliceShmMut}, + api::buffer::{ + zshm::{zshm, ZShm}, + zshmmut::{zshmmut, ZShmMut}, }, SharedMemoryBuf, }; @@ -1526,47 +1526,47 @@ impl TryFrom<&mut ZBytes> for serde_pickle::Value { // Shared memory conversion #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl Serialize for ZSerde { +impl Serialize for ZSerde { type Output = ZBytes; - fn serialize(self, t: ZSliceShm) -> Self::Output { + fn serialize(self, t: ZShm) -> Self::Output { let slice: ZSlice = t.into(); ZBytes::new(slice) } } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl From for ZBytes { - fn from(t: ZSliceShm) -> Self { +impl From for ZBytes { + fn from(t: ZShm) -> Self { ZSerde.serialize(t) } } // Shared memory conversion #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl Serialize for ZSerde { +impl Serialize for ZSerde { type Output = ZBytes; - fn serialize(self, t: ZSliceShmMut) -> Self::Output { + fn serialize(self, t: ZShmMut) -> Self::Output { let slice: ZSlice = t.into(); ZBytes::new(slice) } } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl From for ZBytes { - fn from(t: ZSliceShmMut) -> Self { +impl From for ZBytes { + fn from(t: ZShmMut) -> Self { ZSerde.serialize(t) } } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl<'a> Deserialize<'a, &'a zsliceshm> for ZSerde { +impl<'a> Deserialize<'a, &'a zshm> for ZSerde { type Input = &'a ZBytes; type Error = ZDeserializeError; - fn deserialize(self, v: Self::Input) -> Result<&'a zsliceshm, Self::Error> { - // A ZSliceShm is expected to have only one slice + fn deserialize(self, v: Self::Input) -> Result<&'a zshm, Self::Error> { + // A ZShm is expected to have only one slice let mut zslices = v.0.zslices(); if let Some(zs) = zslices.next() { if let Some(shmb) = zs.downcast_ref::() { @@ -1578,7 +1578,7 @@ impl<'a> Deserialize<'a, &'a zsliceshm> for ZSerde { } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl<'a> TryFrom<&'a ZBytes> for &'a zsliceshm { +impl<'a> TryFrom<&'a ZBytes> for &'a zshm { type Error = ZDeserializeError; fn try_from(value: &'a ZBytes) -> Result { @@ -1587,7 +1587,7 @@ impl<'a> TryFrom<&'a ZBytes> for &'a zsliceshm { } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl<'a> TryFrom<&'a mut ZBytes> for &'a mut zsliceshm { +impl<'a> TryFrom<&'a mut ZBytes> for &'a mut zshm { type Error = ZDeserializeError; fn try_from(value: &'a mut ZBytes) -> Result { @@ -1596,11 +1596,11 @@ impl<'a> TryFrom<&'a mut ZBytes> for &'a mut zsliceshm { } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl<'a> Deserialize<'a, &'a mut zsliceshm> for ZSerde { +impl<'a> Deserialize<'a, &'a mut zshm> for ZSerde { type Input = &'a mut ZBytes; type Error = ZDeserializeError; - fn deserialize(self, v: Self::Input) -> Result<&'a mut zsliceshm, Self::Error> { + fn deserialize(self, v: Self::Input) -> Result<&'a mut zshm, Self::Error> { // A ZSliceShmBorrowMut is expected to have only one slice let mut zslices = v.0.zslices_mut(); if let Some(zs) = zslices.next() { @@ -1613,11 +1613,11 @@ impl<'a> Deserialize<'a, &'a mut zsliceshm> for ZSerde { } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl<'a> Deserialize<'a, &'a mut zsliceshmmut> for ZSerde { +impl<'a> Deserialize<'a, &'a mut zshmmut> for ZSerde { type Input = &'a mut ZBytes; type Error = ZDeserializeError; - fn deserialize(self, v: Self::Input) -> Result<&'a mut zsliceshmmut, Self::Error> { + fn deserialize(self, v: Self::Input) -> Result<&'a mut zshmmut, Self::Error> { // A ZSliceShmBorrowMut is expected to have only one slice let mut zslices = v.0.zslices_mut(); if let Some(zs) = zslices.next() { @@ -1630,7 +1630,7 @@ impl<'a> Deserialize<'a, &'a mut zsliceshmmut> for ZSerde { } #[cfg(all(feature = "shared-memory", feature = "unstable"))] -impl<'a> TryFrom<&'a mut ZBytes> for &'a mut zsliceshmmut { +impl<'a> TryFrom<&'a mut ZBytes> for &'a mut zshmmut { type Error = ZDeserializeError; fn try_from(value: &'a mut ZBytes) -> Result { @@ -1834,12 +1834,12 @@ mod tests { use zenoh_protocol::core::Properties; #[cfg(all(feature = "shared-memory", feature = "unstable"))] use zenoh_shm::api::{ + buffer::zshm::{zshm, ZShm}, protocol_implementations::posix::{ posix_shared_memory_provider_backend::PosixSharedMemoryProviderBackend, protocol_id::POSIX_PROTOCOL_ID, }, provider::shared_memory_provider::SharedMemoryProviderBuilder, - slice::zsliceshm::{zsliceshm, ZSliceShm}, }; use super::ZBytes; @@ -1967,9 +1967,9 @@ mod tests { let mutable_shm_buf = layout.alloc().res().unwrap(); // convert to immutable SHM buffer - let immutable_shm_buf: ZSliceShm = mutable_shm_buf.into(); + let immutable_shm_buf: ZShm = mutable_shm_buf.into(); - serialize_deserialize!(&zsliceshm, immutable_shm_buf); + serialize_deserialize!(&zshm, immutable_shm_buf); } // Properties diff --git a/zenoh/src/api/encoding.rs b/zenoh/src/api/encoding.rs index f1be92c7ac..29c65f837e 100644 --- a/zenoh/src/api/encoding.rs +++ b/zenoh/src/api/encoding.rs @@ -17,7 +17,7 @@ use phf::phf_map; use zenoh_buffers::{ZBuf, ZSlice}; use zenoh_protocol::core::EncodingId; #[cfg(feature = "shared-memory")] -use zenoh_shm::api::slice::{zsliceshm::ZSliceShm, zsliceshmmut::ZSliceShmMut}; +use zenoh_shm::api::buffer::{zshm::ZShm, zshmmut::ZShmMut}; use super::bytes::ZBytes; @@ -837,10 +837,10 @@ impl EncodingMapping for serde_pickle::Value { // - Zenoh SHM #[cfg(feature = "shared-memory")] -impl EncodingMapping for ZSliceShm { +impl EncodingMapping for ZShm { const ENCODING: Encoding = Encoding::ZENOH_BYTES; } #[cfg(feature = "shared-memory")] -impl EncodingMapping for ZSliceShmMut { +impl EncodingMapping for ZShmMut { const ENCODING: Encoding = Encoding::ZENOH_BYTES; } diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index e344237087..66de2e5700 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -93,6 +93,11 @@ impl Reply { self.result.as_ref() } + /// Gets the a mutable borrowed result of this `Reply`. Use [`Reply::into_result`] to take ownership of the result. + pub fn result_mut(&mut self) -> Result<&mut Sample, &mut Value> { + self.result.as_mut() + } + /// Converts this `Reply` into the its result. Use [`Reply::result`] it you don't want to take ownership. pub fn into_result(self) -> Result { self.result diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index e2343811db..0653c4433d 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -51,18 +51,11 @@ use super::{ use crate::net::primitives::Primitives; pub(crate) struct QueryInner { - /// The key expression of this Query. pub(crate) key_expr: KeyExpr<'static>, - /// This Query's selector parameters. pub(crate) parameters: Parameters<'static>, - /// This Query's body. - pub(crate) value: Option, - pub(crate) qid: RequestId, pub(crate) zid: ZenohId, pub(crate) primitives: Arc, - #[cfg(feature = "unstable")] - pub(crate) attachment: Option, } impl Drop for QueryInner { @@ -80,6 +73,9 @@ impl Drop for QueryInner { pub struct Query { pub(crate) inner: Arc, pub(crate) eid: EntityId, + pub(crate) value: Option, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, } impl Query { @@ -107,24 +103,43 @@ impl Query { /// This Query's value. #[inline(always)] pub fn value(&self) -> Option<&Value> { - self.inner.value.as_ref() + self.value.as_ref() + } + + /// This Query's value. + #[inline(always)] + pub fn value_mut(&mut self) -> Option<&mut Value> { + self.value.as_mut() } /// This Query's payload. #[inline(always)] pub fn payload(&self) -> Option<&ZBytes> { - self.inner.value.as_ref().map(|v| &v.payload) + self.value.as_ref().map(|v| &v.payload) + } + + /// This Query's payload. + #[inline(always)] + pub fn payload_mut(&mut self) -> Option<&mut ZBytes> { + self.value.as_mut().map(|v| &mut v.payload) } /// This Query's encoding. #[inline(always)] pub fn encoding(&self) -> Option<&Encoding> { - self.inner.value.as_ref().map(|v| &v.encoding) + self.value.as_ref().map(|v| &v.encoding) } + /// This Query's attachment. #[zenoh_macros::unstable] pub fn attachment(&self) -> Option<&ZBytes> { - self.inner.attachment.as_ref() + self.attachment.as_ref() + } + + /// This Query's attachment. + #[zenoh_macros::unstable] + pub fn attachment_mut(&mut self) -> Option<&mut ZBytes> { + self.attachment.as_mut() } /// Sends a reply in the form of [`Sample`] to this Query. diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 2551a2a0d9..f70f024677 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -306,6 +306,12 @@ impl Sample { &self.payload } + /// Gets the payload of this Sample. + #[inline] + pub fn payload_mut(&mut self) -> &mut ZBytes { + &mut self.payload + } + /// Gets the kind of this Sample. #[inline] pub fn kind(&self) -> SampleKind { @@ -352,6 +358,13 @@ impl Sample { pub fn attachment(&self) -> Option<&ZBytes> { self.attachment.as_ref() } + + /// Gets the sample attachment: a map of key-value pairs, where each key and value are byte-slices. + #[zenoh_macros::unstable] + #[inline] + pub fn attachment_mut(&mut self) -> Option<&mut ZBytes> { + self.attachment.as_mut() + } } impl From for Value { diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 2e718ecccb..018a3a085e 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1806,10 +1806,6 @@ impl Session { let query_inner = Arc::new(QueryInner { key_expr, parameters: parameters.to_owned().into(), - value: body.map(|b| Value { - payload: b.payload.into(), - encoding: b.encoding.into(), - }), qid, zid, primitives: if local { @@ -1817,13 +1813,17 @@ impl Session { } else { primitives }, - #[cfg(feature = "unstable")] - attachment, }); for (eid, callback) in queryables { callback(Query { inner: query_inner.clone(), eid, + value: body.as_ref().map(|b| Value { + payload: b.payload.clone().into(), + encoding: b.encoding.clone().into(), + }), + #[cfg(feature = "unstable")] + attachment: attachment.clone(), }); } } diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 58e17fc2ea..caf961984b 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -379,20 +379,34 @@ pub mod internal { #[cfg(all(feature = "unstable", feature = "shared-memory"))] pub mod shm { pub use zenoh_shm::api::{ - client_storage::SharedMemoryClientStorage, + buffer::{ + zshm::{zshm, ZShm}, + zshmmut::{zshmmut, ZShmMut}, + }, + client::{ + shared_memory_client::SharedMemoryClient, shared_memory_segment::SharedMemorySegment, + }, + client_storage::{SharedMemoryClientStorage, GLOBAL_CLIENT_STORAGE}, + common::types::{ChunkID, ProtocolID, SegmentID}, protocol_implementations::posix::{ - posix_shared_memory_provider_backend::PosixSharedMemoryProviderBackend, + posix_shared_memory_client::PosixSharedMemoryClient, + posix_shared_memory_provider_backend::{ + LayoutedPosixSharedMemoryProviderBackendBuilder, PosixSharedMemoryProviderBackend, + PosixSharedMemoryProviderBackendBuilder, + }, protocol_id::POSIX_PROTOCOL_ID, }, provider::{ shared_memory_provider::{ - BlockOn, Deallocate, Defragment, GarbageCollect, SharedMemoryProviderBuilder, + AllocBuilder, AllocLayout, AllocLayoutAlignedBuilder, AllocLayoutBuilder, + AllocLayoutSizedBuilder, AllocPolicy, AsyncAllocPolicy, BlockOn, DeallocEldest, + DeallocOptimal, DeallocYoungest, Deallocate, Defragment, DynamicProtocolID, + ForceDeallocPolicy, GarbageCollect, JustAlloc, ProtocolIDSource, + SharedMemoryProvider, SharedMemoryProviderBuilder, + SharedMemoryProviderBuilderBackendID, SharedMemoryProviderBuilderID, + StaticProtocolID, }, - types::{AllocAlignment, MemoryLayout}, - }, - slice::{ - zsliceshm::{zsliceshm, ZSliceShm}, - zsliceshmmut::{zsliceshmmut, ZSliceShmMut}, + types::{AllocAlignment, BufAllocResult, ChunkAllocResult, MemoryLayout, ZAllocError}, }, }; } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 8b53692ead..62f6b7c8b4 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -466,14 +466,14 @@ impl Primitives for AdminSpace { inner: Arc::new(QueryInner { key_expr: key_expr.clone(), parameters: query.parameters.into(), - value: query.ext_body.map(|b| Value::new(b.payload, b.encoding)), qid: msg.id, zid, primitives, - #[cfg(feature = "unstable")] - attachment: query.ext_attachment.map(Into::into), }), eid: self.queryable_id, + value: query.ext_body.map(|b| Value::new(b.payload, b.encoding)), + #[cfg(feature = "unstable")] + attachment: query.ext_attachment.map(Into::into), }; for (key, handler) in &self.handlers { diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index 54418d9f78..2ed94e6f47 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -26,7 +26,7 @@ // Reexport API in flat namespace pub(crate) mod flat { - #[cfg(feature = "shared-memory")] + #[cfg(all(feature = "unstable", feature = "shared-memory"))] pub use crate::shm::*; pub use crate::{ buffers::*, @@ -51,7 +51,7 @@ pub(crate) mod flat { // Reexport API in hierarchical namespace pub(crate) mod mods { - #[cfg(feature = "shared-memory")] + #[cfg(all(feature = "unstable", feature = "shared-memory"))] pub use crate::shm; pub use crate::{ buffers, bytes, config, core, encoding, handlers, key_expr, publication, query, queryable, diff --git a/zenoh/tests/bytes.rs b/zenoh/tests/bytes.rs new file mode 100644 index 0000000000..6de12ab63f --- /dev/null +++ b/zenoh/tests/bytes.rs @@ -0,0 +1,69 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#[test] +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +fn shm_bytes_single_buf() { + use zenoh::prelude::*; + + // create an SHM backend... + let backend = PosixSharedMemoryProviderBackend::builder() + .with_size(4096) + .unwrap() + .res() + .unwrap(); + // ...and an SHM provider + let provider = SharedMemoryProviderBuilder::builder() + .protocol_id::() + .backend(backend) + .res(); + + // Prepare a layout for allocations + let layout = provider.alloc_layout().size(1024).res().unwrap(); + + // allocate an SHM buffer (ZShmMut) + let owned_shm_buf_mut = layout.alloc().res().unwrap(); + + // convert into immutable owned buffer (ZShmMut -> ZSlceShm) + let owned_shm_buf: ZShm = owned_shm_buf_mut.into(); + + // convert again into mutable owned buffer (ZShm -> ZSlceShmMut) + let owned_shm_buf_mut: ZShmMut = owned_shm_buf.try_into().unwrap(); + + // build a ZBytes from an SHM buffer (ZShmMut -> ZBytes) + let mut payload: ZBytes = owned_shm_buf_mut.into(); + + // branch to illustrate immutable access to SHM data + { + // deserialize ZBytes as an immutably borrowed zshm (ZBytes -> &zshm) + let borrowed_shm_buf: &zshm = payload.deserialize().unwrap(); + + // construct owned buffer from borrowed type (&zshm -> ZShm) + let owned = borrowed_shm_buf.to_owned(); + + // try to construct mutable ZShmMut (ZShm -> ZShmMut) + let owned_mut: Result = owned.try_into(); + // the attempt fails because ZShm has two existing references ('owned' and inside 'payload') + assert!(owned_mut.is_err()) + } + + // branch to illustrate mutable access to SHM data + { + // deserialize ZBytes as mutably borrowed zshm (ZBytes -> &mut zshm) + let borrowed_shm_buf: &mut zshm = payload.deserialize_mut().unwrap(); + + // convert zshm to zshmmut (&mut zshm -> &mut zshmmut) + let _borrowed_shm_buf_mut: &mut zshmmut = borrowed_shm_buf.try_into().unwrap(); + } +} diff --git a/zenoh/tests/payload.rs b/zenoh/tests/payload.rs deleted file mode 100644 index fecf10a608..0000000000 --- a/zenoh/tests/payload.rs +++ /dev/null @@ -1,86 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#[test] -#[cfg(all(feature = "shared-memory", feature = "unstable"))] -fn shm_payload_single_buf() { - use zenoh::prelude::*; - - // create an SHM backend... - let backend = PosixSharedMemoryProviderBackend::builder() - .with_size(4096) - .unwrap() - .res() - .unwrap(); - // ...and an SHM provider - let provider = SharedMemoryProviderBuilder::builder() - .protocol_id::() - .backend(backend) - .res(); - - // Prepare a layout for allocations - let layout = provider.alloc_layout().size(1024).res().unwrap(); - - // allocate an SHM buffer - let mut owned_shm_buf_mut = layout.alloc().res().unwrap(); - - // get data - let _data: &[u8] = &owned_shm_buf_mut; - let _data_mut: &mut [u8] = &mut owned_shm_buf_mut; - - // convert into immutable owned buffer - let owned_shm_buf: ZSliceShm = owned_shm_buf_mut.into(); - - // get data - let _data: &[u8] = &owned_shm_buf; - - // convert again into mutable owned buffer - let mut owned_shm_buf_mut: ZSliceShmMut = owned_shm_buf.try_into().unwrap(); - - // get data - let _data: &[u8] = &owned_shm_buf_mut; - let _data_mut: &mut [u8] = &mut owned_shm_buf_mut; - - // build a ZBytes from an SHM buffer - let mut payload: ZBytes = owned_shm_buf_mut.into(); - - { - // deserialize ZBytes as borrowed zsliceshm - let borrowed_shm_buf: &zsliceshm = payload.deserialize().unwrap(); - - // get data - let _data: &[u8] = borrowed_shm_buf; - - // construct owned buffer from borrowed type - let owned = borrowed_shm_buf.to_owned(); - - // get data - let _data: &[u8] = &owned; - } - - { - // deserialize ZBytes as mutably borrowed zsliceshm - let borrowed_shm_buf: &mut zsliceshm = payload.deserialize_mut().unwrap(); - - // get data - let _data: &[u8] = borrowed_shm_buf; - - // convert zsliceshm to zsliceshmmut - let borrowed_shm_buf_mut: &mut zsliceshmmut = borrowed_shm_buf.try_into().unwrap(); - - // get data - let _data: &[u8] = borrowed_shm_buf_mut; - let _data_mut: &mut [u8] = borrowed_shm_buf_mut; - } -}