Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split Iter & IterByColRange types into Tx and MutTxId versions #2043

Merged
merged 5 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::{
datastore::Result,
sequence::{Sequence, SequencesState},
state_view::{Iter, IterByColRange, ScanIterByColRange, StateView},
state_view::{IterByColRangeTx, StateView},
tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState},
IterByColEqTx,
};
use crate::db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx};
use crate::{
db::{
datastore::{
Expand Down Expand Up @@ -39,7 +41,7 @@ use spacetimedb_table::{
table::{IndexScanIter, InsertError, RowRef, Table},
MemoryUsage,
};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;

/// Contains the live, in-memory snapshot of a database. This structure
Expand Down Expand Up @@ -69,6 +71,12 @@ impl MemoryUsage for CommittedState {
}

impl StateView for CommittedState {
type Iter<'a> = IterTx<'a>;
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>> = IterByColRangeTx<'a, R>;
type IterByColEq<'a, 'r> = IterByColEqTx<'a, 'r>
where
Self: 'a;

fn get_schema(&self, table_id: TableId) -> Option<&Arc<TableSchema>> {
self.tables.get(&table_id).map(|table| table.get_schema())
}
Expand All @@ -77,9 +85,9 @@ impl StateView for CommittedState {
self.get_table(table_id).map(|table| table.row_count)
}

fn iter(&self, table_id: TableId) -> Result<Iter<'_>> {
fn iter(&self, table_id: TableId) -> Result<Self::Iter<'_>> {
if self.table_name(table_id).is_some() {
return Ok(Iter::new(table_id, None, self));
return Ok(IterTx::new(table_id, self));
}
Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into())
}
Expand All @@ -91,15 +99,25 @@ impl StateView for CommittedState {
table_id: TableId,
cols: ColList,
range: R,
) -> Result<IterByColRange<'_, R>> {
) -> Result<Self::IterByColRange<'_, R>> {
// TODO: Why does this unconditionally return a `Scan` iter,
// instead of trying to return a `CommittedIndex` iter?
Ok(IterByColRange::Scan(ScanIterByColRange::new(
// Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen
Ok(IterByColRangeTx::Scan(ScanIterByColRangeTx::new(
self.iter(table_id)?,
cols,
range,
)))
}

fn iter_by_col_eq<'a, 'r>(
&'a self,
table_id: TableId,
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
self.iter_by_col_range(table_id, cols.into(), value)
}
}

/// Swallow `Err(TableError::Duplicate(_))`, which signals a set-semantic collision,
Expand Down Expand Up @@ -628,48 +646,25 @@ impl CommittedState {
}
}

pub struct CommittedIndexIter<'a> {
table_id: TableId,
tx_state: Option<&'a TxState>,
#[allow(dead_code)]
committed_state: &'a CommittedState,
pub struct CommittedIndexIterWithDeletedMutTx<'a> {
committed_rows: IndexScanIter<'a>,
num_committed_rows_fetched: u64,
del_table: &'a DeleteTable,
}

impl<'a> CommittedIndexIter<'a> {
pub(super) fn new(
table_id: TableId,
tx_state: Option<&'a TxState>,
committed_state: &'a CommittedState,
committed_rows: IndexScanIter<'a>,
Centril marked this conversation as resolved.
Show resolved Hide resolved
) -> Self {
impl<'a> CommittedIndexIterWithDeletedMutTx<'a> {
pub(super) fn new(committed_rows: IndexScanIter<'a>, del_table: &'a BTreeSet<RowPointer>) -> Self {
Self {
table_id,
tx_state,
committed_state,
committed_rows,
num_committed_rows_fetched: 0,
del_table,
}
}
}

impl<'a> Iterator for CommittedIndexIter<'a> {
impl<'a> Iterator for CommittedIndexIterWithDeletedMutTx<'a> {
type Item = RowRef<'a>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(row_ref) = self.committed_rows.find(|row_ref| {
!self
.tx_state
.map(|tx_state| tx_state.is_deleted(self.table_id, row_ref.pointer()))
.unwrap_or(false)
}) {
// TODO(metrics): This doesn't actually fetch a row.
// Move this counter to `RowRef::read_row`.
self.num_committed_rows_fetched += 1;
return Some(row_ref);
}

None
self.committed_rows
.find(|row_ref| !self.del_table.contains(&row_ref.pointer()))
}
}
36 changes: 23 additions & 13 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use super::{
committed_state::CommittedState,
mut_tx::MutTxId,
sequence::SequencesState,
state_view::{Iter, IterByColRange, StateView},
state_view::{IterByColRangeTx, StateView},
tx::TxId,
tx_state::TxState,
};
use crate::db::datastore::locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx};
use crate::execution_context::Workload;
use crate::{
db::{
Expand Down Expand Up @@ -322,20 +323,20 @@ impl Tx for Locking {
}

impl TxDatastore for Locking {
type Iter<'a>
= Iter<'a>
type IterTx<'a>
= IterTx<'a>
where
Self: 'a;
type IterByColEq<'a, 'r>
= IterByColRange<'a, &'r AlgebraicValue>
type IterByColRangeTx<'a, R: RangeBounds<AlgebraicValue>>
= IterByColRangeTx<'a, R>
where
Self: 'a;
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>>
= IterByColRange<'a, R>
type IterByColEqTx<'a, 'r>
= IterByColRangeTx<'a, &'r AlgebraicValue>
where
Self: 'a;

fn iter_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result<Self::Iter<'a>> {
fn iter_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result<Self::IterTx<'a>> {
tx.iter(table_id)
}

Expand All @@ -345,7 +346,7 @@ impl TxDatastore for Locking {
table_id: TableId,
cols: impl Into<ColList>,
range: R,
) -> Result<Self::IterByColRange<'a, R>> {
) -> Result<Self::IterByColRangeTx<'a, R>> {
tx.iter_by_col_range(table_id, cols.into(), range)
}

Expand All @@ -355,7 +356,7 @@ impl TxDatastore for Locking {
table_id: TableId,
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
) -> Result<Self::IterByColEqTx<'a, 'r>> {
tx.iter_by_col_eq(table_id, cols, value)
}

Expand Down Expand Up @@ -404,6 +405,15 @@ impl TxDatastore for Locking {
}

impl MutTxDatastore for Locking {
type IterMutTx<'a>= IterMutTx<'a>
where
Self: 'a;
type IterByColRangeMutTx<'a, R: RangeBounds<AlgebraicValue>> = IterByColRangeMutTx<'a, R>;
type IterByColEqMutTx<'a, 'r>
= IterByColRangeMutTx<'a, &'r AlgebraicValue>
where
Self: 'a;

fn create_table_mut_tx(&self, tx: &mut Self::MutTx, schema: TableSchema) -> Result<TableId> {
tx.create_table(schema)
}
Expand Down Expand Up @@ -492,7 +502,7 @@ impl MutTxDatastore for Locking {
tx.constraint_id_from_name(constraint_name)
}

fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result<Self::Iter<'a>> {
fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result<Self::IterMutTx<'a>> {
tx.iter(table_id)
}

Expand All @@ -502,7 +512,7 @@ impl MutTxDatastore for Locking {
table_id: TableId,
cols: impl Into<ColList>,
range: R,
) -> Result<Self::IterByColRange<'a, R>> {
) -> Result<Self::IterByColRangeMutTx<'a, R>> {
tx.iter_by_col_range(table_id, cols.into(), range)
}

Expand All @@ -512,7 +522,7 @@ impl MutTxDatastore for Locking {
table_id: TableId,
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
) -> Result<Self::IterByColEqMutTx<'a, 'r>> {
tx.iter_by_col_eq(table_id, cols.into(), value)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod mut_tx;
pub use mut_tx::MutTxId;
mod sequence;
pub mod state_view;
pub use state_view::{Iter, IterByColEq, IterByColRange};
pub use state_view::{IterByColEqTx, IterByColRangeTx};
pub(crate) mod tx;
mod tx_state;

Expand Down
69 changes: 48 additions & 21 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use super::{
committed_state::{CommittedIndexIter, CommittedState},
committed_state::CommittedState,
datastore::{record_metrics, Result},
sequence::{Sequence, SequencesState},
state_view::{IndexSeekIterMutTxId, Iter, IterByColRange, ScanIterByColRange, StateView},
state_view::{IndexSeekIterIdMutTx, ScanIterByColRangeMutTx, StateView},
tx::TxId,
tx_state::{DeleteTable, IndexIdMap, TxState},
SharedMutexGuard, SharedWriteGuard,
};
use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterWithDeletedMutTx;
use crate::db::datastore::locking_tx_datastore::state_view::{
IndexSeekIterIdWithDeletedMutTx, IterByColEqMutTx, IterByColRangeMutTx, IterMutTx,
};
use crate::db::datastore::system_tables::{StRowLevelSecurityFields, StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID};
use crate::db::datastore::{
system_tables::{
Expand Down Expand Up @@ -524,7 +528,7 @@ impl MutTxId {
// but don't yield rows deleted in the tx state.
use itertools::Either::*;
use BTreeScanInner::*;
let commit_iter = commit_iter.map(|iter| match self.tx_state.delete_tables.get(&table_id) {
let commit_iter = commit_iter.map(|iter| match self.tx_state.get_delete_table(table_id) {
None => Left(iter),
Some(deletes) => Right(IndexScanFilterDeleted { iter, deletes }),
});
Expand Down Expand Up @@ -1419,6 +1423,12 @@ impl MutTxId {
}

impl StateView for MutTxId {
type Iter<'a> = IterMutTx<'a>;
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>> = IterByColRangeMutTx<'a, R>;
type IterByColEq<'a, 'r> = IterByColEqMutTx<'a, 'r>
where
Self: 'a;

fn get_schema(&self, table_id: TableId) -> Option<&Arc<TableSchema>> {
// TODO(bikeshedding, docs): should this also check if the schema is in the system tables,
// but the table hasn't been constructed yet?
Expand All @@ -1442,11 +1452,11 @@ impl StateView for MutTxId {
}
}

fn iter(&self, table_id: TableId) -> Result<Iter<'_>> {
fn iter(&self, table_id: TableId) -> Result<Self::Iter<'_>> {
if self.table_name(table_id).is_some() {
return Ok(Iter::new(
return Ok(IterMutTx::new(
table_id,
Some(&self.tx_state),
&self.tx_state,
&self.committed_state_write_lock,
));
}
Expand All @@ -1458,7 +1468,7 @@ impl StateView for MutTxId {
table_id: TableId,
cols: ColList,
range: R,
) -> Result<IterByColRange<'_, R>> {
) -> Result<Self::IterByColRange<'_, R>> {
// We have to index_seek in both the committed state and the current tx state.
// First, we will check modifications in the current tx. It may be that the table
// has not been modified yet in the current tx, in which case we will only search
Expand All @@ -1471,24 +1481,32 @@ impl StateView for MutTxId {
// yet. In particular, I don't know if creating an index in a transaction and
// rolling it back will leave the index in place.
if let Some(inserted_rows) = self.tx_state.index_seek(table_id, &cols, &range) {
let committed_rows = self.committed_state_write_lock.index_seek(table_id, &cols, &range);
// The current transaction has modified this table, and the table is indexed.
Ok(IterByColRange::Index(IndexSeekIterMutTxId {
table_id,
tx_state: &self.tx_state,
inserted_rows,
committed_rows: self.committed_state_write_lock.index_seek(table_id, &cols, &range),
num_committed_rows_fetched: 0,
}))
Ok(if let Some(del_table) = self.tx_state.get_delete_table(table_id) {
IterByColRangeMutTx::IndexWithDeletes(IndexSeekIterIdWithDeletedMutTx {
inserted_rows,
committed_rows,
del_table,
})
} else {
IterByColRangeMutTx::Index(IndexSeekIterIdMutTx {
inserted_rows,
committed_rows,
})
})
} else {
// Either the current transaction has not modified this table, or the table is not
// indexed.
match self.committed_state_write_lock.index_seek(table_id, &cols, &range) {
Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::new(
table_id,
Some(&self.tx_state),
&self.committed_state_write_lock,
committed_rows,
))),
Some(committed_rows) => Ok(if let Some(del_table) = self.tx_state.get_delete_table(table_id) {
IterByColRangeMutTx::CommittedIndexWithDeletes(CommittedIndexIterWithDeletedMutTx::new(
committed_rows,
del_table,
))
} else {
IterByColRangeMutTx::CommittedIndex(committed_rows)
}),
None => {
#[cfg(feature = "unindexed_iter_by_col_range_warn")]
match self.table_row_count(table_id) {
Expand Down Expand Up @@ -1518,7 +1536,7 @@ impl StateView for MutTxId {
}
}

Ok(IterByColRange::Scan(ScanIterByColRange::new(
Ok(IterByColRangeMutTx::Scan(ScanIterByColRangeMutTx::new(
self.iter(table_id)?,
cols,
range,
Expand All @@ -1527,4 +1545,13 @@ impl StateView for MutTxId {
}
}
}

fn iter_by_col_eq<'a, 'r>(
&'a self,
table_id: TableId,
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
self.iter_by_col_range(table_id, cols.into(), value)
}
}
Loading
Loading