From eadd10ef33d446063a2c207006278e42051e0f27 Mon Sep 17 00:00:00 2001 From: Noa Date: Thu, 26 Sep 2024 15:18:21 -0500 Subject: [PATCH] Fix btree_scan --- .../locking_tx_datastore/committed_state.rs | 19 +++-- .../datastore/locking_tx_datastore/mut_tx.rs | 71 +++++++++++++++---- .../locking_tx_datastore/tx_state.rs | 11 ++- 3 files changed, 77 insertions(+), 24 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index 72ee4804755..c909d95dd3c 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -2,7 +2,7 @@ use super::{ datastore::Result, sequence::{Sequence, SequencesState}, state_view::{Iter, IterByColRange, ScanIterByColRange, StateView}, - tx_state::{DeleteTable, IndexIdMap, TxState}, + tx_state::{DeleteTable, IndexIdMap, RemovedIndexIdSet, TxState}, }; use crate::{ db::{ @@ -30,8 +30,8 @@ use spacetimedb_lib::{ address::Address, db::auth::{StAccess, StTableType}, }; -use spacetimedb_primitives::{ColList, ColSet, IndexId, TableId}; -use spacetimedb_sats::{AlgebraicValue, ProductValue}; +use spacetimedb_primitives::{ColList, ColSet, TableId}; +use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductValue}; use spacetimedb_schema::schema::TableSchema; use spacetimedb_table::{ blob_store::{BlobStore, HashMapBlobStore}, @@ -469,7 +469,7 @@ impl CommittedState { self.merge_apply_inserts(&mut tx_data, tx_state.insert_tables, tx_state.blob_store); // Merge index id fast-lookup map changes. - self.merge_index_map(tx_state.index_id_map, &tx_state.index_id_map_removals); + self.merge_index_map(tx_state.index_id_map, tx_state.index_id_map_removals.as_deref()); // If the TX will be logged, record its projected tx offset, // then increment the counter. @@ -562,8 +562,8 @@ impl CommittedState { } } - fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: &[IndexId]) { - for index_id in index_id_map_removals { + fn merge_index_map(&mut self, index_id_map: IndexIdMap, index_id_map_removals: Option<&RemovedIndexIdSet>) { + for index_id in index_id_map_removals.into_iter().flatten() { self.index_id_map.remove(index_id); } self.index_id_map.extend(index_id_map); @@ -609,6 +609,13 @@ impl CommittedState { let blob_store = &mut self.blob_store; (table, blob_store) } + + /// Returns the table and index associated with the given `table_id` and `col_list`, if any. + pub(super) fn get_table_and_index_type(&self, table_id: TableId, col_list: &ColList) -> Option<&AlgebraicType> { + let table = self.tables.get(&table_id)?; + let index = table.indexes.get(col_list)?; + Some(&index.key_type) + } } pub struct CommittedIndexIter<'a> { diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index ef16e21cd91..d69b10619a2 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -488,7 +488,10 @@ impl MutTxId { } // Remove the `index_id -> (table_id, col_list)` association. idx_map.remove(&index_id); - self.tx_state.index_id_map_removals.push(index_id); + self.tx_state + .index_id_map_removals + .get_or_insert_with(Default::default) + .insert(index_id); log::trace!("INDEX DROPPED: {}", index_id); Ok(()) @@ -530,34 +533,54 @@ impl MutTxId { .map_err(IndexError::Decode)?; // Get an index seek iterator for the tx and committed state. - let tx_iter = self.tx_state.index_seek(table_id, col_list, &bounds).unwrap(); + let tx_iter = self.tx_state.index_seek(table_id, col_list, &bounds); let commit_iter = self.committed_state_write_lock.index_seek(table_id, col_list, &bounds); // Chain together the indexed rows in the tx and committed state, // but don't yield rows deleted in the tx state. - enum Choice { + use itertools::Either::{Left, Right}; + // this is gross, but nested `Either`s don't optimize + enum Choice { A(A), B(B), C(C), + D(D), + E(E), + F(F), } - impl, B: Iterator, C: Iterator> Iterator for Choice { + impl< + T, + A: Iterator, + B: Iterator, + C: Iterator, + D: Iterator, + E: Iterator, + F: Iterator, + > Iterator for Choice + { type Item = T; fn next(&mut self) -> Option { match self { Self::A(i) => i.next(), Self::B(i) => i.next(), Self::C(i) => i.next(), + Self::D(i) => i.next(), + Self::E(i) => i.next(), + Self::F(i) => i.next(), } } } - let iter = match commit_iter { - None => Choice::A(tx_iter), - Some(commit_iter) => match self.tx_state.delete_tables.get(&table_id) { - None => Choice::B(tx_iter.chain(commit_iter)), - Some(tx_dels) => { - Choice::C(tx_iter.chain(commit_iter.filter(move |row| !tx_dels.contains(&row.pointer())))) - } - }, + let commit_iter = commit_iter.map(|commit_iter| match self.tx_state.delete_tables.get(&table_id) { + None => Left(commit_iter), + Some(tx_dels) => Right(commit_iter.filter(move |row| !tx_dels.contains(&row.pointer()))), + }); + let iter = match (tx_iter, commit_iter) { + (None, None) => Choice::A(std::iter::empty()), + (Some(tx_iter), None) => Choice::B(tx_iter), + (None, Some(Left(commit_iter))) => Choice::C(commit_iter), + (None, Some(Right(commit_iter))) => Choice::D(commit_iter), + (Some(tx_iter), Some(Left(commit_iter))) => Choice::E(tx_iter.chain(commit_iter)), + (Some(tx_iter), Some(Right(commit_iter))) => Choice::F(tx_iter.chain(commit_iter)), }; Ok((table_id, iter)) } @@ -567,16 +590,34 @@ impl MutTxId { // The order of querying the committed vs. tx state for the translation is not important. // But it is vastly more likely that it is in the committed state, // so query that first to avoid two lookups. - let (table_id, col_list) = self + let &(table_id, ref col_list) = self .committed_state_write_lock .index_id_map .get(&index_id) .or_else(|| self.tx_state.index_id_map.get(&index_id))?; + // The tx state must have the index. // If the index was e.g., dropped from the tx state but exists physically in the committed state, // the index does not exist, semantically. - let key_ty = self.tx_state.get_table_and_index_type(*table_id, col_list)?; - Some((*table_id, col_list, key_ty)) + // TODO: handle the case where the table has been dropped in this transaction. + let key_ty = if let Some(key_ty) = self + .committed_state_write_lock + .get_table_and_index_type(table_id, col_list) + { + if self + .tx_state + .index_id_map_removals + .as_ref() + .is_some_and(|s| s.contains(&index_id)) + { + return None; + } + key_ty + } else { + self.tx_state.get_table_and_index_type(table_id, col_list)? + }; + + Some((table_id, col_list, key_ty)) } /// Decode the bounds for a btree scan for an index typed at `key_type`. diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs index b993bb3c088..5b04d76eed0 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs @@ -1,19 +1,20 @@ use core::ops::RangeBounds; -use spacetimedb_data_structures::map::IntMap; +use spacetimedb_data_structures::map::{IntMap, IntSet}; use spacetimedb_primitives::{ColList, IndexId, TableId}; use spacetimedb_sats::{AlgebraicType, AlgebraicValue}; use spacetimedb_table::{ blob_store::{BlobStore, HashMapBlobStore}, indexes::{RowPointer, SquashedOffset}, + static_assert_size, table::{IndexScanIter, RowRef, Table}, }; use std::collections::{btree_map, BTreeMap, BTreeSet}; -use thin_vec::ThinVec; pub(super) type DeleteTable = BTreeSet; /// A mapping to find the actual index given an `IndexId`. pub(super) type IndexIdMap = IntMap; +pub(super) type RemovedIndexIdSet = IntSet; /// `TxState` tracks all of the modifications made during a particular transaction. /// Rows inserted during a transaction will be added to insert_tables, and similarly, @@ -71,9 +72,13 @@ pub(super) struct TxState { pub(super) index_id_map: IndexIdMap, /// Lists all the `IndexId` that are to be removed from `CommittedState::index_id_map`. - pub(super) index_id_map_removals: ThinVec, + // This is in an `Option>` to reduce the size of `TxState` - it's very uncommon + // that this would be created. + pub(super) index_id_map_removals: Option>, } +static_assert_size!(TxState, 120); + impl TxState { /// Returns the row count in insert tables /// and the number of rows deleted from committed state.