From d946c25ad9e4de71fe245d827c2058459162aee4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Alejandro=20Montoya=20Corte=CC=81s?= Date: Fri, 8 Nov 2024 12:35:28 -0500 Subject: [PATCH] Implement DataStore trait for Tx/MutTx --- Cargo.lock | 1 + crates/core/Cargo.toml | 1 + .../locking_tx_datastore/committed_state.rs | 4 +- .../datastore/locking_tx_datastore/mut_tx.rs | 4 +- .../db/datastore/locking_tx_datastore/tx.rs | 2 +- .../locking_tx_datastore/tx_state.rs | 2 +- crates/core/src/db/mod.rs | 1 + crates/core/src/db/query.rs | 297 ++++++++++++++++++ crates/core/src/db/relational_db.rs | 2 +- crates/execution/src/iter.rs | 6 + crates/table/src/table.rs | 9 +- 11 files changed, 321 insertions(+), 8 deletions(-) create mode 100644 crates/core/src/db/query.rs diff --git a/Cargo.lock b/Cargo.lock index 689fa2e2f73..39f288f639f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5108,6 +5108,7 @@ dependencies = [ "spacetimedb-commitlog", "spacetimedb-data-structures", "spacetimedb-durability", + "spacetimedb-execution", "spacetimedb-expr", "spacetimedb-lib", "spacetimedb-metrics", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 5d2d364ad2d..110a9b10426 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -20,6 +20,7 @@ spacetimedb-lib = { workspace = true, features = ["serde", "metrics_impls"] } spacetimedb-client-api-messages.workspace = true spacetimedb-commitlog.workspace = true spacetimedb-durability.workspace = true +spacetimedb-execution.workspace = true spacetimedb-metrics.workspace = true spacetimedb-primitives.workspace = true spacetimedb-sats = { workspace = true, features = ["serde"] } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index 8aa66b2387b..80cbdde98d8 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -53,7 +53,7 @@ pub struct CommittedState { pub(crate) tables: IntMap, pub(crate) blob_store: HashMapBlobStore, /// Provides fast lookup for index id -> an index. - pub(super) index_id_map: IndexIdMap, + pub(crate) index_id_map: IndexIdMap, } impl MemoryUsage for CommittedState { @@ -411,7 +411,7 @@ impl CommittedState { Ok(()) } - pub(super) fn index_seek<'a>( + pub(crate) fn index_seek<'a>( &'a self, table_id: TableId, cols: &ColList, diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index b13b650db79..c1c07856b5f 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -58,8 +58,8 @@ type DecodeResult = core::result::Result; /// handling can lead to deadlocks. Therefore, it is strongly recommended to use /// `Locking::begin_mut_tx()` for instantiation to ensure safe acquisition of locks. pub struct MutTxId { - pub(super) tx_state: TxState, - pub(super) committed_state_write_lock: SharedWriteGuard, + pub(crate) tx_state: TxState, + pub(crate) committed_state_write_lock: SharedWriteGuard, pub(super) sequence_state_lock: SharedMutexGuard, pub(super) lock_wait_time: Duration, pub(crate) timer: Instant, diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs index ad5805db82b..baed4fe1f34 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs @@ -16,7 +16,7 @@ use std::{ }; pub struct TxId { - pub(super) committed_state_shared_lock: SharedReadGuard, + pub(crate) committed_state_shared_lock: SharedReadGuard, pub(super) lock_wait_time: Duration, pub(super) timer: Instant, pub(crate) ctx: ExecutionContext, diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs index 5b04d76eed0..0daa606870f 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs @@ -43,7 +43,7 @@ pub(super) type RemovedIndexIdSet = IntSet; /// - any row in `delete_tables` must be in the associated `CommittedState` /// - any row cannot be in both `insert_tables` and `delete_tables` #[derive(Default)] -pub(super) struct TxState { +pub(crate) struct TxState { //NOTE: Need to preserve order to correctly restore the db after reopen /// For any `TableId` that has had a row inserted into it in this TX /// (which may have since been deleted), diff --git a/crates/core/src/db/mod.rs b/crates/core/src/db/mod.rs index 843cce12ffc..dff460c831b 100644 --- a/crates/core/src/db/mod.rs +++ b/crates/core/src/db/mod.rs @@ -1,5 +1,6 @@ pub mod datastore; pub mod db_metrics; +pub mod query; pub mod relational_db; pub mod update; diff --git a/crates/core/src/db/query.rs b/crates/core/src/db/query.rs new file mode 100644 index 00000000000..1671ae6ccfb --- /dev/null +++ b/crates/core/src/db/query.rs @@ -0,0 +1,297 @@ +use crate::db::datastore::locking_tx_datastore::committed_state::CommittedState; +use crate::db::datastore::locking_tx_datastore::tx::TxId; +use crate::db::datastore::locking_tx_datastore::MutTxId; +use crate::db::datastore::traits::Tx; +use spacetimedb_data_structures::map::IntMap; +use spacetimedb_execution::iter::{Datastore, DeltaScanIter}; +use spacetimedb_primitives::{ColList, IndexId, TableId}; +use spacetimedb_sats::{AlgebraicValue, ProductValue}; +use spacetimedb_table::blob_store::BlobStore; +use spacetimedb_table::btree_index::BTreeIndex; +use spacetimedb_table::table::{IndexScanIter, RowRef, Table, TableScanIter}; +use std::ops::RangeBounds; + +pub trait DatastoreEx { + fn get_committed_state(&self) -> &CommittedState; + fn get_tables(&self) -> &IntMap; +} + +impl Datastore for TxId { + fn delta_scan_iter(&self, _table_id: TableId) -> DeltaScanIter { + DeltaScanIter::empty_iter() + } + + fn table_scan_iter(&self, table_id: TableId) -> TableScanIter { + let table = self.committed_state_shared_lock.tables.get(&table_id).unwrap(); + table.scan_rows(self.get_blob_store()) + } + + fn index_scan_iter(&self, index_id: IndexId, range: &impl RangeBounds) -> IndexScanIter { + let table = self.get_table_for_index(&index_id); + let index = self.get_index(&index_id); + + let btree_index_iter = index.seek(range); + IndexScanIter::new(table, self.get_blob_store(), btree_index_iter) + } + + fn get_table_for_index(&self, index_id: &IndexId) -> &Table { + let (table_id, _) = self.committed_state_shared_lock.index_id_map.get(index_id).unwrap(); + self.committed_state_shared_lock.tables.get(table_id).unwrap() + } + + fn get_index(&self, index_id: &IndexId) -> &BTreeIndex { + let table = self.get_table_for_index(index_id); + table.indexes.values().find(|idx| idx.index_id == *index_id).unwrap() + } + + fn get_blob_store(&self) -> &dyn BlobStore { + &self.committed_state_shared_lock.blob_store + } +} + +impl DatastoreEx for TxId { + fn get_committed_state(&self) -> &CommittedState { + &self.committed_state_shared_lock + } + + fn get_tables(&self) -> &IntMap { + &self.committed_state_shared_lock.tables + } +} + +impl Datastore for MutTxId { + fn delta_scan_iter(&self, _table_id: TableId) -> DeltaScanIter { + DeltaScanIter::empty_iter() + } + + fn table_scan_iter(&self, table_id: TableId) -> TableScanIter { + let table = self.committed_state_write_lock.tables.get(&table_id).unwrap(); + table.scan_rows(self.get_blob_store()) + } + + fn index_scan_iter(&self, index_id: IndexId, range: &impl RangeBounds) -> IndexScanIter { + let table = self.get_table_for_index(&index_id); + let index = self.get_index(&index_id); + + let btree_index_iter = index.seek(range); + IndexScanIter::new(table, self.get_blob_store(), btree_index_iter) + } + + fn get_table_for_index(&self, index_id: &IndexId) -> &Table { + let (table_id, _) = self.committed_state_write_lock.index_id_map.get(index_id).unwrap(); + self.committed_state_write_lock.tables.get(table_id).unwrap() + } + + fn get_index(&self, index_id: &IndexId) -> &BTreeIndex { + let table = self.get_table_for_index(index_id); + table.indexes.values().find(|idx| idx.index_id == *index_id).unwrap() + } + + fn get_blob_store(&self) -> &dyn BlobStore { + &self.committed_state_write_lock.blob_store + } +} + +impl DatastoreEx for MutTxId { + fn get_committed_state(&self) -> &CommittedState { + &self.committed_state_write_lock + } + + fn get_tables(&self) -> &IntMap { + &self.committed_state_write_lock.tables + } +} + +pub struct Query { + pub tx: T, +} + +impl Query +where + T: Datastore + DatastoreEx, +{ + pub fn new(tx: T) -> Self { + Self { tx } + } + + pub fn into_tx(self) -> T { + self.tx + } + + pub fn iter_by_col_eq(&self, table_id: TableId, cols: impl Into, value: &AlgebraicValue) -> IndexScanIter { + self.tx + .get_committed_state() + .index_seek(table_id, &cols.into(), value) + .unwrap() + } + + pub fn iter_by_col_range( + &self, + table_id: TableId, + cols: impl Into, + range: &impl RangeBounds, + ) -> IndexScanIter { + self.tx + .get_committed_state() + .index_seek(table_id, &cols.into(), range) + .unwrap() + } +} + +impl DatastoreEx for Query +where + T: Datastore + DatastoreEx, +{ + fn get_committed_state(&self) -> &CommittedState { + self.tx.get_committed_state() + } + + fn get_tables(&self) -> &IntMap { + self.tx.get_tables() + } +} + +pub fn collect_rows<'a>(iter: impl Iterator>) -> Vec { + iter.map(|row| row.to_product_value()).collect() +} + +impl Datastore for Query +where + T: Datastore + DatastoreEx, +{ + fn delta_scan_iter(&self, table_id: TableId) -> DeltaScanIter { + self.tx.delta_scan_iter(table_id) + } + + fn table_scan_iter(&self, table_id: TableId) -> TableScanIter { + self.tx.table_scan_iter(table_id) + } + + fn index_scan_iter(&self, index_id: IndexId, range: &impl RangeBounds) -> IndexScanIter { + self.tx.index_scan_iter(index_id, range) + } + + fn get_table_for_index(&self, index_id: &IndexId) -> &Table { + self.tx.get_table_for_index(index_id) + } + + fn get_index(&self, index_id: &IndexId) -> &BTreeIndex { + self.tx.get_index(index_id) + } + + fn get_blob_store(&self) -> &dyn BlobStore { + self.tx.get_blob_store() + } +} + +impl From for Query { + fn from(value: T) -> Self { + Self::new(value) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::datastore::traits::IsolationLevel; + use crate::db::relational_db::tests_utils::TestDB; + use crate::error::DBError; + use crate::execution_context::Workload; + use spacetimedb_lib::error::ResultTest; + use spacetimedb_sats::{product, AlgebraicType}; + + fn create_data(total_rows: u64) -> ResultTest<(TestDB, TableId)> { + let db = TestDB::in_memory()?; + + let rows: Vec<_> = (1..=total_rows) + .map(|i| product!(i, format!("health{i}").into_boxed_str())) + .collect(); + let schema = &[("inventory_id", AlgebraicType::U64), ("name", AlgebraicType::String)]; + let indexes = &[(0.into(), "inventory_id")]; + let table_id = db.create_table_for_test("test", schema, indexes)?; + + db.with_auto_commit(Workload::ForTests, |tx| { + for row in rows { + db.insert(tx, table_id, row)?; + } + Ok::<(), DBError>(()) + })?; + + Ok((db, table_id)) + } + + #[test] + fn table_scan() -> ResultTest<()> { + let (db, table_id) = create_data(2)?; + let tx = db.begin_tx(Workload::ForTests); + + let query = Query::new(tx); + + let iter = query.table_scan_iter(table_id); + + assert_eq!( + collect_rows(iter), + vec![product![1u64, "health1"], product![2u64, "health2"]] + ); + + Ok(()) + } + + #[test] + fn table_scan_mut() -> ResultTest<()> { + let (db, table_id) = create_data(2)?; + + let tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); + + let query = Query::new(tx); + + let iter = query.table_scan_iter(table_id); + + assert_eq!( + collect_rows(iter), + vec![product![1u64, "health1"], product![2u64, "health2"]] + ); + + Ok(()) + } + + #[test] + fn index_scan() -> ResultTest<()> { + let (db, table_id) = create_data(2)?; + let tx = db.begin_tx(Workload::ForTests); + + let query = Query::new(tx); + let index = query + .get_committed_state() + .tables + .get(&table_id) + .unwrap() + .indexes + .values() + .next() + .unwrap(); + + let iter = query.index_scan_iter(index.index_id, &(AlgebraicValue::U64(1)..=AlgebraicValue::U64(2))); + + assert_eq!( + collect_rows(iter), + vec![product![1u64, "health1"], product![2u64, "health2"]] + ); + + Ok(()) + } + + #[test] + fn eq() -> ResultTest<()> { + let (db, table_id) = create_data(10)?; + let tx = db.begin_tx(Workload::ForTests); + + let query = Query::new(tx); + + let iter = query.iter_by_col_eq(table_id, 0, &AlgebraicValue::U64(1)); + + assert_eq!(collect_rows(iter), vec![product![1u64, "health1"]]); + + Ok(()) + } +} diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 36473f8c066..7990dd72da4 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -744,7 +744,7 @@ impl RelationalDB { /// to use `?`, you can write: /// /// ```ignore - /// db.with_auto_commit(|tx| { + /// db.with_auto_commit(Workload::Internal, |tx| { /// let _ = db.schema_for_table(tx, 42)?; /// // ... /// Ok(()) diff --git a/crates/execution/src/iter.rs b/crates/execution/src/iter.rs index 8db3e2c9c7c..fd8fe764489 100644 --- a/crates/execution/src/iter.rs +++ b/crates/execution/src/iter.rs @@ -141,6 +141,12 @@ pub struct DeltaScanIter<'a> { iter: std::slice::Iter<'a, ProductValue>, } +impl<'a> DeltaScanIter<'a> { + pub fn empty_iter() -> Self { + Self { iter: [].iter() } + } +} + impl<'a> Iterator for DeltaScanIter<'a> { type Item = &'a ProductValue; diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 44943f82db1..edac361d94b 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -1030,7 +1030,14 @@ impl<'a> Iterator for IndexScanIter<'a> { } } -impl IndexScanIter<'_> { +impl<'a> IndexScanIter<'a> { + pub fn new(table: &'a Table, blob_store: &'a dyn BlobStore, btree_index_iter: BTreeIndexRangeIter<'a>) -> Self { + Self { + table, + blob_store, + btree_index_iter, + } + } /// Returns the current number of pointers the iterator has returned thus far. pub fn num_pointers_yielded(&self) -> u64 { self.btree_index_iter.num_pointers_yielded()