Skip to content

Commit

Permalink
No need to put Oracle into Arc (#180)
Browse files Browse the repository at this point in the history
  • Loading branch information
gsserge authored Feb 5, 2025
1 parent fc61443 commit 5f7bc7f
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 41 deletions.
7 changes: 3 additions & 4 deletions src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ impl Store {
// Acquire compaction guard
let _guard = CompactionGuard::new(&self.is_compacting);

// Lock the oracle to prevent operations during compaction
let oracle = self.core.oracle.clone();
let oracle_lock = oracle.write_lock.lock();
// Lock the commit lock to prevent operations during compaction
let commit_lock = self.core.commit_write_lock.lock();

// Rotate the commit log and get the new segment ID
let mut clog = self.core.clog.as_ref().unwrap().write();
Expand Down Expand Up @@ -96,7 +95,7 @@ impl Store {
let snapshot = snapshot_lock.snapshot();
let snapshot_versioned_iter = snapshot.iter_with_versions();
drop(snapshot_lock); // Explicitly drop the lock
drop(oracle_lock); // Release the oracle lock
drop(commit_lock); // Release the commit lock

// Do compaction and write

Expand Down
17 changes: 5 additions & 12 deletions src/oracle.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use bytes::Bytes;
use parking_lot::Mutex;
use std::ops::{Bound, RangeBounds};
use std::sync::atomic::{AtomicU64, Ordering};
use vart::VariableSizeKey;
Expand All @@ -10,11 +9,8 @@ use crate::snapshot::Snapshot;
use crate::transaction::Transaction;

/// Oracle is responsible for managing transaction timestamps and isolation levels.
/// It uses a write lock to ensure that only one transaction can commit at a time.
/// It supports two isolation levels: SnapshotIsolation and SerializableSnapshotIsolation.
pub(crate) struct Oracle {
/// Write lock to ensure that only one transaction can commit at a time.
pub(crate) write_lock: Mutex<()>,
/// Isolation level of the transactions.
isolation: IsolationLevel,
}
Expand All @@ -32,15 +28,12 @@ impl Oracle {
}
};

Self {
write_lock: Mutex::new(()),
isolation,
}
Self { isolation }
}

/// Generates a new commit timestamp for the given transaction.
/// It delegates to the isolation level to generate the timestamp.
pub(crate) fn new_commit_ts(&self, txn: &mut Transaction) -> Result<u64> {
pub(crate) fn new_commit_ts(&self, txn: &Transaction) -> Result<u64> {
self.isolation.new_commit_ts(txn)
}

Expand Down Expand Up @@ -77,7 +70,7 @@ macro_rules! isolation_level_method {
impl IsolationLevel {
/// Generates a new commit timestamp for the given transaction.
/// It delegates to the specific isolation level to generate the timestamp.
pub(crate) fn new_commit_ts(&self, txn: &mut Transaction) -> Result<u64> {
pub(crate) fn new_commit_ts(&self, txn: &Transaction) -> Result<u64> {
isolation_level_method!(self, new_commit_ts, txn)
}

Expand Down Expand Up @@ -123,7 +116,7 @@ impl SnapshotIsolation {
/// It performs optimistic concurrency control (OCC) by checking if the read keys in the transaction
/// are still valid in the latest snapshot, and if the timestamp of the read keys matches the timestamp
/// of the latest snapshot. If the timestamp does not match, then there is a conflict.
pub(crate) fn new_commit_ts(&self, txn: &mut Transaction) -> Result<u64> {
pub(crate) fn new_commit_ts(&self, txn: &Transaction) -> Result<u64> {
let current_snapshot = Snapshot::take(&txn.core)?;

// Check write conflicts
Expand Down Expand Up @@ -207,7 +200,7 @@ impl SerializableSnapshotIsolation {
/// It performs optimistic concurrency control (OCC) by checking if the read keys in the transaction
/// are still valid in the latest snapshot, and if the timestamp of the read keys matches the timestamp
/// of the latest snapshot. If the timestamp does not match, then there is a conflict.
pub(crate) fn new_commit_ts(&self, txn: &mut Transaction) -> Result<u64> {
pub(crate) fn new_commit_ts(&self, txn: &Transaction) -> Result<u64> {
let current_snapshot = Snapshot::take(&txn.core)?;

// Check read conflicts
Expand Down
9 changes: 6 additions & 3 deletions src/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use ahash::{HashMap, HashMapExt};
use bytes::{Bytes, BytesMut};
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use quick_cache::sync::Cache;
use revision::Revisioned;
use std::path::Path;
Expand Down Expand Up @@ -112,14 +112,16 @@ pub struct Core {
/// Manifest for store to track Store state.
pub(crate) manifest: Option<RwLock<Aol>>,
/// Transaction ID Oracle for store.
pub(crate) oracle: Arc<Oracle>,
pub(crate) oracle: Oracle,
/// Value cache for store.
/// The assumption for this cache is that it should be useful for
/// storing offsets that are frequently accessed (especially in
/// the case of range scans)
pub(crate) value_cache: Cache<(u64, u64), Bytes>,
/// Flag to indicate if the store is closed.
is_closed: AtomicBool,
/// Write lock to ensure that only one transaction can commit at a time.
pub(crate) commit_write_lock: Mutex<()>,
}

impl Core {
Expand Down Expand Up @@ -205,9 +207,10 @@ impl Core {
opts,
manifest: manifest.map(RwLock::new),
clog: clog.map(|c| Arc::new(RwLock::new(c))),
oracle: Arc::new(oracle),
oracle,
value_cache,
is_closed: AtomicBool::new(false),
commit_write_lock: Mutex::new(()),
})
}

Expand Down
37 changes: 15 additions & 22 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,8 @@ impl Transaction {
return Ok(()); // Return when there's nothing to commit
}

// Lock the oracle to serialize commits to the transaction log.
let oracle = self.core.oracle.clone();
let write_ch_lock = oracle.write_lock.lock();
// Serialize commits to the transaction log.
let write_ch_lock = self.core.commit_write_lock.lock();

// Prepare for the commit by getting a transaction ID.
let (tx_id, commit_ts) = self.prepare_commit()?;
Expand All @@ -482,7 +481,14 @@ impl Transaction {
latest_writes.sort_by(|a, b| a.seqno.cmp(&b.seqno));
let entries: Vec<Entry> = latest_writes
.into_iter()
.map(|ws_entry| ws_entry.e)
.map(|ws_entry| {
let mut e = ws_entry.e;
// Assigns commit timestamps to transaction entries.
if e.ts == 0 {
e.ts = commit_ts;
}
e
})
.collect();

// Commit the changes to the store index.
Expand All @@ -500,25 +506,12 @@ impl Transaction {
Ok(())
}

/// Prepares for the commit by assigning commit timestamps and preparing records.
fn prepare_commit(&mut self) -> Result<(u64, u64)> {
let oracle = self.core.oracle.clone();
let tx_id = oracle.new_commit_ts(self)?;
let commit_ts = self.assign_commit_ts();
Ok((tx_id, commit_ts))
}

/// Assigns commit timestamps to transaction entries.
fn assign_commit_ts(&mut self) -> u64 {
/// Prepares for the commit by checking for conflicts
/// and providing commit timestamp.
fn prepare_commit(&self) -> Result<(u64, u64)> {
let tx_id = self.core.oracle.new_commit_ts(self)?;
let commit_ts = now();
for entries in self.write_set.values_mut() {
if let Some(entry) = entries.last_mut() {
if entry.e.ts == 0 {
entry.e.ts = commit_ts;
}
}
}
commit_ts
Ok((tx_id, commit_ts))
}

/// Rolls back the transaction by removing all updated entries.
Expand Down

0 comments on commit 5f7bc7f

Please sign in to comment.