Skip to content

Commit

Permalink
Split Iter & IterByColRange types into Tx and MutTxId versions (#2043)
Browse files Browse the repository at this point in the history
  • Loading branch information
mamcx authored Dec 31, 2024
1 parent 78b48c9 commit 20d397c
Show file tree
Hide file tree
Showing 11 changed files with 481 additions and 249 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::{
datastore::Result,
sequence::{Sequence, SequencesState},
state_view::{Iter, IterByColRange, ScanIterByColRange, StateView},
state_view::{IterByColRangeTx, StateView},
tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState},
IterByColEqTx,
};
use crate::db::datastore::locking_tx_datastore::state_view::{IterTx, ScanIterByColRangeTx};
use crate::{
db::{
datastore::{
Expand Down Expand Up @@ -39,7 +41,7 @@ use spacetimedb_table::{
table::{IndexScanIter, InsertError, RowRef, Table},
MemoryUsage,
};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;

/// Contains the live, in-memory snapshot of a database. This structure
Expand Down Expand Up @@ -69,6 +71,12 @@ impl MemoryUsage for CommittedState {
}

impl StateView for CommittedState {
type Iter<'a> = IterTx<'a>;
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>> = IterByColRangeTx<'a, R>;
type IterByColEq<'a, 'r> = IterByColEqTx<'a, 'r>
where
Self: 'a;

fn get_schema(&self, table_id: TableId) -> Option<&Arc<TableSchema>> {
self.tables.get(&table_id).map(|table| table.get_schema())
}
Expand All @@ -77,9 +85,9 @@ impl StateView for CommittedState {
self.get_table(table_id).map(|table| table.row_count)
}

fn iter(&self, table_id: TableId) -> Result<Iter<'_>> {
fn iter(&self, table_id: TableId) -> Result<Self::Iter<'_>> {
if self.table_name(table_id).is_some() {
return Ok(Iter::new(table_id, None, self));
return Ok(IterTx::new(table_id, self));
}
Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into())
}
Expand All @@ -91,15 +99,25 @@ impl StateView for CommittedState {
table_id: TableId,
cols: ColList,
range: R,
) -> Result<IterByColRange<'_, R>> {
) -> Result<Self::IterByColRange<'_, R>> {
// TODO: Why does this unconditionally return a `Scan` iter,
// instead of trying to return a `CommittedIndex` iter?
Ok(IterByColRange::Scan(ScanIterByColRange::new(
// Answer: Because CommittedIndexIter::tx_state: Option<&'a TxState> need to be Some to read after reopen
Ok(IterByColRangeTx::Scan(ScanIterByColRangeTx::new(
self.iter(table_id)?,
cols,
range,
)))
}

fn iter_by_col_eq<'a, 'r>(
&'a self,
table_id: TableId,
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
self.iter_by_col_range(table_id, cols.into(), value)
}
}

/// Swallow `Err(TableError::Duplicate(_))`, which signals a set-semantic collision,
Expand Down Expand Up @@ -628,48 +646,25 @@ impl CommittedState {
}
}

pub struct CommittedIndexIter<'a> {
table_id: TableId,
tx_state: Option<&'a TxState>,
#[allow(dead_code)]
committed_state: &'a CommittedState,
pub struct CommittedIndexIterWithDeletedMutTx<'a> {
committed_rows: IndexScanIter<'a>,
num_committed_rows_fetched: u64,
del_table: &'a DeleteTable,
}

impl<'a> CommittedIndexIter<'a> {
pub(super) fn new(
table_id: TableId,
tx_state: Option<&'a TxState>,
committed_state: &'a CommittedState,
committed_rows: IndexScanIter<'a>,
) -> Self {
impl<'a> CommittedIndexIterWithDeletedMutTx<'a> {
pub(super) fn new(committed_rows: IndexScanIter<'a>, del_table: &'a BTreeSet<RowPointer>) -> Self {
Self {
table_id,
tx_state,
committed_state,
committed_rows,
num_committed_rows_fetched: 0,
del_table,
}
}
}

impl<'a> Iterator for CommittedIndexIter<'a> {
impl<'a> Iterator for CommittedIndexIterWithDeletedMutTx<'a> {
type Item = RowRef<'a>;

fn next(&mut self) -> Option<Self::Item> {
if let Some(row_ref) = self.committed_rows.find(|row_ref| {
!self
.tx_state
.map(|tx_state| tx_state.is_deleted(self.table_id, row_ref.pointer()))
.unwrap_or(false)
}) {
// TODO(metrics): This doesn't actually fetch a row.
// Move this counter to `RowRef::read_row`.
self.num_committed_rows_fetched += 1;
return Some(row_ref);
}

None
self.committed_rows
.find(|row_ref| !self.del_table.contains(&row_ref.pointer()))
}
}
36 changes: 23 additions & 13 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use super::{
committed_state::CommittedState,
mut_tx::MutTxId,
sequence::SequencesState,
state_view::{Iter, IterByColRange, StateView},
state_view::{IterByColRangeTx, StateView},
tx::TxId,
tx_state::TxState,
};
use crate::db::datastore::locking_tx_datastore::state_view::{IterByColRangeMutTx, IterMutTx, IterTx};
use crate::execution_context::Workload;
use crate::{
db::{
Expand Down Expand Up @@ -322,20 +323,20 @@ impl Tx for Locking {
}

impl TxDatastore for Locking {
type Iter<'a>
= Iter<'a>
type IterTx<'a>
= IterTx<'a>
where
Self: 'a;
type IterByColEq<'a, 'r>
= IterByColRange<'a, &'r AlgebraicValue>
type IterByColRangeTx<'a, R: RangeBounds<AlgebraicValue>>
= IterByColRangeTx<'a, R>
where
Self: 'a;
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>>
= IterByColRange<'a, R>
type IterByColEqTx<'a, 'r>
= IterByColRangeTx<'a, &'r AlgebraicValue>
where
Self: 'a;

fn iter_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result<Self::Iter<'a>> {
fn iter_tx<'a>(&'a self, tx: &'a Self::Tx, table_id: TableId) -> Result<Self::IterTx<'a>> {
tx.iter(table_id)
}

Expand All @@ -345,7 +346,7 @@ impl TxDatastore for Locking {
table_id: TableId,
cols: impl Into<ColList>,
range: R,
) -> Result<Self::IterByColRange<'a, R>> {
) -> Result<Self::IterByColRangeTx<'a, R>> {
tx.iter_by_col_range(table_id, cols.into(), range)
}

Expand All @@ -355,7 +356,7 @@ impl TxDatastore for Locking {
table_id: TableId,
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
) -> Result<Self::IterByColEqTx<'a, 'r>> {
tx.iter_by_col_eq(table_id, cols, value)
}

Expand Down Expand Up @@ -404,6 +405,15 @@ impl TxDatastore for Locking {
}

impl MutTxDatastore for Locking {
type IterMutTx<'a>= IterMutTx<'a>
where
Self: 'a;
type IterByColRangeMutTx<'a, R: RangeBounds<AlgebraicValue>> = IterByColRangeMutTx<'a, R>;
type IterByColEqMutTx<'a, 'r>
= IterByColRangeMutTx<'a, &'r AlgebraicValue>
where
Self: 'a;

fn create_table_mut_tx(&self, tx: &mut Self::MutTx, schema: TableSchema) -> Result<TableId> {
tx.create_table(schema)
}
Expand Down Expand Up @@ -492,7 +502,7 @@ impl MutTxDatastore for Locking {
tx.constraint_id_from_name(constraint_name)
}

fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result<Self::Iter<'a>> {
fn iter_mut_tx<'a>(&'a self, tx: &'a Self::MutTx, table_id: TableId) -> Result<Self::IterMutTx<'a>> {
tx.iter(table_id)
}

Expand All @@ -502,7 +512,7 @@ impl MutTxDatastore for Locking {
table_id: TableId,
cols: impl Into<ColList>,
range: R,
) -> Result<Self::IterByColRange<'a, R>> {
) -> Result<Self::IterByColRangeMutTx<'a, R>> {
tx.iter_by_col_range(table_id, cols.into(), range)
}

Expand All @@ -512,7 +522,7 @@ impl MutTxDatastore for Locking {
table_id: TableId,
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
) -> Result<Self::IterByColEqMutTx<'a, 'r>> {
tx.iter_by_col_eq(table_id, cols.into(), value)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod mut_tx;
pub use mut_tx::MutTxId;
mod sequence;
pub mod state_view;
pub use state_view::{Iter, IterByColEq, IterByColRange};
pub use state_view::{IterByColEqTx, IterByColRangeTx};
pub(crate) mod tx;
mod tx_state;

Expand Down
69 changes: 48 additions & 21 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use super::{
committed_state::{CommittedIndexIter, CommittedState},
committed_state::CommittedState,
datastore::{record_metrics, Result},
sequence::{Sequence, SequencesState},
state_view::{IndexSeekIterMutTxId, Iter, IterByColRange, ScanIterByColRange, StateView},
state_view::{IndexSeekIterIdMutTx, ScanIterByColRangeMutTx, StateView},
tx::TxId,
tx_state::{DeleteTable, IndexIdMap, TxState},
SharedMutexGuard, SharedWriteGuard,
};
use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexIterWithDeletedMutTx;
use crate::db::datastore::locking_tx_datastore::state_view::{
IndexSeekIterIdWithDeletedMutTx, IterByColEqMutTx, IterByColRangeMutTx, IterMutTx,
};
use crate::db::datastore::system_tables::{StRowLevelSecurityFields, StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID};
use crate::db::datastore::{
system_tables::{
Expand Down Expand Up @@ -524,7 +528,7 @@ impl MutTxId {
// but don't yield rows deleted in the tx state.
use itertools::Either::*;
use BTreeScanInner::*;
let commit_iter = commit_iter.map(|iter| match self.tx_state.delete_tables.get(&table_id) {
let commit_iter = commit_iter.map(|iter| match self.tx_state.get_delete_table(table_id) {
None => Left(iter),
Some(deletes) => Right(IndexScanFilterDeleted { iter, deletes }),
});
Expand Down Expand Up @@ -1419,6 +1423,12 @@ impl MutTxId {
}

impl StateView for MutTxId {
type Iter<'a> = IterMutTx<'a>;
type IterByColRange<'a, R: RangeBounds<AlgebraicValue>> = IterByColRangeMutTx<'a, R>;
type IterByColEq<'a, 'r> = IterByColEqMutTx<'a, 'r>
where
Self: 'a;

fn get_schema(&self, table_id: TableId) -> Option<&Arc<TableSchema>> {
// TODO(bikeshedding, docs): should this also check if the schema is in the system tables,
// but the table hasn't been constructed yet?
Expand All @@ -1442,11 +1452,11 @@ impl StateView for MutTxId {
}
}

fn iter(&self, table_id: TableId) -> Result<Iter<'_>> {
fn iter(&self, table_id: TableId) -> Result<Self::Iter<'_>> {
if self.table_name(table_id).is_some() {
return Ok(Iter::new(
return Ok(IterMutTx::new(
table_id,
Some(&self.tx_state),
&self.tx_state,
&self.committed_state_write_lock,
));
}
Expand All @@ -1458,7 +1468,7 @@ impl StateView for MutTxId {
table_id: TableId,
cols: ColList,
range: R,
) -> Result<IterByColRange<'_, R>> {
) -> Result<Self::IterByColRange<'_, R>> {
// We have to index_seek in both the committed state and the current tx state.
// First, we will check modifications in the current tx. It may be that the table
// has not been modified yet in the current tx, in which case we will only search
Expand All @@ -1471,24 +1481,32 @@ impl StateView for MutTxId {
// yet. In particular, I don't know if creating an index in a transaction and
// rolling it back will leave the index in place.
if let Some(inserted_rows) = self.tx_state.index_seek(table_id, &cols, &range) {
let committed_rows = self.committed_state_write_lock.index_seek(table_id, &cols, &range);
// The current transaction has modified this table, and the table is indexed.
Ok(IterByColRange::Index(IndexSeekIterMutTxId {
table_id,
tx_state: &self.tx_state,
inserted_rows,
committed_rows: self.committed_state_write_lock.index_seek(table_id, &cols, &range),
num_committed_rows_fetched: 0,
}))
Ok(if let Some(del_table) = self.tx_state.get_delete_table(table_id) {
IterByColRangeMutTx::IndexWithDeletes(IndexSeekIterIdWithDeletedMutTx {
inserted_rows,
committed_rows,
del_table,
})
} else {
IterByColRangeMutTx::Index(IndexSeekIterIdMutTx {
inserted_rows,
committed_rows,
})
})
} else {
// Either the current transaction has not modified this table, or the table is not
// indexed.
match self.committed_state_write_lock.index_seek(table_id, &cols, &range) {
Some(committed_rows) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter::new(
table_id,
Some(&self.tx_state),
&self.committed_state_write_lock,
committed_rows,
))),
Some(committed_rows) => Ok(if let Some(del_table) = self.tx_state.get_delete_table(table_id) {
IterByColRangeMutTx::CommittedIndexWithDeletes(CommittedIndexIterWithDeletedMutTx::new(
committed_rows,
del_table,
))
} else {
IterByColRangeMutTx::CommittedIndex(committed_rows)
}),
None => {
#[cfg(feature = "unindexed_iter_by_col_range_warn")]
match self.table_row_count(table_id) {
Expand Down Expand Up @@ -1518,7 +1536,7 @@ impl StateView for MutTxId {
}
}

Ok(IterByColRange::Scan(ScanIterByColRange::new(
Ok(IterByColRangeMutTx::Scan(ScanIterByColRangeMutTx::new(
self.iter(table_id)?,
cols,
range,
Expand All @@ -1527,4 +1545,13 @@ impl StateView for MutTxId {
}
}
}

fn iter_by_col_eq<'a, 'r>(
&'a self,
table_id: TableId,
cols: impl Into<ColList>,
value: &'r AlgebraicValue,
) -> Result<Self::IterByColEq<'a, 'r>> {
self.iter_by_col_range(table_id, cols.into(), value)
}
}
Loading

1 comment on commit 20d397c

@github-actions
Copy link

@github-actions github-actions bot commented on 20d397c Dec 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Criterion benchmark results

Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

Please sign in to comment.