From 0dd89b794f462baa616fb8c0685348f43ae6f3a0 Mon Sep 17 00:00:00 2001 From: Ivan Pleshkov Date: Wed, 10 Jan 2024 09:51:56 +0000 Subject: [PATCH 1/4] copy wal fies using memcpy --- src/lib.rs | 12 ++++++++++++ src/segment.rs | 16 ++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index fe79732..a4b7fd1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -471,6 +471,18 @@ impl Wal { pub fn clear(&mut self) -> Result<()> { self.truncate(self.first_index()) } + + /// copy all files to the given path directory + pub fn save_to_path

(&self, path: P) -> Result<()> + where + P: AsRef, + { + self.open_segment.segment.save_to_path(&path)?; + for segment in &self.closed_segments { + segment.segment.save_to_path(&path)?; + } + Ok(()) + } } impl fmt::Debug for Wal { diff --git a/src/segment.rs b/src/segment.rs index 9590aa5..5decdd4 100644 --- a/src/segment.rs +++ b/src/segment.rs @@ -643,6 +643,22 @@ impl Segment { } } } + + pub fn save_to_path

(&self, path: P) -> Result<()> + where + P: AsRef, + { + let file_name = self.path.file_name().unwrap(); + let dst_path = path.as_ref().to_owned().join(file_name); + let mut other = Self::create(dst_path, self.capacity())?; + unsafe { + other + .mmap + .as_mut_slice() + .copy_from_slice(self.mmap.as_slice()); + } + Ok(()) + } } impl fmt::Debug for Segment { From 3148553be224fb9a58b064b417061747c757f213 Mon Sep 17 00:00:00 2001 From: Ivan Pleshkov Date: Mon, 15 Jan 2024 23:55:18 +0000 Subject: [PATCH 2/4] copy files that are not locked by segments --- src/lib.rs | 39 ++++++++++++++++++++++++++++++++++----- src/segment.rs | 6 ++---- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a4b7fd1..55646ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ use crossbeam_channel::{Receiver, Sender}; use fs4::FileExt; use log::{debug, info, trace, warn}; use std::cmp::Ordering; +use std::collections::HashMap; use std::fmt; use std::fs::{self, File}; use std::io::{Error, ErrorKind, Result}; @@ -472,14 +473,42 @@ impl Wal { self.truncate(self.first_index()) } - /// copy all files to the given path directory - pub fn save_to_path

(&self, path: P) -> Result<()> + /// Copy all files to the given path directory. directory should exist and be empty + pub fn copy_to_path

(&self, path: P) -> Result<()> where P: AsRef, { - self.open_segment.segment.save_to_path(&path)?; - for segment in &self.closed_segments { - segment.segment.save_to_path(&path)?; + if fs::read_dir(path.as_ref())?.next().is_some() { + return Err(Error::new( + ErrorKind::AlreadyExists, + format!("path {:?} not empty", path.as_ref()), + )); + }; + + let open_segment_file = self.open_segment.segment.path().file_name().unwrap(); + let close_segment_files: HashMap<_, _> = self + .closed_segments + .iter() + .map(|segment| (segment.segment.path().file_name().unwrap(), &segment.segment)) + .collect(); + + for entry in fs::read_dir(self.path())? { + let entry = entry?; + if !entry.metadata()?.is_file() { + continue; + } + + // if file is locked by any Segment, call copy_to_path on it + let entry_file_name = entry.file_name(); + let dst_path = path.as_ref().to_owned().join(entry_file_name.clone()); + if entry_file_name == open_segment_file { + self.open_segment.segment.copy_to_path(&dst_path)?; + } else if let Some(segment) = close_segment_files.get(entry_file_name.as_os_str()) { + segment.copy_to_path(&dst_path)?; + } else { + // if file is not locked by any Segment, just copy it + fs::copy(&entry.path(), &dst_path)?; + } } Ok(()) } diff --git a/src/segment.rs b/src/segment.rs index 5decdd4..173e145 100644 --- a/src/segment.rs +++ b/src/segment.rs @@ -644,13 +644,11 @@ impl Segment { } } - pub fn save_to_path

(&self, path: P) -> Result<()> + pub(crate) fn copy_to_path

(&self, path: P) -> Result<()> where P: AsRef, { - let file_name = self.path.file_name().unwrap(); - let dst_path = path.as_ref().to_owned().join(file_name); - let mut other = Self::create(dst_path, self.capacity())?; + let mut other = Self::create(path, self.capacity())?; unsafe { other .mmap From 12a9718f6531c782346c666a3703d874aca7a3d1 Mon Sep 17 00:00:00 2001 From: Ivan Pleshkov Date: Tue, 16 Jan 2024 00:01:11 +0000 Subject: [PATCH 3/4] are you happy fmt --- src/lib.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 55646ce..a3c7f4a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -489,7 +489,12 @@ impl Wal { let close_segment_files: HashMap<_, _> = self .closed_segments .iter() - .map(|segment| (segment.segment.path().file_name().unwrap(), &segment.segment)) + .map(|segment| { + ( + segment.segment.path().file_name().unwrap(), + &segment.segment, + ) + }) .collect(); for entry in fs::read_dir(self.path())? { From 4cae86c8baa0042452e57e693664c4671f296465 Mon Sep 17 00:00:00 2001 From: Ivan Pleshkov Date: Tue, 16 Jan 2024 09:47:59 +0000 Subject: [PATCH 4/4] are you happy clippy --- src/lib.rs | 10 +++++----- src/segment.rs | 7 +++++++ 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a3c7f4a..0a032fb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -159,7 +159,7 @@ impl Wal { // Validate the closed segments. They must be non-overlapping, and contiguous. closed_segments.sort_by(|a, b| a.start_index.cmp(&b.start_index)); let mut next_start_index = closed_segments - .get(0) + .first() .map_or(0, |segment| segment.start_index); for &ClosedSegment { start_index, @@ -394,7 +394,7 @@ impl Wal { if until <= self .closed_segments - .get(0) + .first() .map_or(0, |segment| segment.start_index) { // Do nothing, the first entry is already greater than `until`. @@ -450,7 +450,7 @@ impl Wal { self.open_segment_start_index() - self .closed_segments - .get(0) + .first() .map_or(0, |segment| segment.start_index) + self.open_segment.segment.len() as u64 } @@ -458,7 +458,7 @@ impl Wal { /// The index of the first entry. pub fn first_index(&self) -> u64 { self.closed_segments - .get(0) + .first() .map_or(0, |segment| segment.start_index) } @@ -523,7 +523,7 @@ impl fmt::Debug for Wal { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let start_index = self .closed_segments - .get(0) + .first() .map_or(0, |segment| segment.start_index); let end_index = self.open_segment_start_index() + self.open_segment.segment.len() as u64; write!( diff --git a/src/segment.rs b/src/segment.rs index 173e145..f0e76fb 100644 --- a/src/segment.rs +++ b/src/segment.rs @@ -648,6 +648,13 @@ impl Segment { where P: AsRef, { + if path.as_ref().exists() { + return Err(Error::new( + ErrorKind::AlreadyExists, + format!("Path {:?} already exists", path.as_ref()), + )); + } + let mut other = Self::create(path, self.capacity())?; unsafe { other