diff --git a/Cargo.lock b/Cargo.lock index 6ef19cd..dcd5b06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,7 +160,7 @@ dependencies = [ [[package]] name = "dustdata" -version = "1.3.0" +version = "1.3.1" dependencies = [ "bitvec", "bson", diff --git a/Cargo.toml b/Cargo.toml index c8d3797..1c981e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dustdata" -version = "1.3.0" +version = "1.3.1" edition = "2021" description = "A data concurrency control storage engine to Rustbase" repository = "https://github.com/rustbase/dustdata" diff --git a/README.md b/README.md index dfa31ea..4e54548 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ Add the following to your `Cargo.toml`: ```toml [dependencies] -dustdata = "1.3.0" +dustdata = "1.3.1" ``` # Usage @@ -36,7 +36,7 @@ Initialize a DustData interface. ```rust // DustData Configuration let config = dustdata::DustDataConfig { - path: "./test_data".to_string(), + path: std::path::Path::new("./test_data/dustdata").to_path_buf(), lsm_config: dustdata::LsmConfig { flush_threshold: dustdata::Size::Megabytes(128), } diff --git a/src/dustdata.rs b/src/dustdata.rs index f2182b3..1660fd4 100644 --- a/src/dustdata.rs +++ b/src/dustdata.rs @@ -20,7 +20,7 @@ pub struct LsmConfig { /// * `lsm_config` - The LSM configuration #[derive(Clone)] pub struct DustDataConfig { - pub path: String, + pub path: path::PathBuf, pub lsm_config: LsmConfig, } @@ -146,11 +146,9 @@ impl std::fmt::Display for ErrorCode { impl DustData { pub fn new(configuration: DustDataConfig) -> Self { - let path = path::Path::new(&configuration.path); - let lsm = storage::lsm::Lsm::new(lsm::LsmConfig { flush_threshold: size_to_usize(configuration.clone().lsm_config.flush_threshold), - sstable_path: path.to_str().unwrap().to_string(), + sstable_path: configuration.clone().path, }); Self { diff --git a/src/lib.rs b/src/lib.rs index 2b1ac9f..65753a9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ -mod bloom; +pub mod bloom; mod dustdata; -mod storage; +pub mod storage; pub use self::{ dustdata::DustData, dustdata::DustDataConfig, dustdata::Error, dustdata::ErrorCode, @@ -23,7 +23,7 @@ mod dustdata_tests { fn get_default_config() -> DustDataConfig { DustDataConfig { - path: "./test_data/dustdata".to_string(), + path: std::path::Path::new("./test_data/dustdata").to_path_buf(), lsm_config: LsmConfig { flush_threshold: Size::Megabytes(128), }, diff --git a/src/storage/lsm/filter/mod.rs b/src/storage/lsm/filter/mod.rs index 7035840..77b513e 100644 --- a/src/storage/lsm/filter/mod.rs +++ b/src/storage/lsm/filter/mod.rs @@ -1,15 +1,18 @@ use crate::bloom::BloomFilter; use lz4::{Decoder, EncoderBuilder}; -use std::io::{Read, Write}; +use std::{ + io::{Read, Write}, + path, +}; -pub fn check_if_filter_exists(path: &str) -> bool { - let _path = std::path::Path::new(path).join("filter"); +pub fn check_if_filter_exists(path: &path::Path) -> bool { + let _path = path.join("filter"); _path.exists() } -pub fn write_filter(path: &str, filter: &BloomFilter) { - let _path = std::path::Path::new(path).join("filter"); +pub fn write_filter(path: &path::Path, filter: &BloomFilter) { + let _path = path.join("filter"); if !check_if_filter_exists(path) { std::fs::create_dir_all(_path.clone()).unwrap(); @@ -36,8 +39,8 @@ pub fn write_filter(path: &str, filter: &BloomFilter) { hashes_file.sync_all().unwrap(); } -pub fn read_filter(path: &str) -> BloomFilter { - let _path = std::path::Path::new(path).join("filter"); +pub fn read_filter(path: &path::Path) -> BloomFilter { + let _path = path.join("filter"); // bitvec diff --git a/src/storage/lsm/index/mod.rs b/src/storage/lsm/index/mod.rs index 9476113..a8128fe 100644 --- a/src/storage/lsm/index/mod.rs +++ b/src/storage/lsm/index/mod.rs @@ -5,14 +5,14 @@ use std::{ path, }; -pub fn check_if_index_exists(path: &str) -> bool { - let _path = path::Path::new(path).join("index"); +pub fn check_if_index_exists(path: &path::Path) -> bool { + let _path = path.join("index"); _path.exists() } -pub fn write_index(path: &str, index: &HashMap) { - let _path = path::Path::new(path).join("index"); +pub fn write_index(path: &path::Path, index: &HashMap) { + let _path = path.join("index"); if index.is_empty() { return; @@ -26,8 +26,8 @@ pub fn write_index(path: &str, index: &HashMap) { file.flush().unwrap(); } -pub fn read_index(path: &str) -> HashMap { - let _path = path::Path::new(path).join("index"); +pub fn read_index(path: &path::Path) -> HashMap { + let _path = path.join("index"); let mut file = fs::File::open(_path).unwrap(); let mut bytes_to_read: Vec = Vec::new(); diff --git a/src/storage/lsm/mod.rs b/src/storage/lsm/mod.rs index d5bab12..d58696a 100644 --- a/src/storage/lsm/mod.rs +++ b/src/storage/lsm/mod.rs @@ -2,33 +2,33 @@ use std::collections::{BTreeMap, HashMap}; use std::mem; use std::ops::Deref; use std::path; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, RwLock}; use crate::bloom::BloomFilter; use crate::dustdata::{Error, ErrorCode, Result}; use self::snapshots::Snapshot; -mod filter; -mod index; +pub mod filter; +pub mod index; pub mod snapshots; -mod sstable; +pub mod sstable; mod writer; #[derive(Clone, Debug)] pub struct LsmConfig { pub flush_threshold: usize, - pub sstable_path: String, + pub sstable_path: path::PathBuf, } #[derive(Clone)] pub struct Lsm { - pub memtable: Arc>>, + pub memtable: Arc>>, pub memtable_size: usize, pub lsm_config: LsmConfig, pub snapshots: snapshots::SnapshotManager, - pub dense_index: Arc>>, - pub bloom_filter: Arc>, + pub dense_index: Arc>>, + pub bloom_filter: Arc>, } impl Lsm { @@ -56,9 +56,9 @@ impl Lsm { ); Lsm { - memtable: Arc::new(Mutex::new(BTreeMap::new())), - bloom_filter: Arc::new(Mutex::new(bloom_filter)), - dense_index: Arc::new(Mutex::new(index)), + memtable: Arc::new(RwLock::new(BTreeMap::new())), + bloom_filter: Arc::new(RwLock::new(bloom_filter)), + dense_index: Arc::new(RwLock::new(index)), lsm_config: config, memtable_size: 0, // The current memtable size (in bytes) snapshots, @@ -75,8 +75,11 @@ impl Lsm { self.memtable_size += mem::size_of_val(&value); - self.memtable.lock().unwrap().insert(key.to_string(), value); - self.bloom_filter.lock().unwrap().insert(key); + self.memtable + .write() + .unwrap() + .insert(key.to_string(), value); + self.bloom_filter.write().unwrap().insert(key); if self.memtable_size >= self.lsm_config.flush_threshold { self.flush().unwrap(); @@ -90,16 +93,16 @@ impl Lsm { return Ok(None); } - let memtable = self.memtable.lock().unwrap(); + let memtable = self.memtable.read().unwrap(); match memtable.get(&key.to_string()) { Some(document) => Ok(Some(document.clone())), None => { - let dense_index = self.dense_index.lock().unwrap(); + let dense_index = self.dense_index.read().unwrap(); let offset = dense_index.get(&key.to_string()).unwrap(); Ok(sstable::Segment::read_with_offset( offset.to_string(), - self.lsm_config.sstable_path.to_string(), + &self.lsm_config.sstable_path, )) } } @@ -113,17 +116,17 @@ impl Lsm { }); } - let mut memtable = self.memtable.lock().unwrap(); + let mut memtable = self.memtable.write().unwrap(); if memtable.contains_key(&key.to_string()) { memtable.remove(&key.to_string()); drop(memtable); } else { - self.dense_index.lock().unwrap().remove(&key.to_string()); + self.dense_index.write().unwrap().remove(&key.to_string()); } - self.bloom_filter.lock().unwrap().delete(key); + self.bloom_filter.write().unwrap().delete(key); Ok(()) } @@ -136,13 +139,13 @@ impl Lsm { }); } - let mut memtable = self.memtable.lock().unwrap(); - let mut bloom_filter = self.bloom_filter.lock().unwrap(); + let mut memtable = self.memtable.write().unwrap(); + let mut bloom_filter = self.bloom_filter.write().unwrap(); // Delete the old value from the bloom filter bloom_filter.delete(key); - let mut dense_index = self.dense_index.lock().unwrap(); + let mut dense_index = self.dense_index.write().unwrap(); dense_index.remove(&key.to_string()); drop(dense_index); @@ -162,10 +165,9 @@ impl Lsm { return Ok(()); } - let mut dense_index = self.dense_index.lock().unwrap(); + let mut dense_index = self.dense_index.write().unwrap(); - let segments = - sstable::Segment::from_tree(&memtable, self.lsm_config.sstable_path.as_str()); + let segments = sstable::Segment::from_tree(&memtable, &self.lsm_config.sstable_path); for token in segments.1 { dense_index.insert(token.0, token.1); @@ -183,41 +185,41 @@ impl Lsm { filter::write_filter( &self.lsm_config.sstable_path, - self.bloom_filter.lock().unwrap().deref(), + self.bloom_filter.read().unwrap().deref(), ); - self.memtable.lock().unwrap().clear(); + self.memtable.write().unwrap().clear(); self.memtable_size = 0; Ok(()) } pub fn get_memtable(&self) -> BTreeMap { - self.memtable.lock().unwrap().clone() + self.memtable.read().unwrap().clone() } pub fn contains(&self, key: &str) -> bool { - self.bloom_filter.lock().unwrap().contains(key) + self.bloom_filter.read().unwrap().contains(key) } pub fn clear(&self) { - self.memtable.lock().unwrap().clear(); - self.dense_index.lock().unwrap().clear(); + self.memtable.write().unwrap().clear(); + self.dense_index.write().unwrap().clear(); } pub fn update_index(&self) { - let index = self.dense_index.lock().unwrap().clone(); + let index = self.dense_index.read().unwrap().clone(); index::write_index(&self.lsm_config.sstable_path, &index); } pub fn list_keys(&self) -> Vec { let mut keys = Vec::new(); - for key in self.memtable.lock().unwrap().keys() { + for key in self.memtable.read().unwrap().keys() { keys.push(key.clone()); } - for key in self.dense_index.lock().unwrap().keys() { + for key in self.dense_index.read().unwrap().keys() { keys.push(key.clone()); } @@ -226,27 +228,26 @@ impl Lsm { pub fn drop(&mut self) { self.clear(); - self.bloom_filter.lock().unwrap().clear(); + self.bloom_filter.write().unwrap().clear(); } pub fn load_snapshot(path: path::PathBuf, snapshot: Snapshot) { - sstable::Segment::from_tree(snapshot.get_memtable(), &path.display().to_string()); - index::write_index(&path.display().to_string(), snapshot.get_dense_index()); - filter::write_filter(&path.display().to_string(), snapshot.get_bloom_filter()); + sstable::Segment::from_tree(snapshot.get_memtable(), &path); + index::write_index(&path, snapshot.get_dense_index()); + filter::write_filter(&path, snapshot.get_bloom_filter()); } } impl Drop for Lsm { fn drop(&mut self) { - let memtable = self.memtable.lock().unwrap(); - let mut dense_index = self.dense_index.lock().unwrap(); + let memtable = self.memtable.read().unwrap(); + let mut dense_index = self.dense_index.write().unwrap(); if memtable.is_empty() { return; } - let segments = - sstable::Segment::from_tree(memtable.deref(), self.lsm_config.sstable_path.as_str()); + let segments = sstable::Segment::from_tree(memtable.deref(), &self.lsm_config.sstable_path); for token in segments.1 { dense_index.insert(token.0, token.1); @@ -262,7 +263,7 @@ impl Drop for Lsm { filter::write_filter( &self.lsm_config.sstable_path, - self.bloom_filter.lock().unwrap().deref(), + self.bloom_filter.read().unwrap().deref(), ); } } diff --git a/src/storage/lsm/snapshots/mod.rs b/src/storage/lsm/snapshots/mod.rs index e7dd09a..874b6ee 100644 --- a/src/storage/lsm/snapshots/mod.rs +++ b/src/storage/lsm/snapshots/mod.rs @@ -87,9 +87,9 @@ impl Snapshot { } let snapshot = Snapshot::new( - lsm.memtable.lock().unwrap().clone(), - lsm.bloom_filter.lock().unwrap().clone(), - lsm.dense_index.lock().unwrap().clone(), + lsm.memtable.read().unwrap().clone(), + lsm.bloom_filter.read().unwrap().clone(), + lsm.dense_index.read().unwrap().clone(), ); let now = chrono::Local::now(); diff --git a/src/storage/lsm/sstable.rs b/src/storage/lsm/sstable.rs index 956b82f..62ddadb 100644 --- a/src/storage/lsm/sstable.rs +++ b/src/storage/lsm/sstable.rs @@ -70,8 +70,8 @@ pub struct Segment { } impl Segment { - pub fn new(path: &str) -> Self { - let _path = path::Path::new(path).join("data"); + pub fn new(path: &path::Path) -> Self { + let _path = path.join("data"); if !_path.exists() { fs::create_dir_all(_path.clone()).unwrap(); @@ -89,12 +89,12 @@ impl Segment { } } - pub fn read_with_offset(offset: String, path: String) -> Option { + pub fn read_with_offset(offset: String, path: &path::Path) -> Option { let splited_offset = offset.split('_').collect::>(); let file_index = splited_offset[0].parse::().unwrap(); let offset = splited_offset[1].parse::().unwrap(); - let path = path::Path::new(&path).join("data"); + let path = path.join("data"); let file_path = path.join(get_file_that_starts_with_index( (*path).to_path_buf(), file_index as usize, @@ -126,7 +126,7 @@ impl Segment { pub fn from_tree( tree: &BTreeMap, - path: &str, + path: &path::Path, ) -> (Segment, Vec<(String, String)>) { let mut segment = Segment::new(path); diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 3085d3b..1ce9b7b 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1 +1 @@ -pub mod lsm; \ No newline at end of file +pub mod lsm;