Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 86 additions & 35 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@ use crate::{
};
use alloy_primitives::B256;
use parking_lot::Mutex;
use std::{io, path::Path};
use std::{io, path::Path, sync::Arc};

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct Database {
pub(crate) inner: Arc<DatabaseInner>,
}

#[derive(Debug)]
pub(crate) struct DatabaseInner {
pub(crate) storage_engine: StorageEngine,
pub(crate) transaction_manager: Mutex<TransactionManager>,
metrics: DatabaseMetrics,
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub enum Error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very rare for errors to implement Clone, which is why most errors (in the std lib too) don't implement it. Is there a specific reason why we want them here? The PR description mentions "usability reasons", but I struggle to find places where errors are cloned instead of being simply propagated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was introduced for seamless compatibility with Reth, as they appear to clone errors. I'll double check whether or not this is truly necessary

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon further review this mostly stems from Reth cloning Results (example) including the ProviderError

We don't really need to embed the raw TrieDB Error inside of the ProviderError, and can instead just stringify this to remove the need for error cloning.

PageError(PageError),
EngineError(engine::Error),
Expand All @@ -30,6 +35,16 @@ pub enum OpenError {
IO(io::Error),
}

impl Clone for OpenError {
fn clone(&self) -> Self {
match self {
Self::PageError(e) => Self::PageError(e.clone()),
Self::MetadataError(e) => Self::MetadataError(e.clone()),
Self::IO(e) => Self::IO(std::io::Error::new(e.kind(), e.to_string())),
}
}
}

impl Database {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of rewriting all the code to use self.inner. instead of self., I would suggest simply changing impl Database to impl DatabaseInner. The only methods that need to stay inside impl Database are the constructors (create and new). Everything else can be transparently delegated by Deref

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this would require making DatabaseInner public, which we probably want to avoid

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option might be to make Transaction accept a generic DB which implements Deref<Database>, so that we can use &Database or Arc<Database> as needed.

pub fn create(path: impl AsRef<Path>) -> Result<Self, OpenError> {
let db_file_path = path.as_ref();
Expand Down Expand Up @@ -67,17 +82,65 @@ impl Database {

pub fn new(storage_engine: StorageEngine) -> Self {
Self {
storage_engine,
transaction_manager: Mutex::new(TransactionManager::new()),
metrics: DatabaseMetrics::default(),
inner: Arc::new(DatabaseInner {
storage_engine,
transaction_manager: Mutex::new(TransactionManager::new()),
metrics: DatabaseMetrics::default(),
}),
}
}

pub fn begin_rw(&self) -> Result<Transaction<RW>, TransactionError> {
let context = self.inner.storage_engine.write_context();
let min_snapshot_id =
self.inner.transaction_manager.lock().begin_rw(context.snapshot_id)?;
if min_snapshot_id > 0 {
self.inner.storage_engine.unlock(min_snapshot_id - 1);
}
Ok(Transaction::new(context, self.clone()))
}

pub fn begin_ro(&self) -> Result<Transaction<RO>, TransactionError> {
let context = self.inner.storage_engine.read_context();
self.inner.transaction_manager.lock().begin_ro(context.snapshot_id);
Ok(Transaction::new(context, self.clone()))
}

pub fn print_page<W: io::Write>(&self, buf: W, page_id: Option<PageId>) -> Result<(), Error> {
self.inner.print_page(buf, page_id)
}

pub fn root_page_info<W: io::Write>(
&self,
buf: W,
file_path: impl AsRef<Path>,
) -> Result<(), OpenError> {
self.inner.root_page_info(buf, file_path)
}

pub fn print_statistics<W: io::Write>(&self, buf: W) -> Result<(), Error> {
self.inner.print_statistics(buf)
}

pub fn size(&self) -> u32 {
self.inner.size()
}

pub fn state_root(&self) -> B256 {
self.inner.state_root()
}

pub fn close(self) -> io::Result<()> {
Arc::try_unwrap(self.inner).unwrap().close()
}
}

impl DatabaseInner {
fn close(self) -> io::Result<()> {
self.storage_engine.close()
}

pub fn print_page<W: io::Write>(self, buf: W, page_id: Option<PageId>) -> Result<(), Error> {
fn print_page<W: io::Write>(&self, buf: W, page_id: Option<PageId>) -> Result<(), Error> {
let context = self.storage_engine.read_context();
// TODO: Must use `expect()` because `storage::engine::Error` and `database::Error` are not
// compatible. There's probably no reason to use two different error enums here, so maybe
Expand All @@ -86,8 +149,8 @@ impl Database {
Ok(())
}

pub fn root_page_info<W: io::Write>(
self,
fn root_page_info<W: io::Write>(
&self,
mut buf: W,
file_path: impl AsRef<Path>,
) -> Result<(), OpenError> {
Expand All @@ -103,47 +166,32 @@ impl Database {
let root_node_page_id = active_slot.root_node_page_id();
let orphaned_page_list = meta_manager.orphan_pages().iter().collect::<Vec<_>>();

writeln!(buf, "Root Node Page ID: {:?}", root_node_page_id).expect("write failed");
writeln!(buf, "Root Node Page ID: {root_node_page_id:?}").expect("write failed");

//root subtrie pageID
writeln!(buf, "Total Page Count: {:?}", page_count).expect("write failed");
writeln!(buf, "Total Page Count: {page_count:?}").expect("write failed");

//orphaned pages list (grouped by page)
writeln!(buf, "Orphaned Pages: {:?}", orphaned_page_list).expect("write failed");
writeln!(buf, "Orphaned Pages: {orphaned_page_list:?}").expect("write failed");

Ok(())
}

pub fn print_statistics<W: io::Write>(self, buf: W) -> Result<(), Error> {
fn print_statistics<W: io::Write>(&self, buf: W) -> Result<(), Error> {
let context = self.storage_engine.read_context();
self.storage_engine.debug_statistics(&context, buf).expect("write failed");
Ok(())
}

pub fn begin_rw(&self) -> Result<Transaction<'_, RW>, TransactionError> {
let context = self.storage_engine.write_context();
let min_snapshot_id = self.transaction_manager.lock().begin_rw(context.snapshot_id)?;
if min_snapshot_id > 0 {
self.storage_engine.unlock(min_snapshot_id - 1);
}
Ok(Transaction::new(context, self))
}

pub fn begin_ro(&self) -> Result<Transaction<'_, RO>, TransactionError> {
let context = self.storage_engine.read_context();
self.transaction_manager.lock().begin_ro(context.snapshot_id);
Ok(Transaction::new(context, self))
}

pub fn state_root(&self) -> B256 {
fn state_root(&self) -> B256 {
self.storage_engine.read_context().root_node_hash
}

pub fn size(&self) -> u32 {
fn size(&self) -> u32 {
self.storage_engine.size()
}

pub fn update_metrics_ro(&self, context: &TransactionContext) {
pub(crate) fn update_metrics_ro(&self, context: &TransactionContext) {
self.metrics
.ro_transaction_pages_read
.record(context.transaction_metrics.take_pages_read() as f64);
Expand All @@ -154,7 +202,7 @@ impl Database {
self.metrics.cache_storage_read_miss.increment(cache_storage_read_miss as u64);
}

pub fn update_metrics_rw(&self, context: &TransactionContext) {
pub(crate) fn update_metrics_rw(&self, context: &TransactionContext) {
self.metrics
.rw_transaction_pages_read
.record(context.transaction_metrics.take_pages_read() as f64);
Expand Down Expand Up @@ -287,14 +335,15 @@ mod tests {

fn alive_page_ids(db: &Database) -> Vec<PageId> {
let orphan_pages = db
.inner
.storage_engine
.meta_manager
.lock()
.orphan_pages()
.iter()
.map(|orphan| orphan.page_id())
.collect::<Vec<_>>();
let all_pages = (1..db.storage_engine.page_manager.size())
let all_pages = (1..db.inner.storage_engine.page_manager.size())
.map(|page_id| PageId::new(page_id).unwrap());
all_pages.filter(move |page_id| !orphan_pages.contains(page_id)).collect()
}
Expand All @@ -303,7 +352,7 @@ mod tests {
let tmp_dir = TempDir::new("test_db").unwrap();
let file_path = tmp_dir.path().join("test.db");
let db = Database::create(file_path).unwrap();
assert_eq!(db.storage_engine.page_manager.size(), 0);
assert_eq!(db.inner.storage_engine.page_manager.size(), 0);

// Add 1000 accounts
let mut tx = db.begin_rw().expect("rw transaction creation failed");
Expand All @@ -318,7 +367,8 @@ mod tests {
assert!(page_ids.len() > 1, "storage has no pages");
for page_id in &page_ids {
assert_eq!(
db.storage_engine
db.inner
.storage_engine
.page_manager
.get(1, *page_id)
.unwrap_or_else(|err| panic!("page {page_id} not found: {err:?}"))
Expand Down Expand Up @@ -347,6 +397,7 @@ mod tests {
);
for page_id in &new_page_ids {
let page = db
.inner
.storage_engine
.page_manager
.get(1, *page_id)
Expand Down
9 changes: 9 additions & 0 deletions src/meta/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ pub enum OpenMetadataError {
IO(io::Error),
}

impl Clone for OpenMetadataError {
fn clone(&self) -> Self {
match self {
Self::Corrupted => Self::Corrupted,
Self::IO(e) => Self::IO(std::io::Error::new(e.kind(), e.to_string())),
}
}
}

impl fmt::Display for OpenMetadataError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Expand Down
2 changes: 1 addition & 1 deletion src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub enum Node {
},
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum NodeError {
ChildrenUnsupported,
MaxPrefixLengthExceeded,
Expand Down
19 changes: 19 additions & 0 deletions src/page/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,22 @@ pub enum PageError {
InvalidPageContents(PageId),
// TODO: add more errors here for other cases.
}

impl Clone for PageError {
fn clone(&self) -> Self {
match self {
Self::PageNotFound(id) => Self::PageNotFound(*id),
Self::PageOccupied(id) => Self::PageOccupied(*id),
Self::PageDirty(id) => Self::PageDirty(*id),
Self::PageLimitReached => Self::PageLimitReached,
Self::InvalidRootPage(id) => Self::InvalidRootPage(*id),
Self::InvalidCellPointer => Self::InvalidCellPointer,
Self::NoFreeCells => Self::NoFreeCells,
Self::PageIsFull => Self::PageIsFull,
Self::PageSplitLimitReached => Self::PageSplitLimitReached,
Self::IO(e) => Self::IO(std::io::Error::new(e.kind(), e.to_string())),
Self::InvalidValue => Self::InvalidValue,
Self::InvalidPageContents(id) => Self::InvalidPageContents(*id),
}
}
}
12 changes: 8 additions & 4 deletions src/page/slotted_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl SlottedPage<'_> {
}

// Returns the cell pointer at the given index.
fn get_cell_pointer(&self, index: u8) -> Result<CellPointer, PageError> {
fn get_cell_pointer(&self, index: u8) -> Result<CellPointer<'_>, PageError> {
if index >= self.num_cells() {
return Err(PageError::InvalidCellPointer);
}
Expand All @@ -83,7 +83,7 @@ impl SlottedPage<'_> {
self.page.contents()[0]
}

fn cell_pointers_iter(&self) -> impl Iterator<Item = CellPointer> {
fn cell_pointers_iter(&self) -> impl Iterator<Item = CellPointer<'_>> {
self.page.contents()[1..=CELL_POINTER_SIZE * self.num_cells() as usize]
.chunks(CELL_POINTER_SIZE)
.map(|chunk| chunk.try_into().unwrap())
Expand Down Expand Up @@ -244,7 +244,11 @@ impl<'a> SlottedPageMut<'a> {

// Allocates a cell pointer at the given index with the given length and returns the cell
// pointer.
fn allocate_cell_pointer(&mut self, index: u8, length: u16) -> Result<CellPointer, PageError> {
fn allocate_cell_pointer(
&mut self,
index: u8,
length: u16,
) -> Result<CellPointer<'_>, PageError> {
match self.find_available_slot(index, length)? {
Some(offset) => {
let num_cells = self.num_cells();
Expand Down Expand Up @@ -397,7 +401,7 @@ impl<'a> SlottedPageMut<'a> {
index: u8,
offset: u16,
length: u16,
) -> Result<CellPointer, PageError> {
) -> Result<CellPointer<'_>, PageError> {
let start_index = 1 + CELL_POINTER_SIZE * (index as usize);
let end_index = start_index + CELL_POINTER_SIZE;
let data = &mut self.page.contents_mut()[start_index..end_index];
Expand Down
Loading