diff --git a/cli/src/analyzer.rs b/cli/src/analyzer.rs index f35fb1fa..f3a3f4b1 100644 --- a/cli/src/analyzer.rs +++ b/cli/src/analyzer.rs @@ -187,7 +187,7 @@ impl Analyzer { .with_file_extension("clog".to_string()); // Open the commit log - let mut destination_commit_log = + let destination_commit_log = Aol::open(&destination_clog_subdir, &copts).map_err(Error::from)?; // Write latest versions to new location diff --git a/src/compaction.rs b/src/compaction.rs index cb4bc443..c3689872 100644 --- a/src/compaction.rs +++ b/src/compaction.rs @@ -66,7 +66,7 @@ impl Store { let oracle_lock = oracle.write_lock.lock(); // Rotate the commit log and get the new segment ID - let mut clog = self.core.clog.as_ref().unwrap().write(); + let clog = self.core.clog.as_ref().unwrap().write(); let new_segment_id = clog.rotate()?; let last_updated_segment_id = new_segment_id - 1; drop(clog); // Explicitly drop the lock @@ -75,7 +75,7 @@ impl Store { fs::create_dir_all(&tmp_merge_dir)?; // Initialize a new manifest in the temporary directory - let mut manifest = Core::initialize_manifest(&tmp_merge_dir)?; + let manifest = Core::initialize_manifest(&tmp_merge_dir)?; // Add the last updated segment ID to the manifest let changeset = Manifest::with_compacted_up_to_segment(last_updated_segment_id); manifest.append(&changeset.serialize()?)?; @@ -86,7 +86,7 @@ impl Store { let tm_opts = LogOptions::default() .with_max_file_size(self.core.opts.max_compaction_segment_size) .with_file_extension("clog".to_string()); - let mut temp_writer = Aol::open(&temp_clog_dir, &tm_opts)?; + let temp_writer = Aol::open(&temp_clog_dir, &tm_opts)?; // TODO: Check later to add a new way for compaction by reading from the files first and then // check in files for the keys that are not found in memory to handle deletion @@ -101,7 +101,7 @@ impl Store { // Do compaction and write // Define a closure for writing entries to the temporary commit log - let mut write_entry = |key: &[u8], + let write_entry = |key: &[u8], value: Vec, version: u64, ts: u64, diff --git a/src/log/aol.rs b/src/log/aol.rs index 02b230e2..03a157f5 100644 --- a/src/log/aol.rs +++ b/src/log/aol.rs @@ -1,26 +1,34 @@ use std::{ - fs, io, mem, + fs, io, path::{Path, PathBuf}, - sync::atomic::{AtomicBool, Ordering}, - sync::Arc, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, + }, }; +use parking_lot::Mutex; use quick_cache::sync::Cache; use crate::log::{ fd::SegmentReaderPool, get_segment_range, Error, IOError, Options, Result, Segment, }; +/// Writer state that needs exclusive access +pub(crate) struct WriterState { + /// The currently active segment where data is being written. + pub(crate) active_segment: Segment, + /// The ID of the currently active segment. + pub(crate) active_segment_id: AtomicU64, +} + /// Append-Only Log (Aol) is a data structure used to sequentially store records /// in a series of segments. It provides efficient write operations, /// making it suitable for use cases like storing large amounts of data and /// writing data in a sequential manner. pub struct Aol { - /// The currently active segment where data is being written. - pub(crate) active_segment: Segment, - - /// The ID of the currently active segment. - pub(crate) active_segment_id: u64, + /// The write state protected by a mutex + pub(crate) writer_state: Mutex, /// The directory where the segment files are located. pub(crate) dir: PathBuf, @@ -29,7 +37,7 @@ pub struct Aol { pub(crate) opts: Options, /// A flag indicating whether the AOL instance is closed or not. - closed: bool, + closed: AtomicBool, /// A lock-free cache used to store recently used segments. /// Cache stores pools of segment readers @@ -57,16 +65,17 @@ impl Aol { // Open the active segment let active_segment = Segment::open(dir, active_segment_id, opts, false)?; - // Create the lock-free cache with specified capacity - let segment_cache = Cache::new(opts.max_cached_segments); + let writer_state = WriterState { + active_segment, + active_segment_id: AtomicU64::new(active_segment_id), + }; Ok(Self { - active_segment, - active_segment_id, + writer_state: Mutex::new(writer_state), dir: dir.to_path_buf(), opts: opts.clone(), - closed: false, - segment_cache, + closed: AtomicBool::new(false), + segment_cache: Cache::new(opts.max_cached_segments), fsync_failed: Default::default(), }) } @@ -102,8 +111,8 @@ impl Aol { } /// Appends a record to the active segment. - pub fn append(&mut self, rec: &[u8]) -> Result<(u64, u64, usize)> { - if self.closed { + pub fn append(&self, rec: &[u8]) -> Result<(u64, u64, usize)> { + if self.closed.load(Ordering::Acquire) { return Err(Error::SegmentClosed); } @@ -118,52 +127,52 @@ impl Aol { return Err(Error::RecordTooLarge); } + // Take write lock only when needed + let mut writer = self.writer_state.lock(); + + // Check if we need to rotate + let current_offset = writer.active_segment.offset(); // Calculate available space in the active segment - let current_offset = self.active_segment.offset(); let available = self.opts.max_file_size as i64 - current_offset as i64; // If the entire record can't fit into the remaining space of the current segment, // close the current segment and create a new one if available < rec.len() as i64 { - // Rotate to a new segment - let current_id = self.active_segment_id; - let new_id = current_id + 1; - + // Rotate to new segment // Sync and close the active segment // Note that closing the segment will // not close the underlying file until // it is dropped. - self.active_segment.close()?; + writer.active_segment.close()?; - // Increment the active segment id - self.active_segment_id = new_id; + // Increment active segment id and get the new id + let new_id = writer.active_segment_id.fetch_add(1, Ordering::AcqRel); - // Open a new segment for writing - let new_segment = Segment::open(&self.dir, new_id, &self.opts, false)?; - - // Retrieve the previous active segment and replace it with the new one - let _ = mem::replace(&mut self.active_segment, new_segment); + // Open new segment with the incremented id + let new_segment = Segment::open(&self.dir, new_id + 1, &self.opts, false)?; + writer.active_segment = new_segment; } // Write the record to the segment - let result = self.active_segment.append(rec); - let offset = match result { - Ok(off) => off, + match writer.active_segment.append(rec) { + Ok(offset) => { + let segment_id = writer.active_segment_id.load(Ordering::Acquire); + Ok((segment_id, offset, rec.len())) + } Err(e) => { if let Error::IO(_) = e { self.set_fsync_failed(true); } - return Err(e); + Err(e) } - }; - - Ok((self.active_segment_id, offset, rec.len())) + } } /// Flushes and syncs the active segment. - pub fn sync(&mut self) -> Result<()> { + pub fn sync(&self) -> Result<()> { self.check_if_fsync_failed()?; - self.active_segment.sync() + let writer = self.writer_state.lock(); + writer.active_segment.sync() } /// Reads data from the segment at the specified offset into the provided buffer. @@ -182,76 +191,60 @@ impl Aol { ))); } - let mut r = 0; - - // Read data from the appropriate segment - match self.read_segment_data(&mut buf[r..], segment_id, read_offset) { - Ok(bytes_read) => { - r += bytes_read; - } - Err(e) => match e { - Error::Eof => { - return Err(Error::Eof); - } - _ => return Err(e), - }, - } - - Ok((segment_id, r)) - } + // Use segment cache for all reads, including active segment + let pool = self + .segment_cache + .get_or_insert_with(&segment_id, || { + Ok::<_, std::io::Error>(Arc::new( + SegmentReaderPool::new( + self.dir.clone(), + segment_id, + self.opts.clone(), + self.opts.max_file_descriptor_per_segment, + ) + .unwrap(), + )) + }) + .unwrap(); - // Helper function to read data from the appropriate segment - fn read_segment_data( - &self, - buf: &mut [u8], - segment_id: u64, - read_offset: u64, - ) -> Result { - if segment_id == self.active_segment.id { - self.active_segment.read_at(buf, read_offset) - } else { - // Get or create the segment reader pool - let pool = self - .segment_cache - .get_or_insert_with(&segment_id, || { - Ok::<_, std::io::Error>(Arc::new( - SegmentReaderPool::new( - self.dir.clone(), - segment_id, - self.opts.clone(), - self.opts.max_file_descriptor_per_segment, - ) - .unwrap(), - )) - }) - .unwrap(); - - // Acquire reader from pool - let reader = pool.acquire_reader()?; - - // Use reader and return it to pool on drop - reader.segment.as_ref().unwrap().read_at(buf, read_offset) + let reader = pool.acquire_reader()?; + match reader.segment.as_ref().unwrap().read_at(buf, read_offset) { + Ok(bytes_read) => Ok((segment_id, bytes_read)), + Err(Error::Eof) => Err(Error::Eof), + Err(e) => Err(e), } } - pub fn close(&mut self) -> Result<()> { - self.active_segment.close()?; + pub fn close(&self) -> Result<()> { + let writer = self.writer_state.lock(); + writer.active_segment.close()?; // Clear segment cache to ensure all pools are dropped self.segment_cache.clear(); - self.closed = true; + self.closed.store(true, Ordering::Release); Ok(()) } - pub fn rotate(&mut self) -> Result { - self.active_segment.close()?; - self.active_segment_id += 1; - self.active_segment = Segment::open(&self.dir, self.active_segment_id, &self.opts, false)?; - Ok(self.active_segment_id) + pub fn rotate(&self) -> Result { + let mut writer = self.writer_state.lock(); + + // Close current segment + writer.active_segment.close()?; + + // Increment active segment id and get the new id + let new_id = writer.active_segment_id.fetch_add(1, Ordering::AcqRel); + + // Open new segment with the incremented id + writer.active_segment = Segment::open(&self.dir, new_id + 1, &self.opts, false)?; + + Ok(new_id + 1) } pub fn size(&self) -> Result { - let cur_segment_size = self.active_segment.file_offset(); - let total_size = (self.active_segment_id * self.opts.max_file_size) + cur_segment_size; + let writer = self.writer_state.lock(); + let cur_segment_size = writer.active_segment.file_offset(); + let total_size = (writer.active_segment_id.load(Ordering::Acquire) + * self.opts.max_file_size) + + cur_segment_size; Ok(total_size) } @@ -293,6 +286,14 @@ mod tests { TempDir::new("test").unwrap() } + fn get_writer_state(aol: &Aol) -> (u64, u64) { + let writer = aol.writer_state.lock(); + ( + writer.active_segment_id.load(Ordering::Acquire), + writer.active_segment.offset(), + ) + } + #[test] fn append() { // Create a temporary directory @@ -300,10 +301,10 @@ mod tests { // Create aol options and open a aol file let opts = Options::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); // Test initial offset - let sz = (a.active_segment_id, a.active_segment.offset()); + let sz = get_writer_state(&a); assert_eq!(0, sz.0); assert_eq!(0, sz.1); @@ -321,7 +322,7 @@ mod tests { assert!(r.is_ok()); assert_eq!(7, r.unwrap().2); - let (segment_id, offset) = (a.active_segment_id, a.active_segment.offset()); + let (segment_id, offset) = get_writer_state(&a); // Validate offset after appending // 4 + 7 = 11 @@ -341,15 +342,16 @@ mod tests { // Test reading beyond segment's current size let mut bs = vec![0; 15]; - let r = a.read_at(&mut bs, segment_id, 4097); - assert!(r.is_err()); + let r = a.read_at(&mut bs, segment_id, 4097).unwrap(); + assert_eq!(r, (0, 0)); + assert_eq!(bs, vec![0; 15]); // Test appending another buffer let r = a.append(&[11, 12, 13, 14]); assert!(r.is_ok()); assert_eq!(4, r.unwrap().2); - let (segment_id, offset) = (a.active_segment_id, a.active_segment.offset()); + let (segment_id, offset) = get_writer_state(&a); // Validate offset after appending // 11 + 4 = 15 assert_eq!(offset, 15); @@ -371,7 +373,7 @@ mod tests { // Create aol options and open a aol file let opts = Options::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); // Create two slices of bytes of different sizes let data1 = vec![1; 31 * 1024]; @@ -387,7 +389,7 @@ mod tests { assert!(r2.is_ok()); assert_eq!(2 * 1024, r2.unwrap().2); - let (segment_id, _) = (a.active_segment_id, a.active_segment.offset()); + let (segment_id, _) = get_writer_state(&a); // Read the first data slice back from the aol let mut read_data1 = vec![0; 31 * 1024]; @@ -416,7 +418,7 @@ mod tests { // Create aol options and open a aol file let opts = Options::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); // Create two slices of bytes of different sizes let data1 = vec![1; 31 * 1024]; @@ -436,7 +438,7 @@ mod tests { // Read the first data slice back from the aol let mut read_data1 = vec![0; 31 * 1024]; - let (segment_id, _) = (a.active_segment_id, a.active_segment.offset()); + let (segment_id, _) = get_writer_state(&a); let n1 = a .read_at(&mut read_data1, segment_id, 0) .expect("should read"); @@ -453,7 +455,7 @@ mod tests { assert!(r4.is_ok()); assert_eq!(1024, r4.unwrap().2); - let (segment_id, _) = (a.active_segment_id, a.active_segment.offset()); + let (segment_id, _) = get_writer_state(&a); // Read the first data slice back from the aol let mut read_data1 = vec![0; 31 * 1024]; @@ -485,19 +487,19 @@ mod tests { max_file_size: 1024, ..Default::default() }; - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); let large_record = vec![1; 1025]; let small_record = vec![1; 1024]; let r = a.append(&small_record); assert!(r.is_ok()); - let (segment_id, offset) = (a.active_segment_id, a.active_segment.offset()); + let (segment_id, offset) = get_writer_state(&a); assert_eq!(0, segment_id); assert_eq!(1024, offset); let r = a.append(&large_record); assert!(r.is_err()); - let (segment_id, offset) = (a.active_segment_id, a.active_segment.offset()); + let (segment_id, offset) = get_writer_state(&a); assert_eq!(0, segment_id); assert_eq!(1024, offset); } @@ -512,12 +514,12 @@ mod tests { max_file_size: 1024, ..Default::default() }; - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); let small_record = vec![1; 1024]; let r = a.append(&small_record); assert!(r.is_ok()); - let (segment_id, offset) = (a.active_segment_id, a.active_segment.offset()); + let (segment_id, offset) = get_writer_state(&a); assert_eq!(1024, offset); // Simulate fsync failure @@ -543,29 +545,29 @@ mod tests { max_file_size: 1024, ..Default::default() }; - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); let large_record = vec![1; 1024]; let small_record = vec![1; 512]; let r = a.append(&large_record); assert!(r.is_ok()); - let (segment_id, offset) = (a.active_segment_id, a.active_segment.offset()); + let (segment_id, offset) = get_writer_state(&a); assert_eq!(0, segment_id); assert_eq!(1024, offset); - assert_eq!(0, a.active_segment_id); + assert_eq!(0, get_writer_state(&a).0); a.close().expect("should close"); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); - assert_eq!(0, a.active_segment_id); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + assert_eq!(0, get_writer_state(&a).0); let r = a.append(&small_record); assert!(r.is_ok()); - let (segment_id, offset) = (a.active_segment_id, a.active_segment.offset()); + let (segment_id, offset) = get_writer_state(&a); assert_eq!(1, segment_id); assert_eq!(512, offset); - assert_eq!(1, a.active_segment_id); + assert_eq!(1, get_writer_state(&a).0); } #[test] @@ -578,13 +580,13 @@ mod tests { max_file_size: 1024, // Small enough to ensure the second append creates a new file ..Default::default() }; - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); // Append a record that fits within the first file let first_record = vec![1; 512]; a.append(&first_record) .expect("first append should succeed"); - let (first_segment_id, first_offset) = (a.active_segment_id, a.active_segment.offset()); + let (first_segment_id, first_offset) = get_writer_state(&a); assert_eq!(0, first_segment_id); assert_eq!(512, first_offset); @@ -592,7 +594,7 @@ mod tests { let second_record = vec![2; 1024]; // This will exceed the max_file_size a.append(&second_record) .expect("second append should succeed"); - let (second_segment_id, second_offset) = (a.active_segment_id, a.active_segment.offset()); + let (second_segment_id, second_offset) = get_writer_state(&a); assert_eq!(1, second_segment_id); // Expecting a new segment/file assert_eq!(1024, second_offset); @@ -610,11 +612,11 @@ mod tests { } #[test] - fn read_beyond_current_offset_should_fail() { + fn read_beyond_current_offset_should_not_fail() { // Setup: Create a temporary directory and initialize the log with default options let temp_dir = create_temp_directory(); let opts = Options::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); // Append a single record to ensure there is some data in the log let record = vec![1; 512]; @@ -625,10 +627,8 @@ mod tests { let result = a.read_at(&mut read_buf, 0, 1024); // Attempt to read at an offset beyond the single appended record // Verify: The read operation should fail or return an error indicating the offset is out of bounds - assert!( - result.is_err(), - "Reading beyond the current offset should fail" - ); + assert_eq!(result.unwrap(), (0, 0)); + assert_eq!(read_buf, vec![0; 1024]); } #[test] @@ -640,13 +640,13 @@ mod tests { // Step 1: Open the log, append a record, and then close it { - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); a.append(&record).expect("append should succeed"); } // Log is closed here as `a` goes out of scope // Step 2: Reopen the log and append another record { - let mut a = Aol::open(temp_dir.path(), &opts).expect("should reopen aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should reopen aol"); a.append(&record) .expect("append after reopen should succeed"); @@ -674,14 +674,14 @@ mod tests { // Step 1: Open the log, append a record, and then close it { - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); a.append(&record).expect("first append should succeed"); a.append(&record).expect("first append should succeed"); } // Log is closed here as `a` goes out of scope // Step 2: Reopen the log and append another record, which should create a new file { - let mut a = Aol::open(temp_dir.path(), &opts).expect("should reopen aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should reopen aol"); // Verify: Ensure the first record is in a new file by reading it back let mut read_buf = vec![0; 512]; @@ -713,7 +713,7 @@ mod tests { fn sequential_read_performance() { let temp_dir = create_temp_directory(); let opts = Options::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); // Append 1000 records to ensure we have enough data let record = vec![1; 512]; // Each record is 512 bytes @@ -740,7 +740,7 @@ mod tests { fn random_access_read_performance() { let temp_dir = create_temp_directory(); let opts = Options::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); // Append 1000 records to ensure we have enough data let record = vec![1; 512]; // Each record is 512 bytes @@ -773,20 +773,20 @@ mod tests { fn test_rotate_functionality() { let temp_dir = create_temp_directory(); let opts = Options::default(); - let mut aol = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let aol = Aol::open(temp_dir.path(), &opts).expect("should create aol"); // Ensure there's data in the current segment to necessitate a rotation aol.append(b"data 1").unwrap(); // Capture the current active_segment_id before rotation - let current_segment_id = aol.active_segment_id; + let current_segment_id = get_writer_state(&aol).0; // Perform the rotation aol.rotate().unwrap(); // Verify the active_segment_id is incremented assert_eq!( - aol.active_segment_id, + get_writer_state(&aol).0, current_segment_id + 1, "Segment ID should be incremented after rotation" ); @@ -796,7 +796,7 @@ mod tests { let (_, record_offset_start, _) = aol.append(data_to_append).unwrap(); // Use the offset method to verify the append operation is in the new segment - let (segment_id, current_offset) = (aol.active_segment_id, aol.active_segment.offset()); + let (segment_id, current_offset) = get_writer_state(&aol); assert!( current_offset > 0, "Offset should be greater than 0 after appending to a new segment" @@ -902,7 +902,7 @@ mod tests { let dir = create_temp_directory(); let opts = Options::default(); - let mut aol = Aol::open(dir.path(), &opts).unwrap(); + let aol = Aol::open(dir.path(), &opts).unwrap(); let data = b"test data"; aol.append(data).unwrap(); @@ -928,7 +928,7 @@ mod tests { ..Default::default() }; - let mut aol = Aol::open(dir.path(), &opts).unwrap(); + let aol = Aol::open(dir.path(), &opts).unwrap(); for i in 0..10 { let data = vec![i as u8; 50]; @@ -958,7 +958,7 @@ mod tests { ..Default::default() }; - let mut aol = Aol::open(dir.path(), &opts).unwrap(); + let aol = Aol::open(dir.path(), &opts).unwrap(); let data = vec![1u8; 100]; let (segment_id, offset, _) = aol.append(&data).unwrap(); @@ -997,7 +997,7 @@ mod tests { ..Default::default() }; - let mut aol = Aol::open(dir.path(), &opts).unwrap(); + let aol = Aol::open(dir.path(), &opts).unwrap(); for i in 0..5 { let data = vec![i as u8; 50]; @@ -1039,7 +1039,7 @@ mod tests { ..Default::default() }; - let mut aol = Aol::open(dir.path(), &opts).unwrap(); + let aol = Aol::open(dir.path(), &opts).unwrap(); let large_data = vec![0u8; 200]; let _ = aol.append(&large_data); // Expected to fail diff --git a/src/log/fd.rs b/src/log/fd.rs index c56fbf54..0463e74e 100644 --- a/src/log/fd.rs +++ b/src/log/fd.rs @@ -19,8 +19,17 @@ pub struct SegmentReaderPool { impl SegmentReaderPool { pub fn new(dir: PathBuf, id: u64, opts: Options, pool_size: usize) -> Result { + let initial_size = pool_size / 2; + let queue = ArrayQueue::new(pool_size); + + // Pre-create initial readers + for _ in 0..initial_size { + if let Ok(segment) = Segment::open(&dir, id, &opts, true) { + let _ = queue.push(segment); + } + } Ok(Self { - readers: ArrayQueue::new(pool_size), + readers: queue, dir, id, opts, diff --git a/src/log/mod.rs b/src/log/mod.rs index a9e87ea8..7f087c55 100644 --- a/src/log/mod.rs +++ b/src/log/mod.rs @@ -647,10 +647,11 @@ impl SegmentRef { +------+------+------+------+------+------+------+------+ */ pub struct Segment { + #[allow(unused)] /// The unique identifier of the segment. pub(crate) id: u64, - #[allow(dead_code)] + #[allow(unused)] /// The path where the segment file is located. pub(crate) file_path: PathBuf, @@ -846,14 +847,6 @@ impl Segment { ))); } - let current_offset = self.file_offset.load(Ordering::Acquire); - if off > current_offset { - return Err(Error::IO(IOError::new( - io::ErrorKind::Other, - "Offset beyond current position", - ))); - } - // Read from the file let actual_read_offset = self.file_header_offset + off; @@ -1367,7 +1360,8 @@ mod tests { // Test reading beyond segment's current size let mut bs = vec![0; 14]; let r = segment.read_at(&mut bs, 11 + 1); - assert!(r.is_err()); + assert_eq!(r.unwrap(), 0); + assert_eq!(bs, vec![0; 14]); // Test appending another buffer after syncing let r = segment.append(&[11, 12, 13, 14]); @@ -1475,7 +1469,8 @@ mod tests { // Test reading beyond segment's current size let mut bs = vec![0; 14]; let r = segment.read_at(&mut bs, READ_BUF_SIZE as u64 + 1); - assert!(r.is_err()); + assert_eq!(r.unwrap(), 0); + assert_eq!(bs, vec![0; 14]); // Test appending another buffer after syncing let r = segment.append(&[11, 12, 13, 14]); diff --git a/src/manifest.rs b/src/manifest.rs index b57ec4a9..27ae2741 100644 --- a/src/manifest.rs +++ b/src/manifest.rs @@ -134,7 +134,7 @@ mod tests { // Create a temporary directory let temp_dir = create_temp_directory(); let opts = LogOptions::default(); - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); let manifest = Manifest { changes: vec![ManifestChangeType::Options(Options::default())], @@ -157,7 +157,7 @@ mod tests { // Step 1: Create a temporary directory let temp_dir = create_temp_directory(); let log_opts = LogOptions::default(); - let mut a = Aol::open(temp_dir.path(), &log_opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &log_opts).expect("should create aol"); // Step 2: Create the first Manifest instance and append it to the file let first_manifest = Manifest { diff --git a/src/reader.rs b/src/reader.rs index 35069a4b..30aa9e68 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -269,6 +269,8 @@ impl RecordReader { #[cfg(test)] mod tests { + use std::sync::atomic::Ordering; + use super::*; use crate::log::{Aol, Options, SegmentRef}; use tempdir::TempDir; @@ -277,6 +279,14 @@ mod tests { TempDir::new("test").unwrap() } + fn get_writer_state(aol: &Aol) -> (u64, u64) { + let writer = aol.writer_state.lock(); + ( + writer.active_segment_id.load(Ordering::Acquire), + writer.active_segment.offset(), + ) + } + #[test] fn reader() { // Create a temporary directory @@ -287,10 +297,10 @@ mod tests { max_file_size: 4, ..Options::default() }; - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); // Test initial offset - let sz = (a.active_segment_id, a.active_segment.offset()); + let sz = get_writer_state(&a); assert_eq!(0, sz.0); assert_eq!(0, sz.1); @@ -344,7 +354,7 @@ mod tests { ..Options::default() }; - let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); + let a = Aol::open(temp_dir.path(), &opts).expect("should create aol"); // Append 10 records for i in 0..num_items { diff --git a/src/repair.rs b/src/repair.rs index 6df24b42..32117200 100644 --- a/src/repair.rs +++ b/src/repair.rs @@ -1,6 +1,7 @@ use std::fs; use std::path::Path; use std::path::PathBuf; +use std::sync::atomic::Ordering; use crate::error::{Error, Result}; use crate::log::{Aol, Error as LogError, MultiSegmentReader, Segment, SegmentRef}; @@ -123,9 +124,13 @@ fn repair_segment( corrupted_segment_file_path: PathBuf, corrupted_segment_file_header_offset: u64, ) -> Result<()> { + // Get writer lock to check and manipulate active segment + let mut writer = aol.writer_state.lock(); + let current_active_id = writer.active_segment_id.load(Ordering::Acquire); + // Close the active segment if its ID matches - if aol.active_segment_id == corrupted_segment_id { - aol.active_segment.close()?; + if current_active_id == corrupted_segment_id { + writer.active_segment.close()?; } // Prepare the repaired segment path @@ -171,8 +176,12 @@ fn repair_segment( println!("deleting empty file {:?}", corrupted_segment_file_path); std::fs::remove_file(&corrupted_segment_file_path)?; } - let new_segment = Segment::open(&aol.dir, aol.active_segment_id, &aol.opts, false)?; - aol.active_segment = new_segment; + + // If we were repairing the active segment, update it + if current_active_id == corrupted_segment_id { + let new_active_segment = Segment::open(&aol.dir, corrupted_segment_id, &aol.opts, false)?; + writer.active_segment = new_active_segment; + } Ok(()) } diff --git a/src/store.rs b/src/store.rs index 77cffdb8..a600a4e0 100644 --- a/src/store.rs +++ b/src/store.rs @@ -478,7 +478,7 @@ impl Core { } fn append_log(&self, tx_record: &BytesMut, durability: Durability) -> Result<(u64, u64)> { - let mut clog = self.clog.as_ref().unwrap().write(); + let clog = self.clog.as_ref().unwrap().write(); let (segment_id, offset) = match durability { Durability::Immediate => {