From dab1275acbdb81079340aa13964632ab81f607ea Mon Sep 17 00:00:00 2001 From: Alex Bocharov Date: Tue, 13 Aug 2024 14:54:22 -0500 Subject: [PATCH] Implement customizable sleep duration in nanoseconds used by writer during lock acquisition (default 1s). --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/state.rs | 11 +++++++---- src/synchronizer.rs | 19 +++++++++++++++---- 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fc53d2b..4dbcfec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -586,7 +586,7 @@ dependencies = [ [[package]] name = "mmap-sync" -version = "1.0.3" +version = "1.0.4" dependencies = [ "bytecheck", "criterion", diff --git a/Cargo.toml b/Cargo.toml index 33dc2c7..c00de5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mmap-sync" -version = "1.0.3" +version = "1.0.4" edition = "2021" authors = ["Alex Bocharov "] description = "A Rust package allowing sharing of data between processes in a wait-free and zero-copy fashion from mapped memory." diff --git a/src/state.rs b/src/state.rs index 2cce51f..3efd7b4 100644 --- a/src/state.rs +++ b/src/state.rs @@ -14,11 +14,10 @@ use crate::synchronizer::SynchronizerError; use crate::synchronizer::SynchronizerError::*; const STATE_SIZE: usize = mem::size_of::(); -const SLEEP_DURATION: Duration = Duration::from_secs(1); /// State stored in memory for synchronization using atomics #[repr(C)] -pub(crate) struct State { +pub(crate) struct State { /// Current data instance version version: AtomicU64, /// Current number of readers for each data instance @@ -48,7 +47,11 @@ impl State { /// Acquire next `idx` of the state for writing #[inline] - pub(crate) fn acquire_next_idx(&self, grace_duration: Duration) -> (usize, bool) { + pub(crate) fn acquire_next_idx( + &self, + grace_duration: Duration, + sleep_duration: Duration, + ) -> (usize, bool) { // calculate `next_idx` to acquire, in case of uninitialized version use 0 let next_idx = match InstanceVersion::try_from(self.version.load(Ordering::SeqCst)) { Ok(version) => (version.idx() + 1) % 2, @@ -69,7 +72,7 @@ impl State { reset = true; break; } else { - thread::sleep(SLEEP_DURATION); + thread::sleep(sleep_duration); } } diff --git a/src/synchronizer.rs b/src/synchronizer.rs index f6d8890..0bc3c66 100644 --- a/src/synchronizer.rs +++ b/src/synchronizer.rs @@ -24,7 +24,16 @@ use crate::synchronizer::SynchronizerError::*; /// `Synchronizer` is a concurrency primitive that manages data access between a single writer process and multiple reader processes. /// /// It coordinates the access to two data files that store the shared data. A state file, also memory-mapped, stores the index of the current data file and the number of active readers for each index, updated via atomic instructions. -pub struct Synchronizer { +/// +/// Template parameters: +/// - `H` - hasher used for checksum calculation +/// - `N` - serializer scratch space size +/// - `SD` - sleep duration in nanoseconds used by writer during lock acquisition (default 1s) +pub struct Synchronizer< + H: Hasher + Default = WyHash, + const N: usize = 1024, + const SD: u64 = 1_000_000_000, +> { /// Container storing state mmap state_container: StateContainer, /// Container storing data mmap @@ -70,7 +79,7 @@ impl Synchronizer { } } -impl Synchronizer { +impl Synchronizer { /// Create new instance of `Synchronizer` using given `path_prefix` and template parameters pub fn with_params(path_prefix: &OsStr) -> Self { Synchronizer { @@ -133,7 +142,8 @@ impl Synchronizer { let checksum = hasher.finish(); // acquire next available data file idx and write data to it - let (new_idx, reset) = state.acquire_next_idx(grace_duration); + let acquire_sleep_duration = Duration::from_nanos(SD); + let (new_idx, reset) = state.acquire_next_idx(grace_duration, acquire_sleep_duration); let new_version = InstanceVersion::new(new_idx, data.len(), checksum)?; let size = self.data_container.write(&data, new_version)?; @@ -167,7 +177,8 @@ impl Synchronizer { let checksum = hasher.finish(); // acquire next available data file idx and write data to it - let (new_idx, reset) = state.acquire_next_idx(grace_duration); + let acquire_sleep_duration = Duration::from_nanos(SD); + let (new_idx, reset) = state.acquire_next_idx(grace_duration, acquire_sleep_duration); let new_version = InstanceVersion::new(new_idx, data.len(), checksum)?; let size = self.data_container.write(data, new_version)?;