Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement DataStore trait for Tx/MutTx #1967

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ spacetimedb-lib = { workspace = true, features = ["serde", "metrics_impls"] }
spacetimedb-client-api-messages.workspace = true
spacetimedb-commitlog.workspace = true
spacetimedb-durability.workspace = true
spacetimedb-execution.workspace = true
spacetimedb-metrics.workspace = true
spacetimedb-primitives.workspace = true
spacetimedb-sats = { workspace = true, features = ["serde"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct CommittedState {
pub(crate) tables: IntMap<TableId, Table>,
pub(crate) blob_store: HashMapBlobStore,
/// Provides fast lookup for index id -> an index.
pub(super) index_id_map: IndexIdMap,
pub(crate) index_id_map: IndexIdMap,
}

impl MemoryUsage for CommittedState {
Expand Down Expand Up @@ -411,7 +411,7 @@ impl CommittedState {
Ok(())
}

pub(super) fn index_seek<'a>(
pub(crate) fn index_seek<'a>(
&'a self,
table_id: TableId,
cols: &ColList,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type DecodeResult<T> = core::result::Result<T, DecodeError>;
/// handling can lead to deadlocks. Therefore, it is strongly recommended to use
/// `Locking::begin_mut_tx()` for instantiation to ensure safe acquisition of locks.
pub struct MutTxId {
pub(super) tx_state: TxState,
pub(super) committed_state_write_lock: SharedWriteGuard<CommittedState>,
pub(crate) tx_state: TxState,
pub(crate) committed_state_write_lock: SharedWriteGuard<CommittedState>,
Comment on lines +61 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather the visibilities not change in this PR and to instead expose what is needed through methods with a maximally narrow interface.

pub(super) sequence_state_lock: SharedMutexGuard<SequencesState>,
pub(super) lock_wait_time: Duration,
pub(crate) timer: Instant,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/datastore/locking_tx_datastore/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{
};

pub struct TxId {
pub(super) committed_state_shared_lock: SharedReadGuard<CommittedState>,
pub(crate) committed_state_shared_lock: SharedReadGuard<CommittedState>,
pub(super) lock_wait_time: Duration,
pub(super) timer: Instant,
pub(crate) ctx: ExecutionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(super) type RemovedIndexIdSet = IntSet<IndexId>;
/// - any row in `delete_tables` must be in the associated `CommittedState`
/// - any row cannot be in both `insert_tables` and `delete_tables`
#[derive(Default)]
pub(super) struct TxState {
pub(crate) struct TxState {
//NOTE: Need to preserve order to correctly restore the db after reopen
/// For any `TableId` that has had a row inserted into it in this TX
/// (which may have since been deleted),
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod datastore;
pub mod db_metrics;
pub mod query;
pub mod relational_db;
pub mod update;

Expand Down
297 changes: 297 additions & 0 deletions crates/core/src/db/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
use crate::db::datastore::locking_tx_datastore::committed_state::CommittedState;
use crate::db::datastore::locking_tx_datastore::tx::TxId;
use crate::db::datastore::locking_tx_datastore::MutTxId;
use crate::db::datastore::traits::Tx;
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_execution::iter::{Datastore, DeltaScanIter};
use spacetimedb_primitives::{ColList, IndexId, TableId};
use spacetimedb_sats::{AlgebraicValue, ProductValue};
use spacetimedb_table::blob_store::BlobStore;
use spacetimedb_table::btree_index::BTreeIndex;
use spacetimedb_table::table::{IndexScanIter, RowRef, Table, TableScanIter};
use std::ops::RangeBounds;

pub trait DatastoreEx {
fn get_committed_state(&self) -> &CommittedState;
fn get_tables(&self) -> &IntMap<TableId, Table>;
}

impl Datastore for TxId {
fn delta_scan_iter(&self, _table_id: TableId) -> DeltaScanIter {
DeltaScanIter::empty_iter()
}

fn table_scan_iter(&self, table_id: TableId) -> TableScanIter {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this methods requiere unwrap and assume the acquiring of the iter will not fail. I think we should make them return a Result so we can report Index/Table not found

let table = self.committed_state_shared_lock.tables.get(&table_id).unwrap();
table.scan_rows(self.get_blob_store())
}

fn index_scan_iter(&self, index_id: IndexId, range: &impl RangeBounds<AlgebraicValue>) -> IndexScanIter {
let table = self.get_table_for_index(&index_id);
let index = self.get_index(&index_id);

let btree_index_iter = index.seek(range);
IndexScanIter::new(table, self.get_blob_store(), btree_index_iter)
}

fn get_table_for_index(&self, index_id: &IndexId) -> &Table {
let (table_id, _) = self.committed_state_shared_lock.index_id_map.get(index_id).unwrap();
self.committed_state_shared_lock.tables.get(table_id).unwrap()
}

fn get_index(&self, index_id: &IndexId) -> &BTreeIndex {
let table = self.get_table_for_index(index_id);
table.indexes.values().find(|idx| idx.index_id == *index_id).unwrap()
}
Comment on lines +25 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This duplicates structure from MutTx::get_table_and_index_type. Please move this logic to CommittedState / TxId and refactor MutTx to use the new methods there.

For example:

self.index_seek_by_id(index_id, range)

in defined in TxId, delegating to a method with the same name in CommittedState.


fn get_blob_store(&self) -> &dyn BlobStore {
&self.committed_state_shared_lock.blob_store
}
}

impl DatastoreEx for TxId {
fn get_committed_state(&self) -> &CommittedState {
&self.committed_state_shared_lock
}

fn get_tables(&self) -> &IntMap<TableId, Table> {
&self.committed_state_shared_lock.tables
}
}

impl Datastore for MutTxId {
fn delta_scan_iter(&self, _table_id: TableId) -> DeltaScanIter {
DeltaScanIter::empty_iter()
}

fn table_scan_iter(&self, table_id: TableId) -> TableScanIter {
let table = self.committed_state_write_lock.tables.get(&table_id).unwrap();
table.scan_rows(self.get_blob_store())
}

fn index_scan_iter(&self, index_id: IndexId, range: &impl RangeBounds<AlgebraicValue>) -> IndexScanIter {
let table = self.get_table_for_index(&index_id);
let index = self.get_index(&index_id);

let btree_index_iter = index.seek(range);
IndexScanIter::new(table, self.get_blob_store(), btree_index_iter)
}

fn get_table_for_index(&self, index_id: &IndexId) -> &Table {
let (table_id, _) = self.committed_state_write_lock.index_id_map.get(index_id).unwrap();
self.committed_state_write_lock.tables.get(table_id).unwrap()
}
Comment on lines +72 to +83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic does not consider the TxState which could have added an index. You should be able to refactor MutTxId::btree_scan and extract a method (TableId, &ColList, &impl RangeBounds<AV>) from it which handles this logic correctly.


fn get_index(&self, index_id: &IndexId) -> &BTreeIndex {
let table = self.get_table_for_index(index_id);
table.indexes.values().find(|idx| idx.index_id == *index_id).unwrap()
}

fn get_blob_store(&self) -> &dyn BlobStore {
&self.committed_state_write_lock.blob_store
}
}

impl DatastoreEx for MutTxId {
fn get_committed_state(&self) -> &CommittedState {
&self.committed_state_write_lock
}

fn get_tables(&self) -> &IntMap<TableId, Table> {
&self.committed_state_write_lock.tables
}
}

pub struct Query<T> {
pub tx: T,
}

impl<T> Query<T>
where
T: Datastore + DatastoreEx,
{
pub fn new(tx: T) -> Self {
Self { tx }
}

pub fn into_tx(self) -> T {
self.tx
}

pub fn iter_by_col_eq(&self, table_id: TableId, cols: impl Into<ColList>, value: &AlgebraicValue) -> IndexScanIter {
self.tx
.get_committed_state()
.index_seek(table_id, &cols.into(), value)
.unwrap()
}
Comment on lines +121 to +126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be defined in terms of the method below.


pub fn iter_by_col_range(
&self,
table_id: TableId,
cols: impl Into<ColList>,
range: &impl RangeBounds<AlgebraicValue>,
) -> IndexScanIter {
self.tx
.get_committed_state()
.index_seek(table_id, &cols.into(), range)
.unwrap()
}
}

impl<T> DatastoreEx for Query<T>
where
T: Datastore + DatastoreEx,
{
fn get_committed_state(&self) -> &CommittedState {
self.tx.get_committed_state()
}

fn get_tables(&self) -> &IntMap<TableId, Table> {
self.tx.get_tables()
}
}

pub fn collect_rows<'a>(iter: impl Iterator<Item = RowRef<'a>>) -> Vec<ProductValue> {
iter.map(|row| row.to_product_value()).collect()
}

impl<T> Datastore for Query<T>
where
T: Datastore + DatastoreEx,
{
fn delta_scan_iter(&self, table_id: TableId) -> DeltaScanIter {
self.tx.delta_scan_iter(table_id)
}

fn table_scan_iter(&self, table_id: TableId) -> TableScanIter {
self.tx.table_scan_iter(table_id)
}

fn index_scan_iter(&self, index_id: IndexId, range: &impl RangeBounds<AlgebraicValue>) -> IndexScanIter {
self.tx.index_scan_iter(index_id, range)
}

fn get_table_for_index(&self, index_id: &IndexId) -> &Table {
self.tx.get_table_for_index(index_id)
}

fn get_index(&self, index_id: &IndexId) -> &BTreeIndex {
self.tx.get_index(index_id)
}

fn get_blob_store(&self) -> &dyn BlobStore {
self.tx.get_blob_store()
}
}

impl<T: Tx + Datastore + DatastoreEx> From<T> for Query<T> {
fn from(value: T) -> Self {
Self::new(value)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::db::datastore::traits::IsolationLevel;
use crate::db::relational_db::tests_utils::TestDB;
use crate::error::DBError;
use crate::execution_context::Workload;
use spacetimedb_lib::error::ResultTest;
use spacetimedb_sats::{product, AlgebraicType};

fn create_data(total_rows: u64) -> ResultTest<(TestDB, TableId)> {
let db = TestDB::in_memory()?;

let rows: Vec<_> = (1..=total_rows)
.map(|i| product!(i, format!("health{i}").into_boxed_str()))
.collect();
let schema = &[("inventory_id", AlgebraicType::U64), ("name", AlgebraicType::String)];
let indexes = &[(0.into(), "inventory_id")];
let table_id = db.create_table_for_test("test", schema, indexes)?;

db.with_auto_commit(Workload::ForTests, |tx| {
for row in rows {
db.insert(tx, table_id, row)?;
}
Ok::<(), DBError>(())
})?;

Ok((db, table_id))
}

#[test]
fn table_scan() -> ResultTest<()> {
let (db, table_id) = create_data(2)?;
let tx = db.begin_tx(Workload::ForTests);

let query = Query::new(tx);

let iter = query.table_scan_iter(table_id);

assert_eq!(
collect_rows(iter),
vec![product![1u64, "health1"], product![2u64, "health2"]]
);

Ok(())
}

#[test]
fn table_scan_mut() -> ResultTest<()> {
let (db, table_id) = create_data(2)?;

let tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);

let query = Query::new(tx);

let iter = query.table_scan_iter(table_id);

assert_eq!(
collect_rows(iter),
vec![product![1u64, "health1"], product![2u64, "health2"]]
);

Ok(())
}

#[test]
fn index_scan() -> ResultTest<()> {
let (db, table_id) = create_data(2)?;
let tx = db.begin_tx(Workload::ForTests);

let query = Query::new(tx);
let index = query
.get_committed_state()
.tables
.get(&table_id)
.unwrap()
.indexes
.values()
.next()
.unwrap();

let iter = query.index_scan_iter(index.index_id, &(AlgebraicValue::U64(1)..=AlgebraicValue::U64(2)));

assert_eq!(
collect_rows(iter),
vec![product![1u64, "health1"], product![2u64, "health2"]]
);

Ok(())
}

#[test]
fn eq() -> ResultTest<()> {
let (db, table_id) = create_data(10)?;
let tx = db.begin_tx(Workload::ForTests);

let query = Query::new(tx);

let iter = query.iter_by_col_eq(table_id, 0, &AlgebraicValue::U64(1));

assert_eq!(collect_rows(iter), vec![product![1u64, "health1"]]);

Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ impl RelationalDB {
/// to use `?`, you can write:
///
/// ```ignore
/// db.with_auto_commit(|tx| {
/// db.with_auto_commit(Workload::Internal, |tx| {
/// let _ = db.schema_for_table(tx, 42)?;
/// // ...
/// Ok(())
Expand Down
6 changes: 6 additions & 0 deletions crates/execution/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ pub struct DeltaScanIter<'a> {
iter: std::slice::Iter<'a, ProductValue>,
}

impl<'a> DeltaScanIter<'a> {
pub fn empty_iter() -> Self {
Self { iter: [].iter() }
}
}

impl<'a> Iterator for DeltaScanIter<'a> {
type Item = &'a ProductValue;

Expand Down
Loading
Loading