From e8c2b70e930ced8ecc35cd8290cf62d2c59a1fad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Alejandro=20Montoya=20Corte=CC=81s?= Date: Mon, 9 Dec 2024 13:04:26 -0500 Subject: [PATCH 1/5] Split Iter & IterByColRange types into Tx and MutTxId versions --- .../locking_tx_datastore/committed_state.rs | 86 +++++++++--- .../datastore/locking_tx_datastore/mut_tx.rs | 11 +- .../locking_tx_datastore/state_view.rs | 132 ++++++++++++------ .../db/datastore/locking_tx_datastore/tx.rs | 7 +- 4 files changed, 158 insertions(+), 78 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index 71f133ffb8e..0930dd2ed39 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -79,7 +79,7 @@ impl StateView for CommittedState { fn iter(&self, table_id: TableId) -> Result> { if self.table_name(table_id).is_some() { - return Ok(Iter::new(table_id, None, self)); + return Ok(Iter::tx(table_id, self)); } Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into()) } @@ -94,6 +94,7 @@ impl StateView for CommittedState { ) -> Result> { // TODO: Why does this unconditionally return a `Scan` iter, // instead of trying to return a `CommittedIndex` iter? + // Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen Ok(IterByColRange::Scan(ScanIterByColRange::new( self.iter(table_id)?, cols, @@ -628,42 +629,87 @@ impl CommittedState { } } -pub struct CommittedIndexIter<'a> { +pub enum CommittedIndexIter<'a> { + Tx(CommittedIndexIterTx<'a>), + MutTx(CommittedIndexIterMutTx<'a>), +} + +impl<'a> CommittedIndexIter<'a> { + pub(super) fn tx(committed_rows: IndexScanIter<'a>) -> Self { + Self::Tx(CommittedIndexIterTx::new(committed_rows)) + } + + pub(super) fn mut_tx(table_id: TableId, tx_state: &'a TxState, committed_rows: IndexScanIter<'a>) -> Self { + Self::MutTx(CommittedIndexIterMutTx::new(table_id, tx_state, committed_rows)) + } +} + +impl<'a> Iterator for CommittedIndexIter<'a> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + match self { + Self::Tx(iter) => iter.next(), + Self::MutTx(iter) => iter.next(), + } + } +} + +pub struct CommittedIndexIterTx<'a> { + committed_rows: IndexScanIter<'a>, + num_committed_rows_fetched: u64, +} + +impl<'a> CommittedIndexIterTx<'a> { + pub(super) fn new(committed_rows: IndexScanIter<'a>) -> Self { + Self { + committed_rows, + num_committed_rows_fetched: 0, + } + } +} + +impl<'a> Iterator for CommittedIndexIterTx<'a> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + if let Some(row_ref) = self.committed_rows.next() { + // TODO(metrics): This doesn't actually fetch a row. + // Move this counter to `RowRef::read_row`. + self.num_committed_rows_fetched += 1; + return Some(row_ref); + } + + None + } +} + +pub struct CommittedIndexIterMutTx<'a> { table_id: TableId, - tx_state: Option<&'a TxState>, - #[allow(dead_code)] - committed_state: &'a CommittedState, + tx_state: &'a TxState, committed_rows: IndexScanIter<'a>, num_committed_rows_fetched: u64, } -impl<'a> CommittedIndexIter<'a> { - pub(super) fn new( - table_id: TableId, - tx_state: Option<&'a TxState>, - committed_state: &'a CommittedState, - committed_rows: IndexScanIter<'a>, - ) -> Self { +impl<'a> CommittedIndexIterMutTx<'a> { + pub(super) fn new(table_id: TableId, tx_state: &'a TxState, committed_rows: IndexScanIter<'a>) -> Self { Self { table_id, tx_state, - committed_state, committed_rows, num_committed_rows_fetched: 0, } } } -impl<'a> Iterator for CommittedIndexIter<'a> { +impl<'a> Iterator for CommittedIndexIterMutTx<'a> { type Item = RowRef<'a>; fn next(&mut self) -> Option { - if let Some(row_ref) = self.committed_rows.find(|row_ref| { - !self - .tx_state - .map(|tx_state| tx_state.is_deleted(self.table_id, row_ref.pointer())) - .unwrap_or(false) - }) { + if let Some(row_ref) = self + .committed_rows + .find(|row_ref| !self.tx_state.is_deleted(self.table_id, row_ref.pointer())) + { // TODO(metrics): This doesn't actually fetch a row. // Move this counter to `RowRef::read_row`. self.num_committed_rows_fetched += 1; diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index 83636f4eebe..e37674e6c0c 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -1444,11 +1444,7 @@ impl StateView for MutTxId { fn iter(&self, table_id: TableId) -> Result> { if self.table_name(table_id).is_some() { - return Ok(Iter::new( - table_id, - Some(&self.tx_state), - &self.committed_state_write_lock, - )); + return Ok(Iter::mut_tx(table_id, &self.tx_state, &self.committed_state_write_lock)); } Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into()) } @@ -1483,10 +1479,9 @@ impl StateView for MutTxId { // Either the current transaction has not modified this table, or the table is not // indexed. match self.committed_state_write_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::new( + Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::mut_tx( table_id, - Some(&self.tx_state), - &self.committed_state_write_lock, + &self.tx_state, committed_rows, ))), None => { diff --git a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs index 14166e137ca..c73d1db7abb 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs @@ -149,35 +149,43 @@ pub trait StateView { } } -pub struct Iter<'a> { - table_id: TableId, - tx_state_del: Option<&'a DeleteTable>, +pub struct IterMutTx<'a> { tx_state_ins: Option<(&'a Table, &'a HashMapBlobStore)>, - committed_state: &'a CommittedState, stage: ScanStage<'a>, } -impl<'a> Iter<'a> { - pub(super) fn new(table_id: TableId, tx_state: Option<&'a TxState>, committed_state: &'a CommittedState) -> Self { - let tx_state_ins = tx_state.and_then(|tx| { - let ins = tx.insert_tables.get(&table_id)?; - let bs = &tx.blob_store; - Some((ins, bs)) - }); - let tx_state_del = tx_state.and_then(|tx| tx.delete_tables.get(&table_id)); - Self { - table_id, - tx_state_ins, - tx_state_del, - committed_state, - stage: ScanStage::Start, - } +impl<'a> IterMutTx<'a> { + pub(super) fn new(table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState) -> Self { + let tx_state_ins = tx_state + .insert_tables + .get(&table_id) + .map(|table| (table, &tx_state.blob_store)); + + let tx_state_del = tx_state.delete_tables.get(&table_id); + + let stage = if let Some(table) = committed_state.tables.get(&table_id) { + // The committed state has changes for this table. + let iter = table.scan_rows(&committed_state.blob_store); + if let Some(del_tables) = tx_state_del.and_then(|del| if del.is_empty() { None } else { Some(del) }) { + // There are deletes in the tx state + // so we must exclude those (1b). + ScanStage::CommittedWithTxDeletes { iter, del_tables } + } else { + // There are no deletes in the tx state + // so we don't need to care about those (1a). + ScanStage::CommittedNoTxDeletes { iter } + } + } else { + ScanStage::Continue + }; + + Self { tx_state_ins, stage } } } enum ScanStage<'a> { - /// We haven't decided yet where to yield from. - Start, + /// Continue to the next stage. + Continue, /// Yielding rows from the current tx. CurrentTx { iter: TableScanIter<'a> }, /// Yielding rows from the committed state @@ -186,18 +194,20 @@ enum ScanStage<'a> { /// Yielding rows from the committed state /// but there are deleted rows in the tx state, /// so we must check against those. - CommittedWithTxDeletes { iter: TableScanIter<'a> }, + CommittedWithTxDeletes { + iter: TableScanIter<'a>, + del_tables: &'a DeleteTable, + }, } -impl<'a> Iterator for Iter<'a> { +impl<'a> Iterator for IterMutTx<'a> { type Item = RowRef<'a>; + #[inline] fn next(&mut self) -> Option { - let table_id = self.table_id; - // The finite state machine goes: // - // Start + // Continue // | // |--> CurrentTx -------------------------------\ // | ^ | @@ -209,22 +219,7 @@ impl<'a> Iterator for Iter<'a> { loop { match &mut self.stage { - ScanStage::Start => { - if let Some(table) = self.committed_state.tables.get(&table_id) { - // The committed state has changes for this table. - let iter = table.scan_rows(&self.committed_state.blob_store); - self.stage = if self.tx_state_del.is_some() { - // There are no deletes in the tx state - // so we don't need to care about those (1a). - ScanStage::CommittedWithTxDeletes { iter } - } else { - // There are deletes in the tx state - // so we must exclude those (1b). - ScanStage::CommittedNoTxDeletes { iter } - }; - continue; - } - } + ScanStage::Continue => {} ScanStage::CommittedNoTxDeletes { iter } => { // (1a) Go through the committed state for this table // but do not consider deleted rows. @@ -232,7 +227,7 @@ impl<'a> Iterator for Iter<'a> { return next; } } - ScanStage::CommittedWithTxDeletes { iter } => { + ScanStage::CommittedWithTxDeletes { iter, del_tables } => { // (1b) Check the committed row's state in the current tx. // If it's been deleted, skip it. // If it's still present, yield it. @@ -259,7 +254,6 @@ impl<'a> Iterator for Iter<'a> { // // As a result, in MVCC, this branch will need to check if the `row_ref` // also exists in the `tx_state.insert_tables` and ensure it is yielded only once. - let del_tables = unsafe { self.tx_state_del.unwrap_unchecked() }; if let next @ Some(_) = iter.find(|row_ref| !del_tables.contains(&row_ref.pointer())) { return next; } @@ -279,6 +273,56 @@ impl<'a> Iterator for Iter<'a> { } } +pub struct IterTx<'a> { + iter: TableScanIter<'a>, +} + +impl<'a> IterTx<'a> { + pub(super) fn new(table_id: TableId, committed_state: &'a CommittedState) -> Self { + // The table_id was validated to exist in the committed state. + let table = committed_state + .tables + .get(&table_id) + .expect("table_id must exist in committed state"); + let iter = table.scan_rows(&committed_state.blob_store); + Self { iter } + } +} + +impl<'a> Iterator for IterTx<'a> { + type Item = RowRef<'a>; + + #[inline] + fn next(&mut self) -> Option { + self.iter.next() + } +} + +pub enum Iter<'a> { + Tx(IterTx<'a>), + MutTx(IterMutTx<'a>), +} + +impl<'a> Iter<'a> { + pub(super) fn tx(table_id: TableId, committed_state: &'a CommittedState) -> Self { + Iter::Tx(IterTx::new(table_id, committed_state)) + } + pub(super) fn mut_tx(table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState) -> Self { + Iter::MutTx(IterMutTx::new(table_id, tx_state, committed_state)) + } +} + +impl<'a> Iterator for Iter<'a> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + match self { + Iter::Tx(iter) => iter.next(), + Iter::MutTx(iter) => iter.next(), + } + } +} + pub struct IndexSeekIterMutTxId<'a> { pub(super) table_id: TableId, pub(super) tx_state: &'a TxState, diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs index b9be047a55c..b785d7c36a6 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs @@ -46,12 +46,7 @@ impl StateView for TxId { range: R, ) -> Result> { match self.committed_state_shared_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::new( - table_id, - None, - &self.committed_state_shared_lock, - committed_rows, - ))), + Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::tx(committed_rows))), None => self .committed_state_shared_lock .iter_by_col_range(table_id, cols, range), From 7b5445ce18d2942a73996c746e6affe374cebcc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Alejandro=20Montoya=20Corte=CC=81s?= Date: Wed, 11 Dec 2024 16:24:22 -0500 Subject: [PATCH 2/5] Refactor for full split --- .../locking_tx_datastore/committed_state.rs | 53 ++--- .../locking_tx_datastore/datastore.rs | 28 ++- .../db/datastore/locking_tx_datastore/mod.rs | 2 +- .../datastore/locking_tx_datastore/mut_tx.rs | 37 ++- .../locking_tx_datastore/state_view.rs | 129 +++++++---- .../db/datastore/locking_tx_datastore/tx.rs | 31 ++- crates/core/src/db/datastore/traits.rs | 18 +- crates/core/src/db/relational_db.rs | 18 +- crates/core/src/vm.rs | 212 +++++++++++------- crates/vm/src/eval.rs | 6 + 10 files changed, 347 insertions(+), 187 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index 0930dd2ed39..20707414153 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -1,9 +1,11 @@ use super::{ datastore::Result, sequence::{Sequence, SequencesState}, - state_view::{Iter, IterByColRange, ScanIterByColRange, StateView}, + state_view::{IterTxByColRange, StateView}, tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState}, + IterByColEq, }; +use crate::db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterTxByColRange}; use crate::{ db::{ datastore::{ @@ -69,6 +71,12 @@ impl MemoryUsage for CommittedState { } impl StateView for CommittedState { + type Iter<'a> = IterTx<'a>; + type IterByColRange<'a, R: RangeBounds> = IterTxByColRange<'a, R>; + type IterByColEq<'a, 'r> = IterByColEq<'a, 'r> + where + Self: 'a; + fn get_schema(&self, table_id: TableId) -> Option<&Arc> { self.tables.get(&table_id).map(|table| table.get_schema()) } @@ -77,9 +85,9 @@ impl StateView for CommittedState { self.get_table(table_id).map(|table| table.row_count) } - fn iter(&self, table_id: TableId) -> Result> { + fn iter(&self, table_id: TableId) -> Result> { if self.table_name(table_id).is_some() { - return Ok(Iter::tx(table_id, self)); + return Ok(IterTx::new(table_id, self)); } Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into()) } @@ -91,16 +99,25 @@ impl StateView for CommittedState { table_id: TableId, cols: ColList, range: R, - ) -> Result> { + ) -> Result> { // TODO: Why does this unconditionally return a `Scan` iter, // instead of trying to return a `CommittedIndex` iter? // Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen - Ok(IterByColRange::Scan(ScanIterByColRange::new( + Ok(IterTxByColRange::Scan(ScanIterTxByColRange::new( self.iter(table_id)?, cols, range, ))) } + + fn iter_by_col_eq<'a, 'r>( + &'a self, + table_id: TableId, + cols: impl Into, + value: &'r AlgebraicValue, + ) -> Result> { + self.iter_by_col_range(table_id, cols.into(), value) + } } /// Swallow `Err(TableError::Duplicate(_))`, which signals a set-semantic collision, @@ -629,32 +646,6 @@ impl CommittedState { } } -pub enum CommittedIndexIter<'a> { - Tx(CommittedIndexIterTx<'a>), - MutTx(CommittedIndexIterMutTx<'a>), -} - -impl<'a> CommittedIndexIter<'a> { - pub(super) fn tx(committed_rows: IndexScanIter<'a>) -> Self { - Self::Tx(CommittedIndexIterTx::new(committed_rows)) - } - - pub(super) fn mut_tx(table_id: TableId, tx_state: &'a TxState, committed_rows: IndexScanIter<'a>) -> Self { - Self::MutTx(CommittedIndexIterMutTx::new(table_id, tx_state, committed_rows)) - } -} - -impl<'a> Iterator for CommittedIndexIter<'a> { - type Item = RowRef<'a>; - - fn next(&mut self) -> Option { - match self { - Self::Tx(iter) => iter.next(), - Self::MutTx(iter) => iter.next(), - } - } -} - pub struct CommittedIndexIterTx<'a> { committed_rows: IndexScanIter<'a>, num_committed_rows_fetched: u64, diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index 8cbe633288a..af5ee9a2ac5 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -2,10 +2,11 @@ use super::{ committed_state::CommittedState, mut_tx::MutTxId, sequence::SequencesState, - state_view::{Iter, IterByColRange, StateView}, + state_view::{IterTxByColRange, StateView}, tx::TxId, tx_state::TxState, }; +use crate::db::datastore::locking_tx_datastore::state_view::{IterMutTx, IterMutTxByColRange, IterTx}; use crate::execution_context::Workload; use crate::{ db::{ @@ -323,15 +324,26 @@ impl Tx for Locking { impl TxDatastore for Locking { type Iter<'a> - = Iter<'a> + = IterTx<'a> where Self: 'a; - type IterByColEq<'a, 'r> - = IterByColRange<'a, &'r AlgebraicValue> + type IterMutTx<'a>= IterMutTx<'a> where Self: 'a; type IterByColRange<'a, R: RangeBounds> - = IterByColRange<'a, R> + = IterTxByColRange<'a, R> + where + Self: 'a; + type IterMutByColRange<'a, R: RangeBounds> + = IterMutTxByColRange<'a, R> + where + Self: 'a; + type IterByColEq<'a, 'r> + = IterTxByColRange<'a, &'r AlgebraicValue> + where + Self: 'a; + type IterMutByColEq<'a, 'r> + = IterMutTxByColRange<'a, &'r AlgebraicValue> where Self: 'a; @@ -492,7 +504,7 @@ impl MutTxDatastore for Locking { tx.constraint_id_from_name(constraint_name) } - fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result> { + fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result> { tx.iter(table_id) } @@ -502,7 +514,7 @@ impl MutTxDatastore for Locking { table_id: TableId, cols: impl Into, range: R, - ) -> Result> { + ) -> Result> { tx.iter_by_col_range(table_id, cols.into(), range) } @@ -512,7 +524,7 @@ impl MutTxDatastore for Locking { table_id: TableId, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result> { + ) -> Result> { tx.iter_by_col_eq(table_id, cols.into(), value) } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index 33c636f9428..d7e590bdcc5 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -6,7 +6,7 @@ mod mut_tx; pub use mut_tx::MutTxId; mod sequence; pub mod state_view; -pub use state_view::{Iter, IterByColEq, IterByColRange}; +pub use state_view::{IterByColEq, IterTxByColRange}; pub(crate) mod tx; mod tx_state; diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index e37674e6c0c..b163a0a1742 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -1,12 +1,14 @@ use super::{ - committed_state::{CommittedIndexIter, CommittedState}, + committed_state::CommittedState, datastore::{record_metrics, Result}, sequence::{Sequence, SequencesState}, - state_view::{IndexSeekIterMutTxId, Iter, IterByColRange, ScanIterByColRange, StateView}, + state_view::{IndexSeekIterMutTxId, ScanIterMutTxByColRange, StateView}, tx::TxId, tx_state::{DeleteTable, IndexIdMap, TxState}, SharedMutexGuard, SharedWriteGuard, }; +use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterMutTx; +use crate::db::datastore::locking_tx_datastore::state_view::{IterMutTx, IterMutTxByColEq, IterMutTxByColRange}; use crate::db::datastore::system_tables::{StRowLevelSecurityFields, StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; use crate::db::datastore::{ system_tables::{ @@ -1419,6 +1421,12 @@ impl MutTxId { } impl StateView for MutTxId { + type Iter<'a> = IterMutTx<'a>; + type IterByColRange<'a, R: RangeBounds> = IterMutTxByColRange<'a, R>; + type IterByColEq<'a, 'r> = IterMutTxByColEq<'a, 'r> + where + Self: 'a; + fn get_schema(&self, table_id: TableId) -> Option<&Arc> { // TODO(bikeshedding, docs): should this also check if the schema is in the system tables, // but the table hasn't been constructed yet? @@ -1442,9 +1450,13 @@ impl StateView for MutTxId { } } - fn iter(&self, table_id: TableId) -> Result> { + fn iter(&self, table_id: TableId) -> Result> { if self.table_name(table_id).is_some() { - return Ok(Iter::mut_tx(table_id, &self.tx_state, &self.committed_state_write_lock)); + return Ok(IterMutTx::new( + table_id, + &self.tx_state, + &self.committed_state_write_lock, + )); } Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into()) } @@ -1454,7 +1466,7 @@ impl StateView for MutTxId { table_id: TableId, cols: ColList, range: R, - ) -> Result> { + ) -> Result> { // We have to index_seek in both the committed state and the current tx state. // First, we will check modifications in the current tx. It may be that the table // has not been modified yet in the current tx, in which case we will only search @@ -1468,7 +1480,7 @@ impl StateView for MutTxId { // rolling it back will leave the index in place. if let Some(inserted_rows) = self.tx_state.index_seek(table_id, &cols, &range) { // The current transaction has modified this table, and the table is indexed. - Ok(IterByColRange::Index(IndexSeekIterMutTxId { + Ok(IterMutTxByColRange::Index(IndexSeekIterMutTxId { table_id, tx_state: &self.tx_state, inserted_rows, @@ -1479,7 +1491,7 @@ impl StateView for MutTxId { // Either the current transaction has not modified this table, or the table is not // indexed. match self.committed_state_write_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::mut_tx( + Some(committed_rows) => Ok(IterMutTxByColRange::CommittedIndex(CommittedIndexIterMutTx::new( table_id, &self.tx_state, committed_rows, @@ -1513,7 +1525,7 @@ impl StateView for MutTxId { } } - Ok(IterByColRange::Scan(ScanIterByColRange::new( + Ok(IterMutTxByColRange::Scan(ScanIterMutTxByColRange::new( self.iter(table_id)?, cols, range, @@ -1522,4 +1534,13 @@ impl StateView for MutTxId { } } } + + fn iter_by_col_eq<'a, 'r>( + &'a self, + table_id: TableId, + cols: impl Into, + value: &'r AlgebraicValue, + ) -> Result> { + self.iter_by_col_range(table_id, cols.into(), value) + } } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs index c73d1db7abb..894d4c7e460 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs @@ -1,8 +1,9 @@ use super::{ - committed_state::{CommittedIndexIter, CommittedState}, + committed_state::CommittedState, datastore::Result, tx_state::{DeleteTable, TxState}, }; +use crate::db::datastore::locking_tx_datastore::committed_state::{CommittedIndexIterMutTx, CommittedIndexIterTx}; use crate::{ db::datastore::system_tables::{ StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StIndexFields, StIndexRow, StScheduledFields, @@ -24,6 +25,16 @@ use std::sync::Arc; // StateView trait, is designed to define the behavior of viewing internal datastore states. // Currently, it applies to: CommittedState, MutTxId, and TxId. pub trait StateView { + type Iter<'a>: Iterator> + where + Self: 'a; + type IterByColRange<'a, R: RangeBounds>: Iterator> + where + Self: 'a; + type IterByColEq<'a, 'r>: Iterator> + where + Self: 'a; + fn get_schema(&self, table_id: TableId) -> Option<&Arc>; fn table_id_from_name(&self, table_name: &str) -> Result> { @@ -35,7 +46,7 @@ pub trait StateView { /// Returns the number of rows in the table identified by `table_id`. fn table_row_count(&self, table_id: TableId) -> Option; - fn iter(&self, table_id: TableId) -> Result>; + fn iter(&self, table_id: TableId) -> Result>; fn table_name(&self, table_id: TableId) -> Option<&str> { self.get_schema(table_id).map(|s| &*s.table_name) @@ -49,16 +60,14 @@ pub trait StateView { table_id: TableId, cols: ColList, range: R, - ) -> Result>; + ) -> Result>; fn iter_by_col_eq<'a, 'r>( &'a self, table_id: TableId, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result> { - self.iter_by_col_range(table_id, cols.into(), value) - } + ) -> Result>; /// Reads the schema information for the specified `table_id` directly from the database. fn schema_for_table_raw(&self, table_id: TableId) -> Result { @@ -298,31 +307,6 @@ impl<'a> Iterator for IterTx<'a> { } } -pub enum Iter<'a> { - Tx(IterTx<'a>), - MutTx(IterMutTx<'a>), -} - -impl<'a> Iter<'a> { - pub(super) fn tx(table_id: TableId, committed_state: &'a CommittedState) -> Self { - Iter::Tx(IterTx::new(table_id, committed_state)) - } - pub(super) fn mut_tx(table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState) -> Self { - Iter::MutTx(IterMutTx::new(table_id, tx_state, committed_state)) - } -} - -impl<'a> Iterator for Iter<'a> { - type Item = RowRef<'a>; - - fn next(&mut self) -> Option { - match self { - Iter::Tx(iter) => iter.next(), - Iter::MutTx(iter) => iter.next(), - } - } -} - pub struct IndexSeekIterMutTxId<'a> { pub(super) table_id: TableId, pub(super) tx_state: &'a TxState, @@ -374,13 +358,15 @@ impl<'a> Iterator for IndexSeekIterMutTxId<'a> { } } -/// An [IterByColRange] for an individual column value. -pub type IterByColEq<'a, 'r> = IterByColRange<'a, &'r AlgebraicValue>; +/// An [IterTxByColRange] for an individual column value. +pub type IterByColEq<'a, 'r> = IterTxByColRange<'a, &'r AlgebraicValue>; +/// An [IterMutTxByColRange] for an individual column value. +pub type IterMutTxByColEq<'a, 'r> = IterMutTxByColRange<'a, &'r AlgebraicValue>; /// An iterator for a range of values in a column. -pub enum IterByColRange<'a, R: RangeBounds> { +pub enum IterTxByColRange<'a, R: RangeBounds> { /// When the column in question does not have an index. - Scan(ScanIterByColRange<'a, R>), + Scan(ScanIterTxByColRange<'a, R>), /// When the column has an index, and the table /// has been modified this transaction. @@ -388,34 +374,87 @@ pub enum IterByColRange<'a, R: RangeBounds> { /// When the column has an index, and the table /// has not been modified in this transaction. - CommittedIndex(CommittedIndexIter<'a>), + CommittedIndex(CommittedIndexIterTx<'a>), } -impl<'a, R: RangeBounds> Iterator for IterByColRange<'a, R> { +impl<'a, R: RangeBounds> Iterator for IterTxByColRange<'a, R> { type Item = RowRef<'a>; fn next(&mut self) -> Option { match self { - IterByColRange::Scan(range) => range.next(), - IterByColRange::Index(range) => range.next(), - IterByColRange::CommittedIndex(seek) => seek.next(), + IterTxByColRange::Scan(range) => range.next(), + IterTxByColRange::Index(range) => range.next(), + IterTxByColRange::CommittedIndex(seek) => seek.next(), } } } -pub struct ScanIterByColRange<'a, R: RangeBounds> { - scan_iter: Iter<'a>, +/// An iterator for a range of values in a column. +pub enum IterMutTxByColRange<'a, R: RangeBounds> { + /// When the column in question does not have an index. + Scan(ScanIterMutTxByColRange<'a, R>), + + /// When the column has an index, and the table + /// has been modified this transaction. + Index(IndexSeekIterMutTxId<'a>), + + /// When the column has an index, and the table + /// has not been modified in this transaction. + CommittedIndex(CommittedIndexIterMutTx<'a>), +} + +impl<'a, R: RangeBounds> Iterator for IterMutTxByColRange<'a, R> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + match self { + IterMutTxByColRange::Scan(range) => range.next(), + IterMutTxByColRange::Index(range) => range.next(), + IterMutTxByColRange::CommittedIndex(seek) => seek.next(), + } + } +} + +pub struct ScanIterTxByColRange<'a, R: RangeBounds> { + scan_iter: IterTx<'a>, + cols: ColList, + range: R, +} + +impl<'a, R: RangeBounds> ScanIterTxByColRange<'a, R> { + pub(super) fn new(scan_iter: IterTx<'a>, cols: ColList, range: R) -> Self { + Self { scan_iter, cols, range } + } +} + +impl<'a, R: RangeBounds> Iterator for ScanIterTxByColRange<'a, R> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + for row_ref in &mut self.scan_iter { + let value = row_ref.project(&self.cols).unwrap(); + if self.range.contains(&value) { + return Some(row_ref); + } + } + + None + } +} + +pub struct ScanIterMutTxByColRange<'a, R: RangeBounds> { + scan_iter: IterMutTx<'a>, cols: ColList, range: R, } -impl<'a, R: RangeBounds> ScanIterByColRange<'a, R> { - pub(super) fn new(scan_iter: Iter<'a>, cols: ColList, range: R) -> Self { +impl<'a, R: RangeBounds> ScanIterMutTxByColRange<'a, R> { + pub(super) fn new(scan_iter: IterMutTx<'a>, cols: ColList, range: R) -> Self { Self { scan_iter, cols, range } } } -impl<'a, R: RangeBounds> Iterator for ScanIterByColRange<'a, R> { +impl<'a, R: RangeBounds> Iterator for ScanIterMutTxByColRange<'a, R> { type Item = RowRef<'a>; fn next(&mut self) -> Option { diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs index b785d7c36a6..93bd6d96f2f 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs @@ -1,10 +1,12 @@ use super::datastore::record_metrics; use super::{ - committed_state::{CommittedIndexIter, CommittedState}, + committed_state::CommittedState, datastore::Result, - state_view::{Iter, IterByColRange, StateView}, - SharedReadGuard, + state_view::{IterTxByColRange, StateView}, + IterByColEq, SharedReadGuard, }; +use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterTx; +use crate::db::datastore::locking_tx_datastore::state_view::IterTx; use crate::execution_context::ExecutionContext; use spacetimedb_primitives::{ColList, TableId}; use spacetimedb_sats::AlgebraicValue; @@ -24,6 +26,12 @@ pub struct TxId { } impl StateView for TxId { + type Iter<'a> = IterTx<'a>; + type IterByColRange<'a, R: RangeBounds> = IterTxByColRange<'a, R>; + type IterByColEq<'a, 'r> = IterByColEq<'a, 'r> + where + Self: 'a; + fn get_schema(&self, table_id: TableId) -> Option<&Arc> { self.committed_state_shared_lock.get_schema(table_id) } @@ -32,7 +40,7 @@ impl StateView for TxId { self.committed_state_shared_lock.table_row_count(table_id) } - fn iter(&self, table_id: TableId) -> Result> { + fn iter(&self, table_id: TableId) -> Result> { self.committed_state_shared_lock.iter(table_id) } @@ -44,14 +52,25 @@ impl StateView for TxId { table_id: TableId, cols: ColList, range: R, - ) -> Result> { + ) -> Result> { match self.committed_state_shared_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::tx(committed_rows))), + Some(committed_rows) => Ok(IterTxByColRange::CommittedIndex(CommittedIndexIterTx::new( + committed_rows, + ))), None => self .committed_state_shared_lock .iter_by_col_range(table_id, cols, range), } } + + fn iter_by_col_eq<'a, 'r>( + &'a self, + table_id: TableId, + cols: impl Into, + value: &'r AlgebraicValue, + ) -> Result> { + self.iter_by_col_range(table_id, cols.into(), value) + } } impl TxId { diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index 0d7a0631c4b..529fb65b7ec 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -350,14 +350,24 @@ pub trait TxDatastore: DataRow + Tx { where Self: 'a; - type IterByColRange<'a, R: RangeBounds>: Iterator> + type IterMutTx<'a>: Iterator> where Self: 'a; + type IterByColRange<'a, R: RangeBounds>: Iterator> + where + Self: 'a; + type IterMutByColRange<'a, R: RangeBounds>: Iterator> + where + Self: 'a; type IterByColEq<'a, 'r>: Iterator> where Self: 'a; + type IterMutByColEq<'a, 'r>: Iterator> + where + Self: 'a; + fn iter_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result>; fn iter_by_col_range_tx<'a, R: RangeBounds>( @@ -438,21 +448,21 @@ pub trait MutTxDatastore: TxDatastore + MutTx { fn constraint_id_from_name(&self, tx: &Self::MutTx, constraint_name: &str) -> super::Result>; // Data - fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result>; + fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result>; fn iter_by_col_range_mut_tx<'a, R: RangeBounds>( &'a self, tx: &'a Self::MutTx, table_id: TableId, cols: impl Into, range: R, - ) -> Result>; + ) -> Result>; fn iter_by_col_eq_mut_tx<'a, 'r>( &'a self, tx: &'a Self::MutTx, table_id: TableId, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result>; + ) -> Result>; fn get_mut_tx<'a>( &self, tx: &'a Self::MutTx, diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 9fa1ef92bae..356813e6d5d 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,5 +1,7 @@ use super::datastore::locking_tx_datastore::committed_state::CommittedState; -use super::datastore::locking_tx_datastore::state_view::StateView; +use super::datastore::locking_tx_datastore::state_view::{ + IterMutTx, IterMutTxByColEq, IterMutTxByColRange, IterTx, StateView, +}; use super::datastore::system_tables::ST_MODULE_ID; use super::datastore::traits::{ IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore, @@ -7,7 +9,7 @@ use super::datastore::traits::{ use super::datastore::{ locking_tx_datastore::{ datastore::Locking, - state_view::{Iter, IterByColEq, IterByColRange}, + state_view::{IterByColEq, IterTxByColRange}, }, traits::TxData, }; @@ -1044,11 +1046,11 @@ impl RelationalDB { /// Returns an iterator, /// yielding every row in the table identified by `table_id`. - pub fn iter_mut<'a>(&'a self, tx: &'a MutTx, table_id: TableId) -> Result, DBError> { + pub fn iter_mut<'a>(&'a self, tx: &'a MutTx, table_id: TableId) -> Result, DBError> { self.inner.iter_mut_tx(tx, table_id) } - pub fn iter<'a>(&'a self, tx: &'a Tx, table_id: TableId) -> Result, DBError> { + pub fn iter<'a>(&'a self, tx: &'a Tx, table_id: TableId) -> Result, DBError> { self.inner.iter_tx(tx, table_id) } @@ -1063,7 +1065,7 @@ impl RelationalDB { table_id: impl Into, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result, DBError> { + ) -> Result, DBError> { self.inner.iter_by_col_eq_mut_tx(tx, table_id.into(), cols, value) } @@ -1088,7 +1090,7 @@ impl RelationalDB { table_id: impl Into, cols: impl Into, range: R, - ) -> Result, DBError> { + ) -> Result, DBError> { self.inner.iter_by_col_range_mut_tx(tx, table_id.into(), cols, range) } @@ -1103,7 +1105,7 @@ impl RelationalDB { table_id: impl Into, cols: impl Into, range: R, - ) -> Result, DBError> { + ) -> Result, DBError> { self.inner.iter_by_col_range_tx(tx, table_id.into(), cols, range) } @@ -2146,7 +2148,7 @@ mod tests { let cols = col_list![0, 1]; let value = product![0u64, 1u64].into(); - let IterByColEq::Index(mut iter) = stdb.iter_by_col_eq_mut(&tx, table_id, cols, &value)? else { + let IterMutTxByColEq::Index(mut iter) = stdb.iter_by_col_eq_mut(&tx, table_id, cols, &value)? else { panic!("expected index iterator"); }; diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index 5195a5493c8..0821106e310 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -1,7 +1,8 @@ //! The [DbProgram] that execute arbitrary queries & code against the database. +use crate::db::datastore::locking_tx_datastore::state_view::IterMutTxByColRange; use crate::db::datastore::locking_tx_datastore::tx::TxId; -use crate::db::datastore::locking_tx_datastore::IterByColRange; +use crate::db::datastore::locking_tx_datastore::IterTxByColRange; use crate::db::datastore::system_tables::{st_var_schema, StVarName, StVarRow, StVarTable}; use crate::db::relational_db::{MutTx, RelationalDB, Tx}; use crate::error::DBError; @@ -17,7 +18,7 @@ use spacetimedb_sats::{AlgebraicValue, ProductValue}; use spacetimedb_table::static_assert_size; use spacetimedb_table::table::RowRef; use spacetimedb_vm::errors::ErrorVm; -use spacetimedb_vm::eval::{build_project, build_select, join_inner, IterRows}; +use spacetimedb_vm::eval::{box_iter, build_project, build_select, join_inner, IterRows}; use spacetimedb_vm::expr::*; use spacetimedb_vm::iterators::RelIter; use spacetimedb_vm::program::{ProgramVm, Sources}; @@ -184,26 +185,9 @@ pub fn build_query<'a>( let index_table = index_side.table_id().unwrap(); if *return_index_rows { - Box::new(IndexSemiJoinLeft { - db, - tx, - probe_side, - probe_col: *probe_col, - index_select, - index_table, - index_col: *index_col, - index_iter: None, - }) as Box> + index_semi_join_left(db, tx, probe_side, *probe_col, index_select, index_table, *index_col) } else { - Box::new(IndexSemiJoinRight { - db, - tx, - probe_side, - probe_col: *probe_col, - index_select, - index_table, - index_col: *index_col, - }) + index_semi_join_right(db, tx, probe_side, *probe_col, index_select, index_table, *index_col) } } Query::Select(cmp) => build_select(result_or_base(sources, &mut result), cmp), @@ -246,8 +230,8 @@ fn get_table<'a>( .into_iter(), ), SourceExpr::DbTable(db_table) => build_iter_from_db(match tx { - TxMode::MutTx(tx) => stdb.iter_mut(tx, db_table.table_id), - TxMode::Tx(tx) => stdb.iter(tx, db_table.table_id), + TxMode::MutTx(tx) => stdb.iter_mut(tx, db_table.table_id).map(box_iter), + TxMode::Tx(tx) => stdb.iter(tx, db_table.table_id).map(box_iter), }), } } @@ -260,8 +244,10 @@ fn iter_by_col_range<'a>( range: impl RangeBounds + 'a, ) -> Box> { build_iter_from_db(match tx { - TxMode::MutTx(tx) => db.iter_by_col_range_mut(tx, table.table_id, columns, range), - TxMode::Tx(tx) => db.iter_by_col_range(tx, table.table_id, columns, range), + TxMode::MutTx(tx) => db + .iter_by_col_range_mut(tx, table.table_id, columns, range) + .map(box_iter), + TxMode::Tx(tx) => db.iter_by_col_range(tx, table.table_id, columns, range).map(box_iter), }) } @@ -276,36 +262,38 @@ fn build_iter<'a>(iter: impl 'a + Iterator>) -> Box> { +pub struct IndexSemiJoinLeft<'c, Rhs, IndexIter, F> { /// An iterator for the probe side. /// The values returned will be used to probe the index. - pub probe_side: Rhs, + probe_side: Rhs, /// The column whose value will be used to probe the index. - pub probe_col: ColId, + probe_col: ColId, /// An optional predicate to evaluate over the matching rows of the index. - pub index_select: &'c Option, - /// The table id on which the index is defined. - pub index_table: TableId, - /// The column id for which the index is defined. - pub index_col: ColId, + index_select: &'c Option, /// An iterator for the index side. /// A new iterator will be instantiated for each row on the probe side. - pub index_iter: Option>, - /// A reference to the database. - pub db: &'a RelationalDB, - /// A reference to the current transaction. - pub tx: &'a TxMode<'a>, + index_iter: Option, + /// The function that returns an iterator for the index side. + index_function: F, } -static_assert_size!(IndexSemiJoinLeft>>, 264); - -impl<'a, Rhs: RelOps<'a>> IndexSemiJoinLeft<'a, '_, Rhs> { +impl<'a, 'c, Rhs, IndexIter, F> IndexSemiJoinLeft<'c, Rhs, IndexIter, F> +where + F: Fn(AlgebraicValue) -> Result, + IndexIter: Iterator>, + Rhs: RelOps<'a>, +{ fn filter(&self, index_row: &RelValue<'_>) -> bool { self.index_select.as_ref().map_or(true, |op| op.eval_bool(index_row)) } } -impl<'a, Rhs: RelOps<'a>> RelOps<'a> for IndexSemiJoinLeft<'a, '_, Rhs> { +impl<'a, 'c, Rhs, IndexIter, F> RelOps<'a> for IndexSemiJoinLeft<'c, Rhs, IndexIter, F> +where + F: Fn(AlgebraicValue) -> Result, + IndexIter: Iterator>, + Rhs: RelOps<'a>, +{ fn next(&mut self) -> Option> { // Return a value from the current index iterator, if not exhausted. while let Some(index_row) = self.index_iter.as_mut().and_then(|iter| iter.next()).map(RelValue::Row) { @@ -315,16 +303,10 @@ impl<'a, Rhs: RelOps<'a>> RelOps<'a> for IndexSemiJoinLeft<'a, '_, Rhs> { } // Otherwise probe the index with a row from the probe side. - let table_id = self.index_table; - let index_col = self.index_col; let probe_col = self.probe_col.idx(); while let Some(mut row) = self.probe_side.next() { if let Some(value) = row.read_or_take_column(probe_col) { - let index_iter = match self.tx { - TxMode::MutTx(tx) => self.db.iter_by_col_range_mut(tx, table_id, index_col, value), - TxMode::Tx(tx) => self.db.iter_by_col_range(tx, table_id, index_col, value), - }; - let mut index_iter = index_iter.expect(TABLE_ID_EXPECTED_VALID); + let mut index_iter = (self.index_function)(value).expect(TABLE_ID_EXPECTED_VALID); while let Some(index_row) = index_iter.next().map(RelValue::Row) { if self.filter(&index_row) { self.index_iter = Some(index_iter); @@ -337,47 +319,85 @@ impl<'a, Rhs: RelOps<'a>> RelOps<'a> for IndexSemiJoinLeft<'a, '_, Rhs> { } } +/// Return an iterator index join operator that returns matching rows from the index side. +pub fn index_semi_join_left<'a>( + db: &'a RelationalDB, + tx: &'a TxMode<'a>, + probe_side: Box>, + probe_col: ColId, + index_select: &'a Option, + index_table: TableId, + index_col: ColId, +) -> Box> { + match tx { + TxMode::MutTx(tx) => Box::new(IndexSemiJoinLeft { + probe_side, + probe_col, + index_select, + index_iter: None, + index_function: move |value| db.iter_by_col_range_mut(tx, index_table, index_col, value), + }), + TxMode::Tx(tx) => Box::new(IndexSemiJoinLeft { + probe_side, + probe_col, + index_select, + index_iter: None, + index_function: move |value| db.iter_by_col_range(tx, index_table, index_col, value), + }), + } +} + +static_assert_size!( + IndexSemiJoinLeft< + Box>, + fn(AlgebraicValue) -> Result, DBError>, + IterTxByColRange<'static, AlgebraicValue>, + >, + 256 +); +static_assert_size!( + IndexSemiJoinLeft< + Box>, + fn(AlgebraicValue) -> Result, DBError>, + IterMutTxByColRange<'static, AlgebraicValue>, + >, + 256 +); + /// An index join operator that returns matching rows from the probe side. -pub struct IndexSemiJoinRight<'a, 'c, Rhs: RelOps<'a>> { +pub struct IndexSemiJoinRight<'c, Rhs: RelOps<'c>, F> { /// An iterator for the probe side. - /// The values returned will be used to probe the index. - pub probe_side: Rhs, + /// The values returned will be useSd to probe the index. + probe_side: Rhs, /// The column whose value will be used to probe the index. - pub probe_col: ColId, + probe_col: ColId, /// An optional predicate to evaluate over the matching rows of the index. - pub index_select: &'c Option, - /// The table id on which the index is defined. - pub index_table: TableId, - /// The column id for which the index is defined. - pub index_col: ColId, - /// A reference to the database. - pub db: &'a RelationalDB, - /// A reference to the current transaction. - pub tx: &'a TxMode<'a>, + index_select: &'c Option, + /// A function that returns an iterator for the index side. + index_function: F, } -static_assert_size!(IndexSemiJoinRight>>, 48); - -impl<'a, Rhs: RelOps<'a>> IndexSemiJoinRight<'a, '_, Rhs> { +impl<'a, Rhs: RelOps<'a>, F, IndexIter> IndexSemiJoinRight<'a, Rhs, F> +where + F: Fn(AlgebraicValue) -> Result, + IndexIter: Iterator>, +{ fn filter(&self, index_row: &RelValue<'_>) -> bool { self.index_select.as_ref().map_or(true, |op| op.eval_bool(index_row)) } } -impl<'a, Rhs: RelOps<'a>> RelOps<'a> for IndexSemiJoinRight<'a, '_, Rhs> { +impl<'a, Rhs: RelOps<'a>, F, IndexIter> RelOps<'a> for IndexSemiJoinRight<'a, Rhs, F> +where + F: Fn(AlgebraicValue) -> Result, + IndexIter: Iterator>, +{ fn next(&mut self) -> Option> { // Otherwise probe the index with a row from the probe side. - let table_id = self.index_table; - let index_col = self.index_col; let probe_col = self.probe_col.idx(); - while let Some(row) = self.probe_side.next() { - if let Some(value) = row.read_column(probe_col) { - let value = &*value; - let index_iter = match self.tx { - TxMode::MutTx(tx) => self.db.iter_by_col_range_mut(tx, table_id, index_col, value), - TxMode::Tx(tx) => self.db.iter_by_col_range(tx, table_id, index_col, value), - }; - let mut index_iter = index_iter.expect(TABLE_ID_EXPECTED_VALID); + while let Some(mut row) = self.probe_side.next() { + if let Some(value) = row.read_or_take_column(probe_col) { + let mut index_iter = (self.index_function)(value).expect(TABLE_ID_EXPECTED_VALID); while let Some(index_row) = index_iter.next().map(RelValue::Row) { if self.filter(&index_row) { return Some(row); @@ -389,6 +409,46 @@ impl<'a, Rhs: RelOps<'a>> RelOps<'a> for IndexSemiJoinRight<'a, '_, Rhs> { } } +/// Return an iterator index join operator that returns matching rows from the probe side. +pub fn index_semi_join_right<'a>( + db: &'a RelationalDB, + tx: &'a TxMode<'a>, + probe_side: Box>, + probe_col: ColId, + index_select: &'a Option, + index_table: TableId, + index_col: ColId, +) -> Box> { + match tx { + TxMode::MutTx(tx) => Box::new(IndexSemiJoinRight { + probe_side, + probe_col, + index_select, + index_function: move |value| db.iter_by_col_range_mut(tx, index_table, index_col, value), + }), + TxMode::Tx(tx) => Box::new(IndexSemiJoinRight { + probe_side, + probe_col, + index_select, + index_function: move |value| db.iter_by_col_range(tx, index_table, index_col, value), + }), + } +} +static_assert_size!( + IndexSemiJoinRight< + Box>, + fn(AlgebraicValue) -> Result, DBError>, + >, + 40 +); +static_assert_size!( + IndexSemiJoinRight< + Box>, + fn(AlgebraicValue) -> Result, DBError>, + >, + 40 +); + /// A [ProgramVm] implementation that carry a [RelationalDB] for it /// query execution pub struct DbProgram<'db, 'tx> { diff --git a/crates/vm/src/eval.rs b/crates/vm/src/eval.rs index 8ccaa012412..1dc845c8f31 100644 --- a/crates/vm/src/eval.rs +++ b/crates/vm/src/eval.rs @@ -4,9 +4,15 @@ use crate::program::{ProgramVm, Sources}; use crate::rel_ops::RelOps; use crate::relation::RelValue; use spacetimedb_sats::ProductValue; +use spacetimedb_table::table::RowRef; pub type IterRows<'a> = dyn RelOps<'a> + 'a; +/// Utility to simplify the creation of a boxed iterator. +pub fn box_iter<'a, T: Iterator> + 'a>(iter: T) -> Box> + 'a> { + Box::new(iter) +} + pub fn build_select<'a>(base: impl RelOps<'a> + 'a, cmp: &'a ColumnOp) -> Box> { Box::new(base.select(move |row| cmp.eval_bool(row))) } From 8e0fab6d2b8992ac7fdd06a5b0a3facb8797342f Mon Sep 17 00:00:00 2001 From: Mario Montoya Date: Thu, 12 Dec 2024 09:57:31 -0500 Subject: [PATCH 3/5] Normalize names of iters and split more --- .../locking_tx_datastore/committed_state.rs | 72 +++----------- .../locking_tx_datastore/datastore.rs | 32 +++--- .../db/datastore/locking_tx_datastore/mod.rs | 2 +- .../datastore/locking_tx_datastore/mut_tx.rs | 50 ++++++---- .../locking_tx_datastore/state_view.rs | 97 +++++++++++-------- .../db/datastore/locking_tx_datastore/tx.rs | 13 +-- crates/core/src/db/datastore/traits.rs | 20 ++-- crates/core/src/db/relational_db.rs | 14 +-- crates/core/src/vm.rs | 22 ++--- 9 files changed, 156 insertions(+), 166 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index 20707414153..a74ae630905 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -1,11 +1,11 @@ use super::{ datastore::Result, sequence::{Sequence, SequencesState}, - state_view::{IterTxByColRange, StateView}, + state_view::{IterByColRangeTx, StateView}, tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState}, - IterByColEq, + IterByColEqTx, }; -use crate::db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterTxByColRange}; +use crate::db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx}; use crate::{ db::{ datastore::{ @@ -41,7 +41,7 @@ use spacetimedb_table::{ table::{IndexScanIter, InsertError, RowRef, Table}, MemoryUsage, }; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; /// Contains the live, in-memory snapshot of a database. This structure @@ -72,8 +72,8 @@ impl MemoryUsage for CommittedState { impl StateView for CommittedState { type Iter<'a> = IterTx<'a>; - type IterByColRange<'a, R: RangeBounds> = IterTxByColRange<'a, R>; - type IterByColEq<'a, 'r> = IterByColEq<'a, 'r> + type IterByColRange<'a, R: RangeBounds> = IterByColRangeTx<'a, R>; + type IterByColEq<'a, 'r> = IterByColEqTx<'a, 'r> where Self: 'a; @@ -103,7 +103,7 @@ impl StateView for CommittedState { // TODO: Why does this unconditionally return a `Scan` iter, // instead of trying to return a `CommittedIndex` iter? // Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen - Ok(IterTxByColRange::Scan(ScanIterTxByColRange::new( + Ok(IterByColRangeTx::Scan(ScanIterByColRangeTx::new( self.iter(table_id)?, cols, range, @@ -646,67 +646,25 @@ impl CommittedState { } } -pub struct CommittedIndexIterTx<'a> { +pub struct CommittedIndexIterWithDeletedMutTx<'a> { committed_rows: IndexScanIter<'a>, - num_committed_rows_fetched: u64, + del_table: &'a DeleteTable, } -impl<'a> CommittedIndexIterTx<'a> { - pub(super) fn new(committed_rows: IndexScanIter<'a>) -> Self { +impl<'a> CommittedIndexIterWithDeletedMutTx<'a> { + pub(super) fn new(committed_rows: IndexScanIter<'a>, del_table: &'a BTreeSet) -> Self { Self { committed_rows, - num_committed_rows_fetched: 0, + del_table, } } } -impl<'a> Iterator for CommittedIndexIterTx<'a> { +impl<'a> Iterator for CommittedIndexIterWithDeletedMutTx<'a> { type Item = RowRef<'a>; fn next(&mut self) -> Option { - if let Some(row_ref) = self.committed_rows.next() { - // TODO(metrics): This doesn't actually fetch a row. - // Move this counter to `RowRef::read_row`. - self.num_committed_rows_fetched += 1; - return Some(row_ref); - } - - None - } -} - -pub struct CommittedIndexIterMutTx<'a> { - table_id: TableId, - tx_state: &'a TxState, - committed_rows: IndexScanIter<'a>, - num_committed_rows_fetched: u64, -} - -impl<'a> CommittedIndexIterMutTx<'a> { - pub(super) fn new(table_id: TableId, tx_state: &'a TxState, committed_rows: IndexScanIter<'a>) -> Self { - Self { - table_id, - tx_state, - committed_rows, - num_committed_rows_fetched: 0, - } - } -} - -impl<'a> Iterator for CommittedIndexIterMutTx<'a> { - type Item = RowRef<'a>; - - fn next(&mut self) -> Option { - if let Some(row_ref) = self - .committed_rows - .find(|row_ref| !self.tx_state.is_deleted(self.table_id, row_ref.pointer())) - { - // TODO(metrics): This doesn't actually fetch a row. - // Move this counter to `RowRef::read_row`. - self.num_committed_rows_fetched += 1; - return Some(row_ref); - } - - None + self.committed_rows + .find(|row_ref| !self.del_table.contains(&row_ref.pointer())) } } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index af5ee9a2ac5..4f02646e2eb 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -2,11 +2,11 @@ use super::{ committed_state::CommittedState, mut_tx::MutTxId, sequence::SequencesState, - state_view::{IterTxByColRange, StateView}, + state_view::{IterByColRangeTx, StateView}, tx::TxId, tx_state::TxState, }; -use crate::db::datastore::locking_tx_datastore::state_view::{IterMutTx, IterMutTxByColRange, IterTx}; +use crate::db::datastore::locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx}; use crate::execution_context::Workload; use crate::{ db::{ @@ -323,31 +323,31 @@ impl Tx for Locking { } impl TxDatastore for Locking { - type Iter<'a> + type IterTx<'a> = IterTx<'a> where Self: 'a; type IterMutTx<'a>= IterMutTx<'a> where Self: 'a; - type IterByColRange<'a, R: RangeBounds> - = IterTxByColRange<'a, R> + type IterByColRangeTx<'a, R: RangeBounds> + = IterByColRangeTx<'a, R> where Self: 'a; - type IterMutByColRange<'a, R: RangeBounds> - = IterMutTxByColRange<'a, R> + type IterByColRangeMutTx<'a, R: RangeBounds> + = IterByColRangeMutTx<'a, R> where Self: 'a; - type IterByColEq<'a, 'r> - = IterTxByColRange<'a, &'r AlgebraicValue> + type IterByColEqTx<'a, 'r> + = IterByColRangeTx<'a, &'r AlgebraicValue> where Self: 'a; - type IterMutByColEq<'a, 'r> - = IterMutTxByColRange<'a, &'r AlgebraicValue> + type IterByColEqMutTx<'a, 'r> + = IterByColRangeMutTx<'a, &'r AlgebraicValue> where Self: 'a; - fn iter_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result> { + fn iter_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result> { tx.iter(table_id) } @@ -357,7 +357,7 @@ impl TxDatastore for Locking { table_id: TableId, cols: impl Into, range: R, - ) -> Result> { + ) -> Result> { tx.iter_by_col_range(table_id, cols.into(), range) } @@ -367,7 +367,7 @@ impl TxDatastore for Locking { table_id: TableId, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result> { + ) -> Result> { tx.iter_by_col_eq(table_id, cols, value) } @@ -514,7 +514,7 @@ impl MutTxDatastore for Locking { table_id: TableId, cols: impl Into, range: R, - ) -> Result> { + ) -> Result> { tx.iter_by_col_range(table_id, cols.into(), range) } @@ -524,7 +524,7 @@ impl MutTxDatastore for Locking { table_id: TableId, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result> { + ) -> Result> { tx.iter_by_col_eq(table_id, cols.into(), value) } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index d7e590bdcc5..c5c87da46da 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -6,7 +6,7 @@ mod mut_tx; pub use mut_tx::MutTxId; mod sequence; pub mod state_view; -pub use state_view::{IterByColEq, IterTxByColRange}; +pub use state_view::{IterByColEqTx, IterByColRangeTx}; pub(crate) mod tx; mod tx_state; diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index b163a0a1742..dfbb800d8d5 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -2,13 +2,15 @@ use super::{ committed_state::CommittedState, datastore::{record_metrics, Result}, sequence::{Sequence, SequencesState}, - state_view::{IndexSeekIterMutTxId, ScanIterMutTxByColRange, StateView}, + state_view::{IndexSeekIterIdMutTx, ScanIterByColRangeMutTx, StateView}, tx::TxId, tx_state::{DeleteTable, IndexIdMap, TxState}, SharedMutexGuard, SharedWriteGuard, }; -use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterMutTx; -use crate::db::datastore::locking_tx_datastore::state_view::{IterMutTx, IterMutTxByColEq, IterMutTxByColRange}; +use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterWithDeletedMutTx; +use crate::db::datastore::locking_tx_datastore::state_view::{ + IndexSeekIterIdWithDeletedMutTx, IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, +}; use crate::db::datastore::system_tables::{StRowLevelSecurityFields, StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; use crate::db::datastore::{ system_tables::{ @@ -1422,8 +1424,8 @@ impl MutTxId { impl StateView for MutTxId { type Iter<'a> = IterMutTx<'a>; - type IterByColRange<'a, R: RangeBounds> = IterMutTxByColRange<'a, R>; - type IterByColEq<'a, 'r> = IterMutTxByColEq<'a, 'r> + type IterByColRange<'a, R: RangeBounds> = IterByColRangeMutTx<'a, R>; + type IterByColEq<'a, 'r> = IterByColEqMutTx<'a, 'r> where Self: 'a; @@ -1480,22 +1482,34 @@ impl StateView for MutTxId { // rolling it back will leave the index in place. if let Some(inserted_rows) = self.tx_state.index_seek(table_id, &cols, &range) { // The current transaction has modified this table, and the table is indexed. - Ok(IterMutTxByColRange::Index(IndexSeekIterMutTxId { - table_id, - tx_state: &self.tx_state, - inserted_rows, - committed_rows: self.committed_state_write_lock.index_seek(table_id, &cols, &range), - num_committed_rows_fetched: 0, - })) + Ok( + if let Some(del_table) = self.tx_state.delete_tables.get(&table_id).filter(|x| !x.is_empty()) { + IterByColRangeMutTx::IndexWithDeletes(IndexSeekIterIdWithDeletedMutTx { + inserted_rows, + committed_rows: self.committed_state_write_lock.index_seek(table_id, &cols, &range), + del_table, + }) + } else { + IterByColRangeMutTx::Index(IndexSeekIterIdMutTx { + inserted_rows, + committed_rows: self.committed_state_write_lock.index_seek(table_id, &cols, &range), + }) + }, + ) } else { // Either the current transaction has not modified this table, or the table is not // indexed. match self.committed_state_write_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok(IterMutTxByColRange::CommittedIndex(CommittedIndexIterMutTx::new( - table_id, - &self.tx_state, - committed_rows, - ))), + Some(committed_rows) => Ok( + if let Some(del_table) = self.tx_state.delete_tables.get(&table_id).filter(|x| !x.is_empty()) { + IterByColRangeMutTx::CommittedIndexWithDeletes(CommittedIndexIterWithDeletedMutTx::new( + committed_rows, + del_table, + )) + } else { + IterByColRangeMutTx::CommittedIndex(committed_rows) + }, + ), None => { #[cfg(feature = "unindexed_iter_by_col_range_warn")] match self.table_row_count(table_id) { @@ -1525,7 +1539,7 @@ impl StateView for MutTxId { } } - Ok(IterMutTxByColRange::Scan(ScanIterMutTxByColRange::new( + Ok(IterByColRangeMutTx::Scan(ScanIterByColRangeMutTx::new( self.iter(table_id)?, cols, range, diff --git a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs index 894d4c7e460..4fd09dc7021 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs @@ -3,7 +3,7 @@ use super::{ datastore::Result, tx_state::{DeleteTable, TxState}, }; -use crate::db::datastore::locking_tx_datastore::committed_state::{CommittedIndexIterMutTx, CommittedIndexIterTx}; +use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterWithDeletedMutTx; use crate::{ db::datastore::system_tables::{ StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StIndexFields, StIndexRow, StScheduledFields, @@ -170,12 +170,10 @@ impl<'a> IterMutTx<'a> { .get(&table_id) .map(|table| (table, &tx_state.blob_store)); - let tx_state_del = tx_state.delete_tables.get(&table_id); - let stage = if let Some(table) = committed_state.tables.get(&table_id) { // The committed state has changes for this table. let iter = table.scan_rows(&committed_state.blob_store); - if let Some(del_tables) = tx_state_del.and_then(|del| if del.is_empty() { None } else { Some(del) }) { + if let Some(del_tables) = tx_state.delete_tables.get(&table_id).filter(|del| !del.is_empty()) { // There are deletes in the tx state // so we must exclude those (1b). ScanStage::CommittedWithTxDeletes { iter, del_tables } @@ -307,15 +305,29 @@ impl<'a> Iterator for IterTx<'a> { } } -pub struct IndexSeekIterMutTxId<'a> { - pub(super) table_id: TableId, - pub(super) tx_state: &'a TxState, +pub struct IndexSeekIterIdMutTx<'a> { + pub(super) inserted_rows: IndexScanIter<'a>, + pub(super) committed_rows: Option>, +} + +impl<'a> Iterator for IndexSeekIterIdMutTx<'a> { + type Item = RowRef<'a>; + + fn next(&mut self) -> Option { + if let Some(row_ref) = self.inserted_rows.next() { + return Some(row_ref); + } + self.committed_rows.as_mut().and_then(|i| i.next()) + } +} + +pub struct IndexSeekIterIdWithDeletedMutTx<'a> { pub(super) inserted_rows: IndexScanIter<'a>, pub(super) committed_rows: Option>, - pub(super) num_committed_rows_fetched: u64, + pub(super) del_table: &'a DeleteTable, } -impl<'a> Iterator for IndexSeekIterMutTxId<'a> { +impl<'a> Iterator for IndexSeekIterIdWithDeletedMutTx<'a> { type Item = RowRef<'a>; fn next(&mut self) -> Option { @@ -346,88 +358,97 @@ impl<'a> Iterator for IndexSeekIterMutTxId<'a> { // // As a result, in MVCC, this branch will need to check if the `row_ref` // also exists in the `tx_state.insert_tables` and ensure it is yielded only once. - .and_then(|i| i.find(|row_ref| !self.tx_state.is_deleted(self.table_id, row_ref.pointer()))) + .and_then(|i| i.find(|row_ref| !self.del_table.contains(&row_ref.pointer()))) { // TODO(metrics): This doesn't actually fetch a row. // Move this counter to `RowRef::read_row`. - self.num_committed_rows_fetched += 1; + // self.num_committed_rows_fetched += 1; return Some(row_ref); } None } } - -/// An [IterTxByColRange] for an individual column value. -pub type IterByColEq<'a, 'r> = IterTxByColRange<'a, &'r AlgebraicValue>; -/// An [IterMutTxByColRange] for an individual column value. -pub type IterMutTxByColEq<'a, 'r> = IterMutTxByColRange<'a, &'r AlgebraicValue>; +/// An [IterByColRangeTx] for an individual column value. +pub type IterByColEqTx<'a, 'r> = IterByColRangeTx<'a, &'r AlgebraicValue>; +/// An [IterByColRangeMutTx] for an individual column value. +pub type IterByColEqMutTx<'a, 'r> = IterByColRangeMutTx<'a, &'r AlgebraicValue>; /// An iterator for a range of values in a column. -pub enum IterTxByColRange<'a, R: RangeBounds> { +pub enum IterByColRangeTx<'a, R: RangeBounds> { /// When the column in question does not have an index. - Scan(ScanIterTxByColRange<'a, R>), + Scan(ScanIterByColRangeTx<'a, R>), /// When the column has an index, and the table /// has been modified this transaction. - Index(IndexSeekIterMutTxId<'a>), + Index(IndexSeekIterIdMutTx<'a>), /// When the column has an index, and the table /// has not been modified in this transaction. - CommittedIndex(CommittedIndexIterTx<'a>), + CommittedIndex(IndexScanIter<'a>), } -impl<'a, R: RangeBounds> Iterator for IterTxByColRange<'a, R> { +impl<'a, R: RangeBounds> Iterator for IterByColRangeTx<'a, R> { type Item = RowRef<'a>; fn next(&mut self) -> Option { match self { - IterTxByColRange::Scan(range) => range.next(), - IterTxByColRange::Index(range) => range.next(), - IterTxByColRange::CommittedIndex(seek) => seek.next(), + IterByColRangeTx::Scan(range) => range.next(), + IterByColRangeTx::Index(range) => range.next(), + IterByColRangeTx::CommittedIndex(seek) => seek.next(), } } } /// An iterator for a range of values in a column. -pub enum IterMutTxByColRange<'a, R: RangeBounds> { +pub enum IterByColRangeMutTx<'a, R: RangeBounds> { /// When the column in question does not have an index. - Scan(ScanIterMutTxByColRange<'a, R>), + Scan(ScanIterByColRangeMutTx<'a, R>), /// When the column has an index, and the table /// has been modified this transaction. - Index(IndexSeekIterMutTxId<'a>), + Index(IndexSeekIterIdMutTx<'a>), + + /// When the column has an index, and the table + /// has been modified this transaction, and there are deleted rows. + IndexWithDeletes(IndexSeekIterIdWithDeletedMutTx<'a>), + + /// When the column has an index, and the table + /// has not been modified in this transaction. + CommittedIndex(IndexScanIter<'a>), /// When the column has an index, and the table /// has not been modified in this transaction. - CommittedIndex(CommittedIndexIterMutTx<'a>), + CommittedIndexWithDeletes(CommittedIndexIterWithDeletedMutTx<'a>), } -impl<'a, R: RangeBounds> Iterator for IterMutTxByColRange<'a, R> { +impl<'a, R: RangeBounds> Iterator for IterByColRangeMutTx<'a, R> { type Item = RowRef<'a>; fn next(&mut self) -> Option { match self { - IterMutTxByColRange::Scan(range) => range.next(), - IterMutTxByColRange::Index(range) => range.next(), - IterMutTxByColRange::CommittedIndex(seek) => seek.next(), + IterByColRangeMutTx::Scan(range) => range.next(), + IterByColRangeMutTx::Index(range) => range.next(), + IterByColRangeMutTx::IndexWithDeletes(range) => range.next(), + IterByColRangeMutTx::CommittedIndex(seek) => seek.next(), + IterByColRangeMutTx::CommittedIndexWithDeletes(seek) => seek.next(), } } } -pub struct ScanIterTxByColRange<'a, R: RangeBounds> { +pub struct ScanIterByColRangeTx<'a, R: RangeBounds> { scan_iter: IterTx<'a>, cols: ColList, range: R, } -impl<'a, R: RangeBounds> ScanIterTxByColRange<'a, R> { +impl<'a, R: RangeBounds> ScanIterByColRangeTx<'a, R> { pub(super) fn new(scan_iter: IterTx<'a>, cols: ColList, range: R) -> Self { Self { scan_iter, cols, range } } } -impl<'a, R: RangeBounds> Iterator for ScanIterTxByColRange<'a, R> { +impl<'a, R: RangeBounds> Iterator for ScanIterByColRangeTx<'a, R> { type Item = RowRef<'a>; fn next(&mut self) -> Option { @@ -442,19 +463,19 @@ impl<'a, R: RangeBounds> Iterator for ScanIterTxByColRange<'a, R } } -pub struct ScanIterMutTxByColRange<'a, R: RangeBounds> { +pub struct ScanIterByColRangeMutTx<'a, R: RangeBounds> { scan_iter: IterMutTx<'a>, cols: ColList, range: R, } -impl<'a, R: RangeBounds> ScanIterMutTxByColRange<'a, R> { +impl<'a, R: RangeBounds> ScanIterByColRangeMutTx<'a, R> { pub(super) fn new(scan_iter: IterMutTx<'a>, cols: ColList, range: R) -> Self { Self { scan_iter, cols, range } } } -impl<'a, R: RangeBounds> Iterator for ScanIterMutTxByColRange<'a, R> { +impl<'a, R: RangeBounds> Iterator for ScanIterByColRangeMutTx<'a, R> { type Item = RowRef<'a>; fn next(&mut self) -> Option { diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs index 93bd6d96f2f..aef949dd7a9 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx.rs @@ -2,10 +2,9 @@ use super::datastore::record_metrics; use super::{ committed_state::CommittedState, datastore::Result, - state_view::{IterTxByColRange, StateView}, - IterByColEq, SharedReadGuard, + state_view::{IterByColRangeTx, StateView}, + IterByColEqTx, SharedReadGuard, }; -use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterTx; use crate::db::datastore::locking_tx_datastore::state_view::IterTx; use crate::execution_context::ExecutionContext; use spacetimedb_primitives::{ColList, TableId}; @@ -27,8 +26,8 @@ pub struct TxId { impl StateView for TxId { type Iter<'a> = IterTx<'a>; - type IterByColRange<'a, R: RangeBounds> = IterTxByColRange<'a, R>; - type IterByColEq<'a, 'r> = IterByColEq<'a, 'r> + type IterByColRange<'a, R: RangeBounds> = IterByColRangeTx<'a, R>; + type IterByColEq<'a, 'r> = IterByColEqTx<'a, 'r> where Self: 'a; @@ -54,9 +53,7 @@ impl StateView for TxId { range: R, ) -> Result> { match self.committed_state_shared_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok(IterTxByColRange::CommittedIndex(CommittedIndexIterTx::new( - committed_rows, - ))), + Some(committed_rows) => Ok(IterByColRangeTx::CommittedIndex(committed_rows)), None => self .committed_state_shared_lock .iter_by_col_range(table_id, cols, range), diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index 529fb65b7ec..1c923251c59 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -346,7 +346,7 @@ impl Program { } pub trait TxDatastore: DataRow + Tx { - type Iter<'a>: Iterator> + type IterTx<'a>: Iterator> where Self: 'a; @@ -354,21 +354,21 @@ pub trait TxDatastore: DataRow + Tx { where Self: 'a; - type IterByColRange<'a, R: RangeBounds>: Iterator> + type IterByColRangeTx<'a, R: RangeBounds>: Iterator> where Self: 'a; - type IterMutByColRange<'a, R: RangeBounds>: Iterator> + type IterByColRangeMutTx<'a, R: RangeBounds>: Iterator> where Self: 'a; - type IterByColEq<'a, 'r>: Iterator> + type IterByColEqTx<'a, 'r>: Iterator> where Self: 'a; - type IterMutByColEq<'a, 'r>: Iterator> + type IterByColEqMutTx<'a, 'r>: Iterator> where Self: 'a; - fn iter_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result>; + fn iter_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result>; fn iter_by_col_range_tx<'a, R: RangeBounds>( &'a self, @@ -376,7 +376,7 @@ pub trait TxDatastore: DataRow + Tx { table_id: TableId, cols: impl Into, range: R, - ) -> Result>; + ) -> Result>; fn iter_by_col_eq_tx<'a, 'r>( &'a self, @@ -384,7 +384,7 @@ pub trait TxDatastore: DataRow + Tx { table_id: TableId, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result>; + ) -> Result>; fn table_id_exists_tx(&self, tx: &Self::Tx, table_id: &TableId) -> bool; fn table_id_from_name_tx(&self, tx: &Self::Tx, table_name: &str) -> Result>; @@ -455,14 +455,14 @@ pub trait MutTxDatastore: TxDatastore + MutTx { table_id: TableId, cols: impl Into, range: R, - ) -> Result>; + ) -> Result>; fn iter_by_col_eq_mut_tx<'a, 'r>( &'a self, tx: &'a Self::MutTx, table_id: TableId, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result>; + ) -> Result>; fn get_mut_tx<'a>( &self, tx: &'a Self::MutTx, diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 356813e6d5d..659d32f67bf 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1,6 +1,6 @@ use super::datastore::locking_tx_datastore::committed_state::CommittedState; use super::datastore::locking_tx_datastore::state_view::{ - IterMutTx, IterMutTxByColEq, IterMutTxByColRange, IterTx, StateView, + IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; use super::datastore::system_tables::ST_MODULE_ID; use super::datastore::traits::{ @@ -9,7 +9,7 @@ use super::datastore::traits::{ use super::datastore::{ locking_tx_datastore::{ datastore::Locking, - state_view::{IterByColEq, IterTxByColRange}, + state_view::{IterByColEqTx, IterByColRangeTx}, }, traits::TxData, }; @@ -1065,7 +1065,7 @@ impl RelationalDB { table_id: impl Into, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result, DBError> { + ) -> Result, DBError> { self.inner.iter_by_col_eq_mut_tx(tx, table_id.into(), cols, value) } @@ -1075,7 +1075,7 @@ impl RelationalDB { table_id: impl Into, cols: impl Into, value: &'r AlgebraicValue, - ) -> Result, DBError> { + ) -> Result, DBError> { self.inner.iter_by_col_eq_tx(tx, table_id.into(), cols, value) } @@ -1090,7 +1090,7 @@ impl RelationalDB { table_id: impl Into, cols: impl Into, range: R, - ) -> Result, DBError> { + ) -> Result, DBError> { self.inner.iter_by_col_range_mut_tx(tx, table_id.into(), cols, range) } @@ -1105,7 +1105,7 @@ impl RelationalDB { table_id: impl Into, cols: impl Into, range: R, - ) -> Result, DBError> { + ) -> Result, DBError> { self.inner.iter_by_col_range_tx(tx, table_id.into(), cols, range) } @@ -2148,7 +2148,7 @@ mod tests { let cols = col_list![0, 1]; let value = product![0u64, 1u64].into(); - let IterMutTxByColEq::Index(mut iter) = stdb.iter_by_col_eq_mut(&tx, table_id, cols, &value)? else { + let IterByColEqMutTx::Index(mut iter) = stdb.iter_by_col_eq_mut(&tx, table_id, cols, &value)? else { panic!("expected index iterator"); }; diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index 0821106e310..6186e479aad 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -1,8 +1,8 @@ //! The [DbProgram] that execute arbitrary queries & code against the database. -use crate::db::datastore::locking_tx_datastore::state_view::IterMutTxByColRange; +use crate::db::datastore::locking_tx_datastore::state_view::IterByColRangeMutTx; use crate::db::datastore::locking_tx_datastore::tx::TxId; -use crate::db::datastore::locking_tx_datastore::IterTxByColRange; +use crate::db::datastore::locking_tx_datastore::IterByColRangeTx; use crate::db::datastore::system_tables::{st_var_schema, StVarName, StVarRow, StVarTable}; use crate::db::relational_db::{MutTx, RelationalDB, Tx}; use crate::error::DBError; @@ -350,24 +350,24 @@ pub fn index_semi_join_left<'a>( static_assert_size!( IndexSemiJoinLeft< Box>, - fn(AlgebraicValue) -> Result, DBError>, - IterTxByColRange<'static, AlgebraicValue>, + fn(AlgebraicValue) -> Result, DBError>, + IterByColRangeTx<'static, AlgebraicValue>, >, - 256 + 232 ); static_assert_size!( IndexSemiJoinLeft< Box>, - fn(AlgebraicValue) -> Result, DBError>, - IterMutTxByColRange<'static, AlgebraicValue>, + fn(AlgebraicValue) -> Result, DBError>, + IterByColRangeMutTx<'static, AlgebraicValue>, >, - 256 + 240 ); /// An index join operator that returns matching rows from the probe side. pub struct IndexSemiJoinRight<'c, Rhs: RelOps<'c>, F> { /// An iterator for the probe side. - /// The values returned will be useSd to probe the index. + /// The values returned will be used to probe the index. probe_side: Rhs, /// The column whose value will be used to probe the index. probe_col: ColId, @@ -437,14 +437,14 @@ pub fn index_semi_join_right<'a>( static_assert_size!( IndexSemiJoinRight< Box>, - fn(AlgebraicValue) -> Result, DBError>, + fn(AlgebraicValue) -> Result, DBError>, >, 40 ); static_assert_size!( IndexSemiJoinRight< Box>, - fn(AlgebraicValue) -> Result, DBError>, + fn(AlgebraicValue) -> Result, DBError>, >, 40 ); From d76e12ddfbb96b59c0e6e53338e01c59a20c9b11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Alejandro=20Montoya=20Corte=CC=81s?= Date: Fri, 13 Dec 2024 09:21:54 -0500 Subject: [PATCH 4/5] Refactor getting the deleted table --- .../datastore/locking_tx_datastore/mut_tx.rs | 47 +++++++++---------- .../locking_tx_datastore/state_view.rs | 2 +- .../locking_tx_datastore/tx_state.rs | 6 +++ 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index dfbb800d8d5..4dfd0916a66 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -528,7 +528,7 @@ impl MutTxId { // but don't yield rows deleted in the tx state. use itertools::Either::*; use BTreeScanInner::*; - let commit_iter = commit_iter.map(|iter| match self.tx_state.delete_tables.get(&table_id) { + let commit_iter = commit_iter.map(|iter| match self.tx_state.get_delete_table(table_id) { None => Left(iter), Some(deletes) => Right(IndexScanFilterDeleted { iter, deletes }), }); @@ -1481,35 +1481,32 @@ impl StateView for MutTxId { // yet. In particular, I don't know if creating an index in a transaction and // rolling it back will leave the index in place. if let Some(inserted_rows) = self.tx_state.index_seek(table_id, &cols, &range) { + let committed_rows = self.committed_state_write_lock.index_seek(table_id, &cols, &range); // The current transaction has modified this table, and the table is indexed. - Ok( - if let Some(del_table) = self.tx_state.delete_tables.get(&table_id).filter(|x| !x.is_empty()) { - IterByColRangeMutTx::IndexWithDeletes(IndexSeekIterIdWithDeletedMutTx { - inserted_rows, - committed_rows: self.committed_state_write_lock.index_seek(table_id, &cols, &range), - del_table, - }) - } else { - IterByColRangeMutTx::Index(IndexSeekIterIdMutTx { - inserted_rows, - committed_rows: self.committed_state_write_lock.index_seek(table_id, &cols, &range), - }) - }, - ) + Ok(if let Some(del_table) = self.tx_state.get_delete_table(table_id) { + IterByColRangeMutTx::IndexWithDeletes(IndexSeekIterIdWithDeletedMutTx { + inserted_rows, + committed_rows, + del_table, + }) + } else { + IterByColRangeMutTx::Index(IndexSeekIterIdMutTx { + inserted_rows, + committed_rows, + }) + }) } else { // Either the current transaction has not modified this table, or the table is not // indexed. match self.committed_state_write_lock.index_seek(table_id, &cols, &range) { - Some(committed_rows) => Ok( - if let Some(del_table) = self.tx_state.delete_tables.get(&table_id).filter(|x| !x.is_empty()) { - IterByColRangeMutTx::CommittedIndexWithDeletes(CommittedIndexIterWithDeletedMutTx::new( - committed_rows, - del_table, - )) - } else { - IterByColRangeMutTx::CommittedIndex(committed_rows) - }, - ), + Some(committed_rows) => Ok(if let Some(del_table) = self.tx_state.get_delete_table(table_id) { + IterByColRangeMutTx::CommittedIndexWithDeletes(CommittedIndexIterWithDeletedMutTx::new( + committed_rows, + del_table, + )) + } else { + IterByColRangeMutTx::CommittedIndex(committed_rows) + }), None => { #[cfg(feature = "unindexed_iter_by_col_range_warn")] match self.table_row_count(table_id) { diff --git a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs index 4fd09dc7021..8f3533febe1 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs @@ -173,7 +173,7 @@ impl<'a> IterMutTx<'a> { let stage = if let Some(table) = committed_state.tables.get(&table_id) { // The committed state has changes for this table. let iter = table.scan_rows(&committed_state.blob_store); - if let Some(del_tables) = tx_state.delete_tables.get(&table_id).filter(|del| !del.is_empty()) { + if let Some(del_tables) = tx_state.get_delete_table(table_id) { // There are deletes in the tx state // so we must exclude those (1b). ScanStage::CommittedWithTxDeletes { iter, del_tables } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs index 5b04d76eed0..fe5ab090f54 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs @@ -145,6 +145,12 @@ impl TxState { .unwrap_or(false) } + /// Returns the [DeleteTable] for the given `table_id`, checking if it's empty. + pub(super) fn get_delete_table(&self, table_id: TableId) -> Option<&DeleteTable> { + self.delete_tables.get(&table_id).filter(|x| !x.is_empty()) + } + + /// Guarantees that the `table_id` returns a `DeleteTable`. pub(super) fn get_delete_table_mut(&mut self, table_id: TableId) -> &mut DeleteTable { self.delete_tables.entry(table_id).or_default() } From d5baa137e6118be903e7656b92c67a6ba8233899 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Alejandro=20Montoya=20Corte=CC=81s?= Date: Thu, 19 Dec 2024 12:24:26 -0500 Subject: [PATCH 5/5] Move out the associated types for MutTxDatastore --- .../locking_tx_datastore/datastore.rs | 20 ++++++++-------- crates/core/src/db/datastore/traits.rs | 23 ++++++++++--------- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index 4f02646e2eb..847572d5a1c 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -325,27 +325,16 @@ impl Tx for Locking { impl TxDatastore for Locking { type IterTx<'a> = IterTx<'a> - where - Self: 'a; - type IterMutTx<'a>= IterMutTx<'a> where Self: 'a; type IterByColRangeTx<'a, R: RangeBounds> = IterByColRangeTx<'a, R> - where - Self: 'a; - type IterByColRangeMutTx<'a, R: RangeBounds> - = IterByColRangeMutTx<'a, R> where Self: 'a; type IterByColEqTx<'a, 'r> = IterByColRangeTx<'a, &'r AlgebraicValue> where Self: 'a; - type IterByColEqMutTx<'a, 'r> - = IterByColRangeMutTx<'a, &'r AlgebraicValue> - where - Self: 'a; fn iter_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result> { tx.iter(table_id) @@ -416,6 +405,15 @@ impl TxDatastore for Locking { } impl MutTxDatastore for Locking { + type IterMutTx<'a>= IterMutTx<'a> + where + Self: 'a; + type IterByColRangeMutTx<'a, R: RangeBounds> = IterByColRangeMutTx<'a, R>; + type IterByColEqMutTx<'a, 'r> + = IterByColRangeMutTx<'a, &'r AlgebraicValue> + where + Self: 'a; + fn create_table_mut_tx(&self, tx: &mut Self::MutTx, schema: TableSchema) -> Result { tx.create_table(schema) } diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index 1c923251c59..b507f4c4360 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -350,24 +350,13 @@ pub trait TxDatastore: DataRow + Tx { where Self: 'a; - type IterMutTx<'a>: Iterator> - where - Self: 'a; - type IterByColRangeTx<'a, R: RangeBounds>: Iterator> - where - Self: 'a; - type IterByColRangeMutTx<'a, R: RangeBounds>: Iterator> where Self: 'a; type IterByColEqTx<'a, 'r>: Iterator> where Self: 'a; - type IterByColEqMutTx<'a, 'r>: Iterator> - where - Self: 'a; - fn iter_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result>; fn iter_by_col_range_tx<'a, R: RangeBounds>( @@ -404,6 +393,18 @@ pub trait TxDatastore: DataRow + Tx { } pub trait MutTxDatastore: TxDatastore + MutTx { + type IterMutTx<'a>: Iterator> + where + Self: 'a; + + type IterByColRangeMutTx<'a, R: RangeBounds>: Iterator> + where + Self: 'a; + + type IterByColEqMutTx<'a, 'r>: Iterator> + where + Self: 'a; + // Tables fn create_table_mut_tx(&self, tx: &mut Self::MutTx, schema: TableSchema) -> Result; // In these methods, we use `'tx` because the return type must borrow data