Skip to content

Commit

Permalink
Merge branch 'release/v1.3.1' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
peeeuzin committed Jan 24, 2023
2 parents 1454f01 + 9bbc5c1 commit 12241a0
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 76 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dustdata"
version = "1.3.0"
version = "1.3.1"
edition = "2021"
description = "A data concurrency control storage engine to Rustbase"
repository = "https://github.com/rustbase/dustdata"
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Add the following to your `Cargo.toml`:

```toml
[dependencies]
dustdata = "1.3.0"
dustdata = "1.3.1"
```

# Usage
Expand All @@ -36,7 +36,7 @@ Initialize a DustData interface.
```rust
// DustData Configuration
let config = dustdata::DustDataConfig {
path: "./test_data".to_string(),
path: std::path::Path::new("./test_data/dustdata").to_path_buf(),
lsm_config: dustdata::LsmConfig {
flush_threshold: dustdata::Size::Megabytes(128),
}
Expand Down
6 changes: 2 additions & 4 deletions src/dustdata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct LsmConfig {
/// * `lsm_config` - The LSM configuration
#[derive(Clone)]
pub struct DustDataConfig {
pub path: String,
pub path: path::PathBuf,
pub lsm_config: LsmConfig,
}

Expand Down Expand Up @@ -146,11 +146,9 @@ impl std::fmt::Display for ErrorCode {

impl DustData {
pub fn new(configuration: DustDataConfig) -> Self {
let path = path::Path::new(&configuration.path);

let lsm = storage::lsm::Lsm::new(lsm::LsmConfig {
flush_threshold: size_to_usize(configuration.clone().lsm_config.flush_threshold),
sstable_path: path.to_str().unwrap().to_string(),
sstable_path: configuration.clone().path,
});

Self {
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod bloom;
pub mod bloom;
mod dustdata;
mod storage;
pub mod storage;

pub use self::{
dustdata::DustData, dustdata::DustDataConfig, dustdata::Error, dustdata::ErrorCode,
Expand All @@ -23,7 +23,7 @@ mod dustdata_tests {

fn get_default_config() -> DustDataConfig {
DustDataConfig {
path: "./test_data/dustdata".to_string(),
path: std::path::Path::new("./test_data/dustdata").to_path_buf(),
lsm_config: LsmConfig {
flush_threshold: Size::Megabytes(128),
},
Expand Down
17 changes: 10 additions & 7 deletions src/storage/lsm/filter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use crate::bloom::BloomFilter;
use lz4::{Decoder, EncoderBuilder};
use std::io::{Read, Write};
use std::{
io::{Read, Write},
path,
};

pub fn check_if_filter_exists(path: &str) -> bool {
let _path = std::path::Path::new(path).join("filter");
pub fn check_if_filter_exists(path: &path::Path) -> bool {
let _path = path.join("filter");

_path.exists()
}

pub fn write_filter(path: &str, filter: &BloomFilter) {
let _path = std::path::Path::new(path).join("filter");
pub fn write_filter(path: &path::Path, filter: &BloomFilter) {
let _path = path.join("filter");

if !check_if_filter_exists(path) {
std::fs::create_dir_all(_path.clone()).unwrap();
Expand All @@ -36,8 +39,8 @@ pub fn write_filter(path: &str, filter: &BloomFilter) {
hashes_file.sync_all().unwrap();
}

pub fn read_filter(path: &str) -> BloomFilter {
let _path = std::path::Path::new(path).join("filter");
pub fn read_filter(path: &path::Path) -> BloomFilter {
let _path = path.join("filter");

// bitvec

Expand Down
12 changes: 6 additions & 6 deletions src/storage/lsm/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ use std::{
path,
};

pub fn check_if_index_exists(path: &str) -> bool {
let _path = path::Path::new(path).join("index");
pub fn check_if_index_exists(path: &path::Path) -> bool {
let _path = path.join("index");

_path.exists()
}

pub fn write_index(path: &str, index: &HashMap<String, String>) {
let _path = path::Path::new(path).join("index");
pub fn write_index(path: &path::Path, index: &HashMap<String, String>) {
let _path = path.join("index");

if index.is_empty() {
return;
Expand All @@ -26,8 +26,8 @@ pub fn write_index(path: &str, index: &HashMap<String, String>) {
file.flush().unwrap();
}

pub fn read_index(path: &str) -> HashMap<String, String> {
let _path = path::Path::new(path).join("index");
pub fn read_index(path: &path::Path) -> HashMap<String, String> {
let _path = path.join("index");

let mut file = fs::File::open(_path).unwrap();
let mut bytes_to_read: Vec<u8> = Vec::new();
Expand Down
87 changes: 44 additions & 43 deletions src/storage/lsm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,33 @@ use std::collections::{BTreeMap, HashMap};
use std::mem;
use std::ops::Deref;
use std::path;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, RwLock};

use crate::bloom::BloomFilter;
use crate::dustdata::{Error, ErrorCode, Result};

use self::snapshots::Snapshot;

mod filter;
mod index;
pub mod filter;
pub mod index;
pub mod snapshots;
mod sstable;
pub mod sstable;
mod writer;

#[derive(Clone, Debug)]
pub struct LsmConfig {
pub flush_threshold: usize,
pub sstable_path: String,
pub sstable_path: path::PathBuf,
}

#[derive(Clone)]
pub struct Lsm {
pub memtable: Arc<Mutex<BTreeMap<String, bson::Bson>>>,
pub memtable: Arc<RwLock<BTreeMap<String, bson::Bson>>>,
pub memtable_size: usize,
pub lsm_config: LsmConfig,
pub snapshots: snapshots::SnapshotManager,
pub dense_index: Arc<Mutex<HashMap<String, String>>>,
pub bloom_filter: Arc<Mutex<BloomFilter>>,
pub dense_index: Arc<RwLock<HashMap<String, String>>>,
pub bloom_filter: Arc<RwLock<BloomFilter>>,
}

impl Lsm {
Expand Down Expand Up @@ -56,9 +56,9 @@ impl Lsm {
);

Lsm {
memtable: Arc::new(Mutex::new(BTreeMap::new())),
bloom_filter: Arc::new(Mutex::new(bloom_filter)),
dense_index: Arc::new(Mutex::new(index)),
memtable: Arc::new(RwLock::new(BTreeMap::new())),
bloom_filter: Arc::new(RwLock::new(bloom_filter)),
dense_index: Arc::new(RwLock::new(index)),
lsm_config: config,
memtable_size: 0, // The current memtable size (in bytes)
snapshots,
Expand All @@ -75,8 +75,11 @@ impl Lsm {

self.memtable_size += mem::size_of_val(&value);

self.memtable.lock().unwrap().insert(key.to_string(), value);
self.bloom_filter.lock().unwrap().insert(key);
self.memtable
.write()
.unwrap()
.insert(key.to_string(), value);
self.bloom_filter.write().unwrap().insert(key);

if self.memtable_size >= self.lsm_config.flush_threshold {
self.flush().unwrap();
Expand All @@ -90,16 +93,16 @@ impl Lsm {
return Ok(None);
}

let memtable = self.memtable.lock().unwrap();
let memtable = self.memtable.read().unwrap();

match memtable.get(&key.to_string()) {
Some(document) => Ok(Some(document.clone())),
None => {
let dense_index = self.dense_index.lock().unwrap();
let dense_index = self.dense_index.read().unwrap();
let offset = dense_index.get(&key.to_string()).unwrap();
Ok(sstable::Segment::read_with_offset(
offset.to_string(),
self.lsm_config.sstable_path.to_string(),
&self.lsm_config.sstable_path,
))
}
}
Expand All @@ -113,17 +116,17 @@ impl Lsm {
});
}

let mut memtable = self.memtable.lock().unwrap();
let mut memtable = self.memtable.write().unwrap();

if memtable.contains_key(&key.to_string()) {
memtable.remove(&key.to_string());

drop(memtable);
} else {
self.dense_index.lock().unwrap().remove(&key.to_string());
self.dense_index.write().unwrap().remove(&key.to_string());
}

self.bloom_filter.lock().unwrap().delete(key);
self.bloom_filter.write().unwrap().delete(key);

Ok(())
}
Expand All @@ -136,13 +139,13 @@ impl Lsm {
});
}

let mut memtable = self.memtable.lock().unwrap();
let mut bloom_filter = self.bloom_filter.lock().unwrap();
let mut memtable = self.memtable.write().unwrap();
let mut bloom_filter = self.bloom_filter.write().unwrap();

// Delete the old value from the bloom filter
bloom_filter.delete(key);

let mut dense_index = self.dense_index.lock().unwrap();
let mut dense_index = self.dense_index.write().unwrap();
dense_index.remove(&key.to_string());
drop(dense_index);

Expand All @@ -162,10 +165,9 @@ impl Lsm {
return Ok(());
}

let mut dense_index = self.dense_index.lock().unwrap();
let mut dense_index = self.dense_index.write().unwrap();

let segments =
sstable::Segment::from_tree(&memtable, self.lsm_config.sstable_path.as_str());
let segments = sstable::Segment::from_tree(&memtable, &self.lsm_config.sstable_path);

for token in segments.1 {
dense_index.insert(token.0, token.1);
Expand All @@ -183,41 +185,41 @@ impl Lsm {

filter::write_filter(
&self.lsm_config.sstable_path,
self.bloom_filter.lock().unwrap().deref(),
self.bloom_filter.read().unwrap().deref(),
);

self.memtable.lock().unwrap().clear();
self.memtable.write().unwrap().clear();
self.memtable_size = 0;

Ok(())
}

pub fn get_memtable(&self) -> BTreeMap<String, bson::Bson> {
self.memtable.lock().unwrap().clone()
self.memtable.read().unwrap().clone()
}

pub fn contains(&self, key: &str) -> bool {
self.bloom_filter.lock().unwrap().contains(key)
self.bloom_filter.read().unwrap().contains(key)
}

pub fn clear(&self) {
self.memtable.lock().unwrap().clear();
self.dense_index.lock().unwrap().clear();
self.memtable.write().unwrap().clear();
self.dense_index.write().unwrap().clear();
}

pub fn update_index(&self) {
let index = self.dense_index.lock().unwrap().clone();
let index = self.dense_index.read().unwrap().clone();
index::write_index(&self.lsm_config.sstable_path, &index);
}

pub fn list_keys(&self) -> Vec<String> {
let mut keys = Vec::new();

for key in self.memtable.lock().unwrap().keys() {
for key in self.memtable.read().unwrap().keys() {
keys.push(key.clone());
}

for key in self.dense_index.lock().unwrap().keys() {
for key in self.dense_index.read().unwrap().keys() {
keys.push(key.clone());
}

Expand All @@ -226,27 +228,26 @@ impl Lsm {

pub fn drop(&mut self) {
self.clear();
self.bloom_filter.lock().unwrap().clear();
self.bloom_filter.write().unwrap().clear();
}

pub fn load_snapshot(path: path::PathBuf, snapshot: Snapshot) {
sstable::Segment::from_tree(snapshot.get_memtable(), &path.display().to_string());
index::write_index(&path.display().to_string(), snapshot.get_dense_index());
filter::write_filter(&path.display().to_string(), snapshot.get_bloom_filter());
sstable::Segment::from_tree(snapshot.get_memtable(), &path);
index::write_index(&path, snapshot.get_dense_index());
filter::write_filter(&path, snapshot.get_bloom_filter());
}
}

impl Drop for Lsm {
fn drop(&mut self) {
let memtable = self.memtable.lock().unwrap();
let mut dense_index = self.dense_index.lock().unwrap();
let memtable = self.memtable.read().unwrap();
let mut dense_index = self.dense_index.write().unwrap();

if memtable.is_empty() {
return;
}

let segments =
sstable::Segment::from_tree(memtable.deref(), self.lsm_config.sstable_path.as_str());
let segments = sstable::Segment::from_tree(memtable.deref(), &self.lsm_config.sstable_path);

for token in segments.1 {
dense_index.insert(token.0, token.1);
Expand All @@ -262,7 +263,7 @@ impl Drop for Lsm {

filter::write_filter(
&self.lsm_config.sstable_path,
self.bloom_filter.lock().unwrap().deref(),
self.bloom_filter.read().unwrap().deref(),
);
}
}
6 changes: 3 additions & 3 deletions src/storage/lsm/snapshots/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ impl Snapshot {
}

let snapshot = Snapshot::new(
lsm.memtable.lock().unwrap().clone(),
lsm.bloom_filter.lock().unwrap().clone(),
lsm.dense_index.lock().unwrap().clone(),
lsm.memtable.read().unwrap().clone(),
lsm.bloom_filter.read().unwrap().clone(),
lsm.dense_index.read().unwrap().clone(),
);

let now = chrono::Local::now();
Expand Down
Loading

0 comments on commit 12241a0

Please sign in to comment.