Skip to content

Commit

Permalink
Merge pull request #2 from weezy20/compaction
Browse files Browse the repository at this point in the history
Compaction
  • Loading branch information
weezy20 authored Oct 20, 2023
2 parents 7fc55af + f7266b4 commit b4fda64
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 20 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
RUST_LOG=debug
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ test = false

[dependencies]
clap = { version = "4.1.6", features = ["derive"] }
dotenv = "0.15.0"
env_logger = "0.10.0"
lazy_static = "1.4.0"
log = "0.4.20"
Expand Down
9 changes: 7 additions & 2 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use kvs::cli;
use log::{error, info};
use std::env;
fn main() -> kvs::Result<()> {
// Read kv_00001.log file into BufReader
::dotenv::dotenv().ok();
// Read kv_00001.log file into BufReader
// let file = std::fs::File::open("kv_00001.log")?;
// let buf = std::io::BufReader::new(&file);
// let buf2 = std::io::BufRead::lines(buf)
Expand All @@ -22,6 +23,10 @@ fn main() -> kvs::Result<()> {
// create a local kvs instance
let mut kvs = kvs::KvStore::open(env::current_dir()?)?;

if cli.compact {
kvs.compaction()?;
}

if let Some(action) = cli.action {
match action {
Action::Set(SetCmd { key, value }) => {
Expand All @@ -43,7 +48,7 @@ fn main() -> kvs::Result<()> {
}
Action::Remove(RmCmd { key }) => {
info!("Removing \"{key}\"");
let _ = kvs.remove(key);
let _ = kvs.remove(key);
exit_program(0);
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ pub struct KvsCLI {
/// Turn debugging information on
#[arg(short, long, action = clap::ArgAction::Count)]
pub debug: u8,

/// Run compaction
#[arg(short, long)]
pub compact: bool,
}

#[derive(clap::Args, Serialize, Deserialize, Debug)]
Expand Down
117 changes: 99 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use ron::ser::PrettyConfig;
use serde::Deserialize;
use std::{
cell::RefCell,
collections::HashMap,
collections::{BTreeMap, HashMap},
fs::{File, OpenOptions},
io::{BufRead, BufReader, BufWriter, Read, Seek, Write},
path::{Path, PathBuf},
Expand Down Expand Up @@ -124,23 +124,7 @@ impl KvStore {
Err(err) => error!("Cannot load in memory index: {:?}", err),
}
} else {
let mut buf = String::new();
// Reads the entire file at once
// Alternatively you can use BufReader on the file and read it line by line to perform deserialization
// and replay the `Action`
let _bytes_read = (*disk).read_to_string(&mut buf).map_err(|e| {
error!("Cannot load log file into memory");
e
})?;
let mut de = ron::Deserializer::from_str(&buf).expect("RON: deserializer init error");
let log: Vec<Action> = std::iter::from_fn({
move || {
de.end()
.is_err()
.then_some(Action::deserialize::<_>(&mut de))
}
})
.collect::<std::result::Result<Vec<_>, _>>()?;
let log = read_action_from_log(&mut disk)?;
// -- Replay the commands in the log --
let mut offset: Offset = 0;
for (idx, action) in log.iter().enumerate() {
Expand Down Expand Up @@ -170,12 +154,108 @@ impl KvStore {
drop(disk);
Ok(store)
}

/// Run compaction on the disk log
pub fn compaction(&mut self) -> Result<()> {
let mut file = self
.disk
.as_ref()
.ok_or(DbError::Uninitialized)?
.borrow_mut()
.try_clone()?;
file.rewind()?;

let log: Vec<Action> = read_action_from_log(&mut file)?;
// Hold unique keys last set value, None in case it was removed
let mut unique_keys: BTreeMap<String, Option<String>> = BTreeMap::new();
let mut compacted_log: Vec<Action> = Vec::with_capacity(log.capacity());
// debug!("Log to compact : {log:?}");
log.iter().rev().for_each(|action| {
// debug!("compact OFFSET: {offset}");
match action {
Action::Set(SetCmd { key, value }) => {
if !unique_keys.contains_key(key) {
unique_keys.insert(key.to_string(), Some(value.to_string()));
}
}
Action::Get(_) => (),
Action::Remove(RmCmd { key }) => {
if !unique_keys.contains_key(key) {
unique_keys.insert(key.to_string(), None);
}
}
}
});
debug!("Unique Keys : {:?}", unique_keys);
// Rebuild log
compacted_log.extend(unique_keys.into_iter().rev().map(|(key, value)| {
if let Some(value) = value {
Action::Set(SetCmd { key, value })
} else {
Action::Remove(RmCmd { key })
}
}));
// Recompute offsets
self.map.clear();
let mut offset: Offset = 1;
compacted_log.iter().for_each(|action| {
match action {
Action::Set(SetCmd { key, .. }) => {
self.map.insert(key.clone(), offset);
}
Action::Get(_) => {}
Action::Remove(RmCmd { key }) => {
self.map.remove(key);
}
};
offset += 1;
});
self.offset = offset;
debug!("Post compaction, current offset {}", self.offset);
// Write compacted_log to self.disk
let mut file = self
.disk
.as_ref()
.ok_or(DbError::Uninitialized)?
.borrow_mut();
file.rewind()?;
// Clear file contents
file.set_len(0)?;
// Write serialized to file but one entry at a time instead of as a Vec
for action in compacted_log {
let serialized = ron::ser::to_string_pretty(&action, RON_CONFIG.to_owned())? + "\n";
file.write_all(serialized.as_bytes())?;
}
Ok(())
}
}
/// Deserialize on disk log
fn read_action_from_log(disk: &mut File) -> Result<Vec<Action>> {
let mut buf = String::new();
let _bytes_read = (*disk).read_to_string(&mut buf).map_err(|e| {
error!("Cannot load log file into memory");
e
})?;
let mut de = ron::Deserializer::from_str(&buf).expect("RON: deserializer init error");
let log: Vec<Action> = std::iter::from_fn({
move || {
de.end()
.is_err()
.then_some(Action::deserialize::<_>(&mut de))
}
})
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(log)
}

impl KvStore {
/// Set : When setting a key to a value, kvs writes the set command to disk in a sequential log,
/// then stores the log pointer (file offset) of that command in the in-memory index from key to pointer.
pub fn set(&mut self, key: String, value: String) -> Result<()> {
if self.map.len() > 500 {
// trigger compaction
(*self).compaction()?;
}
let mut file = self
.disk
.as_ref()
Expand All @@ -193,6 +273,7 @@ impl KvStore {
// write serialized to self.disk
// TODO : Maybe think about optimizing this? file sys-call on every set cmd?
file.write_all(serialized.as_bytes())?;

Ok(())
}
/// Get : When retrieving a value for a key with the get command, it searches the index,
Expand Down

0 comments on commit b4fda64

Please sign in to comment.