From ee308346f867c6988f033692884418dff9461889 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 10 Dec 2024 23:57:37 +0100 Subject: [PATCH 1/9] add temporary write_row_to_pages_bsatn that does BSATN -> PV -> BFLATN --- crates/table/src/bflatn_to.rs | 30 ++++++- crates/table/src/layout.rs | 150 +++++++++++++++++++++++++++++++++- 2 files changed, 178 insertions(+), 2 deletions(-) diff --git a/crates/table/src/bflatn_to.rs b/crates/table/src/bflatn_to.rs index 52996567776..7c7cc867203 100644 --- a/crates/table/src/bflatn_to.rs +++ b/crates/table/src/bflatn_to.rs @@ -16,12 +16,17 @@ use super::{ var_len::{VarLenGranule, VarLenMembers, VarLenRef}, }; use spacetimedb_sats::{ - bsatn::to_writer, buffer::BufWriter, i256, u256, AlgebraicType, AlgebraicValue, ProductValue, SumValue, + bsatn::{self, to_writer, DecodeError}, + buffer::BufWriter, + de::DeserializeSeed as _, + i256, u256, AlgebraicType, AlgebraicValue, ProductValue, SumValue, }; use thiserror::Error; #[derive(Error, Debug)] pub enum Error { + #[error(transparent)] + Decode(#[from] DecodeError), #[error("Expected a value of type {0:?}, but found {1:?}")] WrongType(AlgebraicType, AlgebraicValue), #[error(transparent)] @@ -30,6 +35,29 @@ pub enum Error { PagesError(#[from] super::pages::Error), } +/// Writes `row` typed at `ty` to `pages` +/// using `blob_store` as needed to write large blobs. +/// +/// Panics if `val` is not of type `ty`. +/// +/// # Safety +/// +/// `pages` must be specialized to store rows of `ty`. +/// This includes that its `visitor` must be prepared to visit var-len members within `ty`, +/// and must do so in the same order as a `VarLenVisitorProgram` for `ty` would, +/// i.e. by monotonically increasing offsets. +pub unsafe fn write_row_to_pages_bsatn( + pages: &mut Pages, + visitor: &impl VarLenMembers, + blob_store: &mut dyn BlobStore, + ty: &RowTypeLayout, + mut bytes: &[u8], + squashed_offset: SquashedOffset, +) -> Result<(RowPointer, BlobNumBytes), Error> { + let val = ty.product().deserialize(bsatn::Deserializer::new(&mut bytes))?; + unsafe { write_row_to_pages(pages, visitor, blob_store, ty, &val, squashed_offset) } +} + /// Writes `row` typed at `ty` to `pages` /// using `blob_store` as needed to write large blobs. /// diff --git a/crates/table/src/layout.rs b/crates/table/src/layout.rs index f63a66d9b37..f1ee56083f5 100644 --- a/crates/table/src/layout.rs +++ b/crates/table/src/layout.rs @@ -16,7 +16,15 @@ use core::mem; use core::ops::Index; use enum_as_inner::EnumAsInner; use spacetimedb_sats::{ - bsatn, AlgebraicType, AlgebraicValue, ProductType, ProductTypeElement, ProductValue, SumType, SumTypeVariant, + bsatn, + de::{ + Deserialize, DeserializeSeed, Deserializer, Error, NamedProductAccess, ProductVisitor, SeqProductAccess, + SumAccess, SumVisitor, ValidNames, VariantAccess as _, VariantVisitor, + }, + i256, + sum_type::{OPTION_NONE_TAG, OPTION_SOME_TAG}, + u256, AlgebraicType, AlgebraicValue, ProductType, ProductTypeElement, ProductValue, SumType, SumTypeVariant, + SumValue, WithTypespace, }; pub use spacetimedb_schema::type_for_generate::PrimitiveType; @@ -602,6 +610,16 @@ impl SumTypeVariantLayout { name: self.name.clone(), } } + + /// Returns whether the variant has the given name. + pub fn has_name(&self, name: &str) -> bool { + self.name.as_deref() == Some(name) + } + + /// Returns whether this is a unit variant. + pub fn is_unit(&self) -> bool { + self.ty.as_product().is_some_and(|ty| ty.elements.is_empty()) + } } // # Inspecting layout @@ -672,6 +690,136 @@ pub fn bsatn_len(val: &AlgebraicValue) -> usize { bsatn::to_len(val).unwrap() } +impl<'de> DeserializeSeed<'de> for &AlgebraicTypeLayout { + type Output = AlgebraicValue; + + fn deserialize>(self, de: D) -> Result { + match self { + AlgebraicTypeLayout::Sum(ty) => ty.deserialize(de).map(Into::into), + AlgebraicTypeLayout::Product(ty) => ty.deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::Bool) => bool::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::I8) => i8::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::U8) => u8::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::I16) => i16::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::U16) => u16::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::I32) => i32::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::U32) => u32::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::I64) => i64::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::U64) => u64::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::I128) => i128::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::U128) => u128::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::I256) => i256::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::U256) => u256::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::F32) => f32::deserialize(de).map(Into::into), + AlgebraicTypeLayout::Primitive(PrimitiveType::F64) => f64::deserialize(de).map(Into::into), + AlgebraicTypeLayout::VarLen(VarLenType::Array(ty)) => WithTypespace::empty(&**ty).deserialize(de), + AlgebraicTypeLayout::VarLen(VarLenType::String) => >::deserialize(de).map(Into::into), + /* + AlgebraicType::Sum(sum) => self.with(sum).deserialize(de).map(Into::into), + */ + } + } +} + +impl<'de> DeserializeSeed<'de> for &ProductTypeLayout { + type Output = ProductValue; + + fn deserialize>(self, de: D) -> Result { + de.deserialize_product(self) + } +} + +impl<'de> ProductVisitor<'de> for &ProductTypeLayout { + type Output = ProductValue; + + fn product_name(&self) -> Option<&str> { + None + } + fn product_len(&self) -> usize { + self.elements.len() + } + + fn visit_seq_product>(self, mut tup: A) -> Result { + let mut elems: Vec = Vec::with_capacity(self.product_len()); + for (i, elem_ty) in self.elements.iter().enumerate() { + let Some(elem_val) = tup.next_element_seed(&elem_ty.ty)? else { + return Err(A::Error::invalid_product_length(i, &self)); + }; + elems.push(elem_val); + } + Ok(elems.into()) + } + + fn visit_named_product>(self, _: A) -> Result { + unreachable!() + } +} + +impl<'de> DeserializeSeed<'de> for &SumTypeLayout { + type Output = SumValue; + + fn deserialize>(self, deserializer: D) -> Result { + deserializer.deserialize_sum(self) + } +} + +impl<'de> SumVisitor<'de> for &SumTypeLayout { + type Output = SumValue; + + fn sum_name(&self) -> Option<&str> { + None + } + + fn is_option(&self) -> bool { + match &*self.variants { + [first, second] + if second.is_unit() // Done first to avoid pointer indirection when it doesn't matter. + && first.has_name(OPTION_SOME_TAG) + && second.has_name(OPTION_NONE_TAG) => + { + true + } + _ => false, + } + } + + fn visit_sum>(self, data: A) -> Result { + let (tag, data) = data.variant(self)?; + // Find the variant type by `tag`. + let variant_ty = &self.variants[tag as usize].ty; + + let value = Box::new(data.deserialize_seed(variant_ty)?); + Ok(SumValue { tag, value }) + } +} + +impl VariantVisitor for &SumTypeLayout { + type Output = u8; + + fn variant_names(&self, names: &mut dyn ValidNames) { + // Provide the names known from the `SumType`. + names.extend(self.variants.iter().filter_map(|v| v.name.as_deref())); + } + + fn visit_tag(self, tag: u8) -> Result { + // Verify that tag identifies a valid variant in `SumType`. + self.variants + .get(tag as usize) + .ok_or_else(|| E::unknown_variant_tag(tag, &self))?; + + Ok(tag) + } + + fn visit_name(self, name: &str) -> Result { + // Translate the variant `name` to its tag. + self.variants + .iter() + .position(|var| var.has_name(name)) + .map(|pos| pos as u8) + .ok_or_else(|| E::unknown_variant_name(name, &self)) + } +} + #[cfg(test)] mod test { use super::*; From 22c75fc63d65c52de093ba63a5860346e393438d Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 10 Dec 2024 23:58:15 +0100 Subject: [PATCH 2/9] extract `Table::insert_into_pointer_map` --- crates/table/src/table.rs | 117 ++++++++++++++++++++++---------------- 1 file changed, 67 insertions(+), 50 deletions(-) diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index c0eb59e49c2..bbdd4c9a032 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -250,6 +250,56 @@ impl Table { Ok((hash, row_ref)) } + /// Insert a `row` into this table. + /// NOTE: This method skips index updating. Use `insert` to insert a row with index updating. + pub fn insert_internal( + &mut self, + blob_store: &mut dyn BlobStore, + row: &ProductValue, + ) -> Result<(RowHash, RowPointer), InsertError> { + // Optimistically insert the `row` before checking for set-semantic collisions, + // under the assumption that set-semantic collisions are rare. + let (row_ref, blob_bytes) = self.insert_internal_allow_duplicate(blob_store, row)?; + + let hash = row_ref.row_hash(); + let ptr = row_ref.pointer(); + // SAFETY: `ptr` was derived from `row_ref` without interleaving calls, so it must be valid. + unsafe { self.insert_into_pointer_map(blob_store, ptr, hash) }?; + + self.row_count += 1; + self.blob_store_bytes += blob_bytes; + + Ok((hash, ptr)) + } + + /// Physically inserts `row` into the page + /// without inserting it logically into the pointer map. + /// + /// This is useful when we need to insert a row temporarily to get back a `RowPointer`. + /// A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`]. + pub fn insert_internal_allow_duplicate<'a>( + &'a mut self, + blob_store: &'a mut dyn BlobStore, + row: &ProductValue, + ) -> Result<(RowRef<'a>, BlobNumBytes), InsertError> { + // SAFETY: `self.pages` is known to be specialized for `self.row_layout`, + // as `self.pages` was constructed from `self.row_layout` in `Table::new`. + let (ptr, blob_bytes) = unsafe { + write_row_to_pages( + &mut self.inner.pages, + &self.inner.visitor_prog, + blob_store, + &self.inner.row_layout, + row, + self.squashed_offset, + ) + }?; + // SAFETY: We just inserted `ptr`, so it must be present. + let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) }; + + Ok((row_ref, blob_bytes)) + } + /// Insert row identified by `ptr` into indices. /// This also checks unique constraints. /// Deletes the row if there were any violations. @@ -284,31 +334,28 @@ impl Table { Ok(()) } - /// Insert a `row` into this table. - /// NOTE: This method skips index updating. Use `insert` to insert a row with index updating. - pub fn insert_internal( - &mut self, - blob_store: &mut dyn BlobStore, - row: &ProductValue, - ) -> Result<(RowHash, RowPointer), InsertError> { - // Optimistically insert the `row` before checking for set-semantic collisions, - // under the assumption that set-semantic collisions are rare. - let (row_ref, blob_bytes) = self.insert_internal_allow_duplicate(blob_store, row)?; - - // Ensure row isn't already there. - // SAFETY: We just inserted `ptr`, so we know it's valid. - let hash = row_ref.row_hash(); - // Safety: - // We just inserted `ptr` and computed `hash`, so they're valid. - // `self` trivially has the same `row_layout` as `self`. - let ptr = row_ref.pointer(); + /// Insert row identified by `ptr` into the pointer map. + /// This checks for set semantic violations. + /// Deletes the row and returns an error if there were any violations. + /// + /// SAFETY: `self.is_row_present(row)` must hold. + /// Post-condition: If this method returns `Ok(_)`, the row still exists. + unsafe fn insert_into_pointer_map<'a>( + &'a mut self, + blob_store: &'a mut dyn BlobStore, + ptr: RowPointer, + hash: RowHash, + ) -> Result<(), InsertError> { + // SAFETY: + // - `self` trivially has the same `row_layout` as `self`. + // - Caller promised that `ptr` is a valid row in `self`. let existing_row = unsafe { Self::find_same_row(self, self, ptr, hash) }; if let Some(existing_row) = existing_row { // If an equal row was already present, // roll back our optimistic insert to avoid violating set semantics. - // SAFETY: we just inserted `ptr`, so it must be valid. + // SAFETY: Caller promised that `ptr` is a valid row in `self`. unsafe { self.inner .pages @@ -316,43 +363,13 @@ impl Table { }; return Err(InsertError::Duplicate(existing_row)); } - self.row_count += 1; - self.blob_store_bytes += blob_bytes; // If the optimistic insertion was correct, // i.e. this is not a set-semantic duplicate, // add it to the `pointer_map`. self.pointer_map.insert(hash, ptr); - Ok((hash, ptr)) - } - - /// Physically inserts `row` into the page - /// without inserting it logically into the pointer map. - /// - /// This is useful when we need to insert a row temporarily to get back a `RowPointer`. - /// A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`]. - pub fn insert_internal_allow_duplicate<'a>( - &'a mut self, - blob_store: &'a mut dyn BlobStore, - row: &ProductValue, - ) -> Result<(RowRef<'a>, BlobNumBytes), InsertError> { - // SAFETY: `self.pages` is known to be specialized for `self.row_layout`, - // as `self.pages` was constructed from `self.row_layout` in `Table::new`. - let (ptr, blob_bytes) = unsafe { - write_row_to_pages( - &mut self.inner.pages, - &self.inner.visitor_prog, - blob_store, - &self.inner.row_layout, - row, - self.squashed_offset, - ) - }?; - // SAFETY: We just inserted `ptr`, so it must be present. - let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) }; - - Ok((row_ref, blob_bytes)) + Ok(()) } /// Finds the [`RowPointer`] to the row in `committed_table` From 886d904d04cc98d03c588c8cf7be9054b226893c Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Wed, 18 Dec 2024 18:13:12 +0100 Subject: [PATCH 3/9] datastore: insert via bsatn instead of PV --- crates/bench/benches/subscription.rs | 14 +- crates/bench/src/spacetime_raw.rs | 12 +- .../locking_tx_datastore/datastore.rs | 76 +++--- .../datastore/locking_tx_datastore/mut_tx.rs | 241 ++++++++++-------- .../locking_tx_datastore/tx_state.rs | 19 ++ crates/core/src/db/datastore/system_tables.rs | 13 +- crates/core/src/db/datastore/traits.rs | 17 +- crates/core/src/db/relational_db.rs | 94 ++++--- crates/core/src/estimation.rs | 6 +- crates/core/src/host/instance_env.rs | 25 +- crates/core/src/host/module_host.rs | 2 +- .../src/host/wasm_common/module_host_actor.rs | 4 +- .../src/host/wasmtime/wasm_instance_env.rs | 14 +- crates/core/src/sql/compiler.rs | 4 +- crates/core/src/sql/execute.rs | 12 +- .../subscription/module_subscription_actor.rs | 6 +- crates/core/src/subscription/query.rs | 10 +- crates/core/src/util/slow.rs | 4 +- crates/core/src/vm.rs | 15 +- crates/sats/src/algebraic_value.rs | 22 -- crates/table/src/layout.rs | 3 - crates/table/src/page.rs | 21 ++ crates/table/src/pages.rs | 1 - crates/table/src/read_column.rs | 1 + crates/table/src/static_bsatn_validator.rs | 1 + crates/table/src/table.rs | 237 ++++++++++++++--- 26 files changed, 559 insertions(+), 315 deletions(-) diff --git a/crates/bench/benches/subscription.rs b/crates/bench/benches/subscription.rs index 8dcc56d51a5..ae26a40a114 100644 --- a/crates/bench/benches/subscription.rs +++ b/crates/bench/benches/subscription.rs @@ -10,7 +10,7 @@ use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compress use spacetimedb_bench::database::BenchDatabase as _; use spacetimedb_bench::spacetime_raw::SpacetimeRaw; use spacetimedb_primitives::{col_list, TableId}; -use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductValue}; +use spacetimedb_sats::{bsatn, product, AlgebraicType, AlgebraicValue, ProductValue}; fn create_table_location(db: &RelationalDB) -> Result { let schema = &[ @@ -57,11 +57,15 @@ fn eval(c: &mut Criterion) { .db .with_auto_commit(Workload::Internal, |tx| -> Result<(), DBError> { // 1M rows + let mut scratch = Vec::new(); for entity_id in 0u64..1_000_000 { let owner = entity_id % 1_000; let footprint = AlgebraicValue::sum(entity_id as u8 % 4, AlgebraicValue::unit()); let row = product!(entity_id, owner, footprint); - let _ = raw.db.insert(tx, lhs, row)?; + + scratch.clear(); + bsatn::to_writer(&mut scratch, &row).unwrap(); + let _ = raw.db.insert(tx, lhs, &scratch)?; } Ok(()) }); @@ -70,6 +74,7 @@ fn eval(c: &mut Criterion) { .db .with_auto_commit(Workload::Internal, |tx| -> Result<(), DBError> { // 1000 chunks, 1200 rows per chunk = 1.2M rows + let mut scratch = Vec::new(); for chunk_index in 0u64..1_000 { for i in 0u64..1200 { let entity_id = chunk_index * 1200 + i; @@ -77,7 +82,10 @@ fn eval(c: &mut Criterion) { let z = entity_id as i32; let dimension = 0u32; let row = product!(entity_id, chunk_index, x, z, dimension); - let _ = raw.db.insert(tx, rhs, row)?; + + scratch.clear(); + bsatn::to_writer(&mut scratch, &row).unwrap(); + let _ = raw.db.insert(tx, rhs, &scratch)?; } } Ok(()) diff --git a/crates/bench/src/spacetime_raw.rs b/crates/bench/src/spacetime_raw.rs index 2b63dbf2a2e..3de9bab3f09 100644 --- a/crates/bench/src/spacetime_raw.rs +++ b/crates/bench/src/spacetime_raw.rs @@ -5,8 +5,8 @@ use crate::{ }; use spacetimedb::db::relational_db::{tests_utils::TestDB, RelationalDB}; use spacetimedb::execution_context::Workload; -use spacetimedb_lib::sats::AlgebraicValue; use spacetimedb_primitives::{ColId, IndexId, TableId}; +use spacetimedb_sats::{bsatn, AlgebraicValue}; use spacetimedb_schema::{ def::{BTreeAlgorithm, IndexAlgorithm}, schema::{IndexSchema, TableSchema}, @@ -102,8 +102,11 @@ impl BenchDatabase for SpacetimeRaw { fn insert_bulk(&mut self, table_id: &Self::TableId, rows: Vec) -> ResultBench<()> { self.db.with_auto_commit(Workload::Internal, |tx| { + let mut scratch = Vec::new(); for row in rows { - self.db.insert(tx, *table_id, row.into_product_value())?; + scratch.clear(); + bsatn::to_writer(&mut scratch, &row.into_product_value()).unwrap(); + self.db.insert(tx, *table_id, &scratch)?; } Ok(()) }) @@ -119,6 +122,7 @@ impl BenchDatabase for SpacetimeRaw { .collect::>(); assert_eq!(rows.len(), row_count as usize, "not enough rows found for update_bulk!"); + let mut scratch = Vec::new(); for mut row in rows { // It would likely be faster to collect a vector of IDs and delete + insert them all at once, // but this implementation is closer to how `update` works in modules. @@ -143,7 +147,9 @@ impl BenchDatabase for SpacetimeRaw { panic!("column 1 is not a u64!"); } - self.db.insert(tx, *table_id, row)?; + scratch.clear(); + bsatn::to_writer(&mut scratch, &row).unwrap(); + self.db.insert(tx, *table_id, &scratch)?; } Ok(()) }) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index 847572d5a1c..e095479ea7a 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -478,10 +478,6 @@ impl MutTxDatastore for Locking { tx.index_id_from_name(index_name) } - fn get_next_sequence_value_mut_tx(&self, tx: &mut Self::MutTx, seq_id: SequenceId) -> Result { - tx.get_next_sequence_value(seq_id) - } - fn create_sequence_mut_tx(&self, tx: &mut Self::MutTx, sequence_schema: SequenceSchema) -> Result { tx.create_sequence(sequence_schema) } @@ -572,9 +568,9 @@ impl MutTxDatastore for Locking { &'a self, tx: &'a mut Self::MutTx, table_id: TableId, - mut row: ProductValue, - ) -> Result<(AlgebraicValue, RowRef<'a>)> { - let (gens, row_ref) = tx.insert(table_id, &mut row)?; + row: &[u8], + ) -> Result<(ColList, RowRef<'a>)> { + let (gens, row_ref) = tx.insert::(table_id, row)?; Ok((gens, row_ref.collapse())) } @@ -599,7 +595,7 @@ impl MutTxDatastore for Locking { row.program_bytes = program.bytes; tx.delete(ST_MODULE_ID, ptr)?; - tx.insert(ST_MODULE_ID, &mut row.into()).map(drop) + tx.insert_via_serialize_bsatn(ST_MODULE_ID, &row).map(drop) } None => Err(anyhow!( @@ -986,6 +982,7 @@ mod tests { use crate::db::datastore::traits::{IsolationLevel, MutTx}; use crate::db::datastore::Result; use crate::error::{DBError, IndexError}; + use bsatn::to_vec; use itertools::Itertools; use pretty_assertions::assert_eq; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -1354,6 +1351,18 @@ mod tests { .collect() } + fn insert<'a>( + datastore: &'a Locking, + tx: &'a mut MutTxId, + table_id: TableId, + row: &ProductValue, + ) -> Result<(AlgebraicValue, RowRef<'a>)> { + let row = to_vec(&row).unwrap(); + let (gen_cols, row_ref) = datastore.insert_mut_tx(tx, table_id, &row)?; + let gen_cols = row_ref.project(&gen_cols)?; + Ok((gen_cols, row_ref)) + } + #[test] fn test_bootstrapping_sets_up_tables() -> ResultTest<()> { let datastore = get_datastore()?; @@ -1693,7 +1702,7 @@ mod tests { fn test_insert_pre_commit() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; #[rustfmt::skip] assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(1, "Foo", 18)]); Ok(()) @@ -1703,7 +1712,7 @@ mod tests { fn test_insert_wrong_schema_pre_commit() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = product!(0, "Foo"); - assert!(datastore.insert_mut_tx(&mut tx, table_id, row).is_err()); + assert!(insert(&datastore, &mut tx, table_id, &row).is_err()); #[rustfmt::skip] assert_eq!(all_rows(&datastore, &tx, table_id), vec![]); Ok(()) @@ -1713,7 +1722,7 @@ mod tests { fn test_insert_post_commit() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, u32_str_u32(0, "Foo", 18))?; + insert(&datastore, &mut tx, table_id, &u32_str_u32(0, "Foo", 18))?; datastore.commit_mut_tx(tx)?; let tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); #[rustfmt::skip] @@ -1727,7 +1736,7 @@ mod tests { let row = u32_str_u32(15, "Foo", 18); // 15 is ignored. datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.rollback_mut_tx(tx); let tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); #[rustfmt::skip] @@ -1739,7 +1748,7 @@ mod tests { fn test_insert_commit_delete_insert() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let created_row = u32_str_u32(1, "Foo", 18); @@ -1747,7 +1756,7 @@ mod tests { assert_eq!(num_deleted, 1); assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0); let created_row = u32_str_u32(1, "Foo", 19); - datastore.insert_mut_tx(&mut tx, table_id, created_row)?; + insert(&datastore, &mut tx, table_id, &created_row)?; #[rustfmt::skip] assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(1, "Foo", 19)]); Ok(()) @@ -1757,7 +1766,7 @@ mod tests { fn test_insert_delete_insert_delete_insert() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(1, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row.clone())?; + insert(&datastore, &mut tx, table_id, &row)?; for i in 0..2 { assert_eq!( all_rows(&datastore, &tx, table_id), @@ -1774,7 +1783,7 @@ mod tests { &[], "Found rows present after deleting", ); - datastore.insert_mut_tx(&mut tx, table_id, row.clone())?; + insert(&datastore, &mut tx, table_id, &row)?; assert_eq!( all_rows(&datastore, &tx, table_id), vec![row.clone()], @@ -1788,8 +1797,8 @@ mod tests { fn test_unique_constraint_pre_commit() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row.clone())?; - let result = datastore.insert_mut_tx(&mut tx, table_id, row); + insert(&datastore, &mut tx, table_id, &row)?; + let result = insert(&datastore, &mut tx, table_id, &row); match result { Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation { constraint_name: _, @@ -1808,10 +1817,10 @@ mod tests { fn test_unique_constraint_post_commit() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row.clone())?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - let result = datastore.insert_mut_tx(&mut tx, table_id, row); + let result = insert(&datastore, &mut tx, table_id, &row); match result { Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation { constraint_name: _, @@ -1832,10 +1841,10 @@ mod tests { datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row.clone())?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.rollback_mut_tx(tx); let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; #[rustfmt::skip] assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(2, "Foo", 18)]); Ok(()) @@ -1848,7 +1857,7 @@ mod tests { let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); @@ -1883,7 +1892,7 @@ mod tests { IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", }, ].map(Into::into)); let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored. - let result = datastore.insert_mut_tx(&mut tx, table_id, row); + let result = insert(&datastore, &mut tx, table_id, &row); match result { Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation { constraint_name: _, @@ -1902,7 +1911,7 @@ mod tests { fn test_create_index_post_commit() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let index_def = IndexSchema { @@ -1937,7 +1946,7 @@ mod tests { IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", }, ].map(Into::into)); let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored. - let result = datastore.insert_mut_tx(&mut tx, table_id, row); + let result = insert(&datastore, &mut tx, table_id, &row); match result { Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation { constraint_name: _, @@ -1956,7 +1965,7 @@ mod tests { fn test_create_index_post_rollback() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let index_def = IndexSchema { @@ -1991,7 +2000,7 @@ mod tests { IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", }, ].map(Into::into)); let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; #[rustfmt::skip] assert_eq!(all_rows(&datastore, &tx, table_id), vec![ u32_str_u32(1, "Foo", 18), @@ -2008,7 +2017,7 @@ mod tests { let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. // Because of auto_inc columns, we will get a slightly different // value than the one we inserted. - let row = datastore.insert_mut_tx(&mut tx, table_id, row)?.1.to_product_value(); + let row = insert(&datastore, &mut tx, table_id, &row)?.1.to_product_value(); datastore.commit_mut_tx(tx)?; let all_rows_col_0_eq_1 = |tx: &MutTxId| { @@ -2033,10 +2042,7 @@ mod tests { assert_eq!(all_rows_col_0_eq_1(&tx).len(), 0); // Reinsert the row. - let reinserted_row = datastore - .insert_mut_tx(&mut tx, table_id, row.clone())? - .1 - .to_product_value(); + let reinserted_row = insert(&datastore, &mut tx, table_id, &row)?.1.to_product_value(); assert_eq!(reinserted_row, row); // The actual test: we should be able to iterate again, while still in the @@ -2054,9 +2060,9 @@ mod tests { fn test_read_only_tx_shared_lock() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row1 = u32_str_u32(1, "Foo", 18); - datastore.insert_mut_tx(&mut tx, table_id, row1.clone())?; + insert(&datastore, &mut tx, table_id, &row1)?; let row2 = u32_str_u32(2, "Bar", 20); - datastore.insert_mut_tx(&mut tx, table_id, row2.clone())?; + insert(&datastore, &mut tx, table_id, &row2)?; datastore.commit_mut_tx(tx)?; // create multiple read only tx, and use them together. diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index 4dfd0916a66..90102884648 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -11,15 +11,13 @@ use crate::db::datastore::locking_tx_datastore::committed_state::CommittedIndexI use crate::db::datastore::locking_tx_datastore::state_view::{ IndexSeekIterIdWithDeletedMutTx, IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, }; -use crate::db::datastore::system_tables::{StRowLevelSecurityFields, StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; -use crate::db::datastore::{ - system_tables::{ - StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StFields as _, StIndexFields, StIndexRow, - StScheduledFields, StScheduledRow, StSequenceFields, StSequenceRow, StTableFields, StTableRow, SystemTable, - ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID, - }, - traits::{RowTypeForTable, TxData}, +use crate::db::datastore::system_tables::{ + with_sys_table_buf, StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StFields as _, StIndexFields, + StIndexRow, StRowLevelSecurityFields, StRowLevelSecurityRow, StScheduledFields, StScheduledRow, StSequenceFields, + StSequenceRow, StTableFields, StTableRow, SystemTable, ST_COLUMN_ID, ST_CONSTRAINT_ID, ST_INDEX_ID, + ST_ROW_LEVEL_SECURITY_ID, ST_SCHEDULED_ID, ST_SEQUENCE_ID, ST_TABLE_ID, }; +use crate::db::datastore::traits::{RowTypeForTable, TxData}; use crate::execution_context::Workload; use crate::{ error::{IndexError, SequenceError, TableError}, @@ -29,15 +27,12 @@ use core::ops::RangeBounds; use core::{iter, ops::Bound}; use smallvec::SmallVec; use spacetimedb_lib::db::raw_def::v9::RawSql; -use spacetimedb_lib::{ - bsatn::Deserializer, - db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP}, - de::DeserializeSeed, -}; +use spacetimedb_lib::db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP}; use spacetimedb_primitives::{ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId}; use spacetimedb_sats::{ - bsatn::{self, DecodeError}, - de::WithBound, + bsatn::{self, to_writer, DecodeError, Deserializer}, + de::{DeserializeSeed, WithBound}, + ser::Serialize, AlgebraicType, AlgebraicValue, ProductType, ProductValue, WithTypespace, }; use spacetimedb_schema::{ @@ -117,7 +112,7 @@ impl MutTxId { table_primary_key: table_schema.primary_key.map(Into::into), }; let table_id = self - .insert(ST_TABLE_ID, &mut row.into())? + .insert_via_serialize_bsatn(ST_TABLE_ID, &row)? .1 .collapse() .read_col(StTableFields::TableId)?; @@ -134,7 +129,7 @@ impl MutTxId { col_name: col.col_name.clone(), col_type: col.col_type.clone().into(), }; - self.insert(ST_COLUMN_ID, &mut row.into())?; + self.insert_via_serialize_bsatn(ST_COLUMN_ID, &row)?; } let mut schema_internal = table_schema.clone(); @@ -156,20 +151,13 @@ impl MutTxId { reducer_name: schedule.reducer_name, at_column: schedule.at_column, }; - let (generated, ..) = self.insert(ST_SCHEDULED_ID, &mut row.into())?; - let id = generated.as_u32(); - - if let Some(&id) = id { - let (table, ..) = self.get_or_create_insert_table_mut(table_id)?; - table.with_mut_schema(|s| s.schedule.as_mut().unwrap().schedule_id = id.into()); - } else { - return Err(anyhow::anyhow!( - "Failed to generate a schedule ID for table: {}, generated: {:#?}", - table_schema.table_name, - generated - ) - .into()); - } + let id = self + .insert_via_serialize_bsatn(ST_SCHEDULED_ID, &row)? + .1 + .collapse() + .read_col::(StScheduledFields::ScheduleId)?; + let (table, ..) = self.get_or_create_insert_table_mut(table_id)?; + table.with_mut_schema(|s| s.schedule.as_mut().unwrap().schedule_id = id); } // Insert constraints into `st_constraints` @@ -287,7 +275,7 @@ impl MutTxId { // Delete the row, run updates, and insert again. self.delete(ST_TABLE_ID, ptr)?; updater(&mut row); - self.insert(ST_TABLE_ID, &mut row.into())?; + self.insert_via_serialize_bsatn(ST_TABLE_ID, &row)?; Ok(()) } @@ -386,7 +374,7 @@ impl MutTxId { index_algorithm: index.index_algorithm.clone().into(), }; let index_id = self - .insert(ST_INDEX_ID, &mut row.into())? + .insert_via_serialize_bsatn(ST_INDEX_ID, &row)? .1 .collapse() .read_col(StIndexFields::IndexId)?; @@ -680,11 +668,15 @@ impl MutTxId { (range_start, range_end) } + fn get_sequence_mut(&mut self, seq_id: SequenceId) -> Result<&mut Sequence> { + self.sequence_state_lock + .get_sequence_mut(seq_id) + .ok_or_else(|| SequenceError::NotFound(seq_id).into()) + } + pub fn get_next_sequence_value(&mut self, seq_id: SequenceId) -> Result { { - let Some(sequence) = self.sequence_state_lock.get_sequence_mut(seq_id) else { - return Err(SequenceError::NotFound(seq_id).into()); - }; + let sequence = self.get_sequence_mut(seq_id)?; // If there are allocated sequence values, return the new value. // `gen_next_value` internally checks that the new allocation is acceptable, @@ -704,30 +696,27 @@ impl MutTxId { let seq_row = { let mut seq_row = StSequenceRow::try_from(old_seq_row_ref)?; - let Some(sequence) = self.sequence_state_lock.get_sequence_mut(seq_id) else { - return Err(SequenceError::NotFound(seq_id).into()); - }; + let sequence = self.get_sequence_mut(seq_id)?; seq_row.allocated = sequence.nth_value(SEQUENCE_ALLOCATION_STEP as usize); sequence.set_allocation(seq_row.allocated); seq_row }; self.delete(ST_SEQUENCE_ID, old_seq_row_ptr)?; - // `insert_row_internal` rather than `insert` because: + // `insert::` rather than `GENERATE = true` because: // - We have already checked unique constraints during `create_sequence`. // - Similarly, we have already applied autoinc sequences. // - We do not want to apply autoinc sequences again, // since the system table sequence `seq_st_table_table_id_primary_key_auto` // has ID 0, and would otherwise trigger autoinc. - self.insert_row_internal(ST_SEQUENCE_ID, &ProductValue::from(seq_row))?; - - let Some(sequence) = self.sequence_state_lock.get_sequence_mut(seq_id) else { - return Err(SequenceError::NotFound(seq_id).into()); - }; - if let Some(value) = sequence.gen_next_value() { - return Ok(value); - } - Err(SequenceError::UnableToAllocate(seq_id).into()) + with_sys_table_buf(|buf| { + to_writer(buf, &seq_row).unwrap(); + self.insert::(ST_SEQUENCE_ID, buf) + })?; + + self.get_sequence_mut(seq_id)? + .gen_next_value() + .ok_or_else(|| SequenceError::UnableToAllocate(seq_id).into()) } /// Create a sequence. @@ -769,7 +758,7 @@ impl MutTxId { min_value: seq.min_value, max_value: seq.max_value, }; - let row = self.insert(ST_SEQUENCE_ID, &mut sequence_row.clone().into())?; + let row = self.insert_via_serialize_bsatn(ST_SEQUENCE_ID, &sequence_row)?; let seq_id = row.1.collapse().read_col(StSequenceFields::SequenceId)?; sequence_row.sequence_id = seq_id; @@ -858,7 +847,7 @@ impl MutTxId { constraint_data: constraint.data.clone().into(), }; - let constraint_row = self.insert(ST_CONSTRAINT_ID, &mut ProductValue::from(constraint_row))?; + let constraint_row = self.insert_via_serialize_bsatn(ST_CONSTRAINT_ID, &constraint_row)?; let constraint_id = constraint_row.1.collapse().read_col(StConstraintFields::ConstraintId)?; let existed = matches!(constraint_row.1, RowRefInsertion::Existed(_)); // TODO: Can we return early here? @@ -952,7 +941,7 @@ impl MutTxId { sql: row_level_security_schema.sql, }; - let row = self.insert(ST_ROW_LEVEL_SECURITY_ID, &mut ProductValue::from(row))?; + let row = self.insert_via_serialize_bsatn(ST_ROW_LEVEL_SECURITY_ID, &row)?; let row_level_security_sql = row.1.collapse().read_col(StRowLevelSecurityFields::Sql)?; let existed = matches!(row.1, RowRefInsertion::Existed(_)); @@ -1113,7 +1102,7 @@ impl MutTxId { /// Either a row just inserted to a table or a row that already existed in some table. #[derive(Clone, Copy)] -pub(super) enum RowRefInsertion<'a> { +pub(crate) enum RowRefInsertion<'a> { /// The row was just inserted. Inserted(RowRef<'a>), /// The row already existed. @@ -1171,65 +1160,98 @@ impl<'a> Iterator for IndexScanFilterDeleted<'a> { } impl MutTxId { - /// Insert a row into a table. + pub(crate) fn insert_via_serialize_bsatn<'a, T: Serialize>( + &'a mut self, + table_id: TableId, + row: &T, + ) -> Result<(ColList, RowRefInsertion<'a>)> { + with_sys_table_buf(|buf| { + to_writer(buf, row).unwrap(); + self.insert::(table_id, buf) + }) + } + + /// Insert a row, encoded in BSATN, into a table. /// /// Requires: /// - `TableId` must refer to a valid table for the database at `database_address`. /// - `row` must be a valid row for the table at `table_id`. /// /// Returns: - /// - a product value with a projection of the row containing only the generated column values. + /// - a list of columns which have been replaced with generated values. /// - a ref to the inserted row. - pub(super) fn insert<'a>( - &'a mut self, + pub(super) fn insert( + &mut self, table_id: TableId, - row: &mut ProductValue, - ) -> Result<(AlgebraicValue, RowRefInsertion<'a>)> { - let generated = self.write_sequence_values(table_id, row)?; - let row_ref = self.insert_row_internal(table_id, row)?; - Ok((generated, row_ref)) - } - - /// Generate and write sequence values to `row` - /// and return a projection of `row` with only the generated column values. - fn write_sequence_values(&mut self, table_id: TableId, row: &mut ProductValue) -> Result { - // TODO: Executing schema_for_table for every row insert is expensive. - // However we ask for the schema in the [Table] struct instead. - let schema = self.schema_for_table(table_id)?; - - // Collect all the columns with sequences that need generation. - let (cols_to_update, seqs_to_use): (ColList, SmallVec<[_; 1]>) = schema - .sequences - .iter() - .filter(|seq| row.elements[seq.col_pos.idx()].is_numeric_zero()) - .map(|seq| (seq.col_pos, seq.sequence_id)) - .unzip(); - - // Update every column in the row that needs it. - // We assume here that column with a sequence is of a sequence-compatible type. - for (col_id, sequence_id) in cols_to_update.iter().zip(seqs_to_use) { - let seq_val = self.get_next_sequence_value(sequence_id)?; - let col_typ = &schema.columns()[col_id.idx()].col_type; - let gen_val = AlgebraicValue::from_sequence_value(col_typ, seq_val); - row.elements[col_id.idx()] = gen_val; - } - - Ok(row.project(&cols_to_update)?) - } - - pub(super) fn insert_row_internal(&mut self, table_id: TableId, row: &ProductValue) -> Result> { - let commit_table = self.committed_state_write_lock.get_table(table_id); - + row: &[u8], + ) -> Result<(ColList, RowRefInsertion<'_>)> { // Get the insert table, so we can write the row into it. - let (tx_table, tx_blob_store, _, delete_table) = self + let (tx_table, tx_blob_store, ..) = self .tx_state - .get_table_and_blob_store_or_maybe_create_from(table_id, commit_table) + .get_table_and_blob_store_or_maybe_create_from( + table_id, + self.committed_state_write_lock.get_table(table_id), + ) .ok_or(TableError::IdNotFoundState(table_id))?; - match tx_table.insert(tx_blob_store, row) { - Ok((tx_row_hash, tx_row_ref)) => { + // 1. Insert the physical row. + // 2. Detect, generate, write sequence values. + // 3. Confirm that the insertion respects constraints and update statistics. + let ((tx_table, tx_blob_store, delete_table), gen_cols, res) = match tx_table + .insert_physically_bsatn(tx_blob_store, row) + { + Ok((tx_row_ref, blob_bytes)) if GENERATE => { + // Collect all the columns with sequences that need generation. let tx_row_ptr = tx_row_ref.pointer(); - if let Some(commit_table) = commit_table { + let (cols_to_gen, seqs_to_use) = unsafe { tx_table.sequence_triggers_for(tx_blob_store, tx_row_ptr) }; + + // Generate a value for every column in the row that needs it. + let mut seq_vals: SmallVec<[i128; 1]> = <_>::default(); + for sequence_id in seqs_to_use { + seq_vals.push(self.get_next_sequence_value(sequence_id)?); + } + + // Write the generated values to the physical row at `tx_row_ptr`. + // We assume here that column with a sequence is of a sequence-compatible type. + // SAFETY: By virtue of `get_table_and_blob_store_or_maybe_create_from` above succeeding, + // we can assume we have an insert and delete table. + let (tx_table, tx_blob_store, delete_table) = + unsafe { self.tx_state.assume_present_get_mut_table(table_id) }; + for (col_id, seq_val) in cols_to_gen.iter().zip(seq_vals) { + // SAFETY: + // - `self.is_row_present(row)` holds as we haven't deleted the row. + // - `col_id` is a valid column, and has a sequence, so it must have a primitive type. + unsafe { tx_table.write_gen_val_to_col(col_id, tx_row_ptr, seq_val) }; + } + + let res = tx_table.confirm_insertion(tx_blob_store, tx_row_ptr, blob_bytes); + ((tx_table, tx_blob_store, delete_table), cols_to_gen, res) + } + Ok((tx_row_ref, blob_bytes)) => { + let tx_row_ptr = tx_row_ref.pointer(); + let res = tx_table.confirm_insertion(tx_blob_store, tx_row_ptr, blob_bytes); + // SAFETY: By virtue of `get_table_and_blob_store_or_maybe_create_from` above succeeding, + // we can assume we have an insert and delete table. + ( + unsafe { self.tx_state.assume_present_get_mut_table(table_id) }, + ColList::empty(), + res, + ) + } + Err(e) => { + // SAFETY: By virtue of `get_table_and_blob_store_or_maybe_create_from` above succeeding, + // we can assume we have an insert and delete table. + ( + unsafe { self.tx_state.assume_present_get_mut_table(table_id) }, + ColList::empty(), + Err(e), + ) + } + }; + + match res { + Ok((tx_row_hash, tx_row_ptr)) => { + if let Some(commit_table) = self.committed_state_write_lock.get_table(table_id) { // The `tx_row_ref` was not previously present in insert tables, // but may still be a set-semantic conflict // or may violate a unique constraint with a row in the committed state. @@ -1278,10 +1300,11 @@ impl MutTxId { // No new row was inserted, but return `committed_ptr`. let blob_store = &self.committed_state_write_lock.blob_store; - return Ok(RowRefInsertion::Existed( + let rri = RowRefInsertion::Existed( // SAFETY: `find_same_row` told us that `ptr` refers to a valid row in `commit_table`. unsafe { commit_table.get_row_ref_unchecked(blob_store, commit_ptr) }, - )); + ); + return Ok((gen_cols, rri)); } // Pacify the borrow checker. @@ -1302,16 +1325,20 @@ impl MutTxId { } } - Ok(RowRefInsertion::Inserted(unsafe { + let rri = RowRefInsertion::Inserted(unsafe { // SAFETY: `ptr` came from `tx_table.insert` just now without any interleaving calls. tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) - })) + }); + Ok((gen_cols, rri)) } // `row` previously present in insert tables; do nothing but return `ptr`. - Err(InsertError::Duplicate(ptr)) => Ok(RowRefInsertion::Existed( - // SAFETY: `tx_table` told us that `ptr` refers to a valid row in it. - unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, ptr) }, - )), + Err(InsertError::Duplicate(ptr)) => { + let rri = RowRefInsertion::Existed( + // SAFETY: `tx_table` told us that `ptr` refers to a valid row in it. + unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, ptr) }, + ); + Ok((gen_cols, rri)) + } // Index error: unbox and return `TableError::IndexError` // rather than `TableError::Insert(InsertError::IndexError)`. diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs index fe5ab090f54..6fd72635a11 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs @@ -185,6 +185,25 @@ impl TxState { Some((tbl, blob_store, idx_map, delete_tables.entry(table_id).or_default())) } + /// Assumes that the insert and delete tables exist for `table_id` and fetches them. + /// + /// # Safety + /// + /// The insert and delete tables must exist. + pub unsafe fn assume_present_get_mut_table( + &mut self, + table_id: TableId, + ) -> (&mut Table, &mut dyn BlobStore, &mut DeleteTable) { + let tx_blob_store: &mut dyn BlobStore = &mut self.blob_store; + let tx_table = self.insert_tables.get_mut(&table_id); + // SAFETY: we successfully got a `tx_table` before and haven't removed it since. + let tx_table = unsafe { tx_table.unwrap_unchecked() }; + let delete_table = self.delete_tables.get_mut(&table_id); + // SAFETY: we successfully got a `delete_table` before and haven't removed it since. + let delete_table = unsafe { delete_table.unwrap_unchecked() }; + (tx_table, tx_blob_store, delete_table) + } + /// Returns the table and index associated with the given `table_id` and `col_list`, if any. pub(super) fn get_table_and_index_type(&self, table_id: TableId, col_list: &ColList) -> Option<&AlgebraicType> { let table = self.insert_tables.get(&table_id)?; diff --git a/crates/core/src/db/datastore/system_tables.rs b/crates/core/src/db/datastore/system_tables.rs index 6213286f51c..02ec48e957e 100644 --- a/crates/core/src/db/datastore/system_tables.rs +++ b/crates/core/src/db/datastore/system_tables.rs @@ -989,8 +989,7 @@ impl StVarTable { { db.delete(tx, ST_VAR_ID, [row_ref.pointer()]); } - let row = value_serialize(&StVarRow { name, value }); - db.insert(tx, ST_VAR_ID, row.into_product().expect("should be product"))?; + tx.insert_via_serialize_bsatn(ST_VAR_ID, &StVarRow { name, value })?; Ok(()) } @@ -1305,9 +1304,17 @@ thread_local! { static READ_BUF: RefCell> = const { RefCell::new(Vec::new()) }; } +/// Provides access to a buffer to which bytes can be written. +pub(crate) fn with_sys_table_buf(run: impl FnOnce(&mut Vec) -> R) -> R { + READ_BUF.with_borrow_mut(|buf| { + buf.clear(); + run(buf) + }) +} + /// Read a value from a system table via BSatn. fn read_via_bsatn(row: RowRef<'_>) -> Result { - READ_BUF.with_borrow_mut(|buf| Ok(row.read_via_bsatn::(buf)?)) + with_sys_table_buf(|buf| Ok(row.read_via_bsatn::(buf)?)) } /// Convert a value to a product value. diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index b507f4c4360..46b907d38ee 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -439,7 +439,6 @@ pub trait MutTxDatastore: TxDatastore + MutTx { // - index_seek_mut_tx // Sequences - fn get_next_sequence_value_mut_tx(&self, tx: &mut Self::MutTx, seq_id: SequenceId) -> Result; fn create_sequence_mut_tx(&self, tx: &mut Self::MutTx, sequence_schema: SequenceSchema) -> Result; fn drop_sequence_mut_tx(&self, tx: &mut Self::MutTx, seq_id: SequenceId) -> Result<()>; fn sequence_id_from_name_mut_tx(&self, tx: &Self::MutTx, sequence_name: &str) -> super::Result>; @@ -482,23 +481,19 @@ pub trait MutTxDatastore: TxDatastore + MutTx { table_id: TableId, relation: impl IntoIterator, ) -> u32; - /// Inserts `row` into the table identified by `table_id`. + /// Inserts `row`, encoded in BSATN, into the table identified by `table_id`. /// - /// Returns the generated column values (the [`AlgebraicValue`]) + /// Returns the list of columns where values were replaced with generated ones /// and a reference to the row as a [`RowRef`]. - /// The generated column values are only those that were generated. - /// Those are columns with an auto-inc sequence + /// + /// Generated columns are columns with an auto-inc sequence /// and where the column was `0` in `row`. - /// In case of zero or multiple such column, - /// an `AlgebraicValue::Product` is returned. - /// Otherwise, in case of a single column, - /// as an optimization, the value is not wrapped. fn insert_mut_tx<'a>( &'a self, tx: &'a mut Self::MutTx, table_id: TableId, - row: ProductValue, - ) -> Result<(AlgebraicValue, RowRef<'a>)>; + row: &[u8], + ) -> Result<(ColList, RowRef<'a>)>; /// Obtain the [`Metadata`] for this datastore. /// diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 659d32f67bf..914a6091af0 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -379,7 +379,7 @@ impl RelationalDB { program_bytes: program.bytes, module_version: ONLY_MODULE_VERSION.into(), }; - self.insert(tx, ST_MODULE_ID, row.into()).map(drop) + tx.insert_via_serialize_bsatn(ST_MODULE_ID, &row).map(drop) } /// Obtain the [`Metadata`] of this database. @@ -1125,25 +1125,11 @@ impl RelationalDB { &'a self, tx: &'a mut MutTx, table_id: TableId, - row: ProductValue, - ) -> Result<(AlgebraicValue, RowRef<'a>), DBError> { + row: &[u8], + ) -> Result<(ColList, RowRef<'a>), DBError> { self.inner.insert_mut_tx(tx, table_id, row) } - pub fn insert_bytes_as_row<'a>( - &'a self, - tx: &'a mut MutTx, - table_id: TableId, - row_bytes: &[u8], - ) -> Result<(AlgebraicValue, RowRef<'a>), DBError> { - // Decode the `row_bytes` as a `ProductValue` according to the schema. - let ty = self.inner.row_type_for_table_mut_tx(tx, table_id)?; - let row = ProductValue::decode(&ty, &mut &row_bytes[..])?; - - // Insert the row. - self.insert(tx, table_id, row) - } - pub fn delete(&self, tx: &mut MutTx, table_id: TableId, row_ids: impl IntoIterator) -> u32 { self.inner.delete_mut_tx(tx, table_id, row_ids) } @@ -1167,11 +1153,6 @@ impl RelationalDB { Ok(()) } - /// Generated the next value for the [SequenceId] - pub fn next_sequence(&self, tx: &mut MutTx, seq_id: SequenceId) -> Result { - self.inner.get_next_sequence_value_mut_tx(tx, seq_id) - } - pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result { self.inner.create_sequence_mut_tx(tx, sequence_schema) } @@ -1320,6 +1301,7 @@ pub mod tests_utils { use super::*; use core::ops::Deref; use durability::EmptyHistory; + use spacetimedb_lib::{bsatn::to_vec, ser::Serialize}; use spacetimedb_paths::FromPathUnchecked; use tempfile::TempDir; @@ -1537,6 +1519,17 @@ pub mod tests_utils { &self.db } } + + pub fn insert<'a, T: Serialize>( + db: &'a RelationalDB, + tx: &'a mut MutTx, + table_id: TableId, + row: &T, + ) -> Result<(AlgebraicValue, RowRef<'a>), DBError> { + let (gen_cols, row_ref) = db.insert(tx, table_id, &to_vec(row).unwrap())?; + let gen_cols = row_ref.project(&gen_cols).unwrap(); + Ok((gen_cols, row_ref)) + } } #[cfg(test)] @@ -1551,7 +1544,7 @@ mod tests { system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SEQUENCE_ID, ST_TABLE_ID, }; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{insert, TestDB}; use crate::error::IndexError; use crate::execution_context::ReducerContext; use anyhow::bail; @@ -1724,7 +1717,7 @@ mod tests { fn insert_three_i32s(stdb: &RelationalDB, tx: &mut MutTx, table_id: TableId) -> ResultTest<()> { for v in [-1, 0, 1] { - stdb.insert(tx, table_id, product![v])?; + insert(stdb, tx, table_id, &product![v])?; } Ok(()) } @@ -1838,8 +1831,8 @@ mod tests { let sequence = stdb.sequence_id_from_name(&tx, "MyTable_my_col_seq")?; assert!(sequence.is_some(), "Sequence not created"); - stdb.insert(&mut tx, table_id, product![0i64])?; - stdb.insert(&mut tx, table_id, product![0i64])?; + insert(&stdb, &mut tx, table_id, &product![0i64])?; + insert(&stdb, &mut tx, table_id, &product![0i64])?; assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![1, 2]); Ok(()) @@ -1856,8 +1849,8 @@ mod tests { let sequence = stdb.sequence_id_from_name(&tx, "MyTable_my_col_seq")?; assert!(sequence.is_some(), "Sequence not created"); - stdb.insert(&mut tx, table_id, product![5i64])?; - stdb.insert(&mut tx, table_id, product![6i64])?; + insert(&stdb, &mut tx, table_id, &product![5i64])?; + insert(&stdb, &mut tx, table_id, &product![6i64])?; assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![5, 6]); Ok(()) @@ -1881,7 +1874,7 @@ mod tests { let sequence = stdb.sequence_id_from_name(&tx, "MyTable_my_col_seq")?; assert!(sequence.is_some(), "Sequence not created"); - stdb.insert(&mut tx, table_id, product![0i64])?; + insert(&stdb, &mut tx, table_id, &product![0i64])?; assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![1]); stdb.commit_tx(tx)?; @@ -1889,7 +1882,7 @@ mod tests { let stdb = stdb.reopen()?; let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - stdb.insert(&mut tx, table_id, product![0i64]).unwrap(); + insert(&stdb, &mut tx, table_id, &product![0i64]).unwrap(); // Check the second row start after `SEQUENCE_PREALLOCATION_AMOUNT` assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![1, 4098]); @@ -1910,8 +1903,8 @@ mod tests { "Index not created" ); - stdb.insert(&mut tx, table_id, product![1i64, 1i64])?; - stdb.insert(&mut tx, table_id, product![1i64, 1i64])?; + insert(&stdb, &mut tx, table_id, &product![1i64, 1i64])?; + insert(&stdb, &mut tx, table_id, &product![1i64, 1i64])?; assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![1]); Ok(()) @@ -1924,8 +1917,8 @@ mod tests { let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let schema = my_table(AlgebraicType::I64); let table_id = stdb.create_table(&mut tx, schema)?; - stdb.insert(&mut tx, table_id, product![1i64])?; - stdb.insert(&mut tx, table_id, product![2i64])?; + insert(&stdb, &mut tx, table_id, &product![1i64])?; + insert(&stdb, &mut tx, table_id, &product![2i64])?; stdb.commit_tx(tx)?; let stdb = stdb.reopen()?; @@ -1981,9 +1974,8 @@ mod tests { "Index not created" ); - stdb.insert(&mut tx, table_id, product![1i64, 0i64]) - .expect("stdb.insert failed"); - match stdb.insert(&mut tx, table_id, product![1i64, 1i64]) { + insert(&stdb, &mut tx, table_id, &product![1i64, 0i64]).expect("stdb.insert failed"); + match insert(&stdb, &mut tx, table_id, &product![1i64, 1i64]) { Ok(_) => panic!("Allow to insert duplicate row"), Err(DBError::Index(IndexError::UniqueConstraintViolation { .. })) => {} Err(err) => panic!("Expected error `UniqueConstraintViolation`, got {err}"), @@ -2013,8 +2005,8 @@ mod tests { let sequence = stdb.sequence_id_from_name(&tx, "MyTable_my_col_seq")?; assert!(sequence.is_some(), "Sequence not created"); - stdb.insert(&mut tx, table_id, product![0i64])?; - stdb.insert(&mut tx, table_id, product![0i64])?; + insert(&stdb, &mut tx, table_id, &product![0i64])?; + insert(&stdb, &mut tx, table_id, &product![0i64])?; assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![1, 2]); Ok(()) @@ -2141,9 +2133,9 @@ mod tests { let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let table_id = stdb.create_table(&mut tx, schema)?; - stdb.insert(&mut tx, table_id, product![0u64, 0u64, 1u64])?; - stdb.insert(&mut tx, table_id, product![0u64, 1u64, 2u64])?; - stdb.insert(&mut tx, table_id, product![1u64, 2u64, 2u64])?; + insert(&stdb, &mut tx, table_id, &product![0u64, 0u64, 1u64])?; + insert(&stdb, &mut tx, table_id, &product![0u64, 1u64, 2u64])?; + insert(&stdb, &mut tx, table_id, &product![1u64, 2u64, 2u64])?; let cols = col_list![0, 1]; let value = product![0u64, 1u64].into(); @@ -2178,8 +2170,7 @@ mod tests { // Insert a row and commit it, so the row is in the committed_state. let mut insert_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - stdb.insert(&mut insert_tx, table_id, product!(AlgebraicValue::I32(0))) - .expect("Insert insert_tx failed"); + insert(&stdb, &mut insert_tx, table_id, &product!(AlgebraicValue::I32(0))).expect("Insert insert_tx failed"); stdb.commit_tx(insert_tx).expect("Commit insert_tx failed"); let mut delete_insert_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); @@ -2192,8 +2183,13 @@ mod tests { // Insert the row again, so that depending on the datastore internals, // it may now be only in the committed_state, // or in all three of the committed_state, delete_tables and insert_tables. - stdb.insert(&mut delete_insert_tx, table_id, product!(AlgebraicValue::I32(0))) - .expect("Insert delete_insert_tx failed"); + insert( + &stdb, + &mut delete_insert_tx, + table_id, + &product!(AlgebraicValue::I32(0)), + ) + .expect("Insert delete_insert_tx failed"); // Iterate over the table and assert that we see the committed-deleted-inserted row only once. assert_eq!( @@ -2257,8 +2253,7 @@ mod tests { let table_id = { let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Reducer(ctx)); let table_id = stdb.create_table(&mut tx, schema).expect("failed to create table"); - stdb.insert(&mut tx, table_id, product!(AlgebraicValue::I32(0))) - .expect("failed to insert row"); + insert(&stdb, &mut tx, table_id, &product!(AlgebraicValue::I32(0))).expect("failed to insert row"); stdb.commit_tx(tx).expect("failed to commit tx"); table_id @@ -2268,8 +2263,7 @@ mod tests { // created by a mutable SQL transaction { let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql); - stdb.insert(&mut tx, table_id, product!(AlgebraicValue::I32(-42))) - .expect("failed to insert row"); + insert(&stdb, &mut tx, table_id, &product!(AlgebraicValue::I32(-42))).expect("failed to insert row"); stdb.commit_tx(tx).expect("failed to commit tx"); } diff --git a/crates/core/src/estimation.rs b/crates/core/src/estimation.rs index fad6b1b4ac2..9edb2e1a514 100644 --- a/crates/core/src/estimation.rs +++ b/crates/core/src/estimation.rs @@ -66,6 +66,7 @@ fn index_row_est(tx: &Tx, table_id: TableId, cols: &ColList) -> u64 { #[cfg(test)] mod tests { + use crate::db::relational_db::tests_utils::insert; use crate::execution_context::Workload; use crate::{ db::relational_db::{tests_utils::TestDB, RelationalDB}, @@ -103,8 +104,7 @@ mod tests { db.with_auto_commit(Workload::ForTests, |tx| -> Result<(), DBError> { for i in 0..NUM_T_ROWS { - db.insert(tx, table_id, product![i % NDV_T, i]) - .expect("failed to insert into table"); + insert(db, tx, table_id, &product![i % NDV_T, i]).expect("failed to insert into table"); } Ok(()) }) @@ -120,7 +120,7 @@ mod tests { db.with_auto_commit(Workload::ForTests, |tx| -> Result<(), DBError> { for i in 0..NUM_S_ROWS { - db.insert(tx, rhs, product![i, i]).expect("failed to insert into table"); + insert(db, tx, rhs, &product![i, i]).expect("failed to insert into table"); } Ok(()) }) diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 39938b199d7..d3d40c9525a 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -7,7 +7,11 @@ use core::mem; use parking_lot::{Mutex, MutexGuard}; use smallvec::SmallVec; use spacetimedb_primitives::{ColId, IndexId, TableId}; -use spacetimedb_sats::{bsatn::ToBsatn, AlgebraicValue, ProductValue}; +use spacetimedb_sats::{ + bsatn::{self, ToBsatn}, + buffer::{CountWriter, TeeWriter}, + ProductValue, +}; use spacetimedb_table::table::UniqueConstraintViolation; use std::ops::DerefMut; use std::sync::Arc; @@ -113,13 +117,22 @@ impl InstanceEnv { ); } - pub fn insert(&self, table_id: TableId, buffer: &[u8]) -> Result { + pub fn insert(&self, table_id: TableId, buffer: &mut [u8]) -> Result { let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.get_tx()?; - let (gen_cols, row_ptr) = stdb - .insert_bytes_as_row(tx, table_id, buffer) - .map(|(gc, rr)| (gc, rr.pointer())) + let (row_len, row_ptr) = stdb + .insert(tx, table_id, buffer) + .map(|(gen_cols, row_ref)| { + // Write back the generated column values to `buffer` + // and the encoded length to `row_len`. + let counter = CountWriter::default(); + let mut writer = TeeWriter::new(counter, buffer); + bsatn::to_writer(&mut writer, &gen_cols).unwrap(); + let row_len = writer.w1.finish(); + + (row_len, row_ref.pointer()) + }) .inspect_err(|e| match e { crate::error::DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation { constraint_name: _, @@ -148,7 +161,7 @@ impl InstanceEnv { .map_err(NodesError::ScheduleError)?; } - Ok(gen_cols) + Ok(row_len) } #[tracing::instrument(skip_all)] diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 1a7258d159f..83758e86d5e 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -640,7 +640,7 @@ impl ModuleHost { }; if connected { - db.insert(mut_tx, ST_CLIENT_ID, row.into()).map(|_| ()) + mut_tx.insert_via_serialize_bsatn(ST_CLIENT_ID, &row).map(|_| ()) } else { let row = db .iter_by_col_eq_mut( diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 5ee0d84cb5d..77f4bd7bf98 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -561,13 +561,11 @@ impl WasmModuleInstance { } fn insert_st_client(&self, tx: &mut MutTxId, identity: Identity, address: Address) -> Result<(), DBError> { - let db = &*self.replica_context().relational_db; let row = &StClientRow { identity: identity.into(), address: address.into(), }; - - db.insert(tx, ST_CLIENT_ID, row.into()).map(|_| ()) + tx.insert_via_serialize_bsatn(ST_CLIENT_ID, row).map(|_| ()) } } diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 48980578027..696edfcfc79 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -13,8 +13,6 @@ use crate::host::wasm_common::{ use crate::host::AbiCall; use anyhow::Context as _; use spacetimedb_primitives::{errno, ColId}; -use spacetimedb_sats::bsatn; -use spacetimedb_sats::buffer::{CountWriter, TeeWriter}; use wasmtime::{AsContext, Caller, StoreContextMut}; use super::{Mem, MemView, NullableMemOp, WasmError, WasmPointee, WasmPtr}; @@ -677,16 +675,8 @@ impl WasmInstanceEnv { // Get a mutable view to the `row`. let row = mem.deref_slice_mut(row_ptr, row_len)?; - // Insert the row into the DB. - // This will return back the generated column values. - let gen_cols = env.instance_env.insert(table_id.into(), row)?; - - // Write back the generated column values to `row` - // and the encoded length to `row_len`. - let counter = CountWriter::default(); - let mut writer = TeeWriter::new(counter, row); - bsatn::to_writer(&mut writer, &gen_cols).unwrap(); - let row_len = writer.w1.finish(); + // Insert the row into the DB and write back the generated column values. + let row_len = env.instance_env.insert(table_id.into(), row)?; u32::try_from(row_len).unwrap().write_to(mem, row_len_ptr)?; Ok(()) }) diff --git a/crates/core/src/sql/compiler.rs b/crates/core/src/sql/compiler.rs index c0c62315bd4..00cf1e1cec0 100644 --- a/crates/core/src/sql/compiler.rs +++ b/crates/core/src/sql/compiler.rs @@ -230,7 +230,7 @@ fn compile_statement(db: &RelationalDB, statement: SqlAst) -> Result(()) })?; diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 5012d2fb700..41962fffa1f 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -213,7 +213,7 @@ pub fn translate_col(tx: &Tx, field: FieldName) -> Option> { pub(crate) mod tests { use super::*; use crate::db::datastore::system_tables::{StTableFields, ST_TABLE_ID, ST_TABLE_NAME}; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{insert, TestDB}; use crate::vm::tests::create_table_with_rows; use pretty_assertions::assert_eq; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -619,7 +619,7 @@ pub(crate) mod tests { ]; let table_id = db.create_table_for_test_multi_column("test", schema, col_list![0, 1])?; db.with_auto_commit(Workload::ForTests, |tx| { - db.insert(tx, table_id, product![1, 1, 1, 1]).map(drop) + insert(&db, tx, table_id, &product![1, 1, 1, 1]).map(drop) })?; let result = run_for_testing(&db, "select * from test where b = 1 and a = 1")?; @@ -665,7 +665,7 @@ pub(crate) mod tests { db.with_auto_commit(Workload::ForTests, |tx| { for i in 0..1000i32 { - db.insert(tx, table_id, product!(i)).unwrap(); + insert(&db, tx, table_id, &product!(i)).unwrap(); } Ok::<(), DBError>(()) }) @@ -697,7 +697,9 @@ pub(crate) mod tests { let schema = &[("a", AlgebraicType::U8), ("b", AlgebraicType::U8)]; let table_id = db.create_table_for_test_multi_column("test", schema, col_list![0, 1])?; let row = product![4u8, 8u8]; - db.with_auto_commit(Workload::ForTests, |tx| db.insert(tx, table_id, row.clone()).map(drop))?; + db.with_auto_commit(Workload::ForTests, |tx| { + insert(&db, tx, table_id, &row.clone()).map(drop) + })?; let result = run_for_testing(&db, "select * from test where a >= 3 and a <= 5 and b >= 3 and b <= 5")?; @@ -714,7 +716,7 @@ pub(crate) mod tests { let table_id = db.create_table_for_test("T", &[("a", AlgebraicType::U8)], &[])?; db.with_auto_commit(Workload::ForTests, |tx| -> Result<_, DBError> { for i in 0..5u8 { - db.insert(tx, table_id, product!(i))?; + insert(&db, tx, table_id, &product!(i))?; } Ok(()) })?; diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 64dfe0b78eb..ac7796d287d 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -393,7 +393,7 @@ pub struct WriteConflict; mod tests { use super::{AssertTxFn, ModuleSubscriptions}; use crate::client::{ClientActorId, ClientConfig, ClientConnectionSender}; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{insert, TestDB}; use crate::db::relational_db::RelationalDB; use crate::error::DBError; use crate::execution_context::Workload; @@ -431,7 +431,7 @@ mod tests { // Create table with one row let table_id = db.create_table_for_test("T", &[("a", AlgebraicType::U8)], &[])?; db.with_auto_commit(Workload::ForTests, |tx| { - db.insert(tx, table_id, product!(1_u8)).map(drop) + insert(&db, tx, table_id, &product!(1_u8)).map(drop) })?; let (send, mut recv) = mpsc::unbounded_channel(); @@ -459,7 +459,7 @@ mod tests { let write_handle = runtime.spawn(async move { let _ = recv.recv().await; db2.with_auto_commit(Workload::ForTests, |tx| { - db2.insert(tx, table_id, product!(2_u8)).map(drop) + insert(&db2, tx, table_id, &product!(2_u8)).map(drop) }) }); diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 69cdc3b718c..422d03aa1a7 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -131,7 +131,7 @@ pub fn classify(expr: &QueryExpr) -> Option { mod tests { use super::*; use crate::db::datastore::traits::IsolationLevel; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{insert, TestDB}; use crate::db::relational_db::MutTx; use crate::execution_context::Workload; use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate}; @@ -195,7 +195,7 @@ mod tests { } fn insert_row(db: &RelationalDB, tx: &mut MutTx, table_id: TableId, row: ProductValue) -> ResultTest<()> { - db.insert(tx, table_id, row)?; + insert(db, tx, table_id, &row)?; Ok(()) } @@ -376,7 +376,7 @@ mod tests { let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let mut deletes = Vec::new(); for i in 0u64..9u64 { - db.insert(&mut tx, table_id, product!(i, i))?; + insert(&db, &mut tx, table_id, &product!(i, i))?; deletes.push(product!(i + 10, i)) } @@ -669,7 +669,7 @@ mod tests { let lhs_id = db.create_table_for_test("lhs", &[("id", I32), ("x", I32)], &[0.into()])?; db.with_auto_commit(Workload::ForTests, |tx| { for i in 0..5 { - db.insert(tx, lhs_id, product!(i, i + 5))?; + insert(db, tx, lhs_id, &product!(i, i + 5))?; } Ok(lhs_id) }) @@ -681,7 +681,7 @@ mod tests { let rhs_id = db.create_table_for_test("rhs", &[("rid", I32), ("id", I32), ("y", I32)], &[1.into()])?; db.with_auto_commit(Workload::ForTests, |tx| { for i in 10..20 { - db.insert(tx, rhs_id, product!(i, i - 10, i - 8))?; + insert(db, tx, rhs_id, &product!(i, i - 10, i - 8))?; } Ok(rhs_id) }) diff --git a/crates/core/src/util/slow.rs b/crates/core/src/util/slow.rs index 3850d6f874b..b9d56a58ca2 100644 --- a/crates/core/src/util/slow.rs +++ b/crates/core/src/util/slow.rs @@ -56,7 +56,7 @@ mod tests { use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::ProductValue; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{insert, TestDB}; use crate::db::relational_db::RelationalDB; use crate::execution_context::Workload; use spacetimedb_sats::{product, AlgebraicType}; @@ -86,7 +86,7 @@ mod tests { db.with_auto_commit(Workload::ForTests, |tx| -> ResultTest<_> { for i in 0..100_000 { - db.insert(tx, table_id, product![i, i * 2])?; + insert(&db, tx, table_id, &product![i, i * 2])?; } Ok(()) })?; diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index 6186e479aad..84c438445c8 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -507,11 +507,14 @@ impl<'db, 'tx> DbProgram<'db, 'tx> { Ok(Code::Table(MemTable::new(head, table_access, rows))) } - fn _execute_insert(&mut self, table: &DbTable, rows: Vec) -> Result { + // TODO(centril): investigate taking bsatn as input instead. + fn _execute_insert(&mut self, table: &DbTable, inserts: Vec) -> Result { let tx = self.tx.unwrap_mut(); - let inserts = rows.clone(); // TODO code shouldn't be hot, let's remove later - for row in rows { - self.db.insert(tx, table.table_id, row)?; + let mut scratch = Vec::new(); + for row in &inserts { + row.encode(&mut scratch); + self.db.insert(tx, table.table_id, &scratch)?; + scratch.clear(); } Ok(Code::Pass(Some(Update { table_id: table.table_id, @@ -642,7 +645,7 @@ pub(crate) mod tests { StSequenceRow, StTableFields, StTableRow, ST_COLUMN_ID, ST_COLUMN_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_RESERVED_SEQUENCE_RANGE, ST_SEQUENCE_ID, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_NAME, }; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{insert, TestDB}; use crate::execution_context::Workload; use pretty_assertions::assert_eq; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -694,7 +697,7 @@ pub(crate) mod tests { let schema = db.schema_for_table_mut(tx, table_id)?; for row in rows { - db.insert(tx, table_id, row.clone())?; + insert(db, tx, table_id, &row)?; } Ok(schema) diff --git a/crates/sats/src/algebraic_value.rs b/crates/sats/src/algebraic_value.rs index 3277c6092a7..863a3483e34 100644 --- a/crates/sats/src/algebraic_value.rs +++ b/crates/sats/src/algebraic_value.rs @@ -266,28 +266,6 @@ impl AlgebraicValue { _ => false, } } - - /// Converts `sequence_value` to an appropriate `AlgebraicValue` based on `ty`. - /// Truncates the `sequence_value` to fit `ty`. - /// - /// Panics if `ty` is not an integer type. - pub fn from_sequence_value(ty: &AlgebraicType, sequence_value: i128) -> Self { - match *ty { - AlgebraicType::I8 => (sequence_value as i8).into(), - AlgebraicType::U8 => (sequence_value as u8).into(), - AlgebraicType::I16 => (sequence_value as i16).into(), - AlgebraicType::U16 => (sequence_value as u16).into(), - AlgebraicType::I32 => (sequence_value as i32).into(), - AlgebraicType::U32 => (sequence_value as u32).into(), - AlgebraicType::I64 => (sequence_value as i64).into(), - AlgebraicType::U64 => (sequence_value as u64).into(), - AlgebraicType::I128 => sequence_value.into(), - AlgebraicType::U128 => (sequence_value as u128).into(), - AlgebraicType::I256 => sequence_value.into(), - AlgebraicType::U256 => (sequence_value as u128).into(), - _ => panic!("`{ty:?}` is not a sequence integer type"), - } - } } impl> From> for AlgebraicValue { diff --git a/crates/table/src/layout.rs b/crates/table/src/layout.rs index f1ee56083f5..9c9b02b0d77 100644 --- a/crates/table/src/layout.rs +++ b/crates/table/src/layout.rs @@ -714,9 +714,6 @@ impl<'de> DeserializeSeed<'de> for &AlgebraicTypeLayout { AlgebraicTypeLayout::Primitive(PrimitiveType::F64) => f64::deserialize(de).map(Into::into), AlgebraicTypeLayout::VarLen(VarLenType::Array(ty)) => WithTypespace::empty(&**ty).deserialize(de), AlgebraicTypeLayout::VarLen(VarLenType::String) => >::deserialize(de).map(Into::into), - /* - AlgebraicType::Sum(sum) => self.with(sum).deserialize(de).map(Into::into), - */ } } } diff --git a/crates/table/src/page.rs b/crates/table/src/page.rs index ab6a0d46420..faf93a54e4a 100644 --- a/crates/table/src/page.rs +++ b/crates/table/src/page.rs @@ -1154,6 +1154,21 @@ impl Page { (fixed, var) } + /// Returns a mutable view of the row from `start` lasting `fixed_row_size` number of bytes. + /// + /// This method is safe, but callers should take care that `start` and `fixed_row_size` + /// are correct for this page, and that `start` is aligned. + /// Callers should further ensure that mutations to the row leave the row bytes + /// in an expected state, i.e. initialized where required by the row type, + /// and with `VarLenRef`s that point to valid granules and with correct lengths. + /// + /// This call will clear the unmodified hash + /// as it is expected that the caller will alter the the page. + pub fn get_fixed_row_data_mut(&mut self, start: PageOffset, fixed_row_size: Size) -> &mut Bytes { + self.header.unmodified_hash = None; + &mut self.row_data[start.range(fixed_row_size)] + } + /// Return the total required var-len granules to store `objects`. pub fn total_granules_required_for_objects(objects: &[impl AsRef<[u8]>]) -> usize { objects @@ -1279,6 +1294,8 @@ impl Page { } /// Allocates a space for a fixed size row of `fixed_row_size` in the freelist, if possible. + /// + /// This call will clear the unmodified hash. #[inline] fn alloc_fixed_len_from_freelist(&mut self, fixed_row_size: Size) -> Option { let header = &mut self.header.fixed; @@ -1293,6 +1310,8 @@ impl Page { } /// Allocates a space for a fixed size row of `fixed_row_size` in the freelist, if possible. + /// + /// This call will clear the unmodified hash. #[inline] fn alloc_fixed_len_from_gap(&mut self, fixed_row_size: Size) -> Option { if gap_enough_size_for_row(self.header.var.first, self.header.fixed.last, fixed_row_size) { @@ -1376,6 +1395,8 @@ impl Page { /// Free a row, marking its fixed-len and var-len storage granules as available for re-use. /// + /// This call will clear the unmodified hash. + /// /// # Safety /// /// - `fixed_row` must point to a valid row in this page. diff --git a/crates/table/src/pages.rs b/crates/table/src/pages.rs index c434f91eafc..7dc74e01c47 100644 --- a/crates/table/src/pages.rs +++ b/crates/table/src/pages.rs @@ -64,7 +64,6 @@ impl Pages { /// /// Used in benchmarks. Internal operators will prefer directly indexing into `self.pages`, /// as that allows split borrows. - #[doc(hidden)] // Used in benchmarks. pub fn get_page_mut(&mut self, page: PageIndex) -> &mut Page { &mut self.pages[page.idx()] } diff --git a/crates/table/src/read_column.rs b/crates/table/src/read_column.rs index f1ad4825cc1..1295d01e621 100644 --- a/crates/table/src/read_column.rs +++ b/crates/table/src/read_column.rs @@ -332,6 +332,7 @@ impl_read_column_via_from! { u32 => spacetimedb_primitives::IndexId; u32 => spacetimedb_primitives::ConstraintId; u32 => spacetimedb_primitives::SequenceId; + u32 => spacetimedb_primitives::ScheduleId; u128 => Packed; i128 => Packed; u256 => Box; diff --git a/crates/table/src/static_bsatn_validator.rs b/crates/table/src/static_bsatn_validator.rs index 22cfcfbe040..2a1078ba1cf 100644 --- a/crates/table/src/static_bsatn_validator.rs +++ b/crates/table/src/static_bsatn_validator.rs @@ -286,6 +286,7 @@ impl Insn { impl MemoryUsage for Insn {} +#[derive(Clone)] pub struct StaticBsatnValidator { /// The list of instructions that make up this program. insns: Arc<[Insn]>, diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index bbdd4c9a032..6d756babe78 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -1,6 +1,12 @@ +use crate::{ + bflatn_to::write_row_to_pages_bsatn, + layout::AlgebraicTypeLayout, + static_bsatn_validator::{static_bsatn_validator, validate_bsatn, StaticBsatnValidator}, +}; + use super::{ bflatn_from::serialize_row_from_page, - bflatn_to::write_row_to_pages, + bflatn_to::{write_row_to_pages, Error}, blob_store::BlobStore, btree_index::{BTreeIndex, BTreeIndexRangeIter}, eq::eq_row_in_page, @@ -18,22 +24,27 @@ use super::{ var_len::VarLenMembers, MemoryUsage, }; -use core::hash::{Hash, Hasher}; use core::ops::RangeBounds; use core::{fmt, ptr}; +use core::{ + hash::{Hash, Hasher}, + hint::unreachable_unchecked, +}; use derive_more::{Add, AddAssign, From, Sub}; +use smallvec::SmallVec; use spacetimedb_data_structures::map::{DefaultHashBuilder, HashCollectionExt, HashMap}; use spacetimedb_lib::{bsatn::DecodeError, de::DeserializeOwned}; -use spacetimedb_primitives::{ColId, ColList, IndexId}; +use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId}; use spacetimedb_sats::{ algebraic_value::ser::ValueSerializer, bsatn::{self, ser::BsatnError, ToBsatn}, + i256, product_value::InvalidFieldError, satn::Satn, ser::{Serialize, Serializer}, - AlgebraicValue, ProductType, ProductValue, + u256, AlgebraicValue, ProductType, ProductValue, }; -use spacetimedb_schema::schema::TableSchema; +use spacetimedb_schema::{schema::TableSchema, type_for_generate::PrimitiveType}; use std::sync::Arc; use thiserror::Error; @@ -43,6 +54,9 @@ pub struct BlobNumBytes(usize); impl MemoryUsage for BlobNumBytes {} +pub type SeqIdList = SmallVec<[SequenceId; 4]>; +static_assert_size!(SeqIdList, 24); + /// A database table containing the row schema, the rows, and indices. /// /// The table stores the rows into a page manager @@ -81,7 +95,7 @@ pub(crate) struct TableInner { row_layout: RowTypeLayout, /// A [`StaticLayout`] for fast BFLATN <-> BSATN conversion, /// if the [`RowTypeLayout`] has a static BSATN length and layout. - static_layout: Option, + static_layout: Option<(StaticLayout, StaticBsatnValidator)>, /// The visitor program for `row_layout`. /// /// Must be in the `TableInner` so that [`RowRef::blob_store_bytes`] can use it. @@ -125,7 +139,7 @@ impl TableInner { } } -static_assert_size!(Table, 256); +static_assert_size!(Table, 272); impl MemoryUsage for Table { fn heap_usage(&self) -> usize { @@ -191,7 +205,7 @@ impl Table { /// Creates a new empty table with the given `schema` and `squashed_offset`. pub fn new(schema: Arc, squashed_offset: SquashedOffset) -> Self { let row_layout: RowTypeLayout = schema.get_row_type().clone().into(); - let static_layout = StaticLayout::for_row_type(&row_layout); + let static_layout = StaticLayout::for_row_type(&row_layout).map(|sl| (sl, static_bsatn_validator(&row_layout))); let visitor_prog = row_type_visitor(&row_layout); Self::new_with_indexes_capacity(schema, row_layout, static_layout, visitor_prog, squashed_offset, 0) } @@ -239,9 +253,7 @@ impl Table { // Insert row into indices and check unique constraints. // SAFETY: We just inserted `ptr`, so it must be present. - unsafe { - self.insert_into_indices(blob_store, ptr)?; - } + unsafe { self.insert_into_indices(blob_store, ptr) }?; // SAFETY: We just inserted `ptr`, // and `insert_into_indices` didn't remove it, @@ -266,8 +278,7 @@ impl Table { // SAFETY: `ptr` was derived from `row_ref` without interleaving calls, so it must be valid. unsafe { self.insert_into_pointer_map(blob_store, ptr, hash) }?; - self.row_count += 1; - self.blob_store_bytes += blob_bytes; + self.update_statistics_added_row(blob_bytes); Ok((hash, ptr)) } @@ -300,6 +311,179 @@ impl Table { Ok((row_ref, blob_bytes)) } + /// Physically insert a `row`, encoded in BSATN, into this table, + /// storing its large var-len members in the `blob_store`. + /// + /// On success, returns the hash of the newly-inserted row, + /// and a `RowRef` referring to the row. + /// + /// This does not check for set semantic or unique constraints. + /// + /// This is also useful when we need to insert a row temporarily to get back a `RowPointer`. + /// In this case, A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`]. + pub fn insert_physically_bsatn<'a>( + &'a mut self, + blob_store: &'a mut dyn BlobStore, + row: &[u8], + ) -> Result<(RowRef<'a>, BlobNumBytes), InsertError> { + // Got a static layout? => Use fast-path insertion. + let (ptr, blob_bytes) = if let Some((static_layout, static_validator)) = self.inner.static_layout.as_ref() { + // Before inserting, validate the row, ensuring type safety. + // SAFETY: The `static_validator` was derived from the same row layout as the static layout. + unsafe { validate_bsatn(static_validator, static_layout, row) }.map_err(Error::Decode)?; + + let fixed_row_size = self.inner.row_layout.size(); + let squashed_offset = self.squashed_offset; + let res = self + .inner + .pages + .with_page_to_insert_row(fixed_row_size, 0, |page| { + // SAFETY: We've used the right `row_size` and we trust that others have too. + // `RowTypeLayout` also ensures that we satisfy the minimum row size. + let fixed_offset = unsafe { page.alloc_fixed_len(fixed_row_size) }.map_err(Error::PageError)?; + let (mut fixed, _) = page.split_fixed_var_mut(); + let fixed_buf = fixed.get_row_mut(fixed_offset, fixed_row_size); + // SAFETY: + // - We've validated that `row` is of sufficient length. + // - The `fixed_buf` is exactly the right `fixed_row_size`. + unsafe { static_layout.deserialize_row_into(fixed_buf, row) }; + Ok(fixed_offset) + }) + .map_err(Error::PagesError)?; + match res { + (page, Ok(offset)) => (RowPointer::new(false, page, offset, squashed_offset), 0.into()), + (_, Err(e)) => return Err(e), + } + } else { + // SAFETY: `self.pages` is known to be specialized for `self.row_layout`, + // as `self.pages` was constructed from `self.row_layout` in `Table::new`. + unsafe { + write_row_to_pages_bsatn( + &mut self.inner.pages, + &self.inner.visitor_prog, + blob_store, + &self.inner.row_layout, + row, + self.squashed_offset, + ) + }? + }; + + // SAFETY: We just inserted `ptr`, so it must be present. + let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) }; + + Ok((row_ref, blob_bytes)) + } + + /// Returns all the columns with sequences that need generation for this `row`. + /// + /// # Safety + /// + /// `self.is_row_present(row)` must hold. + pub unsafe fn sequence_triggers_for<'a>( + &'a self, + blob_store: &'a dyn BlobStore, + row: RowPointer, + ) -> (ColList, SeqIdList) { + let sequences = &*self.get_schema().sequences; + let row_ty = self.row_layout().product(); + + // SAFETY: Caller promised that `self.is_row_present(row)` holds. + let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, row) }; + + sequences + .iter() + // Find all the sequences that are triggered by this row. + .filter(|seq| { + // SAFETY: `seq.col_pos` is in-bounds of `row_ty.elements` + // as `row_ty` was derived from the same schema as `seq` is part of. + let elem_ty = unsafe { &row_ty.elements.get_unchecked(seq.col_pos.idx()) }; + // SAFETY: + // - `elem_ty` appears as a column in th row type. + // - `AlgebraicValue` is compatible with all types. + let val = unsafe { AlgebraicValue::unchecked_read_column(row_ref, elem_ty) }; + val.is_numeric_zero() + }) + .map(|seq| (seq.col_pos, seq.sequence_id)) + .unzip() + } + + /// Writes `seq_val` to the column at `col_id` in the row identified by `ptr`. + /// + /// Truncates the `seq_val` to fit the type of the column. + /// + /// # Safety + /// + /// - `self.is_row_present(row)` must hold. + /// - `col_id` must be a valid column, with a primiive type, of the row type. + pub unsafe fn write_gen_val_to_col(&mut self, col_id: ColId, ptr: RowPointer, seq_val: i128) { + let row_ty = self.inner.row_layout.product(); + let elem_ty = unsafe { row_ty.elements.get_unchecked(col_id.idx()) }; + let AlgebraicTypeLayout::Primitive(col_typ) = elem_ty.ty else { + // SAFETY: Columns with sequences must be primitive types. + unsafe { unreachable_unchecked() } + }; + + let fixed_row_size = self.inner.row_layout.size(); + let fixed_buf = self.inner.pages[ptr.page_index()].get_fixed_row_data_mut(ptr.page_offset(), fixed_row_size); + + fn write(dst: &mut [u8], offset: u16, bytes: [u8; N]) { + let offset = offset as usize; + dst[offset..offset + N].copy_from_slice(&bytes); + } + + match col_typ { + PrimitiveType::I8 => write(fixed_buf, elem_ty.offset, (seq_val as i8).to_le_bytes()), + PrimitiveType::U8 => write(fixed_buf, elem_ty.offset, (seq_val as u8).to_le_bytes()), + PrimitiveType::I16 => write(fixed_buf, elem_ty.offset, (seq_val as i16).to_le_bytes()), + PrimitiveType::U16 => write(fixed_buf, elem_ty.offset, (seq_val as u16).to_le_bytes()), + PrimitiveType::I32 => write(fixed_buf, elem_ty.offset, (seq_val as i32).to_le_bytes()), + PrimitiveType::U32 => write(fixed_buf, elem_ty.offset, (seq_val as u32).to_le_bytes()), + PrimitiveType::I64 => write(fixed_buf, elem_ty.offset, (seq_val as i64).to_le_bytes()), + PrimitiveType::U64 => write(fixed_buf, elem_ty.offset, (seq_val as u64).to_le_bytes()), + PrimitiveType::I128 => write(fixed_buf, elem_ty.offset, seq_val.to_le_bytes()), + PrimitiveType::U128 => write(fixed_buf, elem_ty.offset, (seq_val as u128).to_le_bytes()), + PrimitiveType::I256 => write(fixed_buf, elem_ty.offset, (i256::from(seq_val)).to_le_bytes()), + PrimitiveType::U256 => write(fixed_buf, elem_ty.offset, (u256::from(seq_val as u128)).to_le_bytes()), + PrimitiveType::Bool | PrimitiveType::F32 | PrimitiveType::F64 => { + panic!("`{:?}` is not a sequence integer type", &elem_ty.ty) + } + } + } + + /// Performs all the checks necessary after having fully decided on a rows contents. + /// + /// On `Ok(_)`, statistics of the table are also updated, + /// and the `ptr` still points to a valid row, and otherwise not. + /// + /// # Safety + /// + /// `self.is_row_present(row)` must hold. + pub fn confirm_insertion<'a>( + &'a mut self, + blob_store: &'a mut dyn BlobStore, + ptr: RowPointer, + blob_bytes: BlobNumBytes, + ) -> Result<(RowHash, RowPointer), InsertError> { + // SAFETY: Caller promised that `self.is_row_present(row)` holds. + let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, ptr) }; + let hash = row_ref.row_hash(); + // SAFETY: Caller promised that `self.is_row_present(row)` holds. + unsafe { self.insert_into_pointer_map(blob_store, ptr, hash) }?; + // SAFETY: Caller promised that `self.is_row_present(row)` holds. + unsafe { self.insert_into_indices(blob_store, ptr) }?; + + self.update_statistics_added_row(blob_bytes); + Ok((hash, ptr)) + } + + /// We've added a row, update the statistics to record this. + #[inline] + fn update_statistics_added_row(&mut self, blob_bytes: BlobNumBytes) { + self.row_count += 1; + self.blob_store_bytes += blob_bytes; + } + /// Insert row identified by `ptr` into indices. /// This also checks unique constraints. /// Deletes the row if there were any violations. @@ -415,7 +599,7 @@ impl Table { committed_offset, tx_offset, &committed_table.inner.row_layout, - committed_table.inner.static_layout.as_ref(), + committed_table.static_layout(), ) } }) @@ -675,7 +859,7 @@ impl Table { pub fn clone_structure(&self, squashed_offset: SquashedOffset) -> Self { let schema = self.schema.clone(); let layout = self.row_layout().clone(); - let sbl = self.static_layout().cloned(); + let sbl = self.inner.static_layout.clone(); let visitor = self.inner.visitor_prog.clone(); let mut new = Table::new_with_indexes_capacity(schema, layout, sbl, visitor, squashed_offset, self.indexes.len()); @@ -841,21 +1025,16 @@ impl<'a> RowRef<'a> { RowHash(RowHash::hasher_builder().hash_one(self)) } - /// The length of this row when BSATN-encoded. - /// - /// Only available for rows whose types have a static BSATN layout. - /// Returns `None` for rows of other types, e.g. rows containing strings. - pub fn bsatn_length(&self) -> Option { - self.table.static_layout.as_ref().map(|s| s.bsatn_length as usize) + /// Returns the static layout for this row reference, if any. + pub fn static_layout(&self) -> Option<&StaticLayout> { + self.table.static_layout.as_ref().map(|(s, _)| s) } /// Encode the row referred to by `self` into a `Vec` using BSATN and then deserialize it. - /// The passed buffer is allowed to be in an arbitrary state before and after this operation. pub fn read_via_bsatn(&self, scratch: &mut Vec) -> Result where T: DeserializeOwned, { - scratch.clear(); self.to_bsatn_extend(scratch)?; Ok(bsatn::from_slice::(scratch)?) } @@ -903,7 +1082,7 @@ impl ToBsatn for RowRef<'_> { /// This method will use a [`StaticLayout`] if one is available, /// and may therefore be faster than calling [`bsatn::to_vec`]. fn to_bsatn_vec(&self) -> Result, BsatnError> { - if let Some(static_layout) = &self.table.static_layout { + if let Some(static_layout) = self.static_layout() { // Use fast path, by first fetching the row data and then using the static layout. let row = self.get_row_data(); // SAFETY: @@ -921,7 +1100,7 @@ impl ToBsatn for RowRef<'_> { /// This method will use a [`StaticLayout`] if one is available, /// and may therefore be faster than calling [`bsatn::to_writer`]. fn to_bsatn_extend(&self, buf: &mut Vec) -> Result<(), BsatnError> { - if let Some(static_layout) = &self.table.static_layout { + if let Some(static_layout) = self.static_layout() { // Use fast path, by first fetching the row data and then using the static layout. let row = self.get_row_data(); // SAFETY: @@ -938,7 +1117,7 @@ impl ToBsatn for RowRef<'_> { } fn static_bsatn_size(&self) -> Option { - self.table.static_layout.as_ref().map(|sbl| sbl.bsatn_length) + self.static_layout().map(|sl| sl.bsatn_length) } } @@ -957,7 +1136,7 @@ impl PartialEq for RowRef<'_> { } let (page_a, offset_a) = self.page_and_offset(); let (page_b, offset_b) = other.page_and_offset(); - let static_layout = self.table.static_layout.as_ref(); + let static_layout = self.static_layout(); // SAFETY: `offset_a/b` are valid rows in `page_a/b` typed at `a_ty` // and `static_bsatn_layout` is derived from `a_ty`. unsafe { eq_row_in_page(page_a, page_b, offset_a, offset_b, a_ty, static_layout) } @@ -1133,7 +1312,7 @@ impl Table { fn new_with_indexes_capacity( schema: Arc, row_layout: RowTypeLayout, - static_layout: Option, + static_layout: Option<(StaticLayout, StaticBsatnValidator)>, visitor_prog: VarLenVisitorProgram, squashed_offset: SquashedOffset, indexes_capacity: usize, @@ -1215,7 +1394,7 @@ impl Table { /// Returns the [`StaticLayout`] for this table, pub(crate) fn static_layout(&self) -> Option<&StaticLayout> { - self.inner.static_layout.as_ref() + self.inner.static_layout.as_ref().map(|(s, _)| s) } /// Rebuild the [`PointerMap`] by iterating over all the rows in `self` and inserting them. From 5802273f1ac98a68804fe6ad21aaaa4235fd9343 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Thu, 2 Jan 2025 14:15:24 +0100 Subject: [PATCH 4/9] InstanceEnv::insert: fix gen col write-back --- crates/core/src/host/instance_env.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index d3d40c9525a..1c9df54018b 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -10,7 +10,7 @@ use spacetimedb_primitives::{ColId, IndexId, TableId}; use spacetimedb_sats::{ bsatn::{self, ToBsatn}, buffer::{CountWriter, TeeWriter}, - ProductValue, + AlgebraicValue, ProductValue, }; use spacetimedb_table::table::UniqueConstraintViolation; use std::ops::DerefMut; @@ -124,11 +124,17 @@ impl InstanceEnv { let (row_len, row_ptr) = stdb .insert(tx, table_id, buffer) .map(|(gen_cols, row_ref)| { - // Write back the generated column values to `buffer` - // and the encoded length to `row_len`. + // We get back a col-list with the columns with generated values. + // Write those back to `buffer` and then the encoded length to `row_len`. let counter = CountWriter::default(); let mut writer = TeeWriter::new(counter, buffer); - bsatn::to_writer(&mut writer, &gen_cols).unwrap(); + for col in gen_cols.iter() { + // Read the column value to AV and then serialize. + let val = row_ref + .read_col::(col) + .expect("reading col as AV never panics"); + bsatn::to_writer(&mut writer, &val).unwrap(); + } let row_len = writer.w1.finish(); (row_len, row_ref.pointer()) From 55ac51e56fa4cfc42189508ffab8e0079057e370 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Thu, 2 Jan 2025 15:41:13 +0100 Subject: [PATCH 5/9] insert_via_serialize_bsatn: fix nested mut borrow bug --- crates/core/src/db/datastore/locking_tx_datastore/mod.rs | 2 +- .../core/src/db/datastore/locking_tx_datastore/mut_tx.rs | 7 ++++++- crates/core/src/db/datastore/system_tables.rs | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index c5c87da46da..b40ba83107d 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -1,4 +1,4 @@ -#![forbid(unsafe_op_in_unsafe_fn)] +#![deny(unsafe_op_in_unsafe_fn)] pub mod committed_state; pub mod datastore; diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index 90102884648..3926f31ad1a 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -23,6 +23,7 @@ use crate::{ error::{IndexError, SequenceError, TableError}, execution_context::ExecutionContext, }; +use core::cell::RefCell; use core::ops::RangeBounds; use core::{iter, ops::Bound}; use smallvec::SmallVec; @@ -1165,7 +1166,11 @@ impl MutTxId { table_id: TableId, row: &T, ) -> Result<(ColList, RowRefInsertion<'a>)> { - with_sys_table_buf(|buf| { + thread_local! { + static BUF: RefCell> = const { RefCell::new(Vec::new()) }; + } + BUF.with_borrow_mut(|buf| { + buf.clear(); to_writer(buf, row).unwrap(); self.insert::(table_id, buf) }) diff --git a/crates/core/src/db/datastore/system_tables.rs b/crates/core/src/db/datastore/system_tables.rs index 02ec48e957e..da465527ff8 100644 --- a/crates/core/src/db/datastore/system_tables.rs +++ b/crates/core/src/db/datastore/system_tables.rs @@ -1312,7 +1312,7 @@ pub(crate) fn with_sys_table_buf(run: impl FnOnce(&mut Vec) -> R) -> R { }) } -/// Read a value from a system table via BSatn. +/// Read a value from a system table via BSATN. fn read_via_bsatn(row: RowRef<'_>) -> Result { with_sys_table_buf(|buf| Ok(row.read_via_bsatn::(buf)?)) } From 4af07053fa3c1d2392d0797b860e1bedef8ecdb0 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Thu, 2 Jan 2025 16:15:26 +0100 Subject: [PATCH 6/9] Table::confirm_insertion: fix underflow bug --- crates/table/src/table.rs | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 6d756babe78..013159a9d92 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -30,7 +30,7 @@ use core::{ hash::{Hash, Hasher}, hint::unreachable_unchecked, }; -use derive_more::{Add, AddAssign, From, Sub}; +use derive_more::{Add, AddAssign, From, Sub, SubAssign}; use smallvec::SmallVec; use spacetimedb_data_structures::map::{DefaultHashBuilder, HashCollectionExt, HashMap}; use spacetimedb_lib::{bsatn::DecodeError, de::DeserializeOwned}; @@ -49,7 +49,7 @@ use std::sync::Arc; use thiserror::Error; /// The number of bytes used by, added to, or removed from a [`Table`]'s share of a [`BlobStore`]. -#[derive(Copy, Clone, PartialEq, Eq, Debug, Default, From, Add, Sub, AddAssign)] +#[derive(Copy, Clone, PartialEq, Eq, Debug, Default, From, Add, Sub, AddAssign, SubAssign)] pub struct BlobNumBytes(usize); impl MemoryUsage for BlobNumBytes {} @@ -484,6 +484,13 @@ impl Table { self.blob_store_bytes += blob_bytes; } + /// We've removed a row, update the statistics to record this. + #[inline] + fn update_statistics_deleted_row(&mut self, blob_bytes: BlobNumBytes) { + self.row_count -= 1; + self.blob_store_bytes -= blob_bytes; + } + /// Insert row identified by `ptr` into indices. /// This also checks unique constraints. /// Deletes the row if there were any violations. @@ -661,31 +668,31 @@ impl Table { /// Deletes the row identified by `ptr` from the table. /// + /// Returns the number of blob bytes added. This method does not update statistics by itself. + /// /// NOTE: This method skips updating indexes. /// Use `delete_unchecked` or `delete` to delete a row with index updating. /// /// SAFETY: `self.is_row_present(row)` must hold. - unsafe fn delete_internal(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) { + unsafe fn delete_internal(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes { // SAFETY: `self.is_row_present(row)` holds. let row = unsafe { self.get_row_ref_unchecked(blob_store, ptr) }; // Remove the set semantic association. let _remove_result = self.pointer_map.remove(row.row_hash(), ptr); debug_assert!(_remove_result); - self.row_count -= 1; // Delete the physical row. // SAFETY: `ptr` points to a valid row in this table as `self.is_row_present(row)` holds. - let blob_store_deleted_bytes = unsafe { self.delete_internal_skip_pointer_map(blob_store, ptr) }; - // Just deleted bytes (`blob_store_deleted_bytes`) - // cannot be greater than the total number of bytes (`self.blob_store_bytes`). - self.blob_store_bytes = self.blob_store_bytes - blob_store_deleted_bytes; + unsafe { self.delete_internal_skip_pointer_map(blob_store, ptr) } } /// Deletes the row identified by `ptr` from the table. /// + /// Returns the number of blob bytes added. This method does not update statistics by itself. + /// /// SAFETY: `self.is_row_present(row)` must hold. - unsafe fn delete_unchecked(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) { + unsafe fn delete_unchecked(&mut self, blob_store: &mut dyn BlobStore, ptr: RowPointer) -> BlobNumBytes { // SAFETY: `self.is_row_present(row)` holds. let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) }; @@ -724,7 +731,8 @@ impl Table { let ret = before(row_ref); // SAFETY: We've checked above that `self.is_row_present(ptr)`. - unsafe { self.delete_unchecked(blob_store, ptr) } + let blob_bytes_deleted = unsafe { self.delete_unchecked(blob_store, ptr) }; + self.update_statistics_deleted_row(blob_bytes_deleted); Some(ret) } @@ -761,13 +769,14 @@ impl Table { // If an equal row was present, delete it. if let Some(existing_row_ptr) = existing_row_ptr { - if skip_index_update { + let blob_bytes_deleted = if skip_index_update { // SAFETY: `find_same_row` ensures that the pointer is valid. unsafe { self.delete_internal(blob_store, existing_row_ptr) } } else { // SAFETY: `find_same_row` ensures that the pointer is valid. unsafe { self.delete_unchecked(blob_store, existing_row_ptr) } - } + }; + self.update_statistics_deleted_row(blob_bytes_deleted); } // Remove the temporary row we inserted in the beginning. From 78abf7cbd8a0debd05eac9202a0818137a2594b7 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Fri, 3 Jan 2025 12:47:08 +0100 Subject: [PATCH 7/9] 1. Improve test coverage with `insert_bsatn_same_as_pv`, which asserts that `table.insert(..)` does the same as using the bsatn path. Sprinkles various `Debug, PartialEq, Eq` derives to achieve this. Also uses `confirm_insertion` more to get that more under test. 2. Review and justify some unsafes. --- .../locking_tx_datastore/committed_state.rs | 2 +- .../datastore/locking_tx_datastore/mut_tx.rs | 24 +++-- crates/table/src/bflatn_to.rs | 2 +- crates/table/src/blob_store.rs | 3 +- crates/table/src/btree_index.rs | 4 +- crates/table/src/btree_index/multimap.rs | 2 +- crates/table/src/btree_index/uniquemap.rs | 2 +- crates/table/src/fixed_bit_set.rs | 10 +- crates/table/src/page.rs | 12 +-- crates/table/src/pages.rs | 4 +- crates/table/src/row_type_visitor.rs | 4 +- crates/table/src/static_bsatn_validator.rs | 6 +- crates/table/src/table.rs | 100 ++++++++++++------ 13 files changed, 115 insertions(+), 60 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index a74ae630905..765d158e06d 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -334,7 +334,7 @@ impl CommittedState { row: &ProductValue, ) -> Result<()> { let (table, blob_store) = self.get_table_and_blob_store_or_create(table_id, schema); - table.insert_internal(blob_store, row).map_err(TableError::Insert)?; + table.insert_for_replay(blob_store, row).map_err(TableError::Insert)?; Ok(()) } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index 3926f31ad1a..73c91bd67b5 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -1202,6 +1202,12 @@ impl MutTxId { // 1. Insert the physical row. // 2. Detect, generate, write sequence values. // 3. Confirm that the insertion respects constraints and update statistics. + // 4. Post condition (PC.INS.1): + // `res = Ok((hash, ptr))` + // => `ptr` refers to a valid row in `table_id` for `tx_table` + // ∧ `hash` is the hash of this row + // This follows from both `Ok(_)` branches leading to `confirm_insertion` + // which both entail the above post-condition. let ((tx_table, tx_blob_store, delete_table), gen_cols, res) = match tx_table .insert_physically_bsatn(tx_blob_store, row) { @@ -1229,12 +1235,15 @@ impl MutTxId { unsafe { tx_table.write_gen_val_to_col(col_id, tx_row_ptr, seq_val) }; } - let res = tx_table.confirm_insertion(tx_blob_store, tx_row_ptr, blob_bytes); + // SAFETY: `self.is_row_present(row)` holds as we still haven't deleted the row, + // in particular, the `write_gen_val_to_col` call does not remove the row. + let res = unsafe { tx_table.confirm_insertion(tx_blob_store, tx_row_ptr, blob_bytes) }; ((tx_table, tx_blob_store, delete_table), cols_to_gen, res) } Ok((tx_row_ref, blob_bytes)) => { let tx_row_ptr = tx_row_ref.pointer(); - let res = tx_table.confirm_insertion(tx_blob_store, tx_row_ptr, blob_bytes); + // SAFETY: `self.is_row_present(row)` holds as we just inserted the row. + let res = unsafe { tx_table.confirm_insertion(tx_blob_store, tx_row_ptr, blob_bytes) }; // SAFETY: By virtue of `get_table_and_blob_store_or_maybe_create_from` above succeeding, // we can assume we have an insert and delete table. ( @@ -1266,7 +1275,7 @@ impl MutTxId { // SAFETY: // - `commit_table` and `tx_table` use the same schema // because `tx_table` is derived from `commit_table`. - // - `tx_row_ptr` and `tx_row_hash` are correct because we just got them from `tx_table.insert`. + // - `tx_row_ptr` and `tx_row_hash` are correct per (PC.INS.1). if let Some(commit_ptr) = unsafe { Table::find_same_row(commit_table, tx_table, tx_row_ptr, tx_row_hash) } { @@ -1313,8 +1322,8 @@ impl MutTxId { } // Pacify the borrow checker. - // SAFETY: `tx_row_ptr` came from `tx_table.insert` just now - // without any interleaving `&mut` calls that could invalidate the pointer. + // SAFETY: `tx_row_ptr` is still correct for `tx_table` per (PC.INS.1). + // as there haven't been any interleaving `&mut` calls that could invalidate the pointer. let tx_row_ref = unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) }; // (2) The `tx_row_ref` did not violate a unique constraint *within* the `tx_table`, @@ -1331,7 +1340,8 @@ impl MutTxId { } let rri = RowRefInsertion::Inserted(unsafe { - // SAFETY: `ptr` came from `tx_table.insert` just now without any interleaving calls. + // SAFETY: `tx_row_ptr` is still correct for `tx_table` per (PC.INS.1). + // as there haven't been any interleaving `&mut` calls that could invalidate the pointer. tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) }); Ok((gen_cols, rri)) @@ -1407,7 +1417,7 @@ impl MutTxId { // We need `insert_internal_allow_duplicate` rather than `insert` here // to bypass unique constraint checks. - match tx_table.insert_internal_allow_duplicate(tx_blob_store, rel) { + match tx_table.insert_physically_pv(tx_blob_store, rel) { Err(err @ InsertError::Bflatn(_)) => Err(TableError::Insert(err).into()), Err(e) => unreachable!( "Table::insert_internal_allow_duplicates returned error of unexpected variant: {:?}", diff --git a/crates/table/src/bflatn_to.rs b/crates/table/src/bflatn_to.rs index 7c7cc867203..4263d5936ff 100644 --- a/crates/table/src/bflatn_to.rs +++ b/crates/table/src/bflatn_to.rs @@ -23,7 +23,7 @@ use spacetimedb_sats::{ }; use thiserror::Error; -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq, Eq)] pub enum Error { #[error(transparent)] Decode(#[from] DecodeError), diff --git a/crates/table/src/blob_store.rs b/crates/table/src/blob_store.rs index 9fb0611e202..3431b5b35cf 100644 --- a/crates/table/src/blob_store.rs +++ b/crates/table/src/blob_store.rs @@ -139,7 +139,7 @@ impl BlobStore for NullBlobStore { /// A blob store that is backed by a hash map with a reference counted value. /// Used for tests when you need an actual blob store. -#[derive(Default)] +#[derive(Default, PartialEq, Eq, Debug)] pub struct HashMapBlobStore { /// For testing, we use a hash map with a reference count /// to handle freeing and cloning correctly. @@ -154,6 +154,7 @@ impl MemoryUsage for HashMapBlobStore { } /// A blob object including a reference count and the data. +#[derive(PartialEq, Eq, Debug)] struct BlobObject { /// Reference count of the blob. uses: usize, diff --git a/crates/table/src/btree_index.rs b/crates/table/src/btree_index.rs index 7b2748f7dfd..6635370aa61 100644 --- a/crates/table/src/btree_index.rs +++ b/crates/table/src/btree_index.rs @@ -134,7 +134,7 @@ impl Iterator for BTreeIndexRangeIter<'_> { /// An index from a key type determined at runtime to `RowPointer`(s). /// /// See module docs for info about specialization. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] enum TypedIndex { // All the non-unique index types. Bool(Index), @@ -627,7 +627,7 @@ impl TypedIndex { } /// A B-Tree based index on a set of [`ColId`]s of a table. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub struct BTreeIndex { /// The ID of this index. pub index_id: IndexId, diff --git a/crates/table/src/btree_index/multimap.rs b/crates/table/src/btree_index/multimap.rs index 391c76cdeb4..5fcfe94e27d 100644 --- a/crates/table/src/btree_index/multimap.rs +++ b/crates/table/src/btree_index/multimap.rs @@ -6,7 +6,7 @@ use std::collections::btree_map::{BTreeMap, Range}; use crate::MemoryUsage; /// A multi map that relates a `K` to a *set* of `V`s. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub struct MultiMap { /// The map is backed by a `BTreeMap` for relating keys to values. /// diff --git a/crates/table/src/btree_index/uniquemap.rs b/crates/table/src/btree_index/uniquemap.rs index b71c7aead2e..01de3758bf1 100644 --- a/crates/table/src/btree_index/uniquemap.rs +++ b/crates/table/src/btree_index/uniquemap.rs @@ -5,7 +5,7 @@ use std::collections::btree_map::{BTreeMap, Entry, Range}; /// A "unique map" that relates a `K` to a `V`. /// /// (This is just a `BTreeMap`) with a slightly modified interface. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub struct UniqueMap { /// The map is backed by a `BTreeMap` for relating a key to a value. map: BTreeMap, diff --git a/crates/table/src/fixed_bit_set.rs b/crates/table/src/fixed_bit_set.rs index 76792dcc3a4..608179fde05 100644 --- a/crates/table/src/fixed_bit_set.rs +++ b/crates/table/src/fixed_bit_set.rs @@ -1,4 +1,5 @@ use core::{ + cmp, fmt, ops::{BitAnd, BitAndAssign, BitOr, Not, Shl}, slice::Iter, }; @@ -151,7 +152,8 @@ mod internal_unsafe { } } -impl std::cmp::PartialEq for FixedBitSet { +impl cmp::Eq for FixedBitSet {} +impl cmp::PartialEq for FixedBitSet { fn eq(&self, other: &Self) -> bool { self.storage() == other.storage() } @@ -251,6 +253,12 @@ impl MemoryUsage for FixedBitSet { } } +impl fmt::Debug for FixedBitSet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_set().entries(self.iter_set()).finish() + } +} + /// An iterator that yields the set indices of a [`FixedBitSet`]. pub struct IterSet<'a, B = DefaultBitBlock> { /// The block iterator. diff --git a/crates/table/src/page.rs b/crates/table/src/page.rs index faf93a54e4a..c1084f8fcc9 100644 --- a/crates/table/src/page.rs +++ b/crates/table/src/page.rs @@ -44,7 +44,7 @@ use core::{mem, ops::ControlFlow, ptr}; use spacetimedb_lib::{de::Deserialize, ser::Serialize}; use thiserror::Error; -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq, Eq)] pub enum Error { #[error("Want to allocate a var-len object of {need} granules, but have only {have} granules available")] InsufficientVarLenSpace { need: u16, have: u16 }, @@ -55,7 +55,7 @@ pub enum Error { /// A cons-cell in a freelist either /// for an unused fixed-len cell or a variable-length granule. #[repr(C)] // Required for a stable ABI. -#[derive(Clone, Copy, Debug, bytemuck::NoUninit, Serialize, Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, bytemuck::NoUninit, Serialize, Deserialize)] struct FreeCellRef { /// The address of the next free cell in a freelist. /// @@ -133,7 +133,7 @@ impl FreeCellRef { /// All the fixed size header information. #[repr(C)] // Required for a stable ABI. -#[derive(Serialize, Deserialize)] // So we can dump and restore pages during snapshotting. +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] // So we can dump and restore pages during snapshotting. struct FixedHeader { /// A pointer to the head of the freelist which stores /// all the unused (freed) fixed row cells. @@ -226,7 +226,7 @@ impl FixedHeader { /// All the var-len header information. #[repr(C)] // Required for a stable ABI. -#[derive(bytemuck::NoUninit, Clone, Copy, Serialize, Deserialize)] +#[derive(bytemuck::NoUninit, Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] struct VarHeader { /// A pointer to the head of the freelist which stores /// all the unused (freed) var-len granules. @@ -290,7 +290,7 @@ impl VarHeader { /// as the whole [`Page`] is `Box`ed. #[repr(C)] // Required for a stable ABI. #[repr(align(64))] // Alignment must be same as `VarLenGranule::SIZE`. -#[derive(Serialize, Deserialize)] // So we can dump and restore pages during snapshotting. +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] // So we can dump and restore pages during snapshotting. struct PageHeader { /// The header data relating to the fixed component of a row. fixed: FixedHeader, @@ -386,7 +386,7 @@ const _VLG_ALIGN_MULTIPLE_OF_FCR: () = assert!(mem::align_of::() // ^-- Must have align at least that of `VarLenGranule`, // so that `row_data[PageOffset::PAGE_END - VarLenGranule::SIZE]` is an aligned pointer to `VarLenGranule`. // TODO(bikeshedding): consider raising the alignment. We may want this to be OS page (4096) aligned. -#[derive(Serialize, Deserialize)] // So we can dump and restore pages during snapshotting. +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] // So we can dump and restore pages during snapshotting. pub struct Page { /// The header containing metadata on how to interpret and modify the `row_data`. header: PageHeader, diff --git a/crates/table/src/pages.rs b/crates/table/src/pages.rs index 7dc74e01c47..4150141f4b8 100644 --- a/crates/table/src/pages.rs +++ b/crates/table/src/pages.rs @@ -11,7 +11,7 @@ use core::ops::{ControlFlow, Deref, Index, IndexMut}; use std::ops::DerefMut; use thiserror::Error; -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq, Eq)] pub enum Error { #[error("Attempt to allocate more than {} pages.", PageIndex::MAX.idx())] TooManyPages, @@ -34,7 +34,7 @@ impl IndexMut for Pages { } /// A manager of [`Page`]s. -#[derive(Default)] +#[derive(Default, Debug, PartialEq, Eq)] pub struct Pages { /// The collection of pages under management. pages: Vec>, diff --git a/crates/table/src/row_type_visitor.rs b/crates/table/src/row_type_visitor.rs index cd75c2b11fb..dec882b8631 100644 --- a/crates/table/src/row_type_visitor.rs +++ b/crates/table/src/row_type_visitor.rs @@ -274,7 +274,7 @@ fn remove_trailing_gotos(program: &mut Vec) -> bool { } /// The instruction set of a [`VarLenVisitorProgram`]. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] enum Insn { // TODO(perf): consider boxing this variant (or making it a variable-width instruction) // to minimize sizeof(insn), @@ -355,7 +355,7 @@ impl fmt::Display for Insn { /// Forward progress, and thus termination, /// during interpretation is guaranteed when evaluating a program, /// as all jumps (`SwitchOnTag` and `Goto`) will set `new_instr_ptr > old_instr_ptr`. -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct VarLenVisitorProgram { /// The list of instructions that make up this program. insns: Arc<[Insn]>, diff --git a/crates/table/src/static_bsatn_validator.rs b/crates/table/src/static_bsatn_validator.rs index 2a1078ba1cf..78dbc49299a 100644 --- a/crates/table/src/static_bsatn_validator.rs +++ b/crates/table/src/static_bsatn_validator.rs @@ -249,7 +249,7 @@ fn remove_trailing_gotos(program: &mut Vec) { } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] struct CheckTag { /// The tag to check is stored at `start + tag_offset`. tag_offset: u16, @@ -259,7 +259,7 @@ struct CheckTag { } /// The instruction set of a [`StaticBsatnValidator`]. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] enum Insn { /// Visit the byte at offset `start + N` /// and assert that it is 0 or 1, i.e., a valid `bool`. @@ -286,7 +286,7 @@ impl Insn { impl MemoryUsage for Insn {} -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct StaticBsatnValidator { /// The list of instructions that make up this program. insns: Arc<[Insn]>, diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 013159a9d92..4f558a62aff 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -61,6 +61,7 @@ static_assert_size!(SeqIdList, 24); /// /// The table stores the rows into a page manager /// and uses an internal map to ensure that no identical row is stored more than once. +#[derive(Debug, PartialEq, Eq)] pub struct Table { /// Page manager and row layout grouped together, for `RowRef` purposes. inner: TableInner, @@ -90,6 +91,7 @@ pub struct Table { /// while other mutable references to the `indexes` exist. /// This is necessary because index insertions and deletions take a `RowRef` as an argument, /// from which they [`ReadColumn::read_column`] their keys. +#[derive(Debug, PartialEq, Eq)] pub(crate) struct TableInner { /// The type of rows this table stores, with layout information included. row_layout: RowTypeLayout, @@ -175,7 +177,7 @@ impl MemoryUsage for TableInner { } /// Various error that can happen on table insertion. -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq, Eq)] pub enum InsertError { /// There was already a row with the same value. #[error("Duplicate insertion of row {0:?} violates set semantics")] @@ -248,39 +250,37 @@ impl Table { blob_store: &'a mut dyn BlobStore, row: &ProductValue, ) -> Result<(RowHash, RowRef<'a>), InsertError> { - // Insert the row into the page manager. - let (hash, ptr) = self.insert_internal(blob_store, row)?; + // Optimistically insert the `row` before checking any constraints + // under the assumption that errors (unique constraint & set semantic violations) are rare. + let (row_ref, blob_bytes) = self.insert_physically_pv(blob_store, row)?; + let row_ptr = row_ref.pointer(); - // Insert row into indices and check unique constraints. + // Confirm the insertion, checking any constraints, removing the physical row on error. // SAFETY: We just inserted `ptr`, so it must be present. - unsafe { self.insert_into_indices(blob_store, ptr) }?; - - // SAFETY: We just inserted `ptr`, - // and `insert_into_indices` didn't remove it, - // so it must be present. - let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) }; + let (hash, row_ptr) = unsafe { self.confirm_insertion(blob_store, row_ptr, blob_bytes) }?; + // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row. + let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, row_ptr) }; Ok((hash, row_ref)) } - /// Insert a `row` into this table. + /// Insert a `row` into this table during replay. + /// /// NOTE: This method skips index updating. Use `insert` to insert a row with index updating. - pub fn insert_internal( + pub fn insert_for_replay( &mut self, blob_store: &mut dyn BlobStore, row: &ProductValue, ) -> Result<(RowHash, RowPointer), InsertError> { - // Optimistically insert the `row` before checking for set-semantic collisions, - // under the assumption that set-semantic collisions are rare. - let (row_ref, blob_bytes) = self.insert_internal_allow_duplicate(blob_store, row)?; + // Insert the `row`. There should be no errors + let (row_ref, blob_bytes) = self.insert_physically_pv(blob_store, row)?; + let row_ptr = row_ref.pointer(); - let hash = row_ref.row_hash(); - let ptr = row_ref.pointer(); - // SAFETY: `ptr` was derived from `row_ref` without interleaving calls, so it must be valid. - unsafe { self.insert_into_pointer_map(blob_store, ptr, hash) }?; + // SAFETY: We just inserted the row, so `self.is_row_present(row_ptr)` holds. + let row_hash = unsafe { self.insert_into_pointer_map(blob_store, row_ptr) }?; self.update_statistics_added_row(blob_bytes); - Ok((hash, ptr)) + Ok((row_hash, row_ptr)) } /// Physically inserts `row` into the page @@ -288,7 +288,7 @@ impl Table { /// /// This is useful when we need to insert a row temporarily to get back a `RowPointer`. /// A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`]. - pub fn insert_internal_allow_duplicate<'a>( + pub fn insert_physically_pv<'a>( &'a mut self, blob_store: &'a mut dyn BlobStore, row: &ProductValue, @@ -459,18 +459,15 @@ impl Table { /// # Safety /// /// `self.is_row_present(row)` must hold. - pub fn confirm_insertion<'a>( + pub unsafe fn confirm_insertion<'a>( &'a mut self, blob_store: &'a mut dyn BlobStore, ptr: RowPointer, blob_bytes: BlobNumBytes, ) -> Result<(RowHash, RowPointer), InsertError> { - // SAFETY: Caller promised that `self.is_row_present(row)` holds. - let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, ptr) }; - let hash = row_ref.row_hash(); - // SAFETY: Caller promised that `self.is_row_present(row)` holds. - unsafe { self.insert_into_pointer_map(blob_store, ptr, hash) }?; - // SAFETY: Caller promised that `self.is_row_present(row)` holds. + // SAFETY: Caller promised that `self.is_row_present(ptr)` holds. + let hash = unsafe { self.insert_into_pointer_map(blob_store, ptr) }?; + // SAFETY: Caller promised that `self.is_row_present(ptr)` holds. unsafe { self.insert_into_indices(blob_store, ptr) }?; self.update_statistics_added_row(blob_bytes); @@ -528,6 +525,7 @@ impl Table { /// Insert row identified by `ptr` into the pointer map. /// This checks for set semantic violations. /// Deletes the row and returns an error if there were any violations. + /// Returns the row hash computed. /// /// SAFETY: `self.is_row_present(row)` must hold. /// Post-condition: If this method returns `Ok(_)`, the row still exists. @@ -535,8 +533,11 @@ impl Table { &'a mut self, blob_store: &'a mut dyn BlobStore, ptr: RowPointer, - hash: RowHash, - ) -> Result<(), InsertError> { + ) -> Result { + // SAFETY: Caller promised that `self.is_row_present(row)` holds. + let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, ptr) }; + let hash = row_ref.row_hash(); + // SAFETY: // - `self` trivially has the same `row_layout` as `self`. // - Caller promised that `ptr` is a valid row in `self`. @@ -560,7 +561,7 @@ impl Table { // add it to the `pointer_map`. self.pointer_map.insert(hash, ptr); - Ok(()) + Ok(hash) } /// Finds the [`RowPointer`] to the row in `committed_table` @@ -757,7 +758,7 @@ impl Table { // Insert `row` temporarily so `temp_ptr` and `hash` can be used to find the row. // This must avoid consulting and inserting to the pointer map, // as the row is already present, set-semantically. - let (temp_row, _) = self.insert_internal_allow_duplicate(blob_store, row)?; + let (temp_row, _) = self.insert_physically_pv(blob_store, row)?; let temp_ptr = temp_row.pointer(); let hash = temp_row.row_hash(); @@ -1637,6 +1638,41 @@ pub(crate) mod test { prop_assert_eq!(table.inner.pages[PageIndex(0)].num_rows(), 1); prop_assert_eq!(&table.scan_rows(&blob_store).map(|r| r.pointer()).collect::>(), &[ptr]); } + + #[test] + fn insert_bsatn_same_as_pv((ty, val) in generate_typed_row()) { + let mut bs_pv = HashMapBlobStore::default(); + let mut table_pv = table(ty.clone()); + let res_pv = table_pv.insert(&mut bs_pv, &val); + + let mut bs_bsatn = HashMapBlobStore::default(); + let mut table_bsatn = table(ty); + let res_bsatn = insert_bsatn(&mut table_bsatn, &mut bs_bsatn, &val); + + prop_assert_eq!(res_pv, res_bsatn); + prop_assert_eq!(bs_pv, bs_bsatn); + prop_assert_eq!(table_pv, table_bsatn); + } + } + + fn insert_bsatn<'a>( + table: &'a mut Table, + blob_store: &'a mut dyn BlobStore, + val: &ProductValue, + ) -> Result<(RowHash, RowRef<'a>), InsertError> { + let row = &to_vec(&val).unwrap(); + + // Optimistically insert the `row` before checking any constraints + // under the assumption that errors (unique constraint & set semantic violations) are rare. + let (row_ref, blob_bytes) = table.insert_physically_bsatn(blob_store, row)?; + let row_ptr = row_ref.pointer(); + + // Confirm the insertion, checking any constraints, removing the physical row on error. + // SAFETY: We just inserted `ptr`, so it must be present. + let (hash, row_ptr) = unsafe { table.confirm_insertion(blob_store, row_ptr, blob_bytes) }?; + // SAFETY: Per post-condition of `confirm_insertion`, `row_ptr` refers to a valid row. + let row_ref = unsafe { table.inner.get_row_ref_unchecked(blob_store, row_ptr) }; + Ok((hash, row_ref)) } // Compare `scan_rows` against a simpler implementation. From cb64434edc5b454e2d1aab19b937cb3165d0e8c8 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 7 Jan 2025 19:56:14 +0100 Subject: [PATCH 8/9] undo removing MutTxDatastore::get_next_sequence_value_mut_tx --- .../core/src/db/datastore/locking_tx_datastore/datastore.rs | 4 ++++ crates/core/src/db/datastore/traits.rs | 1 + 2 files changed, 5 insertions(+) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index e095479ea7a..901dee59ea7 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -478,6 +478,10 @@ impl MutTxDatastore for Locking { tx.index_id_from_name(index_name) } + fn get_next_sequence_value_mut_tx(&self, tx: &mut Self::MutTx, seq_id: SequenceId) -> Result { + tx.get_next_sequence_value(seq_id) + } + fn create_sequence_mut_tx(&self, tx: &mut Self::MutTx, sequence_schema: SequenceSchema) -> Result { tx.create_sequence(sequence_schema) } diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index 46b907d38ee..70988c2b340 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -439,6 +439,7 @@ pub trait MutTxDatastore: TxDatastore + MutTx { // - index_seek_mut_tx // Sequences + fn get_next_sequence_value_mut_tx(&self, tx: &mut Self::MutTx, seq_id: SequenceId) -> Result; fn create_sequence_mut_tx(&self, tx: &mut Self::MutTx, sequence_schema: SequenceSchema) -> Result; fn drop_sequence_mut_tx(&self, tx: &mut Self::MutTx, seq_id: SequenceId) -> Result<()>; fn sequence_id_from_name_mut_tx(&self, tx: &Self::MutTx, sequence_name: &str) -> super::Result>; From 7bca713bfda01b682782fe62875f3165b0a6c87f Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 7 Jan 2025 21:58:36 +0100 Subject: [PATCH 9/9] address phoebe & tyler's reviews --- .../datastore/locking_tx_datastore/mut_tx.rs | 5 +++++ crates/core/src/db/datastore/traits.rs | 2 +- crates/table/src/table.rs | 18 +++++++++++++----- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index 73c91bd67b5..f3a1fa34c01 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -1178,6 +1178,11 @@ impl MutTxId { /// Insert a row, encoded in BSATN, into a table. /// + /// Zero placeholders, i.e., sequence triggers, + /// in auto-inc columns in the new row will be replaced with generated values + /// if and only if `GENERATE` is true. + /// This method is called with `GENERATE` false when updating the `st_sequence` system table. + /// /// Requires: /// - `TableId` must refer to a valid table for the database at `database_address`. /// - `row` must be a valid row for the table at `table_id`. diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index 70988c2b340..b973c1e032e 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -484,7 +484,7 @@ pub trait MutTxDatastore: TxDatastore + MutTx { ) -> u32; /// Inserts `row`, encoded in BSATN, into the table identified by `table_id`. /// - /// Returns the list of columns where values were replaced with generated ones + /// Returns the list of columns with sequence-trigger values that were replaced with generated ones /// and a reference to the row as a [`RowRef`]. /// /// Generated columns are columns with an auto-inc sequence diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 4f558a62aff..1ba60f3c8a4 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -97,6 +97,9 @@ pub(crate) struct TableInner { row_layout: RowTypeLayout, /// A [`StaticLayout`] for fast BFLATN <-> BSATN conversion, /// if the [`RowTypeLayout`] has a static BSATN length and layout. + /// + /// A [`StaticBsatnValidator`] is also included. + /// It's used to validate BSATN-encoded rows before converting to BFLATN. static_layout: Option<(StaticLayout, StaticBsatnValidator)>, /// The visitor program for `row_layout`. /// @@ -321,6 +324,9 @@ impl Table { /// /// This is also useful when we need to insert a row temporarily to get back a `RowPointer`. /// In this case, A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`]. + /// + /// When `row` is not valid BSATN at the table's row type, + /// an error is returned and there will be nothing for the caller to revert. pub fn insert_physically_bsatn<'a>( &'a mut self, blob_store: &'a mut dyn BlobStore, @@ -399,7 +405,7 @@ impl Table { // as `row_ty` was derived from the same schema as `seq` is part of. let elem_ty = unsafe { &row_ty.elements.get_unchecked(seq.col_pos.idx()) }; // SAFETY: - // - `elem_ty` appears as a column in th row type. + // - `elem_ty` appears as a column in the row type. // - `AlgebraicValue` is compatible with all types. let val = unsafe { AlgebraicValue::unchecked_read_column(row_ref, elem_ty) }; val.is_numeric_zero() @@ -415,9 +421,10 @@ impl Table { /// # Safety /// /// - `self.is_row_present(row)` must hold. - /// - `col_id` must be a valid column, with a primiive type, of the row type. + /// - `col_id` must be a valid column, with a primitive integer type, of the row type. pub unsafe fn write_gen_val_to_col(&mut self, col_id: ColId, ptr: RowPointer, seq_val: i128) { let row_ty = self.inner.row_layout.product(); + // SAFETY: Caller promised that `col_id` was a valid column. let elem_ty = unsafe { row_ty.elements.get_unchecked(col_id.idx()) }; let AlgebraicTypeLayout::Primitive(col_typ) = elem_ty.ty else { // SAFETY: Columns with sequences must be primitive types. @@ -445,14 +452,15 @@ impl Table { PrimitiveType::U128 => write(fixed_buf, elem_ty.offset, (seq_val as u128).to_le_bytes()), PrimitiveType::I256 => write(fixed_buf, elem_ty.offset, (i256::from(seq_val)).to_le_bytes()), PrimitiveType::U256 => write(fixed_buf, elem_ty.offset, (u256::from(seq_val as u128)).to_le_bytes()), - PrimitiveType::Bool | PrimitiveType::F32 | PrimitiveType::F64 => { - panic!("`{:?}` is not a sequence integer type", &elem_ty.ty) - } + // SAFETY: Columns with sequences must be integer types. + PrimitiveType::Bool | PrimitiveType::F32 | PrimitiveType::F64 => unsafe { unreachable_unchecked() }, } } /// Performs all the checks necessary after having fully decided on a rows contents. /// + /// This includes inserting the row into any applicable indices and/or the pointer map. + /// /// On `Ok(_)`, statistics of the table are also updated, /// and the `ptr` still points to a valid row, and otherwise not. ///