Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement customizable sleep duration in nanoseconds used by writer during lock acquisition (default 1s). #14

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mmap-sync"
version = "1.0.3"
version = "1.0.4"
edition = "2021"
authors = ["Alex Bocharov <[email protected]>"]
description = "A Rust package allowing sharing of data between processes in a wait-free and zero-copy fashion from mapped memory."
Expand Down
11 changes: 7 additions & 4 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ use crate::synchronizer::SynchronizerError;
use crate::synchronizer::SynchronizerError::*;

const STATE_SIZE: usize = mem::size_of::<State>();
const SLEEP_DURATION: Duration = Duration::from_secs(1);

/// State stored in memory for synchronization using atomics
#[repr(C)]
pub(crate) struct State {
pub(crate) struct State<const SD: usize = 1_000_000_000> {
/// Current data instance version
version: AtomicU64,
/// Current number of readers for each data instance
Expand Down Expand Up @@ -48,7 +47,11 @@ impl State {

/// Acquire next `idx` of the state for writing
#[inline]
pub(crate) fn acquire_next_idx(&self, grace_duration: Duration) -> (usize, bool) {
pub(crate) fn acquire_next_idx(
&self,
grace_duration: Duration,
sleep_duration: Duration,
) -> (usize, bool) {
// calculate `next_idx` to acquire, in case of uninitialized version use 0
let next_idx = match InstanceVersion::try_from(self.version.load(Ordering::SeqCst)) {
Ok(version) => (version.idx() + 1) % 2,
Expand All @@ -69,7 +72,7 @@ impl State {
reset = true;
break;
} else {
thread::sleep(SLEEP_DURATION);
thread::sleep(sleep_duration);
}
}

Expand Down
19 changes: 15 additions & 4 deletions src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,16 @@ use crate::synchronizer::SynchronizerError::*;
/// `Synchronizer` is a concurrency primitive that manages data access between a single writer process and multiple reader processes.
///
/// It coordinates the access to two data files that store the shared data. A state file, also memory-mapped, stores the index of the current data file and the number of active readers for each index, updated via atomic instructions.
pub struct Synchronizer<H: Hasher + Default = WyHash, const N: usize = 1024> {
///
/// Template parameters:
/// - `H` - hasher used for checksum calculation
/// - `N` - serializer scratch space size
/// - `SD` - sleep duration in nanoseconds used by writer during lock acquisition (default 1s)
pub struct Synchronizer<
H: Hasher + Default = WyHash,
const N: usize = 1024,
const SD: u64 = 1_000_000_000,
> {
/// Container storing state mmap
state_container: StateContainer,
/// Container storing data mmap
Expand Down Expand Up @@ -70,7 +79,7 @@ impl Synchronizer {
}
}

impl<H: Hasher + Default, const N: usize> Synchronizer<H, N> {
impl<H: Hasher + Default, const N: usize, const SD: u64> Synchronizer<H, N, SD> {
/// Create new instance of `Synchronizer` using given `path_prefix` and template parameters
pub fn with_params(path_prefix: &OsStr) -> Self {
Synchronizer {
Expand Down Expand Up @@ -133,7 +142,8 @@ impl<H: Hasher + Default, const N: usize> Synchronizer<H, N> {
let checksum = hasher.finish();

// acquire next available data file idx and write data to it
let (new_idx, reset) = state.acquire_next_idx(grace_duration);
let acquire_sleep_duration = Duration::from_nanos(SD);
let (new_idx, reset) = state.acquire_next_idx(grace_duration, acquire_sleep_duration);
let new_version = InstanceVersion::new(new_idx, data.len(), checksum)?;
let size = self.data_container.write(&data, new_version)?;

Expand Down Expand Up @@ -167,7 +177,8 @@ impl<H: Hasher + Default, const N: usize> Synchronizer<H, N> {
let checksum = hasher.finish();

// acquire next available data file idx and write data to it
let (new_idx, reset) = state.acquire_next_idx(grace_duration);
let acquire_sleep_duration = Duration::from_nanos(SD);
let (new_idx, reset) = state.acquire_next_idx(grace_duration, acquire_sleep_duration);
let new_version = InstanceVersion::new(new_idx, data.len(), checksum)?;
let size = self.data_container.write(data, new_version)?;

Expand Down
Loading