Skip to content

Commit

Permalink
Make the key of Table.indexes be IndexId (#2124)
Browse files Browse the repository at this point in the history
  • Loading branch information
Centril authored Jan 17, 2025
1 parent a54ea3a commit 6f428f3
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 221 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ use spacetimedb_lib::{
db::auth::{StAccess, StTableType},
Identity,
};
use spacetimedb_primitives::{ColList, ColSet, TableId};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductValue};
use spacetimedb_primitives::{ColList, ColSet, IndexId, TableId};
use spacetimedb_sats::{AlgebraicValue, ProductValue};
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
table::{IndexScanIter, InsertError, RowRef, Table},
table::{IndexScanIter, InsertError, RowRef, Table, TableAndIndex},
MemoryUsage,
};
use std::collections::{BTreeMap, BTreeSet};
Expand Down Expand Up @@ -375,19 +375,19 @@ impl CommittedState {
.collect();

for index_row in rows {
let Some((table, blob_store)) = self.get_table_and_blob_store(index_row.table_id) else {
let index_id = index_row.index_id;
let table_id = index_row.table_id;
let Some((table, blob_store)) = self.get_table_and_blob_store(table_id) else {
panic!("Cannot create index for table which doesn't exist in committed state");
};
let columns = match index_row.index_algorithm {
StIndexAlgorithm::BTree { columns } => columns,
_ => unimplemented!("Only BTree indexes are supported"),
};
let is_unique = unique_constraints.contains(&(index_row.table_id, (&columns).into()));

let index = table.new_index(index_row.index_id, &columns, is_unique)?;
table.insert_index(blob_store, columns.clone(), index);
self.index_id_map
.insert(index_row.index_id, (index_row.table_id, columns));
let is_unique = unique_constraints.contains(&(table_id, (&columns).into()));
let index = table.new_index(columns.clone(), is_unique)?;
table.insert_index(blob_store, index_id, index);
self.index_id_map.insert(index_id, table_id);
}
Ok(())
}
Expand Down Expand Up @@ -429,13 +429,36 @@ impl CommittedState {
Ok(())
}

/// When there's an index on `cols`,
/// returns an iterator over the [BTreeIndex] that yields all the [`RowRef`]s
/// that match the specified `range` in the indexed column.
///
/// Matching is defined by `Ord for AlgebraicValue`.
///
/// For a unique index this will always yield at most one `RowRef`.
/// When there is no index this returns `None`.
pub(super) fn index_seek<'a>(
&'a self,
table_id: TableId,
cols: &ColList,
range: &impl RangeBounds<AlgebraicValue>,
) -> Option<IndexScanIter<'a>> {
self.tables.get(&table_id)?.index_seek(&self.blob_store, cols, range)
self.tables
.get(&table_id)?
.get_index_by_cols_with_table(&self.blob_store, cols)
.map(|i| i.seek(range))
}

/// Returns the table associated with the given `index_id`, if any.
pub(super) fn get_table_for_index(&self, index_id: IndexId) -> Option<TableId> {
self.index_id_map.get(&index_id).copied()
}

/// Returns the table for `table_id` combined with the index for `index_id`, if both exist.
pub(super) fn get_index_by_id_with_table(&self, table_id: TableId, index_id: IndexId) -> Option<TableAndIndex<'_>> {
self.tables
.get(&table_id)?
.get_index_by_id_with_table(&self.blob_store, index_id)
}

// TODO(perf, deep-integration): Make this method `unsafe`. Add the following to the docs:
Expand Down Expand Up @@ -637,13 +660,6 @@ impl CommittedState {
let blob_store = &mut self.blob_store;
(table, blob_store)
}

/// Returns the table and index associated with the given `table_id` and `col_list`, if any.
pub(super) fn get_table_and_index_type(&self, table_id: TableId, col_list: &ColList) -> Option<&AlgebraicType> {
let table = self.tables.get(&table_id)?;
let index = table.indexes.get(col_list)?;
Some(&index.key_type)
}
}

pub struct CommittedIndexIterWithDeletedMutTx<'a> {
Expand Down
96 changes: 51 additions & 45 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use spacetimedb_schema::{
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
table::{IndexScanIter, InsertError, RowRef, Table},
table::{IndexScanIter, InsertError, RowRef, Table, TableAndIndex},
};
use std::{
sync::Arc,
Expand Down Expand Up @@ -395,15 +395,15 @@ impl MutTxId {
//
// Ensure adding the index does not cause a unique constraint violation due to
// the existing rows having the same value for some column(s).
let mut insert_index = table.new_index(index.index_id, &columns, is_unique)?;
let mut insert_index = table.new_index(columns.clone(), is_unique)?;
let mut build_from_rows = |table: &Table, bs: &dyn BlobStore| -> Result<()> {
if let Some(violation) = insert_index.build_from_rows(&columns, table.scan_rows(bs))? {
if let Some(violation) = insert_index.build_from_rows(table.scan_rows(bs))? {
let violation = table
.get_row_ref(bs, violation)
.expect("row came from scanning the table")
.project(&columns)
.expect("`cols` should consist of valid columns for this table");
return Err(IndexError::from(table.build_error_unique(&insert_index, &columns, violation)).into());
return Err(IndexError::from(table.build_error_unique(&insert_index, index_id, violation)).into());
}
Ok(())
};
Expand All @@ -421,16 +421,17 @@ impl MutTxId {
build_from_rows(commit_table, commit_blob_store)?;
}

table.add_index(columns.clone(), insert_index);
// Associate `index_id -> (table_id, col_list)` for fast lookup.
idx_map.insert(index_id, (table_id, columns.clone()));

log::trace!(
"INDEX CREATED: {} for table: {} and col(s): {:?}",
index_id,
table_id,
columns
);

table.add_index(index_id, insert_index);
// Associate `index_id -> table_id` for fast lookup.
idx_map.insert(index_id, table_id);

// Update the table's schema.
// This won't clone-write when creating a table but likely to otherwise.
table.with_mut_schema(|s| s.indexes.push(index));
Expand All @@ -455,16 +456,10 @@ impl MutTxId {
// Remove the index in the transaction's insert table.
// By altering the insert table, this gets moved over to the committed state on merge.
let (table, blob_store, idx_map, ..) = self.get_or_create_insert_table_mut(table_id)?;
if let Some(col) = table
.indexes
.iter()
.find(|(_, idx)| idx.index_id == index_id)
.map(|(cols, _)| cols.clone())
{
if table.delete_index(blob_store, index_id) {
// This likely will do a clone-write as over time?
// The schema might have found other referents.
table.with_mut_schema(|s| s.indexes.retain(|x| x.index_algorithm.columns() != &col));
table.delete_index(blob_store, &col);
table.with_mut_schema(|s| s.indexes.retain(|x| x.index_id != index_id));
}
// Remove the `index_id -> (table_id, col_list)` association.
idx_map.remove(&index_id);
Expand Down Expand Up @@ -497,21 +492,20 @@ impl MutTxId {
rstart: &[u8],
rend: &[u8],
) -> Result<(TableId, BTreeScan<'a>)> {
// Extract the table and index type for the tx state.
let (table_id, col_list, tx_idx_key_type) = self
.get_table_and_index_type(index_id)
.ok_or_else(|| IndexError::NotFound(index_id))?;
// Extract the table id, index type, and commit/tx indices.
let (table_id_and_index_ty, commit_index, tx_index) = self.get_table_and_index_type(index_id);
let (table_id, index_ty) = table_id_and_index_ty.ok_or_else(|| IndexError::NotFound(index_id))?;

// TODO(centril): Once we have more index types than `btree`,
// we'll need to enforce that `index_id` refers to a btree index.

// We have the index key type, so we can decode everything.
let bounds = Self::btree_decode_bounds(tx_idx_key_type, prefix, prefix_elems, rstart, rend)
.map_err(IndexError::Decode)?;
let bounds =
Self::btree_decode_bounds(index_ty, prefix, prefix_elems, rstart, rend).map_err(IndexError::Decode)?;

// Get an index seek iterator for the tx and committed state.
let tx_iter = self.tx_state.index_seek(table_id, col_list, &bounds);
let commit_iter = self.committed_state_write_lock.index_seek(table_id, col_list, &bounds);
let tx_iter = tx_index.map(|i| i.seek(&bounds));
let commit_iter = commit_index.map(|i| i.seek(&bounds));

// Chain together the indexed rows in the tx and committed state,
// but don't yield rows deleted in the tx state.
Expand All @@ -521,7 +515,7 @@ impl MutTxId {
None => Left(iter),
Some(deletes) => Right(IndexScanFilterDeleted { iter, deletes }),
});
// this is effectively just `tx_iter.into_iter().flatten().chain(commit_iter.into_iter().flatten())`,
// This is effectively just `tx_iter.into_iter().flatten().chain(commit_iter.into_iter().flatten())`,
// but with all the branching and `Option`s flattened to just one layer.
let iter = match (tx_iter, commit_iter) {
(None, None) => Empty(iter::empty()),
Expand All @@ -534,34 +528,46 @@ impl MutTxId {
Ok((table_id, BTreeScan { inner: iter }))
}

/// Translate `index_id` to the table id, the column list and index key type.
fn get_table_and_index_type(&self, index_id: IndexId) -> Option<(TableId, &ColList, &AlgebraicType)> {
/// Translate `index_id` to the table id, index type, and commit/tx indices.
fn get_table_and_index_type(
&self,
index_id: IndexId,
) -> (
Option<(TableId, &AlgebraicType)>,
Option<TableAndIndex<'_>>,
Option<TableAndIndex<'_>>,
) {
// The order of querying the committed vs. tx state for the translation is not important.
// But it is vastly more likely that it is in the committed state,
// so query that first to avoid two lookups.
let &(table_id, ref col_list) = self
.committed_state_write_lock
.index_id_map
.get(&index_id)
.or_else(|| self.tx_state.index_id_map.get(&index_id))?;

// The tx state must have the index.
//
// Also, the tx state must have the index.
// If the index was e.g., dropped from the tx state but exists physically in the committed state,
// the index does not exist, semantically.
// TODO: handle the case where the table has been dropped in this transaction.
let key_ty = if let Some(key_ty) = self
let commit_table_id = self
.committed_state_write_lock
.get_table_and_index_type(table_id, col_list)
{
if self.tx_state_removed_index(index_id) {
return None;
}
key_ty
.get_table_for_index(index_id)
.filter(|_| !self.tx_state_removed_index(index_id));

let (table_id, commit_index, tx_index) = if let t_id @ Some(table_id) = commit_table_id {
// Index found for commit state, might also exist for tx state.
let commit_index = self
.committed_state_write_lock
.get_index_by_id_with_table(table_id, index_id);
let tx_index = self.tx_state.get_index_by_id_with_table(table_id, index_id);
(t_id, commit_index, tx_index)
} else if let t_id @ Some(table_id) = self.tx_state.get_table_for_index(index_id) {
// Index might exist for tx state.
let tx_index = self.tx_state.get_index_by_id_with_table(table_id, index_id);
(t_id, None, tx_index)
} else {
self.tx_state.get_table_and_index_type(table_id, col_list)?
// No index in either side.
(None, None, None)
};

Some((table_id, col_list, key_ty))
let index_ty = commit_index.or(tx_index).map(|index| &index.index().key_type);
let table_id_and_index_ty = table_id.zip(index_ty);
(table_id_and_index_ty, commit_index, tx_index)
}

/// Returns whether the index with `index_id` was removed in this transaction.
Expand Down Expand Up @@ -1540,7 +1546,7 @@ impl StateView for MutTxId {
// TODO(george): It's unclear that we truly support dynamically creating an index
// yet. In particular, I don't know if creating an index in a transaction and
// rolling it back will leave the index in place.
if let Some(inserted_rows) = self.tx_state.index_seek(table_id, &cols, &range) {
if let Some(inserted_rows) = self.tx_state.index_seek_by_cols(table_id, &cols, &range) {
let committed_rows = self.committed_state_write_lock.index_seek(table_id, &cols, &range);
// The current transaction has modified this table, and the table is indexed.
Ok(if let Some(del_table) = self.tx_state.get_delete_table(table_id) {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/db/datastore/locking_tx_datastore/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl TxId {
// Do not change its return type to a bare `u64`.
pub(crate) fn num_distinct_values(&self, table_id: TableId, cols: &ColList) -> Option<NonZeroU64> {
let table = self.committed_state_shared_lock.get_table(table_id)?;
let index = table.indexes.get(cols)?;
let (_, index) = table.get_index_by_cols(cols)?;
NonZeroU64::new(index.num_keys() as u64)
}
}
36 changes: 21 additions & 15 deletions crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use core::ops::RangeBounds;
use spacetimedb_data_structures::map::{IntMap, IntSet};
use spacetimedb_primitives::{ColList, IndexId, TableId};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue};
use spacetimedb_sats::AlgebraicValue;
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
static_assert_size,
table::{IndexScanIter, RowRef, Table},
table::{IndexScanIter, RowRef, Table, TableAndIndex},
};
use std::collections::{btree_map, BTreeMap, BTreeSet};

pub(super) type DeleteTable = BTreeSet<RowPointer>;

/// A mapping to find the actual index given an `IndexId`.
pub(super) type IndexIdMap = IntMap<IndexId, (TableId, ColList)>;
pub(super) type IndexIdMap = IntMap<IndexId, TableId>;
pub(super) type RemovedIndexIdSet = IntSet<IndexId>;

/// `TxState` tracks all of the modifications made during a particular transaction.
Expand Down Expand Up @@ -89,22 +89,35 @@ impl TxState {
}

/// When there's an index on `cols`,
/// returns an iterator over the [BTreeIndex] that yields all the `RowId`s
/// that match the specified `value` in the indexed column.
/// returns an iterator over the [BTreeIndex] that yields all the [`RowRef`]s
/// that match the specified `range` in the indexed column.
///
/// Matching is defined by `Ord for AlgebraicValue`.
///
/// For a unique index this will always yield at most one `RowId`.
/// For a unique index this will always yield at most one `RowRef`.
/// When there is no index this returns `None`.
pub(super) fn index_seek<'a>(
pub(super) fn index_seek_by_cols<'a>(
&'a self,
table_id: TableId,
cols: &ColList,
range: &impl RangeBounds<AlgebraicValue>,
) -> Option<IndexScanIter<'a>> {
self.insert_tables
.get(&table_id)?
.index_seek(&self.blob_store, cols, range)
.get_index_by_cols_with_table(&self.blob_store, cols)
.map(|i| i.seek(range))
}

/// Returns the table associated with the given `index_id`, if any.
pub(super) fn get_table_for_index(&self, index_id: IndexId) -> Option<TableId> {
self.index_id_map.get(&index_id).copied()
}

/// Returns the table for `table_id` combined with the index for `index_id`, if both exist.
pub(super) fn get_index_by_id_with_table(&self, table_id: TableId, index_id: IndexId) -> Option<TableAndIndex<'_>> {
self.insert_tables
.get(&table_id)?
.get_index_by_id_with_table(&self.blob_store, index_id)
}

// TODO(perf, deep-integration): Make this unsafe. Add the following to the docs:
Expand Down Expand Up @@ -203,11 +216,4 @@ impl TxState {
let delete_table = unsafe { delete_table.unwrap_unchecked() };
(tx_table, tx_blob_store, delete_table)
}

/// Returns the table and index associated with the given `table_id` and `col_list`, if any.
pub(super) fn get_table_and_index_type(&self, table_id: TableId, col_list: &ColList) -> Option<&AlgebraicType> {
let table = self.insert_tables.get(&table_id)?;
let index = table.indexes.get(col_list)?;
Some(&index.key_type)
}
}
Loading

1 comment on commit 6f428f3

@github-actions
Copy link

@github-actions github-actions bot commented on 6f428f3 Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Criterion benchmark results

Error when comparing benchmarks: Couldn't find AWS credentials in environment, credentials file, or IAM role.

Caused by:
Couldn't find AWS credentials in environment, credentials file, or IAM role.

Please sign in to comment.