Skip to content

Commit

Permalink
TxId, KeySize, ValueSize, FileId
Browse files Browse the repository at this point in the history
  • Loading branch information
ckampfe committed May 25, 2024
1 parent e492d1a commit bcea0e5
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 50 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ I have known about Bitcask for a while, and I wanted to learn it by building a w
- [ ] improve error contexts reported to callers (e.g. with `snafu` or improving use of `thiserror`)
- [ ] error handling and reporting in the event of a corrupt record
- [ ] investigate allowing the access of old values
- [ ] optimize layout of EntryPointer size, file_id to u32, value_position to u32?, tx_id to `time::Time`
- [ ] investigate restricting key size to u16
- [x] file_id to FileId(u32)
- [x] key_size to KeySize(u16)
- [x] value_size to ValueSize(u32)
- [ ] tx_id to `time::Time`?
- [x] use crc32 instead of blake3
35 changes: 19 additions & 16 deletions src/base.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::keydir::{EntryPointer, EntryWithLiveness, Keydir, Liveness};
use crate::keydir::{EntryPointer, EntryWithLiveness, FileId, Keydir, Liveness};
use crate::loadable::Loadable;
use crate::merge_pointer::MergePointer;
use crate::record::Record;
use crate::record::{Record, TxId};
use crate::Options;
use crate::{error, FlushBehavior};
use serde::de::DeserializeOwned;
Expand All @@ -21,9 +21,9 @@ where
options: Options,
keydir: Keydir<K>,
active_file: tokio::io::BufWriter<tokio::fs::File>,
active_file_id: u64,
active_file_id: FileId,
offset: u64,
tx_id: u128,
tx_id: TxId,
}

// public impls
Expand All @@ -36,7 +36,9 @@ where

db_file_ids.sort();

let latest_file_id = db_file_ids.last().unwrap_or(&0);
let file_id_zero: FileId = 0.into();

let latest_file_id = db_file_ids.last().unwrap_or(&file_id_zero);

let active_file_id = latest_file_id + 1;

Expand All @@ -57,7 +59,7 @@ where

let keydir = Keydir::new(all_entries);

let latest_tx_id = keydir.latest_tx_id().unwrap_or(0);
let latest_tx_id = keydir.latest_tx_id().unwrap_or(0.into());

let mut active_file_path = db_directory.to_owned();
active_file_path.push(active_file_id.to_string());
Expand Down Expand Up @@ -94,7 +96,7 @@ where
f.seek(std::io::SeekFrom::Start(entry.value_position))
.await?;

let mut buf = vec![0u8; entry.value_size as usize];
let mut buf = vec![0u8; entry.value_size.0 as usize];

f.read_exact(&mut buf).await?;

Expand Down Expand Up @@ -256,8 +258,9 @@ where

assert!(bytes_read == merge_pointer.record_size);

let value_position =
offset + crate::record::Record::HEADER_SIZE as u64 + merge_pointer.key_size as u64;
let value_position = offset
+ crate::record::Record::HEADER_SIZE as u64
+ merge_pointer.key_size.0 as u64;

offset += merge_pointer.record_size;

Expand Down Expand Up @@ -320,7 +323,7 @@ where
self.active_file.write_all(&record).await?;

let value_position =
self.offset + crate::record::Record::HEADER_SIZE as u64 + record.key_size() as u64;
self.offset + crate::record::Record::HEADER_SIZE as u64 + record.key_size().0 as u64;

let entry = EntryPointer {
file_id: self.active_file_id,
Expand All @@ -332,8 +335,8 @@ where
self.keydir.insert(k, entry);

let entry_size = crate::record::Record::HEADER_SIZE
+ record.key_size() as usize
+ record.value_size() as usize;
+ record.key_size().0 as usize
+ record.value_size().0 as usize;

self.offset += entry_size as u64;

Expand Down Expand Up @@ -375,8 +378,8 @@ where
self.keydir.remove(&k);

let entry_size = crate::record::Record::HEADER_SIZE
+ record.key_size() as usize
+ record.value_size() as usize;
+ record.key_size().0 as usize
+ record.value_size().0 as usize;

self.offset += entry_size as u64;

Expand Down Expand Up @@ -406,7 +409,7 @@ where
}
}

async fn all_db_file_ids(db_directory: &Path) -> crate::Result<Vec<u64>> {
async fn all_db_file_ids(db_directory: &Path) -> crate::Result<Vec<FileId>> {
let mut file_ids = vec![];

let mut dir_reader = tokio::fs::read_dir(db_directory).await?;
Expand All @@ -426,7 +429,7 @@ where
Ok(file_ids)
}

async fn inactive_db_file_ids(&self) -> crate::Result<Vec<u64>> {
async fn inactive_db_file_ids(&self) -> crate::Result<Vec<FileId>> {
let mut db_file_ids = Self::all_db_file_ids(&self.db_directory).await?;

db_file_ids.retain(|file_id| *file_id != self.active_file_id);
Expand Down
56 changes: 50 additions & 6 deletions src/keydir.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::loadable::Loadable;
use crate::record::{TxId, ValueSize};
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::hash::Hash;
use std::num::ParseIntError;
use std::ops::{Add, AddAssign, Deref};
use std::str::FromStr;
use tokio::io::AsyncRead;

#[derive(Debug)]
Expand Down Expand Up @@ -40,7 +44,7 @@ where
self.keydir.keys()
}

pub(crate) fn latest_tx_id(&self) -> Option<u128> {
pub(crate) fn latest_tx_id(&self) -> Option<TxId> {
self.keydir
.values()
.max_by(|a, b| a.tx_id.cmp(&b.tx_id))
Expand All @@ -50,10 +54,10 @@ where

#[derive(Debug, PartialEq)]
pub(crate) struct EntryPointer {
pub(crate) file_id: u64,
pub(crate) file_id: FileId,
pub(crate) value_position: u64,
pub(crate) value_size: u32,
pub(crate) tx_id: u128,
pub(crate) value_size: ValueSize,
pub(crate) tx_id: TxId,
}

#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -81,7 +85,7 @@ where
async fn read<R: AsyncRead + Unpin>(
reader: &mut tokio::io::BufReader<R>,
offset: &mut u64,
file_id: u64,
file_id: FileId,
) -> crate::Result<Option<(K, Self)>>
where
Self: Sized,
Expand All @@ -107,7 +111,7 @@ where
let liveness = record.liveness();

let value_position =
*offset + crate::record::Record::HEADER_SIZE as u64 + record.key_size() as u64;
*offset + crate::record::Record::HEADER_SIZE as u64 + record.key_size().0 as u64;

// and update the offset to reflect that we have read a record
*offset += record.len() as u64;
Expand All @@ -126,3 +130,43 @@ where
)))
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct FileId(u32);

impl FromStr for FileId {
type Err = ParseIntError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let v = s.parse::<u32>()?;
Ok(FileId(v))
}
}

impl From<u32> for FileId {
fn from(value: u32) -> Self {
Self(value)
}
}

impl AddAssign<u32> for FileId {
fn add_assign(&mut self, rhs: u32) {
self.0 += rhs
}
}

impl Add<u32> for &FileId {
type Output = FileId;

fn add(self, rhs: u32) -> Self::Output {
FileId(self.0 + rhs)
}
}

impl Deref for FileId {
type Target = u32;

fn deref(&self) -> &Self::Target {
&self.0
}
}
7 changes: 4 additions & 3 deletions src/loadable.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::keydir::FileId;
use std::hash::Hash;
use std::{collections::HashMap, path::Path};
use tokio::io::AsyncRead;

pub(crate) trait Loadable<K: Eq + Hash>: PartialOrd + Sized {
async fn load_latest_entries(
db_directory: &Path,
db_file_ids: &[u64],
db_file_ids: &[FileId],
) -> crate::Result<HashMap<K, Self>> {
let mut all_files_entries: Vec<HashMap<K, Self>> = vec![];

Expand Down Expand Up @@ -33,7 +34,7 @@ pub(crate) trait Loadable<K: Eq + Hash>: PartialOrd + Sized {

async fn load_entries_from_file(
db_directory: &Path,
file_id: u64,
file_id: FileId,
) -> crate::Result<HashMap<K, Self>> {
let mut path = db_directory.to_owned();

Expand All @@ -59,7 +60,7 @@ pub(crate) trait Loadable<K: Eq + Hash>: PartialOrd + Sized {
async fn read<R: AsyncRead + Unpin>(
reader: &mut tokio::io::BufReader<R>,
offset: &mut u64,
file_id: u64,
file_id: FileId,
) -> crate::Result<Option<(K, Self)>>
where
Self: Sized;
Expand Down
13 changes: 7 additions & 6 deletions src/merge_pointer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::keydir::Liveness;
use crate::keydir::{FileId, Liveness};
use crate::loadable::Loadable;
use crate::record::{KeySize, TxId, ValueSize};
use serde::de::DeserializeOwned;
use std::hash::Hash;
use tokio::io::AsyncRead;
Expand All @@ -10,12 +11,12 @@ use tokio::io::AsyncRead;
pub(crate) struct MergePointer {
/// whether the data is an insert or a delete
pub(crate) liveness: Liveness,
pub(crate) file_id: u64,
pub(crate) tx_id: u128,
pub(crate) file_id: FileId,
pub(crate) tx_id: TxId,
pub(crate) record_offset: u64,
pub(crate) record_size: u64,
pub(crate) key_size: u32,
pub(crate) value_size: u32,
pub(crate) key_size: KeySize,
pub(crate) value_size: ValueSize,
}

impl PartialOrd for MergePointer {
Expand All @@ -28,7 +29,7 @@ impl<K: Eq + Hash + DeserializeOwned> Loadable<K> for MergePointer {
async fn read<R: AsyncRead + Unpin>(
reader: &mut tokio::io::BufReader<R>,
offset: &mut u64,
file_id: u64,
file_id: FileId,
) -> crate::Result<Option<(K, Self)>> {
let record = match crate::record::Record::read_from(reader).await {
Ok(record) => record,
Expand Down
Loading

0 comments on commit bcea0e5

Please sign in to comment.