From 4684925fa64acdeab00c0b5fc0b9f8d88890889c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Alejandro=20Montoya=20Corte=CC=81s?= Date: Wed, 11 Dec 2024 16:24:22 -0500 Subject: [PATCH] Refactor for full split --- .../locking_tx_datastore/committed_state.rs | 53 ++--- .../locking_tx_datastore/datastore.rs | 28 ++- .../db/datastore/locking_tx_datastore/mod.rs | 2 +- .../datastore/locking_tx_datastore/mut_tx.rs | 37 ++- .../locking_tx_datastore/state_view.rs | 129 +++++++---- .../db/datastore/locking_tx_datastore/tx.rs | 31 ++- crates/core/src/db/datastore/traits.rs | 18 +- crates/core/src/db/relational_db.rs | 18 +- crates/core/src/vm.rs | 212 +++++++++++------- crates/vm/src/eval.rs | 6 + 10 files changed, 347 insertions(+), 187 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index 0930dd2ed39..20707414153 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -1,9 +1,11 @@ use super::{ datastore::Result, sequence::{Sequence, SequencesState}, - state_view::{Iter, IterByColRange, ScanIterByColRange, StateView}, + state_view::{IterTxByColRange, StateView}, tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState}, + IterByColEq, }; +use crate::db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterTxByColRange}; use crate::{ db::{ datastore::{ @@ -69,6 +71,12 @@ impl MemoryUsage for CommittedState { } impl StateView for CommittedState { + type Iter<'a> = IterTx<'a>; + type IterByColRange<'a, R: RangeBounds> = IterTxByColRange<'a, R>; + type IterByColEq<'a, 'r> = IterByColEq<'a, 'r> + where + Self: 'a; + fn get_schema(&self, table_id: TableId) -> Option<&Arc> { self.tables.get(&table_id).map(|table| table.get_schema()) } @@ -77,9 +85,9 @@ impl StateView for CommittedState { self.get_table(table_id).map(|table| table.row_count) } - fn iter(&self, table_id: TableId) -> Result> { + fn iter(&self, table_id: TableId) -> Result> { if self.table_name(table_id).is_some() { - return Ok(Iter::tx(table_id, self)); + return Ok(IterTx::new(table_id, self)); } Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into()) } @@ -91,16 +99,25 @@ impl StateView for CommittedState { table_id: TableId, cols: ColList, range: R, - ) -> Result> { + ) -> Result> { // TODO: Why does this unconditionally return a `Scan` iter, // instead of trying to return a `CommittedIndex` iter? // Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen - Ok(IterByColRange::Scan(ScanIterByColRange::new( + Ok(IterTxByColRange::Scan(ScanIterTxByColRange::new( self.iter(table_id)?, cols, range, ))) } + + fn iter_by_col_eq<'a, 'r>( + &'a self, + table_id: TableId, + cols: impl Into, + value: &'r AlgebraicValue, + ) -> Result> { + self.iter_by_col_range(table_id, cols.into(), value) + } } /// Swallow `Err(TableError::Duplicate(_))`, which signals a set-semantic collision, @@ -629,32 +646,6 @@ impl CommittedState { } } -pub enum CommittedIndexIter<'a> { - Tx(CommittedIndexIterTx<'a>), - MutTx(CommittedIndexIterMutTx<'a>), -} - -impl<'a> CommittedIndexIter<'a> { - pub(super) fn tx(committed_rows: IndexScanIter<'a>) -> Self { - Self::Tx(CommittedIndexIterTx::new(committed_rows)) - } - - pub(super) fn mut_tx(table_id: TableId, tx_state: &'a TxState, committed_rows: IndexScanIter<'a>) -> Self { - Self::MutTx(CommittedIndexIterMutTx::new(table_id, tx_state, committed_rows)) - } -} - -impl<'a> Iterator for CommittedIndexIter<'a> { - type Item = RowRef<'a>; - - fn next(&mut self) -> Option { - match self { - Self::Tx(iter) => iter.next(), - Self::MutTx(iter) => iter.next(), - } - } -} - pub struct CommittedIndexIterTx<'a> { committed_rows: IndexScanIter<'a>, num_committed_rows_fetched: u64, diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index 8cbe633288a..af5ee9a2ac5 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -2,10 +2,11 @@ use super::{ committed_state::CommittedState, mut_tx::MutTxId, sequence::SequencesState, - state_view::{Iter, IterByColRange, StateView}, + state_view::{IterTxByColRange, StateView}, tx::TxId, tx_state::TxState, }; +use crate::db::datastore::locking_tx_datastore::state_view::{IterMutTx, IterMutTxByColRange, IterTx}; use crate::execution_context::Workload; use crate::{ db::{ @@ -323,15 +324,26 @@ impl Tx for Locking { impl TxDatastore for Locking { type Iter<'a> - = Iter<'a> + = IterTx<'a> where Self: 'a; - type IterByColEq<'a, 'r> - = IterByColRange<'a, &'r AlgebraicValue> + type IterMutTx<'a>= IterMutTx<'a> where Self: 'a; type IterByColRange<'a, R: RangeBounds> - = IterByColRange<'a, R> + = IterTxByColRange<'a, R> + where + Self: 'a; + type IterMutByColRange<'a, R: RangeBounds> + = IterMutTxByColRange<'a, R> + where + Self: 'a; + type IterByColEq<'a, 'r> + = IterTxByColRange<'a, &'r AlgebraicValue> + where + Self: 'a; + type IterMutByColEq<'a, 'r> + = IterMutTxByColRange<'a, &'r AlgebraicValue> where Self: 'a; @@ -492,7 +504,7 @@ impl MutTxDatastore for Locking { tx.constraint_id_from_name(constraint_name) } - fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result> { + fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result> { tx.iter(table_id) } @@ -502,7 +514,7 @@ impl MutTxDatastore for Locking { table_id: TableId, cols: impl Into, range: R, - ) -> Result> { + ) -> Result> { tx.iter_by_col_range(table_id, cols.into(), range) } @@ -512,7 +524,7 @@ impl MutTxDatastore for Locking { table_id: TableId, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result> { + ) -> Result> { tx.iter_by_col_eq(table_id, cols.into(), value) } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index 33c636f9428..d7e590bdcc5 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -6,7 +6,7 @@ mod mut_tx; pub use mut_tx::MutTxId; mod sequence; pub mod state_view; -pub use state_view::{Iter, IterByColEq, IterByColRange}; +pub use state_view::{IterByColEq, IterTxByColRange}; pub(crate) mod tx; mod tx_state; diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index e37674e6c0c..b163a0a1742 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -1,12 +1,14 @@ use super::{ - committed_state::{CommittedIndexIter, CommittedState}, + committed_state::CommittedState, datastore::{record_metrics, Result}, sequence::{Sequence, SequencesState}, - state_view::{IndexSeekIterMutTxId, Iter, IterByColRange, ScanIterByColRange, StateView}, + state_view::{IndexSeekIterMutTxId, ScanIterMutTxByColRange, StateView}, tx::TxId, tx_state::{DeleteTable, IndexIdMap, TxState}, SharedMutexGuard, SharedWriteGuard, }; +use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterMutTx; +use crate::db::datastore::locking_tx_datastore::state_view::{IterMutTx, IterMutTxByColEq, IterMutTxByColRange}; use crate::db::datastore::system_tables::{StRowLevelSecurityFields, StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; use crate::db::datastore::{ system_tables::{ @@ -1419,6 +1421,12 @@ impl MutTxId { } impl StateView for MutTxId { + type Iter<'a> = IterMutTx<'a>; + type IterByColRange<'a, R: RangeBounds> = IterMutTxByColRange<'a, R>; + type IterByColEq<'a, 'r> = IterMutTxByColEq<'a, 'r> + where + Self: 'a; + fn get_schema(&self, table_id: TableId) -> Option<&Arc> { // TODO(bikeshedding, docs): should this also check if the schema is in the system tables, // but the table hasn't been constructed yet? @@ -1442,9 +1450,13 @@ impl StateView for MutTxId { } } - fn iter(&self, table_id: TableId) -> Result> { + fn iter(&self, table_id: TableId) -> Result> { if self.table_name(table_id).is_some() { - return Ok(Iter::mut_tx(table_id, &self.tx_state, &self.committed_state_write_lock)); + return Ok(IterMutTx::new( + table_id, + &self.tx_state, + &self.committed_state_write_lock, + )); } Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into()) } @@ -1454,7 +1466,7 @@ impl StateView for MutTxId { table_id: TableId, cols: ColList, range: R, - ) -> Result> { + ) -> Result> { // We have to index_seek in both the committed state and the current tx state. // First, we will check modifications in the current tx. It may be that the table // has not been modified yet in the current tx, in which case we will only search @@ -1468,7 +1480,7 @@ impl StateView for MutTxId { // rolling it back will leave the index in place. if let Some(inserted_rows) = self.tx_state.index_seek(table_id, &cols, &range) { // The current transaction has modified this table, and the table is indexed. - Ok(IterByColRange::Index(IndexSeekIterMutTxId { + Ok(IterMutTxByColRange::Index(IndexSeekIterMutTxId { table_id, tx_state: &self.tx_state, inserted_rows, @@ -1479,7 +1491,7 @@ impl StateView for MutTxId { // Either the current transaction has not modified this table, or the table is not // indexed. match self.committed_state_write_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::mut_tx( + Some(committed_rows) => Ok(IterMutTxByColRange::CommittedIndex(CommittedIndexIterMutTx::new( table_id, &self.tx_state, committed_rows, @@ -1513,7 +1525,7 @@ impl StateView for MutTxId { } } - Ok(IterByColRange::Scan(ScanIterByColRange::new( + Ok(IterMutTxByColRange::Scan(ScanIterMutTxByColRange::new( self.iter(table_id)?, cols, range, @@ -1522,4 +1534,13 @@ impl StateView for MutTxId { } } } + + fn iter_by_col_eq<'a, 'r>( + &'a self, + table_id: TableId, + cols: impl Into, + value: &'r AlgebraicValue, + ) -> Result> { + self.iter_by_col_range(table_id, cols.into(), value) + } } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs index c73d1db7abb..894d4c7e460 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs @@ -1,8 +1,9 @@ use super::{ - committed_state::{CommittedIndexIter, CommittedState}, + committed_state::CommittedState, datastore::Result, tx_state::{DeleteTable, TxState}, }; +use crate::db::datastore::locking_tx_datastore::committed_state::{CommittedIndexIterMutTx, CommittedIndexIterTx}; use crate::{ db::datastore::system_tables::{ StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StIndexFields, StIndexRow, StScheduledFields, @@ -24,6 +25,16 @@ use std::sync::Arc; // StateView trait, is designed to define the behavior of viewing internal datastore states. // Currently, it applies to: CommittedState, MutTxId, and TxId. pub trait StateView { + type Iter<'a>: Iterator> + where + Self: 'a; + type IterByColRange<'a, R: RangeBounds>: Iterator> + where + Self: 'a; + type IterByColEq<'a, 'r>: Iterator> + where + Self: 'a; + fn get_schema(&self, table_id: TableId) -> Option<&Arc>; fn table_id_from_name(&self, table_name: &str) -> Result> { @@ -35,7 +46,7 @@ pub trait StateView { /// Returns the number of rows in the table identified by `table_id`. fn table_row_count(&self, table_id: TableId) -> Option; - fn iter(&self, table_id: TableId) -> Result>; + fn iter(&self, table_id: TableId) -> Result>; fn table_name(&self, table_id: TableId) -> Option<&str> { self.get_schema(table_id).map(|s| &*s.table_name) @@ -49,16 +60,14 @@ pub trait StateView { table_id: TableId, cols: ColList, range: R, - ) -> Result>; + ) -> Result>; fn iter_by_col_eq<'a, 'r>( &'a self, table_id: TableId, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result> { - self.iter_by_col_range(table_id, cols.into(), value) - } + ) -> Result>; /// Reads the schema information for the specified `table_id` directly from the database. fn schema_for_table_raw(&self, table_id: TableId) -> Result { @@ -298,31 +307,6 @@ impl<'a> Iterator for IterTx<'a> { } } -pub enum Iter<'a> { - Tx(IterTx<'a>), - MutTx(IterMutTx<'a>), -} - -impl<'a> Iter<'a> { - pub(super) fn tx(table_id: TableId, committed_state: &'a CommittedState) -> Self { - Iter::Tx(IterTx::new(table_id, committed_state)) - } - pub(super) fn mut_tx(table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState) -> Self { - Iter::MutTx(IterMutTx::new(table_id, tx_state, committed_state)) - } -} - -impl<'a> Iterator for Iter<'a> { - type Item = RowRef<'a>; - - fn next(&mut self) -> Option { - match self { - Iter::Tx(iter) => iter.next(), - Iter::MutTx(iter) => iter.next(), - } - } -} - pub struct IndexSeekIterMutTxId<'a> { pub(super) table_id: TableId, pub(super) tx_state: &'a TxState, @@ -374,13 +358,15 @@ impl<'a> Iterator for IndexSeekIterMutTxId<'a> { } } -/// An [IterByColRange] for an individual column value. -pub type IterByColEq<'a, 'r> = IterByColRange<'a, &'r AlgebraicValue>; +/// An [IterTxByColRange] for an individual column value. +pub type IterByColEq<'a, 'r> = IterTxByColRange<'a, &'r AlgebraicValue>; +/// An [IterMutTxByColRange] for an individual column value. +pub type IterMutTxByColEq<'a, 'r> = IterMutTxByColRange<'a, &'r AlgebraicValue>; /// An iterator for a range of values in a column. -pub enum IterByColRange<'a, R: RangeBounds> { +pub enum IterTxByColRange<'a, R: RangeBounds> { /// When the column in question does not have an index. - Scan(ScanIterByColRange<'a, R>), + Scan(ScanIterTxByColRange<'a, R>), /// When the column has an index, and the table /// has been modified this transaction. @@ -388,34 +374,87 @@ pub enum IterByColRange<'a, R: RangeBounds> { /// When the column has an index, and the table /// has not been modified in this transaction. - CommittedIndex(CommittedIndexIter<'a>), + CommittedIndex(CommittedIndexIterTx<'a>), } -impl<'a, R: RangeBounds> Iterator for IterByColRange<'a, R> { +impl<'a, R: RangeBounds> Iterator for IterTxByColRange<'a, R> { type Item = RowRef<'a>; fn next(&mut self) -> Option { match self { - IterByColRange::Scan(range) => range.next(), - IterByColRange::Index(range) => range.next(), - IterByColRange::CommittedIndex(seek) => seek.next(), + IterTxByColRange::Scan(range) => range.next(), + IterTxByColRange::Index(range) => range.next(), + IterTxByColRange::CommittedIndex(seek) => seek.next(), } } } -pub struct ScanIterByColRange<'a, R: RangeBounds> { - scan_iter: Iter<'a>, +/// An iterator for a range of values in a column. +pub enum IterMutTxByColRange<'a, R: RangeBounds> { + /// When the column in question does not have an index. + Scan(ScanIterMutTxByColRange<'a, R>), + + /// When the column has an index, and the table + /// has been modified this transaction. + Index(IndexSeekIterMutTxId<'a>), + + /// When the column has an index, and the table + /// has not been modified in this transaction. + CommittedIndex(CommittedIndexIterMutTx<'a>), +} + +impl<'a, R: RangeBounds> Iterator for IterMutTxByColRange<'a, R> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + match self { + IterMutTxByColRange::Scan(range) => range.next(), + IterMutTxByColRange::Index(range) => range.next(), + IterMutTxByColRange::CommittedIndex(seek) => seek.next(), + } + } +} + +pub struct ScanIterTxByColRange<'a, R: RangeBounds> { + scan_iter: IterTx<'a>, + cols: ColList, + range: R, +} + +impl<'a, R: RangeBounds> ScanIterTxByColRange<'a, R> { + pub(super) fn new(scan_iter: IterTx<'a>, cols: ColList, range: R) -> Self { + Self { scan_iter, cols, range } + } +} + +impl<'a, R: RangeBounds> Iterator for ScanIterTxByColRange<'a, R> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + for row_ref in &mut self.scan_iter { + let value = row_ref.project(&self.cols).unwrap(); + if self.range.contains(&value) { + return Some(row_ref); + } + } + + None + } +} + +pub struct ScanIterMutTxByColRange<'a, R: RangeBounds> { + scan_iter: IterMutTx<'a>, cols: ColList, range: R, } -impl<'a, R: RangeBounds> ScanIterByColRange<'a, R> { - pub(super) fn new(scan_iter: Iter<'a>, cols: ColList, range: R) -> Self { +impl<'a, R: RangeBounds> ScanIterMutTxByColRange<'a, R> { + pub(super) fn new(scan_iter: IterMutTx<'a>, cols: ColList, range: R) -> Self { Self { scan_iter, cols, range } } } -impl<'a, R: RangeBounds> Iterator for ScanIterByColRange<'a, R> { +impl<'a, R: RangeBounds> Iterator for ScanIterMutTxByColRange<'a, R> { type Item = RowRef<'a>; fn next(&mut self) -> Option { diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs index b785d7c36a6..93bd6d96f2f 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs @@ -1,10 +1,12 @@ use super::datastore::record_metrics; use super::{ - committed_state::{CommittedIndexIter, CommittedState}, + committed_state::CommittedState, datastore::Result, - state_view::{Iter, IterByColRange, StateView}, - SharedReadGuard, + state_view::{IterTxByColRange, StateView}, + IterByColEq, SharedReadGuard, }; +use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterTx; +use crate::db::datastore::locking_tx_datastore::state_view::IterTx; use crate::execution_context::ExecutionContext; use spacetimedb_primitives::{ColList, TableId}; use spacetimedb_sats::AlgebraicValue; @@ -24,6 +26,12 @@ pub struct TxId { } impl StateView for TxId { + type Iter<'a> = IterTx<'a>; + type IterByColRange<'a, R: RangeBounds> = IterTxByColRange<'a, R>; + type IterByColEq<'a, 'r> = IterByColEq<'a, 'r> + where + Self: 'a; + fn get_schema(&self, table_id: TableId) -> Option<&Arc> { self.committed_state_shared_lock.get_schema(table_id) } @@ -32,7 +40,7 @@ impl StateView for TxId { self.committed_state_shared_lock.table_row_count(table_id) } - fn iter(&self, table_id: TableId) -> Result> { + fn iter(&self, table_id: TableId) -> Result> { self.committed_state_shared_lock.iter(table_id) } @@ -44,14 +52,25 @@ impl StateView for TxId { table_id: TableId, cols: ColList, range: R, - ) -> Result> { + ) -> Result> { match self.committed_state_shared_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::tx(committed_rows))), + Some(committed_rows) => Ok(IterTxByColRange::CommittedIndex(CommittedIndexIterTx::new( + committed_rows, + ))), None => self .committed_state_shared_lock .iter_by_col_range(table_id, cols, range), } } + + fn iter_by_col_eq<'a, 'r>( + &'a self, + table_id: TableId, + cols: impl Into, + value: &'r AlgebraicValue, + ) -> Result> { + self.iter_by_col_range(table_id, cols.into(), value) + } } impl TxId { diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index 0d7a0631c4b..529fb65b7ec 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -350,14 +350,24 @@ pub trait TxDatastore: DataRow + Tx { where Self: 'a; - type IterByColRange<'a, R: RangeBounds>: Iterator> + type IterMutTx<'a>: Iterator> where Self: 'a; + type IterByColRange<'a, R: RangeBounds>: Iterator> + where + Self: 'a; + type IterMutByColRange<'a, R: RangeBounds>: Iterator> + where + Self: 'a; type IterByColEq<'a, 'r>: Iterator> where Self: 'a; + type IterMutByColEq<'a, 'r>: Iterator> + where + Self: 'a; + fn iter_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result>; fn iter_by_col_range_tx<'a, R: RangeBounds>( @@ -438,21 +448,21 @@ pub trait MutTxDatastore: TxDatastore + MutTx { fn constraint_id_from_name(&self, tx: &Self::MutTx, constraint_name: &str) -> super::Result>; // Data - fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result>; + fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result>; fn iter_by_col_range_mut_tx<'a, R: RangeBounds>( &'a self, tx: &'a Self::MutTx, table_id: TableId, cols: impl Into, range: R, - ) -> Result>; + ) -> Result>; fn iter_by_col_eq_mut_tx<'a, 'r>( &'a self, tx: &'a Self::MutTx, table_id: TableId, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result>; + ) -> Result>; fn get_mut_tx<'a>( &self, tx: &'a Self::MutTx, diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 9fa1ef92bae..356813e6d5d 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,5 +1,7 @@ use super::datastore::locking_tx_datastore::committed_state::CommittedState; -use super::datastore::locking_tx_datastore::state_view::StateView; +use super::datastore::locking_tx_datastore::state_view::{ + IterMutTx, IterMutTxByColEq, IterMutTxByColRange, IterTx, StateView, +}; use super::datastore::system_tables::ST_MODULE_ID; use super::datastore::traits::{ IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore, @@ -7,7 +9,7 @@ use super::datastore::traits::{ use super::datastore::{ locking_tx_datastore::{ datastore::Locking, - state_view::{Iter, IterByColEq, IterByColRange}, + state_view::{IterByColEq, IterTxByColRange}, }, traits::TxData, }; @@ -1044,11 +1046,11 @@ impl RelationalDB { /// Returns an iterator, /// yielding every row in the table identified by `table_id`. - pub fn iter_mut<'a>(&'a self, tx: &'a MutTx, table_id: TableId) -> Result, DBError> { + pub fn iter_mut<'a>(&'a self, tx: &'a MutTx, table_id: TableId) -> Result, DBError> { self.inner.iter_mut_tx(tx, table_id) } - pub fn iter<'a>(&'a self, tx: &'a Tx, table_id: TableId) -> Result, DBError> { + pub fn iter<'a>(&'a self, tx: &'a Tx, table_id: TableId) -> Result, DBError> { self.inner.iter_tx(tx, table_id) } @@ -1063,7 +1065,7 @@ impl RelationalDB { table_id: impl Into, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result, DBError> { + ) -> Result, DBError> { self.inner.iter_by_col_eq_mut_tx(tx, table_id.into(), cols, value) } @@ -1088,7 +1090,7 @@ impl RelationalDB { table_id: impl Into, cols: impl Into, range: R, - ) -> Result, DBError> { + ) -> Result, DBError> { self.inner.iter_by_col_range_mut_tx(tx, table_id.into(), cols, range) } @@ -1103,7 +1105,7 @@ impl RelationalDB { table_id: impl Into, cols: impl Into, range: R, - ) -> Result, DBError> { + ) -> Result, DBError> { self.inner.iter_by_col_range_tx(tx, table_id.into(), cols, range) } @@ -2146,7 +2148,7 @@ mod tests { let cols = col_list![0, 1]; let value = product![0u64, 1u64].into(); - let IterByColEq::Index(mut iter) = stdb.iter_by_col_eq_mut(&tx, table_id, cols, &value)? else { + let IterMutTxByColEq::Index(mut iter) = stdb.iter_by_col_eq_mut(&tx, table_id, cols, &value)? else { panic!("expected index iterator"); }; diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index 5195a5493c8..0821106e310 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -1,7 +1,8 @@ //! The [DbProgram] that execute arbitrary queries & code against the database. +use crate::db::datastore::locking_tx_datastore::state_view::IterMutTxByColRange; use crate::db::datastore::locking_tx_datastore::tx::TxId; -use crate::db::datastore::locking_tx_datastore::IterByColRange; +use crate::db::datastore::locking_tx_datastore::IterTxByColRange; use crate::db::datastore::system_tables::{st_var_schema, StVarName, StVarRow, StVarTable}; use crate::db::relational_db::{MutTx, RelationalDB, Tx}; use crate::error::DBError; @@ -17,7 +18,7 @@ use spacetimedb_sats::{AlgebraicValue, ProductValue}; use spacetimedb_table::static_assert_size; use spacetimedb_table::table::RowRef; use spacetimedb_vm::errors::ErrorVm; -use spacetimedb_vm::eval::{build_project, build_select, join_inner, IterRows}; +use spacetimedb_vm::eval::{box_iter, build_project, build_select, join_inner, IterRows}; use spacetimedb_vm::expr::*; use spacetimedb_vm::iterators::RelIter; use spacetimedb_vm::program::{ProgramVm, Sources}; @@ -184,26 +185,9 @@ pub fn build_query<'a>( let index_table = index_side.table_id().unwrap(); if *return_index_rows { - Box::new(IndexSemiJoinLeft { - db, - tx, - probe_side, - probe_col: *probe_col, - index_select, - index_table, - index_col: *index_col, - index_iter: None, - }) as Box> + index_semi_join_left(db, tx, probe_side, *probe_col, index_select, index_table, *index_col) } else { - Box::new(IndexSemiJoinRight { - db, - tx, - probe_side, - probe_col: *probe_col, - index_select, - index_table, - index_col: *index_col, - }) + index_semi_join_right(db, tx, probe_side, *probe_col, index_select, index_table, *index_col) } } Query::Select(cmp) => build_select(result_or_base(sources, &mut result), cmp), @@ -246,8 +230,8 @@ fn get_table<'a>( .into_iter(), ), SourceExpr::DbTable(db_table) => build_iter_from_db(match tx { - TxMode::MutTx(tx) => stdb.iter_mut(tx, db_table.table_id), - TxMode::Tx(tx) => stdb.iter(tx, db_table.table_id), + TxMode::MutTx(tx) => stdb.iter_mut(tx, db_table.table_id).map(box_iter), + TxMode::Tx(tx) => stdb.iter(tx, db_table.table_id).map(box_iter), }), } } @@ -260,8 +244,10 @@ fn iter_by_col_range<'a>( range: impl RangeBounds + 'a, ) -> Box> { build_iter_from_db(match tx { - TxMode::MutTx(tx) => db.iter_by_col_range_mut(tx, table.table_id, columns, range), - TxMode::Tx(tx) => db.iter_by_col_range(tx, table.table_id, columns, range), + TxMode::MutTx(tx) => db + .iter_by_col_range_mut(tx, table.table_id, columns, range) + .map(box_iter), + TxMode::Tx(tx) => db.iter_by_col_range(tx, table.table_id, columns, range).map(box_iter), }) } @@ -276,36 +262,38 @@ fn build_iter<'a>(iter: impl 'a + Iterator>) -> Box> { +pub struct IndexSemiJoinLeft<'c, Rhs, IndexIter, F> { /// An iterator for the probe side. /// The values returned will be used to probe the index. - pub probe_side: Rhs, + probe_side: Rhs, /// The column whose value will be used to probe the index. - pub probe_col: ColId, + probe_col: ColId, /// An optional predicate to evaluate over the matching rows of the index. - pub index_select: &'c Option, - /// The table id on which the index is defined. - pub index_table: TableId, - /// The column id for which the index is defined. - pub index_col: ColId, + index_select: &'c Option, /// An iterator for the index side. /// A new iterator will be instantiated for each row on the probe side. - pub index_iter: Option>, - /// A reference to the database. - pub db: &'a RelationalDB, - /// A reference to the current transaction. - pub tx: &'a TxMode<'a>, + index_iter: Option, + /// The function that returns an iterator for the index side. + index_function: F, } -static_assert_size!(IndexSemiJoinLeft>>, 264); - -impl<'a, Rhs: RelOps<'a>> IndexSemiJoinLeft<'a, '_, Rhs> { +impl<'a, 'c, Rhs, IndexIter, F> IndexSemiJoinLeft<'c, Rhs, IndexIter, F> +where + F: Fn(AlgebraicValue) -> Result, + IndexIter: Iterator>, + Rhs: RelOps<'a>, +{ fn filter(&self, index_row: &RelValue<'_>) -> bool { self.index_select.as_ref().map_or(true, |op| op.eval_bool(index_row)) } } -impl<'a, Rhs: RelOps<'a>> RelOps<'a> for IndexSemiJoinLeft<'a, '_, Rhs> { +impl<'a, 'c, Rhs, IndexIter, F> RelOps<'a> for IndexSemiJoinLeft<'c, Rhs, IndexIter, F> +where + F: Fn(AlgebraicValue) -> Result, + IndexIter: Iterator>, + Rhs: RelOps<'a>, +{ fn next(&mut self) -> Option> { // Return a value from the current index iterator, if not exhausted. while let Some(index_row) = self.index_iter.as_mut().and_then(|iter| iter.next()).map(RelValue::Row) { @@ -315,16 +303,10 @@ impl<'a, Rhs: RelOps<'a>> RelOps<'a> for IndexSemiJoinLeft<'a, '_, Rhs> { } // Otherwise probe the index with a row from the probe side. - let table_id = self.index_table; - let index_col = self.index_col; let probe_col = self.probe_col.idx(); while let Some(mut row) = self.probe_side.next() { if let Some(value) = row.read_or_take_column(probe_col) { - let index_iter = match self.tx { - TxMode::MutTx(tx) => self.db.iter_by_col_range_mut(tx, table_id, index_col, value), - TxMode::Tx(tx) => self.db.iter_by_col_range(tx, table_id, index_col, value), - }; - let mut index_iter = index_iter.expect(TABLE_ID_EXPECTED_VALID); + let mut index_iter = (self.index_function)(value).expect(TABLE_ID_EXPECTED_VALID); while let Some(index_row) = index_iter.next().map(RelValue::Row) { if self.filter(&index_row) { self.index_iter = Some(index_iter); @@ -337,47 +319,85 @@ impl<'a, Rhs: RelOps<'a>> RelOps<'a> for IndexSemiJoinLeft<'a, '_, Rhs> { } } +/// Return an iterator index join operator that returns matching rows from the index side. +pub fn index_semi_join_left<'a>( + db: &'a RelationalDB, + tx: &'a TxMode<'a>, + probe_side: Box>, + probe_col: ColId, + index_select: &'a Option, + index_table: TableId, + index_col: ColId, +) -> Box> { + match tx { + TxMode::MutTx(tx) => Box::new(IndexSemiJoinLeft { + probe_side, + probe_col, + index_select, + index_iter: None, + index_function: move |value| db.iter_by_col_range_mut(tx, index_table, index_col, value), + }), + TxMode::Tx(tx) => Box::new(IndexSemiJoinLeft { + probe_side, + probe_col, + index_select, + index_iter: None, + index_function: move |value| db.iter_by_col_range(tx, index_table, index_col, value), + }), + } +} + +static_assert_size!( + IndexSemiJoinLeft< + Box>, + fn(AlgebraicValue) -> Result, DBError>, + IterTxByColRange<'static, AlgebraicValue>, + >, + 256 +); +static_assert_size!( + IndexSemiJoinLeft< + Box>, + fn(AlgebraicValue) -> Result, DBError>, + IterMutTxByColRange<'static, AlgebraicValue>, + >, + 256 +); + /// An index join operator that returns matching rows from the probe side. -pub struct IndexSemiJoinRight<'a, 'c, Rhs: RelOps<'a>> { +pub struct IndexSemiJoinRight<'c, Rhs: RelOps<'c>, F> { /// An iterator for the probe side. - /// The values returned will be used to probe the index. - pub probe_side: Rhs, + /// The values returned will be useSd to probe the index. + probe_side: Rhs, /// The column whose value will be used to probe the index. - pub probe_col: ColId, + probe_col: ColId, /// An optional predicate to evaluate over the matching rows of the index. - pub index_select: &'c Option, - /// The table id on which the index is defined. - pub index_table: TableId, - /// The column id for which the index is defined. - pub index_col: ColId, - /// A reference to the database. - pub db: &'a RelationalDB, - /// A reference to the current transaction. - pub tx: &'a TxMode<'a>, + index_select: &'c Option, + /// A function that returns an iterator for the index side. + index_function: F, } -static_assert_size!(IndexSemiJoinRight>>, 48); - -impl<'a, Rhs: RelOps<'a>> IndexSemiJoinRight<'a, '_, Rhs> { +impl<'a, Rhs: RelOps<'a>, F, IndexIter> IndexSemiJoinRight<'a, Rhs, F> +where + F: Fn(AlgebraicValue) -> Result, + IndexIter: Iterator>, +{ fn filter(&self, index_row: &RelValue<'_>) -> bool { self.index_select.as_ref().map_or(true, |op| op.eval_bool(index_row)) } } -impl<'a, Rhs: RelOps<'a>> RelOps<'a> for IndexSemiJoinRight<'a, '_, Rhs> { +impl<'a, Rhs: RelOps<'a>, F, IndexIter> RelOps<'a> for IndexSemiJoinRight<'a, Rhs, F> +where + F: Fn(AlgebraicValue) -> Result, + IndexIter: Iterator>, +{ fn next(&mut self) -> Option> { // Otherwise probe the index with a row from the probe side. - let table_id = self.index_table; - let index_col = self.index_col; let probe_col = self.probe_col.idx(); - while let Some(row) = self.probe_side.next() { - if let Some(value) = row.read_column(probe_col) { - let value = &*value; - let index_iter = match self.tx { - TxMode::MutTx(tx) => self.db.iter_by_col_range_mut(tx, table_id, index_col, value), - TxMode::Tx(tx) => self.db.iter_by_col_range(tx, table_id, index_col, value), - }; - let mut index_iter = index_iter.expect(TABLE_ID_EXPECTED_VALID); + while let Some(mut row) = self.probe_side.next() { + if let Some(value) = row.read_or_take_column(probe_col) { + let mut index_iter = (self.index_function)(value).expect(TABLE_ID_EXPECTED_VALID); while let Some(index_row) = index_iter.next().map(RelValue::Row) { if self.filter(&index_row) { return Some(row); @@ -389,6 +409,46 @@ impl<'a, Rhs: RelOps<'a>> RelOps<'a> for IndexSemiJoinRight<'a, '_, Rhs> { } } +/// Return an iterator index join operator that returns matching rows from the probe side. +pub fn index_semi_join_right<'a>( + db: &'a RelationalDB, + tx: &'a TxMode<'a>, + probe_side: Box>, + probe_col: ColId, + index_select: &'a Option, + index_table: TableId, + index_col: ColId, +) -> Box> { + match tx { + TxMode::MutTx(tx) => Box::new(IndexSemiJoinRight { + probe_side, + probe_col, + index_select, + index_function: move |value| db.iter_by_col_range_mut(tx, index_table, index_col, value), + }), + TxMode::Tx(tx) => Box::new(IndexSemiJoinRight { + probe_side, + probe_col, + index_select, + index_function: move |value| db.iter_by_col_range(tx, index_table, index_col, value), + }), + } +} +static_assert_size!( + IndexSemiJoinRight< + Box>, + fn(AlgebraicValue) -> Result, DBError>, + >, + 40 +); +static_assert_size!( + IndexSemiJoinRight< + Box>, + fn(AlgebraicValue) -> Result, DBError>, + >, + 40 +); + /// A [ProgramVm] implementation that carry a [RelationalDB] for it /// query execution pub struct DbProgram<'db, 'tx> { diff --git a/crates/vm/src/eval.rs b/crates/vm/src/eval.rs index 8ccaa012412..1dc845c8f31 100644 --- a/crates/vm/src/eval.rs +++ b/crates/vm/src/eval.rs @@ -4,9 +4,15 @@ use crate::program::{ProgramVm, Sources}; use crate::rel_ops::RelOps; use crate::relation::RelValue; use spacetimedb_sats::ProductValue; +use spacetimedb_table::table::RowRef; pub type IterRows<'a> = dyn RelOps<'a> + 'a; +/// Utility to simplify the creation of a boxed iterator. +pub fn box_iter<'a, T: Iterator> + 'a>(iter: T) -> Box> + 'a> { + Box::new(iter) +} + pub fn build_select<'a>(base: impl RelOps<'a> + 'a, cmp: &'a ColumnOp) -> Box> { Box::new(base.select(move |row| cmp.eval_bool(row))) }