From bcea0e5b54456a352e655c67748401ba5a17572b Mon Sep 17 00:00:00 2001 From: Clark Kampfe Date: Sat, 25 May 2024 09:17:50 -0500 Subject: [PATCH] TxId, KeySize, ValueSize, FileId --- README.md | 6 ++-- src/base.rs | 35 +++++++++++---------- src/keydir.rs | 56 +++++++++++++++++++++++++++++---- src/loadable.rs | 7 +++-- src/merge_pointer.rs | 13 ++++---- src/record.rs | 74 ++++++++++++++++++++++++++++++++++---------- 6 files changed, 141 insertions(+), 50 deletions(-) diff --git a/README.md b/README.md index a9a6f69..c0e1b84 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,8 @@ I have known about Bitcask for a while, and I wanted to learn it by building a w - [ ] improve error contexts reported to callers (e.g. with `snafu` or improving use of `thiserror`) - [ ] error handling and reporting in the event of a corrupt record - [ ] investigate allowing the access of old values -- [ ] optimize layout of EntryPointer size, file_id to u32, value_position to u32?, tx_id to `time::Time` -- [ ] investigate restricting key size to u16 +- [x] file_id to FileId(u32) +- [x] key_size to KeySize(u16) +- [x] value_size to ValueSize(u32) +- [ ] tx_id to `time::Time`? - [x] use crc32 instead of blake3 diff --git a/src/base.rs b/src/base.rs index cf55aa3..e19c613 100644 --- a/src/base.rs +++ b/src/base.rs @@ -1,7 +1,7 @@ -use crate::keydir::{EntryPointer, EntryWithLiveness, Keydir, Liveness}; +use crate::keydir::{EntryPointer, EntryWithLiveness, FileId, Keydir, Liveness}; use crate::loadable::Loadable; use crate::merge_pointer::MergePointer; -use crate::record::Record; +use crate::record::{Record, TxId}; use crate::Options; use crate::{error, FlushBehavior}; use serde::de::DeserializeOwned; @@ -21,9 +21,9 @@ where options: Options, keydir: Keydir, active_file: tokio::io::BufWriter, - active_file_id: u64, + active_file_id: FileId, offset: u64, - tx_id: u128, + tx_id: TxId, } // public impls @@ -36,7 +36,9 @@ where db_file_ids.sort(); - let latest_file_id = db_file_ids.last().unwrap_or(&0); + let file_id_zero: FileId = 0.into(); + + let latest_file_id = db_file_ids.last().unwrap_or(&file_id_zero); let active_file_id = latest_file_id + 1; @@ -57,7 +59,7 @@ where let keydir = Keydir::new(all_entries); - let latest_tx_id = keydir.latest_tx_id().unwrap_or(0); + let latest_tx_id = keydir.latest_tx_id().unwrap_or(0.into()); let mut active_file_path = db_directory.to_owned(); active_file_path.push(active_file_id.to_string()); @@ -94,7 +96,7 @@ where f.seek(std::io::SeekFrom::Start(entry.value_position)) .await?; - let mut buf = vec![0u8; entry.value_size as usize]; + let mut buf = vec![0u8; entry.value_size.0 as usize]; f.read_exact(&mut buf).await?; @@ -256,8 +258,9 @@ where assert!(bytes_read == merge_pointer.record_size); - let value_position = - offset + crate::record::Record::HEADER_SIZE as u64 + merge_pointer.key_size as u64; + let value_position = offset + + crate::record::Record::HEADER_SIZE as u64 + + merge_pointer.key_size.0 as u64; offset += merge_pointer.record_size; @@ -320,7 +323,7 @@ where self.active_file.write_all(&record).await?; let value_position = - self.offset + crate::record::Record::HEADER_SIZE as u64 + record.key_size() as u64; + self.offset + crate::record::Record::HEADER_SIZE as u64 + record.key_size().0 as u64; let entry = EntryPointer { file_id: self.active_file_id, @@ -332,8 +335,8 @@ where self.keydir.insert(k, entry); let entry_size = crate::record::Record::HEADER_SIZE - + record.key_size() as usize - + record.value_size() as usize; + + record.key_size().0 as usize + + record.value_size().0 as usize; self.offset += entry_size as u64; @@ -375,8 +378,8 @@ where self.keydir.remove(&k); let entry_size = crate::record::Record::HEADER_SIZE - + record.key_size() as usize - + record.value_size() as usize; + + record.key_size().0 as usize + + record.value_size().0 as usize; self.offset += entry_size as u64; @@ -406,7 +409,7 @@ where } } - async fn all_db_file_ids(db_directory: &Path) -> crate::Result> { + async fn all_db_file_ids(db_directory: &Path) -> crate::Result> { let mut file_ids = vec![]; let mut dir_reader = tokio::fs::read_dir(db_directory).await?; @@ -426,7 +429,7 @@ where Ok(file_ids) } - async fn inactive_db_file_ids(&self) -> crate::Result> { + async fn inactive_db_file_ids(&self) -> crate::Result> { let mut db_file_ids = Self::all_db_file_ids(&self.db_directory).await?; db_file_ids.retain(|file_id| *file_id != self.active_file_id); diff --git a/src/keydir.rs b/src/keydir.rs index 915ccc6..14f4a39 100644 --- a/src/keydir.rs +++ b/src/keydir.rs @@ -1,7 +1,11 @@ use crate::loadable::Loadable; +use crate::record::{TxId, ValueSize}; use serde::de::DeserializeOwned; use std::collections::HashMap; use std::hash::Hash; +use std::num::ParseIntError; +use std::ops::{Add, AddAssign, Deref}; +use std::str::FromStr; use tokio::io::AsyncRead; #[derive(Debug)] @@ -40,7 +44,7 @@ where self.keydir.keys() } - pub(crate) fn latest_tx_id(&self) -> Option { + pub(crate) fn latest_tx_id(&self) -> Option { self.keydir .values() .max_by(|a, b| a.tx_id.cmp(&b.tx_id)) @@ -50,10 +54,10 @@ where #[derive(Debug, PartialEq)] pub(crate) struct EntryPointer { - pub(crate) file_id: u64, + pub(crate) file_id: FileId, pub(crate) value_position: u64, - pub(crate) value_size: u32, - pub(crate) tx_id: u128, + pub(crate) value_size: ValueSize, + pub(crate) tx_id: TxId, } #[derive(Debug, PartialEq)] @@ -81,7 +85,7 @@ where async fn read( reader: &mut tokio::io::BufReader, offset: &mut u64, - file_id: u64, + file_id: FileId, ) -> crate::Result> where Self: Sized, @@ -107,7 +111,7 @@ where let liveness = record.liveness(); let value_position = - *offset + crate::record::Record::HEADER_SIZE as u64 + record.key_size() as u64; + *offset + crate::record::Record::HEADER_SIZE as u64 + record.key_size().0 as u64; // and update the offset to reflect that we have read a record *offset += record.len() as u64; @@ -126,3 +130,43 @@ where ))) } } + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct FileId(u32); + +impl FromStr for FileId { + type Err = ParseIntError; + + fn from_str(s: &str) -> Result { + let v = s.parse::()?; + Ok(FileId(v)) + } +} + +impl From for FileId { + fn from(value: u32) -> Self { + Self(value) + } +} + +impl AddAssign for FileId { + fn add_assign(&mut self, rhs: u32) { + self.0 += rhs + } +} + +impl Add for &FileId { + type Output = FileId; + + fn add(self, rhs: u32) -> Self::Output { + FileId(self.0 + rhs) + } +} + +impl Deref for FileId { + type Target = u32; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} diff --git a/src/loadable.rs b/src/loadable.rs index 5647e5e..95c060a 100644 --- a/src/loadable.rs +++ b/src/loadable.rs @@ -1,3 +1,4 @@ +use crate::keydir::FileId; use std::hash::Hash; use std::{collections::HashMap, path::Path}; use tokio::io::AsyncRead; @@ -5,7 +6,7 @@ use tokio::io::AsyncRead; pub(crate) trait Loadable: PartialOrd + Sized { async fn load_latest_entries( db_directory: &Path, - db_file_ids: &[u64], + db_file_ids: &[FileId], ) -> crate::Result> { let mut all_files_entries: Vec> = vec![]; @@ -33,7 +34,7 @@ pub(crate) trait Loadable: PartialOrd + Sized { async fn load_entries_from_file( db_directory: &Path, - file_id: u64, + file_id: FileId, ) -> crate::Result> { let mut path = db_directory.to_owned(); @@ -59,7 +60,7 @@ pub(crate) trait Loadable: PartialOrd + Sized { async fn read( reader: &mut tokio::io::BufReader, offset: &mut u64, - file_id: u64, + file_id: FileId, ) -> crate::Result> where Self: Sized; diff --git a/src/merge_pointer.rs b/src/merge_pointer.rs index 56fe8bb..64441cf 100644 --- a/src/merge_pointer.rs +++ b/src/merge_pointer.rs @@ -1,5 +1,6 @@ -use crate::keydir::Liveness; +use crate::keydir::{FileId, Liveness}; use crate::loadable::Loadable; +use crate::record::{KeySize, TxId, ValueSize}; use serde::de::DeserializeOwned; use std::hash::Hash; use tokio::io::AsyncRead; @@ -10,12 +11,12 @@ use tokio::io::AsyncRead; pub(crate) struct MergePointer { /// whether the data is an insert or a delete pub(crate) liveness: Liveness, - pub(crate) file_id: u64, - pub(crate) tx_id: u128, + pub(crate) file_id: FileId, + pub(crate) tx_id: TxId, pub(crate) record_offset: u64, pub(crate) record_size: u64, - pub(crate) key_size: u32, - pub(crate) value_size: u32, + pub(crate) key_size: KeySize, + pub(crate) value_size: ValueSize, } impl PartialOrd for MergePointer { @@ -28,7 +29,7 @@ impl Loadable for MergePointer { async fn read( reader: &mut tokio::io::BufReader, offset: &mut u64, - file_id: u64, + file_id: FileId, ) -> crate::Result> { let record = match crate::record::Record::read_from(reader).await { Ok(record) => record, diff --git a/src/record.rs b/src/record.rs index 15467c6..aa17cdd 100644 --- a/src/record.rs +++ b/src/record.rs @@ -1,5 +1,6 @@ use crate::{error, keydir::Liveness}; use serde::{de::DeserializeOwned, Serialize}; +use std::ops::{Add, AddAssign}; use std::{ops::Deref, sync::OnceLock}; use tokio::io::{AsyncRead, AsyncReadExt}; @@ -37,7 +38,7 @@ impl Record { pub(crate) fn new( k: &K, v: &V, - tx_id: u128, + tx_id: TxId, ) -> crate::Result { let encoded_tx_id = tx_id.to_be_bytes(); @@ -55,8 +56,8 @@ impl Record { let value_size = encoded_value.len(); let body_size = key_size + value_size; - let encoded_key_size = (key_size as u32).to_be_bytes(); - let encoded_value_size = (value_size as u32).to_be_bytes(); + let encoded_key_size = KeySize(key_size as u16).0.to_be_bytes(); + let encoded_value_size = ValueSize(value_size as u32).0.to_be_bytes(); let mut buf = Vec::with_capacity(Self::HEADER_SIZE + body_size); // header @@ -87,8 +88,8 @@ impl Record { reader.read_exact(&mut record.buf).await?; - let key_size_usize: usize = record.key_size().try_into().unwrap(); - let value_size_usize: usize = record.value_size().try_into().unwrap(); + let key_size_usize: usize = record.key_size().0.into(); + let value_size_usize: usize = record.value_size().0.try_into().unwrap(); let body_size: usize = key_size_usize + value_size_usize; record.buf.resize(record.buf.len() + body_size, 0); @@ -127,13 +128,13 @@ impl Record { pub(crate) fn key_bytes(&self) -> &[u8] { let start = 0; - let end = self.key_size() as usize; + let end = self.key_size().0 as usize; &self.body()[start..end] } pub(crate) fn value_bytes(&self) -> &[u8] { - let start = self.key_size() as usize; - let end = start + self.value_size() as usize; + let start = self.key_size().0 as usize; + let end = start + self.value_size().0 as usize; &self.body()[start..end] } @@ -141,25 +142,29 @@ impl Record { self.buf.len() } - pub(crate) fn tx_id(&self) -> u128 { - u128::from_be_bytes(self.tx_id_bytes().try_into().unwrap()) + pub(crate) fn tx_id(&self) -> TxId { + u128::from_be_bytes(self.tx_id_bytes().try_into().unwrap()).into() } - pub(crate) fn key_size(&self) -> u32 { - u32::from_be_bytes(self.key_size_bytes().try_into().unwrap()) + pub(crate) fn key_size(&self) -> KeySize { + KeySize(u16::from_be_bytes( + self.key_size_bytes().try_into().unwrap(), + )) } - pub(crate) fn value_size(&self) -> u32 { - u32::from_be_bytes(self.value_size_bytes().try_into().unwrap()) + pub(crate) fn value_size(&self) -> ValueSize { + ValueSize(u32::from_be_bytes( + self.value_size_bytes().try_into().unwrap(), + )) } } // private impls impl Record { const HASH_SIZE: usize = std::mem::size_of::(); - const TX_ID_SIZE: usize = std::mem::size_of::(); - const KEY_SIZE_SIZE: usize = std::mem::size_of::(); - const VALUE_SIZE_SIZE: usize = std::mem::size_of::(); + const TX_ID_SIZE: usize = std::mem::size_of::(); + const KEY_SIZE_SIZE: usize = std::mem::size_of::(); + const VALUE_SIZE_SIZE: usize = std::mem::size_of::(); fn header(&self) -> &[u8] { &self.buf[..Self::HEADER_SIZE] @@ -203,3 +208,38 @@ impl Record { &self.header()[start..end] } } + +#[derive(PartialEq)] +pub(crate) struct KeySize(pub(crate) u16); + +#[derive(Debug, PartialEq)] +pub(crate) struct ValueSize(pub(crate) u32); + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct TxId(u128); + +impl TxId { + pub(crate) fn to_be_bytes(self) -> [u8; 16] { + self.0.to_be_bytes() + } +} + +impl From for TxId { + fn from(value: u128) -> Self { + Self(value) + } +} + +impl Add for TxId { + type Output = Self; + + fn add(self, rhs: u128) -> Self::Output { + Self(self.0 + rhs) + } +} + +impl AddAssign for TxId { + fn add_assign(&mut self, rhs: u128) { + self.0 += rhs + } +}