Skip to content

Commit

Permalink
Move Aol mutex to Segment and make it conditional
Browse files Browse the repository at this point in the history
  • Loading branch information
gsserge committed Jan 23, 2025
1 parent 60e6cbb commit 1e23f70
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
18 changes: 4 additions & 14 deletions src/log/aol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};

use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use parking_lot::Mutex;

use crate::log::{get_segment_range, Error, IOError, Options, Result, Segment};

Expand All @@ -32,11 +32,8 @@ pub struct Aol {
/// A flag indicating whether the AOL instance is closed or not.
closed: bool,

/// A read-write lock used to synchronize concurrent access to the AOL instance.
mutex: Mutex<()>,

/// A cache used to store recently used segments to avoid opening and closing the files.
segment_cache: RwLock<LruCache<u64, Segment>>,
segment_cache: Mutex<LruCache<u64, Segment>>,

/// A flag indicating whether the AOL instance has encountered an IO error or not.
fsync_failed: AtomicBool,
Expand Down Expand Up @@ -70,8 +67,7 @@ impl Aol {
dir: dir.to_path_buf(),
opts: opts.clone(),
closed: false,
mutex: Mutex::new(()),
segment_cache: RwLock::new(cache),
segment_cache: Mutex::new(cache),
fsync_failed: Default::default(),
})
}
Expand Down Expand Up @@ -123,8 +119,6 @@ impl Aol {
return Err(Error::RecordTooLarge);
}

let _lock = self.mutex.lock();

// Get options and initialize variables
let opts = &self.opts;

Expand Down Expand Up @@ -217,10 +211,9 @@ impl Aol {
// During read, we acquire a lock to not allow concurrent writes and reads
// to the active segment file to avoid seek errors.
if segment_id == self.active_segment.id {
let _lock = self.mutex.lock();
self.active_segment.read_at(buf, read_offset)
} else {
let mut cache = self.segment_cache.write();
let mut cache = self.segment_cache.lock();
match cache.get(&segment_id) {
Some(segment) => segment.read_at(buf, read_offset),
None => {
Expand All @@ -234,22 +227,19 @@ impl Aol {
}

pub fn close(&mut self) -> Result<()> {
let _lock = self.mutex.lock();
self.active_segment.close()?;
self.closed = true;
Ok(())
}

pub fn rotate(&mut self) -> Result<u64> {
let _lock = self.mutex.lock();
self.active_segment.close()?;
self.active_segment_id += 1;
self.active_segment = Segment::open(&self.dir, self.active_segment_id, &self.opts)?;
Ok(self.active_segment_id)
}

pub fn size(&self) -> Result<u64> {
let _lock = self.mutex.lock();
let cur_segment_size = self.active_segment.file_offset;
let total_size = (self.active_segment_id * self.opts.max_file_size) + cur_segment_size;
Ok(total_size)
Expand Down
11 changes: 11 additions & 0 deletions src/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ mod aol;
pub use aol::Aol;

use ahash::{HashMap, HashMapExt};

#[cfg(not(unix))]
use parking_lot::Mutex;
use std::fmt;
use std::fs::File;
use std::fs::{read_dir, OpenOptions};
Expand Down Expand Up @@ -656,6 +659,11 @@ pub(crate) struct Segment {
/// The maximum size of the segment file.
pub(crate) file_size: u64,

/// A lock used to synchronize concurrent read access to the segment
/// for the platforms that don't have FileExt::read_at().
#[cfg(not(unix))]
mutex: Mutex<()>,

/// A flag indicating whether the segment is closed or not.
closed: bool,
}
Expand Down Expand Up @@ -709,6 +717,8 @@ impl Segment {
file_path,
id,
closed: false,
#[cfg(not(unix))]
mutex: Mutex::new(()),
file_size: opts.max_file_size,
})
}
Expand Down Expand Up @@ -835,6 +845,7 @@ impl Segment {
#[cfg(not(unix))]
{
let mut file = &self.file;
let _lock = self.mutex.lock();
file.seek(SeekFrom::Start(self.file_header_offset + off))?;
bytes_read = file.read(bs)?;
}
Expand Down

0 comments on commit 1e23f70

Please sign in to comment.