Skip to content

Fix btree_scan #1748

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{
datastore::Result,
sequence::{Sequence, SequencesState},
state_view::{Iter, IterByColRange, ScanIterByColRange, StateView},
tx_state::{DeleteTable, IndexIdMap, TxState},
tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState},
};
use crate::{
db::{
Expand Down Expand Up @@ -30,8 +30,8 @@ use spacetimedb_lib::{
address::Address,
db::auth::{StAccess, StTableType},
};
use spacetimedb_primitives::{ColList, ColSet, IndexId, TableId};
use spacetimedb_sats::{AlgebraicValue, ProductValue};
use spacetimedb_primitives::{ColList, ColSet, TableId};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductValue};
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
Expand Down Expand Up @@ -469,7 +469,7 @@ impl CommittedState {
self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store);

// Merge index id fast-lookup map changes.
self.merge_index_map(tx_state.index_id_map, &tx_state.index_id_map_removals);
self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref());

// If the TX will be logged, record its projected tx offset,
// then increment the counter.
Expand Down Expand Up @@ -562,8 +562,8 @@ impl CommittedState {
}
}

fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: &[IndexId]) {
for index_id in index_id_map_removals {
fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: Option<&RemovedIndexIdSet>) {
for index_id in index_id_map_removals.into_iter().flatten() {
self.index_id_map.remove(index_id);
}
self.index_id_map.extend(index_id_map);
Expand Down Expand Up @@ -609,6 +609,13 @@ impl CommittedState {
let blob_store = &mut self.blob_store;
(table, blob_store)
}

/// Returns the table and index associated with the given `table_id` and `col_list`, if any.
pub(super) fn get_table_and_index_type(&self, table_id: TableId, col_list: &ColList) -> Option<&AlgebraicType> {
let table = self.tables.get(&table_id)?;
let index = table.indexes.get(col_list)?;
Some(&index.key_type)
}
}

pub struct CommittedIndexIter<'a> {
Expand Down
71 changes: 56 additions & 15 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,10 @@ impl MutTxId {
}
// Remove the `index_id -> (table_id, col_list)` association.
idx_map.remove(&index_id);
self.tx_state.index_id_map_removals.push(index_id);
self.tx_state
.index_id_map_removals
.get_or_insert_with(Default::default)
.insert(index_id);

log::trace!("INDEX DROPPED: {}", index_id);
Ok(())
Expand Down Expand Up @@ -530,34 +533,54 @@ impl MutTxId {
.map_err(IndexError::Decode)?;

// Get an index seek iterator for the tx and committed state.
let tx_iter = self.tx_state.index_seek(table_id, col_list, &bounds).unwrap();
let tx_iter = self.tx_state.index_seek(table_id, col_list, &bounds);
let commit_iter = self.committed_state_write_lock.index_seek(table_id, col_list, &bounds);

// Chain together the indexed rows in the tx and committed state,
// but don't yield rows deleted in the tx state.
enum Choice<A, B, C> {
use itertools::Either::{Left, Right};
// this is gross, but nested `Either`s don't optimize
enum Choice<A, B, C, D, E, F> {
A(A),
B(B),
C(C),
D(D),
E(E),
F(F),
}
impl<T, A: Iterator<Item = T>, B: Iterator<Item = T>, C: Iterator<Item = T>> Iterator for Choice<A, B, C> {
impl<
T,
A: Iterator<Item = T>,
B: Iterator<Item = T>,
C: Iterator<Item = T>,
D: Iterator<Item = T>,
E: Iterator<Item = T>,
F: Iterator<Item = T>,
> Iterator for Choice<A, B, C, D, E, F>
{
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
match self {
Self::A(i) => i.next(),
Self::B(i) => i.next(),
Self::C(i) => i.next(),
Self::D(i) => i.next(),
Self::E(i) => i.next(),
Self::F(i) => i.next(),
}
}
}
let iter = match commit_iter {
None => Choice::A(tx_iter),
Some(commit_iter) => match self.tx_state.delete_tables.get(&table_id) {
None => Choice::B(tx_iter.chain(commit_iter)),
Some(tx_dels) => {
Choice::C(tx_iter.chain(commit_iter.filter(move |row| !tx_dels.contains(&row.pointer()))))
}
},
let commit_iter = commit_iter.map(|commit_iter| match self.tx_state.delete_tables.get(&table_id) {
None => Left(commit_iter),
Some(tx_dels) => Right(commit_iter.filter(move |row| !tx_dels.contains(&row.pointer()))),
});
let iter = match (tx_iter, commit_iter) {
(None, None) => Choice::A(std::iter::empty()),
(Some(tx_iter), None) => Choice::B(tx_iter),
(None, Some(Left(commit_iter))) => Choice::C(commit_iter),
(None, Some(Right(commit_iter))) => Choice::D(commit_iter),
(Some(tx_iter), Some(Left(commit_iter))) => Choice::E(tx_iter.chain(commit_iter)),
(Some(tx_iter), Some(Right(commit_iter))) => Choice::F(tx_iter.chain(commit_iter)),
};
Ok((table_id, iter))
}
Expand All @@ -567,16 +590,34 @@ impl MutTxId {
// The order of querying the committed vs. tx state for the translation is not important.
// But it is vastly more likely that it is in the committed state,
// so query that first to avoid two lookups.
let (table_id, col_list) = self
let &(table_id, ref col_list) = self
.committed_state_write_lock
.index_id_map
.get(&index_id)
.or_else(|| self.tx_state.index_id_map.get(&index_id))?;

// The tx state must have the index.
// If the index was e.g., dropped from the tx state but exists physically in the committed state,
// the index does not exist, semantically.
let key_ty = self.tx_state.get_table_and_index_type(*table_id, col_list)?;
Some((*table_id, col_list, key_ty))
// TODO: handle the case where the table has been dropped in this transaction.
let key_ty = if let Some(key_ty) = self
.committed_state_write_lock
.get_table_and_index_type(table_id, col_list)
{
if self
.tx_state
.index_id_map_removals
.as_ref()
.is_some_and(|s| s.contains(&index_id))
{
return None;
}
key_ty
} else {
self.tx_state.get_table_and_index_type(table_id, col_list)?
};

Some((table_id, col_list, key_ty))
}

/// Decode the bounds for a btree scan for an index typed at `key_type`.
Expand Down
11 changes: 8 additions & 3 deletions crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use core::ops::RangeBounds;
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_data_structures::map::{IntMap, IntSet};
use spacetimedb_primitives::{ColList, IndexId, TableId};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue};
use spacetimedb_table::{
blob_store::{BlobStore, HashMapBlobStore},
indexes::{RowPointer, SquashedOffset},
static_assert_size,
table::{IndexScanIter, RowRef, Table},
};
use std::collections::{btree_map, BTreeMap, BTreeSet};
use thin_vec::ThinVec;

pub(super) type DeleteTable = BTreeSet<RowPointer>;

/// A mapping to find the actual index given an `IndexId`.
pub(super) type IndexIdMap = IntMap<IndexId, (TableId, ColList)>;
pub(super) type RemovedIndexIdSet = IntSet<IndexId>;

/// `TxState` tracks all of the modifications made during a particular transaction.
/// Rows inserted during a transaction will be added to insert_tables, and similarly,
Expand Down Expand Up @@ -71,9 +72,13 @@ pub(super) struct TxState {
pub(super) index_id_map: IndexIdMap,

/// Lists all the `IndexId` that are to be removed from `CommittedState::index_id_map`.
pub(super) index_id_map_removals: ThinVec<IndexId>,
// This is in an `Option<Box<>>` to reduce the size of `TxState` - it's very uncommon
// that this would be created.
pub(super) index_id_map_removals: Option<Box<RemovedIndexIdSet>>,
}

static_assert_size!(TxState, 120);

impl TxState {
/// Returns the row count in insert tables
/// and the number of rows deleted from committed state.
Expand Down
Loading