Skip to content

Commit

Permalink
Merge pull request #22 from rustbase/feature/rwlock
Browse files Browse the repository at this point in the history
feat: replacing mutex to rwlock
  • Loading branch information
peeeuzin authored Jan 24, 2023
2 parents 7d7c736 + a199882 commit 4cea1a9
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 34 deletions.
65 changes: 34 additions & 31 deletions src/storage/lsm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{BTreeMap, HashMap};
use std::mem;
use std::ops::Deref;
use std::path;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, RwLock};

use crate::bloom::BloomFilter;
use crate::dustdata::{Error, ErrorCode, Result};
Expand All @@ -23,12 +23,12 @@ pub struct LsmConfig {

#[derive(Clone)]
pub struct Lsm {
pub memtable: Arc<Mutex<BTreeMap<String, bson::Bson>>>,
pub memtable: Arc<RwLock<BTreeMap<String, bson::Bson>>>,
pub memtable_size: usize,
pub lsm_config: LsmConfig,
pub snapshots: snapshots::SnapshotManager,
pub dense_index: Arc<Mutex<HashMap<String, String>>>,
pub bloom_filter: Arc<Mutex<BloomFilter>>,
pub dense_index: Arc<RwLock<HashMap<String, String>>>,
pub bloom_filter: Arc<RwLock<BloomFilter>>,
}

impl Lsm {
Expand Down Expand Up @@ -56,9 +56,9 @@ impl Lsm {
);

Lsm {
memtable: Arc::new(Mutex::new(BTreeMap::new())),
bloom_filter: Arc::new(Mutex::new(bloom_filter)),
dense_index: Arc::new(Mutex::new(index)),
memtable: Arc::new(RwLock::new(BTreeMap::new())),
bloom_filter: Arc::new(RwLock::new(bloom_filter)),
dense_index: Arc::new(RwLock::new(index)),
lsm_config: config,
memtable_size: 0, // The current memtable size (in bytes)
snapshots,
Expand All @@ -75,8 +75,11 @@ impl Lsm {

self.memtable_size += mem::size_of_val(&value);

self.memtable.lock().unwrap().insert(key.to_string(), value);
self.bloom_filter.lock().unwrap().insert(key);
self.memtable
.write()
.unwrap()
.insert(key.to_string(), value);
self.bloom_filter.write().unwrap().insert(key);

if self.memtable_size >= self.lsm_config.flush_threshold {
self.flush().unwrap();
Expand All @@ -90,12 +93,12 @@ impl Lsm {
return Ok(None);
}

let memtable = self.memtable.lock().unwrap();
let memtable = self.memtable.read().unwrap();

match memtable.get(&key.to_string()) {
Some(document) => Ok(Some(document.clone())),
None => {
let dense_index = self.dense_index.lock().unwrap();
let dense_index = self.dense_index.read().unwrap();
let offset = dense_index.get(&key.to_string()).unwrap();
Ok(sstable::Segment::read_with_offset(
offset.to_string(),
Expand All @@ -113,17 +116,17 @@ impl Lsm {
});
}

let mut memtable = self.memtable.lock().unwrap();
let mut memtable = self.memtable.write().unwrap();

if memtable.contains_key(&key.to_string()) {
memtable.remove(&key.to_string());

drop(memtable);
} else {
self.dense_index.lock().unwrap().remove(&key.to_string());
self.dense_index.write().unwrap().remove(&key.to_string());
}

self.bloom_filter.lock().unwrap().delete(key);
self.bloom_filter.write().unwrap().delete(key);

Ok(())
}
Expand All @@ -136,13 +139,13 @@ impl Lsm {
});
}

let mut memtable = self.memtable.lock().unwrap();
let mut bloom_filter = self.bloom_filter.lock().unwrap();
let mut memtable = self.memtable.write().unwrap();
let mut bloom_filter = self.bloom_filter.write().unwrap();

// Delete the old value from the bloom filter
bloom_filter.delete(key);

let mut dense_index = self.dense_index.lock().unwrap();
let mut dense_index = self.dense_index.write().unwrap();
dense_index.remove(&key.to_string());
drop(dense_index);

Expand All @@ -162,7 +165,7 @@ impl Lsm {
return Ok(());
}

let mut dense_index = self.dense_index.lock().unwrap();
let mut dense_index = self.dense_index.write().unwrap();

let segments = sstable::Segment::from_tree(&memtable, &self.lsm_config.sstable_path);

Expand All @@ -182,41 +185,41 @@ impl Lsm {

filter::write_filter(
&self.lsm_config.sstable_path,
self.bloom_filter.lock().unwrap().deref(),
self.bloom_filter.read().unwrap().deref(),
);

self.memtable.lock().unwrap().clear();
self.memtable.write().unwrap().clear();
self.memtable_size = 0;

Ok(())
}

pub fn get_memtable(&self) -> BTreeMap<String, bson::Bson> {
self.memtable.lock().unwrap().clone()
self.memtable.read().unwrap().clone()
}

pub fn contains(&self, key: &str) -> bool {
self.bloom_filter.lock().unwrap().contains(key)
self.bloom_filter.read().unwrap().contains(key)
}

pub fn clear(&self) {
self.memtable.lock().unwrap().clear();
self.dense_index.lock().unwrap().clear();
self.memtable.write().unwrap().clear();
self.dense_index.write().unwrap().clear();
}

pub fn update_index(&self) {
let index = self.dense_index.lock().unwrap().clone();
let index = self.dense_index.read().unwrap().clone();
index::write_index(&self.lsm_config.sstable_path, &index);
}

pub fn list_keys(&self) -> Vec<String> {
let mut keys = Vec::new();

for key in self.memtable.lock().unwrap().keys() {
for key in self.memtable.read().unwrap().keys() {
keys.push(key.clone());
}

for key in self.dense_index.lock().unwrap().keys() {
for key in self.dense_index.read().unwrap().keys() {
keys.push(key.clone());
}

Expand All @@ -225,7 +228,7 @@ impl Lsm {

pub fn drop(&mut self) {
self.clear();
self.bloom_filter.lock().unwrap().clear();
self.bloom_filter.write().unwrap().clear();
}

pub fn load_snapshot(path: path::PathBuf, snapshot: Snapshot) {
Expand All @@ -237,8 +240,8 @@ impl Lsm {

impl Drop for Lsm {
fn drop(&mut self) {
let memtable = self.memtable.lock().unwrap();
let mut dense_index = self.dense_index.lock().unwrap();
let memtable = self.memtable.read().unwrap();
let mut dense_index = self.dense_index.write().unwrap();

if memtable.is_empty() {
return;
Expand All @@ -260,7 +263,7 @@ impl Drop for Lsm {

filter::write_filter(
&self.lsm_config.sstable_path,
self.bloom_filter.lock().unwrap().deref(),
self.bloom_filter.read().unwrap().deref(),
);
}
}
6 changes: 3 additions & 3 deletions src/storage/lsm/snapshots/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ impl Snapshot {
}

let snapshot = Snapshot::new(
lsm.memtable.lock().unwrap().clone(),
lsm.bloom_filter.lock().unwrap().clone(),
lsm.dense_index.lock().unwrap().clone(),
lsm.memtable.read().unwrap().clone(),
lsm.bloom_filter.read().unwrap().clone(),
lsm.dense_index.read().unwrap().clone(),
);

let now = chrono::Local::now();
Expand Down

0 comments on commit 4cea1a9

Please sign in to comment.