diff --git a/crates/bench/benches/subscription.rs b/crates/bench/benches/subscription.rs index a24a6864478..01e09598189 100644 --- a/crates/bench/benches/subscription.rs +++ b/crates/bench/benches/subscription.rs @@ -10,7 +10,7 @@ use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compress use spacetimedb_bench::database::BenchDatabase as _; use spacetimedb_bench::spacetime_raw::SpacetimeRaw; use spacetimedb_primitives::{col_list, TableId}; -use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductValue}; +use spacetimedb_sats::{bsatn, product, AlgebraicType, AlgebraicValue, ProductValue}; fn create_table_location(db: &RelationalDB) -> Result { let schema = &[ @@ -57,11 +57,15 @@ fn eval(c: &mut Criterion) { .db .with_auto_commit(Workload::Internal, |tx| -> Result<(), DBError> { // 1M rows + let mut scratch = Vec::new(); for entity_id in 0u64..1_000_000 { let owner = entity_id % 1_000; let footprint = AlgebraicValue::sum(entity_id as u8 % 4, AlgebraicValue::unit()); let row = product!(entity_id, owner, footprint); - let _ = raw.db.insert(tx, lhs, row)?; + + scratch.clear(); + bsatn::to_writer(&mut scratch, &row).unwrap(); + let _ = raw.db.insert(tx, lhs, &scratch)?; } Ok(()) }); @@ -70,6 +74,7 @@ fn eval(c: &mut Criterion) { .db .with_auto_commit(Workload::Internal, |tx| -> Result<(), DBError> { // 1000 chunks, 1200 rows per chunk = 1.2M rows + let mut scratch = Vec::new(); for chunk_index in 0u64..1_000 { for i in 0u64..1200 { let entity_id = chunk_index * 1200 + i; @@ -77,7 +82,10 @@ fn eval(c: &mut Criterion) { let z = entity_id as i32; let dimension = 0u32; let row = product!(entity_id, chunk_index, x, z, dimension); - let _ = raw.db.insert(tx, rhs, row)?; + + scratch.clear(); + bsatn::to_writer(&mut scratch, &row).unwrap(); + let _ = raw.db.insert(tx, rhs, &scratch)?; } } Ok(()) diff --git a/crates/bench/src/spacetime_raw.rs b/crates/bench/src/spacetime_raw.rs index 2b63dbf2a2e..3de9bab3f09 100644 --- a/crates/bench/src/spacetime_raw.rs +++ b/crates/bench/src/spacetime_raw.rs @@ -5,8 +5,8 @@ use crate::{ }; use spacetimedb::db::relational_db::{tests_utils::TestDB, RelationalDB}; use spacetimedb::execution_context::Workload; -use spacetimedb_lib::sats::AlgebraicValue; use spacetimedb_primitives::{ColId, IndexId, TableId}; +use spacetimedb_sats::{bsatn, AlgebraicValue}; use spacetimedb_schema::{ def::{BTreeAlgorithm, IndexAlgorithm}, schema::{IndexSchema, TableSchema}, @@ -102,8 +102,11 @@ impl BenchDatabase for SpacetimeRaw { fn insert_bulk(&mut self, table_id: &Self::TableId, rows: Vec) -> ResultBench<()> { self.db.with_auto_commit(Workload::Internal, |tx| { + let mut scratch = Vec::new(); for row in rows { - self.db.insert(tx, *table_id, row.into_product_value())?; + scratch.clear(); + bsatn::to_writer(&mut scratch, &row.into_product_value()).unwrap(); + self.db.insert(tx, *table_id, &scratch)?; } Ok(()) }) @@ -119,6 +122,7 @@ impl BenchDatabase for SpacetimeRaw { .collect::>(); assert_eq!(rows.len(), row_count as usize, "not enough rows found for update_bulk!"); + let mut scratch = Vec::new(); for mut row in rows { // It would likely be faster to collect a vector of IDs and delete + insert them all at once, // but this implementation is closer to how `update` works in modules. @@ -143,7 +147,9 @@ impl BenchDatabase for SpacetimeRaw { panic!("column 1 is not a u64!"); } - self.db.insert(tx, *table_id, row)?; + scratch.clear(); + bsatn::to_writer(&mut scratch, &row).unwrap(); + self.db.insert(tx, *table_id, &scratch)?; } Ok(()) }) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index 8cbe633288a..720713d126a 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -468,10 +468,6 @@ impl MutTxDatastore for Locking { tx.index_id_from_name(index_name) } - fn get_next_sequence_value_mut_tx(&self, tx: &mut Self::MutTx, seq_id: SequenceId) -> Result { - tx.get_next_sequence_value(seq_id) - } - fn create_sequence_mut_tx(&self, tx: &mut Self::MutTx, sequence_schema: SequenceSchema) -> Result { tx.create_sequence(sequence_schema) } @@ -562,9 +558,9 @@ impl MutTxDatastore for Locking { &'a self, tx: &'a mut Self::MutTx, table_id: TableId, - mut row: ProductValue, - ) -> Result<(AlgebraicValue, RowRef<'a>)> { - let (gens, row_ref) = tx.insert(table_id, &mut row)?; + row: &[u8], + ) -> Result<(ColList, RowRef<'a>)> { + let (gens, row_ref) = tx.insert::(table_id, row)?; Ok((gens, row_ref.collapse())) } @@ -589,7 +585,7 @@ impl MutTxDatastore for Locking { row.program_bytes = program.bytes; tx.delete(ST_MODULE_ID, ptr)?; - tx.insert(ST_MODULE_ID, &mut row.into()).map(drop) + tx.insert_via_serialize_bsatn(ST_MODULE_ID, &row).map(drop) } None => Err(anyhow!( @@ -976,6 +972,7 @@ mod tests { use crate::db::datastore::traits::{IsolationLevel, MutTx}; use crate::db::datastore::Result; use crate::error::{DBError, IndexError}; + use bsatn::to_vec; use itertools::Itertools; use pretty_assertions::assert_eq; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -1344,6 +1341,18 @@ mod tests { .collect() } + fn insert<'a>( + datastore: &'a Locking, + tx: &'a mut MutTxId, + table_id: TableId, + row: &ProductValue, + ) -> Result<(AlgebraicValue, RowRef<'a>)> { + let row = to_vec(&row).unwrap(); + let (gen_cols, row_ref) = datastore.insert_mut_tx(tx, table_id, &row)?; + let gen_cols = row_ref.project(&gen_cols)?; + Ok((gen_cols, row_ref)) + } + #[test] fn test_bootstrapping_sets_up_tables() -> ResultTest<()> { let datastore = get_datastore()?; @@ -1683,7 +1692,7 @@ mod tests { fn test_insert_pre_commit() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; #[rustfmt::skip] assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(1, "Foo", 18)]); Ok(()) @@ -1693,7 +1702,7 @@ mod tests { fn test_insert_wrong_schema_pre_commit() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = product!(0, "Foo"); - assert!(datastore.insert_mut_tx(&mut tx, table_id, row).is_err()); + assert!(insert(&datastore, &mut tx, table_id, &row).is_err()); #[rustfmt::skip] assert_eq!(all_rows(&datastore, &tx, table_id), vec![]); Ok(()) @@ -1703,7 +1712,7 @@ mod tests { fn test_insert_post_commit() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, u32_str_u32(0, "Foo", 18))?; + insert(&datastore, &mut tx, table_id, &u32_str_u32(0, "Foo", 18))?; datastore.commit_mut_tx(tx)?; let tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); #[rustfmt::skip] @@ -1717,7 +1726,7 @@ mod tests { let row = u32_str_u32(15, "Foo", 18); // 15 is ignored. datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.rollback_mut_tx(tx); let tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); #[rustfmt::skip] @@ -1729,7 +1738,7 @@ mod tests { fn test_insert_commit_delete_insert() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let created_row = u32_str_u32(1, "Foo", 18); @@ -1737,7 +1746,7 @@ mod tests { assert_eq!(num_deleted, 1); assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0); let created_row = u32_str_u32(1, "Foo", 19); - datastore.insert_mut_tx(&mut tx, table_id, created_row)?; + insert(&datastore, &mut tx, table_id, &created_row)?; #[rustfmt::skip] assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(1, "Foo", 19)]); Ok(()) @@ -1747,7 +1756,7 @@ mod tests { fn test_insert_delete_insert_delete_insert() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(1, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row.clone())?; + insert(&datastore, &mut tx, table_id, &row)?; for i in 0..2 { assert_eq!( all_rows(&datastore, &tx, table_id), @@ -1764,7 +1773,7 @@ mod tests { &[], "Found rows present after deleting", ); - datastore.insert_mut_tx(&mut tx, table_id, row.clone())?; + insert(&datastore, &mut tx, table_id, &row)?; assert_eq!( all_rows(&datastore, &tx, table_id), vec![row.clone()], @@ -1778,8 +1787,8 @@ mod tests { fn test_unique_constraint_pre_commit() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row.clone())?; - let result = datastore.insert_mut_tx(&mut tx, table_id, row); + insert(&datastore, &mut tx, table_id, &row)?; + let result = insert(&datastore, &mut tx, table_id, &row); match result { Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation { constraint_name: _, @@ -1798,10 +1807,10 @@ mod tests { fn test_unique_constraint_post_commit() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row.clone())?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - let result = datastore.insert_mut_tx(&mut tx, table_id, row); + let result = insert(&datastore, &mut tx, table_id, &row); match result { Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation { constraint_name: _, @@ -1822,10 +1831,10 @@ mod tests { datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row.clone())?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.rollback_mut_tx(tx); let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; #[rustfmt::skip] assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(2, "Foo", 18)]); Ok(()) @@ -1838,7 +1847,7 @@ mod tests { let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); @@ -1873,7 +1882,7 @@ mod tests { IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", }, ].map(Into::into)); let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored. - let result = datastore.insert_mut_tx(&mut tx, table_id, row); + let result = insert(&datastore, &mut tx, table_id, &row); match result { Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation { constraint_name: _, @@ -1892,7 +1901,7 @@ mod tests { fn test_create_index_post_commit() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let index_def = IndexSchema { @@ -1927,7 +1936,7 @@ mod tests { IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", }, ].map(Into::into)); let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored. - let result = datastore.insert_mut_tx(&mut tx, table_id, row); + let result = insert(&datastore, &mut tx, table_id, &row); match result { Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation { constraint_name: _, @@ -1946,7 +1955,7 @@ mod tests { fn test_create_index_post_rollback() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; datastore.commit_mut_tx(tx)?; let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let index_def = IndexSchema { @@ -1981,7 +1990,7 @@ mod tests { IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", }, ].map(Into::into)); let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored. - datastore.insert_mut_tx(&mut tx, table_id, row)?; + insert(&datastore, &mut tx, table_id, &row)?; #[rustfmt::skip] assert_eq!(all_rows(&datastore, &tx, table_id), vec![ u32_str_u32(1, "Foo", 18), @@ -1998,7 +2007,7 @@ mod tests { let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored. // Because of auto_inc columns, we will get a slightly different // value than the one we inserted. - let row = datastore.insert_mut_tx(&mut tx, table_id, row)?.1.to_product_value(); + let row = insert(&datastore, &mut tx, table_id, &row)?.1.to_product_value(); datastore.commit_mut_tx(tx)?; let all_rows_col_0_eq_1 = |tx: &MutTxId| { @@ -2023,10 +2032,7 @@ mod tests { assert_eq!(all_rows_col_0_eq_1(&tx).len(), 0); // Reinsert the row. - let reinserted_row = datastore - .insert_mut_tx(&mut tx, table_id, row.clone())? - .1 - .to_product_value(); + let reinserted_row = insert(&datastore, &mut tx, table_id, &row)?.1.to_product_value(); assert_eq!(reinserted_row, row); // The actual test: we should be able to iterate again, while still in the @@ -2044,9 +2050,9 @@ mod tests { fn test_read_only_tx_shared_lock() -> ResultTest<()> { let (datastore, mut tx, table_id) = setup_table()?; let row1 = u32_str_u32(1, "Foo", 18); - datastore.insert_mut_tx(&mut tx, table_id, row1.clone())?; + insert(&datastore, &mut tx, table_id, &row1)?; let row2 = u32_str_u32(2, "Bar", 20); - datastore.insert_mut_tx(&mut tx, table_id, row2.clone())?; + insert(&datastore, &mut tx, table_id, &row2)?; datastore.commit_mut_tx(tx)?; // create multiple read only tx, and use them together. diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index 83636f4eebe..09118d2feee 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -7,7 +7,9 @@ use super::{ tx_state::{DeleteTable, IndexIdMap, TxState}, SharedMutexGuard, SharedWriteGuard, }; -use crate::db::datastore::system_tables::{StRowLevelSecurityFields, StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; +use crate::db::datastore::system_tables::{ + with_sys_table_buf, StRowLevelSecurityFields, StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID, +}; use crate::db::datastore::{ system_tables::{ StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StFields as _, StIndexFields, StIndexRow, @@ -25,15 +27,12 @@ use core::ops::RangeBounds; use core::{iter, ops::Bound}; use smallvec::SmallVec; use spacetimedb_lib::db::raw_def::v9::RawSql; -use spacetimedb_lib::{ - bsatn::Deserializer, - db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP}, - de::DeserializeSeed, -}; +use spacetimedb_lib::db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP}; use spacetimedb_primitives::{ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId}; use spacetimedb_sats::{ - bsatn::{self, DecodeError}, - de::WithBound, + bsatn::{self, to_writer, DecodeError, Deserializer}, + de::{DeserializeSeed, WithBound}, + ser::Serialize, AlgebraicType, AlgebraicValue, ProductType, ProductValue, WithTypespace, }; use spacetimedb_schema::{ @@ -113,7 +112,7 @@ impl MutTxId { table_primary_key: table_schema.primary_key.map(Into::into), }; let table_id = self - .insert(ST_TABLE_ID, &mut row.into())? + .insert_via_serialize_bsatn(ST_TABLE_ID, &row)? .1 .collapse() .read_col(StTableFields::TableId)?; @@ -130,7 +129,7 @@ impl MutTxId { col_name: col.col_name.clone(), col_type: col.col_type.clone().into(), }; - self.insert(ST_COLUMN_ID, &mut row.into())?; + self.insert_via_serialize_bsatn(ST_COLUMN_ID, &row)?; } let mut schema_internal = table_schema.clone(); @@ -152,20 +151,13 @@ impl MutTxId { reducer_name: schedule.reducer_name, at_column: schedule.at_column, }; - let (generated, ..) = self.insert(ST_SCHEDULED_ID, &mut row.into())?; - let id = generated.as_u32(); - - if let Some(&id) = id { - let (table, ..) = self.get_or_create_insert_table_mut(table_id)?; - table.with_mut_schema(|s| s.schedule.as_mut().unwrap().schedule_id = id.into()); - } else { - return Err(anyhow::anyhow!( - "Failed to generate a schedule ID for table: {}, generated: {:#?}", - table_schema.table_name, - generated - ) - .into()); - } + let id = self + .insert_via_serialize_bsatn(ST_SCHEDULED_ID, &row)? + .1 + .collapse() + .read_col::(StScheduledFields::ScheduleId)?; + let (table, ..) = self.get_or_create_insert_table_mut(table_id)?; + table.with_mut_schema(|s| s.schedule.as_mut().unwrap().schedule_id = id); } // Insert constraints into `st_constraints` @@ -283,7 +275,7 @@ impl MutTxId { // Delete the row, run updates, and insert again. self.delete(ST_TABLE_ID, ptr)?; updater(&mut row); - self.insert(ST_TABLE_ID, &mut row.into())?; + self.insert_via_serialize_bsatn(ST_TABLE_ID, &row)?; Ok(()) } @@ -382,7 +374,7 @@ impl MutTxId { index_algorithm: index.index_algorithm.clone().into(), }; let index_id = self - .insert(ST_INDEX_ID, &mut row.into())? + .insert_via_serialize_bsatn(ST_INDEX_ID, &row)? .1 .collapse() .read_col(StIndexFields::IndexId)?; @@ -676,11 +668,15 @@ impl MutTxId { (range_start, range_end) } + fn get_sequence_mut(&mut self, seq_id: SequenceId) -> Result<&mut Sequence> { + self.sequence_state_lock + .get_sequence_mut(seq_id) + .ok_or_else(|| SequenceError::NotFound(seq_id).into()) + } + pub fn get_next_sequence_value(&mut self, seq_id: SequenceId) -> Result { { - let Some(sequence) = self.sequence_state_lock.get_sequence_mut(seq_id) else { - return Err(SequenceError::NotFound(seq_id).into()); - }; + let sequence = self.get_sequence_mut(seq_id)?; // If there are allocated sequence values, return the new value. // `gen_next_value` internally checks that the new allocation is acceptable, @@ -700,30 +696,27 @@ impl MutTxId { let seq_row = { let mut seq_row = StSequenceRow::try_from(old_seq_row_ref)?; - let Some(sequence) = self.sequence_state_lock.get_sequence_mut(seq_id) else { - return Err(SequenceError::NotFound(seq_id).into()); - }; + let sequence = self.get_sequence_mut(seq_id)?; seq_row.allocated = sequence.nth_value(SEQUENCE_ALLOCATION_STEP as usize); sequence.set_allocation(seq_row.allocated); seq_row }; self.delete(ST_SEQUENCE_ID, old_seq_row_ptr)?; - // `insert_row_internal` rather than `insert` because: + // `insert::` rather than `GENERATE = true` because: // - We have already checked unique constraints during `create_sequence`. // - Similarly, we have already applied autoinc sequences. // - We do not want to apply autoinc sequences again, // since the system table sequence `seq_st_table_table_id_primary_key_auto` // has ID 0, and would otherwise trigger autoinc. - self.insert_row_internal(ST_SEQUENCE_ID, &ProductValue::from(seq_row))?; - - let Some(sequence) = self.sequence_state_lock.get_sequence_mut(seq_id) else { - return Err(SequenceError::NotFound(seq_id).into()); - }; - if let Some(value) = sequence.gen_next_value() { - return Ok(value); - } - Err(SequenceError::UnableToAllocate(seq_id).into()) + with_sys_table_buf(|buf| { + to_writer(buf, &seq_row).unwrap(); + self.insert::(ST_SEQUENCE_ID, buf) + })?; + + self.get_sequence_mut(seq_id)? + .gen_next_value() + .ok_or_else(|| SequenceError::UnableToAllocate(seq_id).into()) } /// Create a sequence. @@ -765,7 +758,7 @@ impl MutTxId { min_value: seq.min_value, max_value: seq.max_value, }; - let row = self.insert(ST_SEQUENCE_ID, &mut sequence_row.clone().into())?; + let row = self.insert_via_serialize_bsatn(ST_SEQUENCE_ID, &sequence_row)?; let seq_id = row.1.collapse().read_col(StSequenceFields::SequenceId)?; sequence_row.sequence_id = seq_id; @@ -854,7 +847,7 @@ impl MutTxId { constraint_data: constraint.data.clone().into(), }; - let constraint_row = self.insert(ST_CONSTRAINT_ID, &mut ProductValue::from(constraint_row))?; + let constraint_row = self.insert_via_serialize_bsatn(ST_CONSTRAINT_ID, &constraint_row)?; let constraint_id = constraint_row.1.collapse().read_col(StConstraintFields::ConstraintId)?; let existed = matches!(constraint_row.1, RowRefInsertion::Existed(_)); // TODO: Can we return early here? @@ -948,7 +941,7 @@ impl MutTxId { sql: row_level_security_schema.sql, }; - let row = self.insert(ST_ROW_LEVEL_SECURITY_ID, &mut ProductValue::from(row))?; + let row = self.insert_via_serialize_bsatn(ST_ROW_LEVEL_SECURITY_ID, &row)?; let row_level_security_sql = row.1.collapse().read_col(StRowLevelSecurityFields::Sql)?; let existed = matches!(row.1, RowRefInsertion::Existed(_)); @@ -1109,7 +1102,7 @@ impl MutTxId { /// Either a row just inserted to a table or a row that already existed in some table. #[derive(Clone, Copy)] -pub(super) enum RowRefInsertion<'a> { +pub(crate) enum RowRefInsertion<'a> { /// The row was just inserted. Inserted(RowRef<'a>), /// The row already existed. @@ -1167,65 +1160,98 @@ impl<'a> Iterator for IndexScanFilterDeleted<'a> { } impl MutTxId { - /// Insert a row into a table. + pub(crate) fn insert_via_serialize_bsatn<'a, T: Serialize>( + &'a mut self, + table_id: TableId, + row: &T, + ) -> Result<(ColList, RowRefInsertion<'a>)> { + with_sys_table_buf(|buf| { + to_writer(buf, row).unwrap(); + self.insert::(table_id, buf) + }) + } + + /// Insert a row, encoded in BSATN, into a table. /// /// Requires: /// - `TableId` must refer to a valid table for the database at `database_address`. /// - `row` must be a valid row for the table at `table_id`. /// /// Returns: - /// - a product value with a projection of the row containing only the generated column values. + /// - a list of columns which have been replaced with generated values. /// - a ref to the inserted row. - pub(super) fn insert<'a>( - &'a mut self, + pub(super) fn insert( + &mut self, table_id: TableId, - row: &mut ProductValue, - ) -> Result<(AlgebraicValue, RowRefInsertion<'a>)> { - let generated = self.write_sequence_values(table_id, row)?; - let row_ref = self.insert_row_internal(table_id, row)?; - Ok((generated, row_ref)) - } - - /// Generate and write sequence values to `row` - /// and return a projection of `row` with only the generated column values. - fn write_sequence_values(&mut self, table_id: TableId, row: &mut ProductValue) -> Result { - // TODO: Executing schema_for_table for every row insert is expensive. - // However we ask for the schema in the [Table] struct instead. - let schema = self.schema_for_table(table_id)?; - - // Collect all the columns with sequences that need generation. - let (cols_to_update, seqs_to_use): (ColList, SmallVec<[_; 1]>) = schema - .sequences - .iter() - .filter(|seq| row.elements[seq.col_pos.idx()].is_numeric_zero()) - .map(|seq| (seq.col_pos, seq.sequence_id)) - .unzip(); - - // Update every column in the row that needs it. - // We assume here that column with a sequence is of a sequence-compatible type. - for (col_id, sequence_id) in cols_to_update.iter().zip(seqs_to_use) { - let seq_val = self.get_next_sequence_value(sequence_id)?; - let col_typ = &schema.columns()[col_id.idx()].col_type; - let gen_val = AlgebraicValue::from_sequence_value(col_typ, seq_val); - row.elements[col_id.idx()] = gen_val; - } - - Ok(row.project(&cols_to_update)?) - } - - pub(super) fn insert_row_internal(&mut self, table_id: TableId, row: &ProductValue) -> Result> { - let commit_table = self.committed_state_write_lock.get_table(table_id); - + row: &[u8], + ) -> Result<(ColList, RowRefInsertion<'_>)> { // Get the insert table, so we can write the row into it. - let (tx_table, tx_blob_store, _, delete_table) = self + let (tx_table, tx_blob_store, ..) = self .tx_state - .get_table_and_blob_store_or_maybe_create_from(table_id, commit_table) + .get_table_and_blob_store_or_maybe_create_from( + table_id, + self.committed_state_write_lock.get_table(table_id), + ) .ok_or(TableError::IdNotFoundState(table_id))?; - match tx_table.insert(tx_blob_store, row) { - Ok((tx_row_hash, tx_row_ref)) => { + // 1. Insert the physical row. + // 2. Detect, generate, write sequence values. + // 3. Confirm that the insertion respects constraints and update statistics. + let ((tx_table, tx_blob_store, delete_table), gen_cols, res) = match tx_table + .insert_physically_bsatn(tx_blob_store, row) + { + Ok((tx_row_ref, blob_bytes)) if GENERATE => { + // Collect all the columns with sequences that need generation. let tx_row_ptr = tx_row_ref.pointer(); - if let Some(commit_table) = commit_table { + let (cols_to_gen, seqs_to_use) = unsafe { tx_table.sequence_triggers_for(tx_blob_store, tx_row_ptr) }; + + // Generate a value for every column in the row that needs it. + let mut seq_vals: SmallVec<[i128; 1]> = <_>::default(); + for sequence_id in seqs_to_use { + seq_vals.push(self.get_next_sequence_value(sequence_id)?); + } + + // Write the generated values to the physical row at `tx_row_ptr`. + // We assume here that column with a sequence is of a sequence-compatible type. + // SAFETY: By virtue of `get_table_and_blob_store_or_maybe_create_from` above succeeding, + // we can assume we have an insert and delete table. + let (tx_table, tx_blob_store, delete_table) = + unsafe { self.tx_state.assume_present_get_mut_table(table_id) }; + for (col_id, seq_val) in cols_to_gen.iter().zip(seq_vals) { + // SAFETY: + // - `self.is_row_present(row)` holds as we haven't deleted the row. + // - `col_id` is a valid column, and has a sequence, so it must have a primitive type. + unsafe { tx_table.write_gen_val_to_col(col_id, tx_row_ptr, seq_val) }; + } + + let res = tx_table.confirm_insertion(tx_blob_store, tx_row_ptr, blob_bytes); + ((tx_table, tx_blob_store, delete_table), cols_to_gen, res) + } + Ok((tx_row_ref, blob_bytes)) => { + let tx_row_ptr = tx_row_ref.pointer(); + let res = tx_table.confirm_insertion(tx_blob_store, tx_row_ptr, blob_bytes); + // SAFETY: By virtue of `get_table_and_blob_store_or_maybe_create_from` above succeeding, + // we can assume we have an insert and delete table. + ( + unsafe { self.tx_state.assume_present_get_mut_table(table_id) }, + ColList::empty(), + res, + ) + } + Err(e) => { + // SAFETY: By virtue of `get_table_and_blob_store_or_maybe_create_from` above succeeding, + // we can assume we have an insert and delete table. + ( + unsafe { self.tx_state.assume_present_get_mut_table(table_id) }, + ColList::empty(), + Err(e), + ) + } + }; + + match res { + Ok((tx_row_hash, tx_row_ptr)) => { + if let Some(commit_table) = self.committed_state_write_lock.get_table(table_id) { // The `tx_row_ref` was not previously present in insert tables, // but may still be a set-semantic conflict // or may violate a unique constraint with a row in the committed state. @@ -1274,10 +1300,11 @@ impl MutTxId { // No new row was inserted, but return `committed_ptr`. let blob_store = &self.committed_state_write_lock.blob_store; - return Ok(RowRefInsertion::Existed( + let rri = RowRefInsertion::Existed( // SAFETY: `find_same_row` told us that `ptr` refers to a valid row in `commit_table`. unsafe { commit_table.get_row_ref_unchecked(blob_store, commit_ptr) }, - )); + ); + return Ok((gen_cols, rri)); } // Pacify the borrow checker. @@ -1298,16 +1325,20 @@ impl MutTxId { } } - Ok(RowRefInsertion::Inserted(unsafe { + let rri = RowRefInsertion::Inserted(unsafe { // SAFETY: `ptr` came from `tx_table.insert` just now without any interleaving calls. tx_table.get_row_ref_unchecked(tx_blob_store, tx_row_ptr) - })) + }); + Ok((gen_cols, rri)) } // `row` previously present in insert tables; do nothing but return `ptr`. - Err(InsertError::Duplicate(ptr)) => Ok(RowRefInsertion::Existed( - // SAFETY: `tx_table` told us that `ptr` refers to a valid row in it. - unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, ptr) }, - )), + Err(InsertError::Duplicate(ptr)) => { + let rri = RowRefInsertion::Existed( + // SAFETY: `tx_table` told us that `ptr` refers to a valid row in it. + unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, ptr) }, + ); + Ok((gen_cols, rri)) + } // Index error: unbox and return `TableError::IndexError` // rather than `TableError::Insert(InsertError::IndexError)`. diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs index 5b04d76eed0..645a765f374 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs @@ -179,6 +179,26 @@ impl TxState { Some((tbl, blob_store, idx_map, delete_tables.entry(table_id).or_default())) } + /// Assumes that the insert and delete tables exist for `table_id` and fetches them. + /// + /// # Safety + /// + /// The insert and delete tables must exist. + pub unsafe fn assume_present_get_mut_table( + &mut self, + table_id: TableId, + ) -> (&mut Table, &mut dyn BlobStore, &mut DeleteTable) { + // Write the generated values to the physical row at `tx_row_ptr`. + let tx_blob_store: &mut dyn BlobStore = &mut self.blob_store; + let tx_table = self.insert_tables.get_mut(&table_id); + // SAFETY: we successfully got a `tx_table` before and haven't removed it since. + let tx_table = unsafe { tx_table.unwrap_unchecked() }; + let delete_table = self.delete_tables.get_mut(&table_id); + // SAFETY: we successfully got a `delete_table` before and haven't removed it since. + let delete_table = unsafe { delete_table.unwrap_unchecked() }; + (tx_table, tx_blob_store, delete_table) + } + /// Returns the table and index associated with the given `table_id` and `col_list`, if any. pub(super) fn get_table_and_index_type(&self, table_id: TableId, col_list: &ColList) -> Option<&AlgebraicType> { let table = self.insert_tables.get(&table_id)?; diff --git a/crates/core/src/db/datastore/system_tables.rs b/crates/core/src/db/datastore/system_tables.rs index 6213286f51c..02ec48e957e 100644 --- a/crates/core/src/db/datastore/system_tables.rs +++ b/crates/core/src/db/datastore/system_tables.rs @@ -989,8 +989,7 @@ impl StVarTable { { db.delete(tx, ST_VAR_ID, [row_ref.pointer()]); } - let row = value_serialize(&StVarRow { name, value }); - db.insert(tx, ST_VAR_ID, row.into_product().expect("should be product"))?; + tx.insert_via_serialize_bsatn(ST_VAR_ID, &StVarRow { name, value })?; Ok(()) } @@ -1305,9 +1304,17 @@ thread_local! { static READ_BUF: RefCell> = const { RefCell::new(Vec::new()) }; } +/// Provides access to a buffer to which bytes can be written. +pub(crate) fn with_sys_table_buf(run: impl FnOnce(&mut Vec) -> R) -> R { + READ_BUF.with_borrow_mut(|buf| { + buf.clear(); + run(buf) + }) +} + /// Read a value from a system table via BSatn. fn read_via_bsatn(row: RowRef<'_>) -> Result { - READ_BUF.with_borrow_mut(|buf| Ok(row.read_via_bsatn::(buf)?)) + with_sys_table_buf(|buf| Ok(row.read_via_bsatn::(buf)?)) } /// Convert a value to a product value. diff --git a/crates/core/src/db/datastore/traits.rs b/crates/core/src/db/datastore/traits.rs index 0d7a0631c4b..9d4621b4a8b 100644 --- a/crates/core/src/db/datastore/traits.rs +++ b/crates/core/src/db/datastore/traits.rs @@ -428,7 +428,6 @@ pub trait MutTxDatastore: TxDatastore + MutTx { // - index_seek_mut_tx // Sequences - fn get_next_sequence_value_mut_tx(&self, tx: &mut Self::MutTx, seq_id: SequenceId) -> Result; fn create_sequence_mut_tx(&self, tx: &mut Self::MutTx, sequence_schema: SequenceSchema) -> Result; fn drop_sequence_mut_tx(&self, tx: &mut Self::MutTx, seq_id: SequenceId) -> Result<()>; fn sequence_id_from_name_mut_tx(&self, tx: &Self::MutTx, sequence_name: &str) -> super::Result>; @@ -471,23 +470,19 @@ pub trait MutTxDatastore: TxDatastore + MutTx { table_id: TableId, relation: impl IntoIterator, ) -> u32; - /// Inserts `row` into the table identified by `table_id`. + /// Inserts `row`, encoded in BSATN, into the table identified by `table_id`. /// - /// Returns the generated column values (the [`AlgebraicValue`]) + /// Returns the list of columns where values were replaced with generated ones /// and a reference to the row as a [`RowRef`]. - /// The generated column values are only those that were generated. - /// Those are columns with an auto-inc sequence + /// + /// Generated columns are columns with an auto-inc sequence /// and where the column was `0` in `row`. - /// In case of zero or multiple such column, - /// an `AlgebraicValue::Product` is returned. - /// Otherwise, in case of a single column, - /// as an optimization, the value is not wrapped. fn insert_mut_tx<'a>( &'a self, tx: &'a mut Self::MutTx, table_id: TableId, - row: ProductValue, - ) -> Result<(AlgebraicValue, RowRef<'a>)>; + row: &[u8], + ) -> Result<(ColList, RowRef<'a>)>; /// Obtain the [`Metadata`] for this datastore. /// diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 9fa1ef92bae..950bceeb4ac 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -377,7 +377,7 @@ impl RelationalDB { program_bytes: program.bytes, module_version: ONLY_MODULE_VERSION.into(), }; - self.insert(tx, ST_MODULE_ID, row.into()).map(drop) + tx.insert_via_serialize_bsatn(ST_MODULE_ID, &row).map(drop) } /// Obtain the [`Metadata`] of this database. @@ -1123,25 +1123,11 @@ impl RelationalDB { &'a self, tx: &'a mut MutTx, table_id: TableId, - row: ProductValue, - ) -> Result<(AlgebraicValue, RowRef<'a>), DBError> { + row: &[u8], + ) -> Result<(ColList, RowRef<'a>), DBError> { self.inner.insert_mut_tx(tx, table_id, row) } - pub fn insert_bytes_as_row<'a>( - &'a self, - tx: &'a mut MutTx, - table_id: TableId, - row_bytes: &[u8], - ) -> Result<(AlgebraicValue, RowRef<'a>), DBError> { - // Decode the `row_bytes` as a `ProductValue` according to the schema. - let ty = self.inner.row_type_for_table_mut_tx(tx, table_id)?; - let row = ProductValue::decode(&ty, &mut &row_bytes[..])?; - - // Insert the row. - self.insert(tx, table_id, row) - } - pub fn delete(&self, tx: &mut MutTx, table_id: TableId, row_ids: impl IntoIterator) -> u32 { self.inner.delete_mut_tx(tx, table_id, row_ids) } @@ -1165,11 +1151,6 @@ impl RelationalDB { Ok(()) } - /// Generated the next value for the [SequenceId] - pub fn next_sequence(&self, tx: &mut MutTx, seq_id: SequenceId) -> Result { - self.inner.get_next_sequence_value_mut_tx(tx, seq_id) - } - pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result { self.inner.create_sequence_mut_tx(tx, sequence_schema) } @@ -1318,6 +1299,7 @@ pub mod tests_utils { use super::*; use core::ops::Deref; use durability::EmptyHistory; + use spacetimedb_lib::{bsatn::to_vec, ser::Serialize}; use spacetimedb_paths::FromPathUnchecked; use tempfile::TempDir; @@ -1535,6 +1517,17 @@ pub mod tests_utils { &self.db } } + + pub fn insert<'a, T: Serialize>( + db: &'a RelationalDB, + tx: &'a mut MutTx, + table_id: TableId, + row: &T, + ) -> Result<(AlgebraicValue, RowRef<'a>), DBError> { + let (gen_cols, row_ref) = db.insert(tx, table_id, &to_vec(row).unwrap())?; + let gen_cols = row_ref.project(&gen_cols).unwrap(); + Ok((gen_cols, row_ref)) + } } #[cfg(test)] @@ -1549,7 +1542,7 @@ mod tests { system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SEQUENCE_ID, ST_TABLE_ID, }; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{insert, TestDB}; use crate::error::IndexError; use crate::execution_context::ReducerContext; use anyhow::bail; @@ -1722,7 +1715,7 @@ mod tests { fn insert_three_i32s(stdb: &RelationalDB, tx: &mut MutTx, table_id: TableId) -> ResultTest<()> { for v in [-1, 0, 1] { - stdb.insert(tx, table_id, product![v])?; + insert(stdb, tx, table_id, &product![v])?; } Ok(()) } @@ -1836,8 +1829,8 @@ mod tests { let sequence = stdb.sequence_id_from_name(&tx, "MyTable_my_col_seq")?; assert!(sequence.is_some(), "Sequence not created"); - stdb.insert(&mut tx, table_id, product![0i64])?; - stdb.insert(&mut tx, table_id, product![0i64])?; + insert(&stdb, &mut tx, table_id, &product![0i64])?; + insert(&stdb, &mut tx, table_id, &product![0i64])?; assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![1, 2]); Ok(()) @@ -1854,8 +1847,8 @@ mod tests { let sequence = stdb.sequence_id_from_name(&tx, "MyTable_my_col_seq")?; assert!(sequence.is_some(), "Sequence not created"); - stdb.insert(&mut tx, table_id, product![5i64])?; - stdb.insert(&mut tx, table_id, product![6i64])?; + insert(&stdb, &mut tx, table_id, &product![5i64])?; + insert(&stdb, &mut tx, table_id, &product![6i64])?; assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![5, 6]); Ok(()) @@ -1879,7 +1872,7 @@ mod tests { let sequence = stdb.sequence_id_from_name(&tx, "MyTable_my_col_seq")?; assert!(sequence.is_some(), "Sequence not created"); - stdb.insert(&mut tx, table_id, product![0i64])?; + insert(&stdb, &mut tx, table_id, &product![0i64])?; assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![1]); stdb.commit_tx(tx)?; @@ -1887,7 +1880,7 @@ mod tests { let stdb = stdb.reopen()?; let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - stdb.insert(&mut tx, table_id, product![0i64]).unwrap(); + insert(&stdb, &mut tx, table_id, &product![0i64]).unwrap(); // Check the second row start after `SEQUENCE_PREALLOCATION_AMOUNT` assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![1, 4098]); @@ -1908,8 +1901,8 @@ mod tests { "Index not created" ); - stdb.insert(&mut tx, table_id, product![1i64, 1i64])?; - stdb.insert(&mut tx, table_id, product![1i64, 1i64])?; + insert(&stdb, &mut tx, table_id, &product![1i64, 1i64])?; + insert(&stdb, &mut tx, table_id, &product![1i64, 1i64])?; assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![1]); Ok(()) @@ -1922,8 +1915,8 @@ mod tests { let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let schema = my_table(AlgebraicType::I64); let table_id = stdb.create_table(&mut tx, schema)?; - stdb.insert(&mut tx, table_id, product![1i64])?; - stdb.insert(&mut tx, table_id, product![2i64])?; + insert(&stdb, &mut tx, table_id, &product![1i64])?; + insert(&stdb, &mut tx, table_id, &product![2i64])?; stdb.commit_tx(tx)?; let stdb = stdb.reopen()?; @@ -1979,9 +1972,8 @@ mod tests { "Index not created" ); - stdb.insert(&mut tx, table_id, product![1i64, 0i64]) - .expect("stdb.insert failed"); - match stdb.insert(&mut tx, table_id, product![1i64, 1i64]) { + insert(&stdb, &mut tx, table_id, &product![1i64, 0i64]).expect("stdb.insert failed"); + match insert(&stdb, &mut tx, table_id, &product![1i64, 1i64]) { Ok(_) => panic!("Allow to insert duplicate row"), Err(DBError::Index(IndexError::UniqueConstraintViolation { .. })) => {} Err(err) => panic!("Expected error `UniqueConstraintViolation`, got {err}"), @@ -2011,8 +2003,8 @@ mod tests { let sequence = stdb.sequence_id_from_name(&tx, "MyTable_my_col_seq")?; assert!(sequence.is_some(), "Sequence not created"); - stdb.insert(&mut tx, table_id, product![0i64])?; - stdb.insert(&mut tx, table_id, product![0i64])?; + insert(&stdb, &mut tx, table_id, &product![0i64])?; + insert(&stdb, &mut tx, table_id, &product![0i64])?; assert_eq!(collect_from_sorted(&stdb, &tx, table_id, 0i64)?, vec![1, 2]); Ok(()) @@ -2139,9 +2131,9 @@ mod tests { let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let table_id = stdb.create_table(&mut tx, schema)?; - stdb.insert(&mut tx, table_id, product![0u64, 0u64, 1u64])?; - stdb.insert(&mut tx, table_id, product![0u64, 1u64, 2u64])?; - stdb.insert(&mut tx, table_id, product![1u64, 2u64, 2u64])?; + insert(&stdb, &mut tx, table_id, &product![0u64, 0u64, 1u64])?; + insert(&stdb, &mut tx, table_id, &product![0u64, 1u64, 2u64])?; + insert(&stdb, &mut tx, table_id, &product![1u64, 2u64, 2u64])?; let cols = col_list![0, 1]; let value = product![0u64, 1u64].into(); @@ -2176,8 +2168,7 @@ mod tests { // Insert a row and commit it, so the row is in the committed_state. let mut insert_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); - stdb.insert(&mut insert_tx, table_id, product!(AlgebraicValue::I32(0))) - .expect("Insert insert_tx failed"); + insert(&stdb, &mut insert_tx, table_id, &product!(AlgebraicValue::I32(0))).expect("Insert insert_tx failed"); stdb.commit_tx(insert_tx).expect("Commit insert_tx failed"); let mut delete_insert_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); @@ -2190,8 +2181,13 @@ mod tests { // Insert the row again, so that depending on the datastore internals, // it may now be only in the committed_state, // or in all three of the committed_state, delete_tables and insert_tables. - stdb.insert(&mut delete_insert_tx, table_id, product!(AlgebraicValue::I32(0))) - .expect("Insert delete_insert_tx failed"); + insert( + &stdb, + &mut delete_insert_tx, + table_id, + &product!(AlgebraicValue::I32(0)), + ) + .expect("Insert delete_insert_tx failed"); // Iterate over the table and assert that we see the committed-deleted-inserted row only once. assert_eq!( @@ -2255,8 +2251,7 @@ mod tests { let table_id = { let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Reducer(ctx)); let table_id = stdb.create_table(&mut tx, schema).expect("failed to create table"); - stdb.insert(&mut tx, table_id, product!(AlgebraicValue::I32(0))) - .expect("failed to insert row"); + insert(&stdb, &mut tx, table_id, &product!(AlgebraicValue::I32(0))).expect("failed to insert row"); stdb.commit_tx(tx).expect("failed to commit tx"); table_id @@ -2266,8 +2261,7 @@ mod tests { // created by a mutable SQL transaction { let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql); - stdb.insert(&mut tx, table_id, product!(AlgebraicValue::I32(-42))) - .expect("failed to insert row"); + insert(&stdb, &mut tx, table_id, &product!(AlgebraicValue::I32(-42))).expect("failed to insert row"); stdb.commit_tx(tx).expect("failed to commit tx"); } diff --git a/crates/core/src/estimation.rs b/crates/core/src/estimation.rs index fad6b1b4ac2..9edb2e1a514 100644 --- a/crates/core/src/estimation.rs +++ b/crates/core/src/estimation.rs @@ -66,6 +66,7 @@ fn index_row_est(tx: &Tx, table_id: TableId, cols: &ColList) -> u64 { #[cfg(test)] mod tests { + use crate::db::relational_db::tests_utils::insert; use crate::execution_context::Workload; use crate::{ db::relational_db::{tests_utils::TestDB, RelationalDB}, @@ -103,8 +104,7 @@ mod tests { db.with_auto_commit(Workload::ForTests, |tx| -> Result<(), DBError> { for i in 0..NUM_T_ROWS { - db.insert(tx, table_id, product![i % NDV_T, i]) - .expect("failed to insert into table"); + insert(db, tx, table_id, &product![i % NDV_T, i]).expect("failed to insert into table"); } Ok(()) }) @@ -120,7 +120,7 @@ mod tests { db.with_auto_commit(Workload::ForTests, |tx| -> Result<(), DBError> { for i in 0..NUM_S_ROWS { - db.insert(tx, rhs, product![i, i]).expect("failed to insert into table"); + insert(db, tx, rhs, &product![i, i]).expect("failed to insert into table"); } Ok(()) }) diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 39938b199d7..d3d40c9525a 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -7,7 +7,11 @@ use core::mem; use parking_lot::{Mutex, MutexGuard}; use smallvec::SmallVec; use spacetimedb_primitives::{ColId, IndexId, TableId}; -use spacetimedb_sats::{bsatn::ToBsatn, AlgebraicValue, ProductValue}; +use spacetimedb_sats::{ + bsatn::{self, ToBsatn}, + buffer::{CountWriter, TeeWriter}, + ProductValue, +}; use spacetimedb_table::table::UniqueConstraintViolation; use std::ops::DerefMut; use std::sync::Arc; @@ -113,13 +117,22 @@ impl InstanceEnv { ); } - pub fn insert(&self, table_id: TableId, buffer: &[u8]) -> Result { + pub fn insert(&self, table_id: TableId, buffer: &mut [u8]) -> Result { let stdb = &*self.replica_ctx.relational_db; let tx = &mut *self.get_tx()?; - let (gen_cols, row_ptr) = stdb - .insert_bytes_as_row(tx, table_id, buffer) - .map(|(gc, rr)| (gc, rr.pointer())) + let (row_len, row_ptr) = stdb + .insert(tx, table_id, buffer) + .map(|(gen_cols, row_ref)| { + // Write back the generated column values to `buffer` + // and the encoded length to `row_len`. + let counter = CountWriter::default(); + let mut writer = TeeWriter::new(counter, buffer); + bsatn::to_writer(&mut writer, &gen_cols).unwrap(); + let row_len = writer.w1.finish(); + + (row_len, row_ref.pointer()) + }) .inspect_err(|e| match e { crate::error::DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation { constraint_name: _, @@ -148,7 +161,7 @@ impl InstanceEnv { .map_err(NodesError::ScheduleError)?; } - Ok(gen_cols) + Ok(row_len) } #[tracing::instrument(skip_all)] diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 1a7258d159f..83758e86d5e 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -640,7 +640,7 @@ impl ModuleHost { }; if connected { - db.insert(mut_tx, ST_CLIENT_ID, row.into()).map(|_| ()) + mut_tx.insert_via_serialize_bsatn(ST_CLIENT_ID, &row).map(|_| ()) } else { let row = db .iter_by_col_eq_mut( diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 5ee0d84cb5d..77f4bd7bf98 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -561,13 +561,11 @@ impl WasmModuleInstance { } fn insert_st_client(&self, tx: &mut MutTxId, identity: Identity, address: Address) -> Result<(), DBError> { - let db = &*self.replica_context().relational_db; let row = &StClientRow { identity: identity.into(), address: address.into(), }; - - db.insert(tx, ST_CLIENT_ID, row.into()).map(|_| ()) + tx.insert_via_serialize_bsatn(ST_CLIENT_ID, row).map(|_| ()) } } diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 48980578027..696edfcfc79 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -13,8 +13,6 @@ use crate::host::wasm_common::{ use crate::host::AbiCall; use anyhow::Context as _; use spacetimedb_primitives::{errno, ColId}; -use spacetimedb_sats::bsatn; -use spacetimedb_sats::buffer::{CountWriter, TeeWriter}; use wasmtime::{AsContext, Caller, StoreContextMut}; use super::{Mem, MemView, NullableMemOp, WasmError, WasmPointee, WasmPtr}; @@ -677,16 +675,8 @@ impl WasmInstanceEnv { // Get a mutable view to the `row`. let row = mem.deref_slice_mut(row_ptr, row_len)?; - // Insert the row into the DB. - // This will return back the generated column values. - let gen_cols = env.instance_env.insert(table_id.into(), row)?; - - // Write back the generated column values to `row` - // and the encoded length to `row_len`. - let counter = CountWriter::default(); - let mut writer = TeeWriter::new(counter, row); - bsatn::to_writer(&mut writer, &gen_cols).unwrap(); - let row_len = writer.w1.finish(); + // Insert the row into the DB and write back the generated column values. + let row_len = env.instance_env.insert(table_id.into(), row)?; u32::try_from(row_len).unwrap().write_to(mem, row_len_ptr)?; Ok(()) }) diff --git a/crates/core/src/sql/compiler.rs b/crates/core/src/sql/compiler.rs index c0c62315bd4..00cf1e1cec0 100644 --- a/crates/core/src/sql/compiler.rs +++ b/crates/core/src/sql/compiler.rs @@ -230,7 +230,7 @@ fn compile_statement(db: &RelationalDB, statement: SqlAst) -> Result(()) })?; diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 5012d2fb700..41962fffa1f 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -213,7 +213,7 @@ pub fn translate_col(tx: &Tx, field: FieldName) -> Option> { pub(crate) mod tests { use super::*; use crate::db::datastore::system_tables::{StTableFields, ST_TABLE_ID, ST_TABLE_NAME}; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{insert, TestDB}; use crate::vm::tests::create_table_with_rows; use pretty_assertions::assert_eq; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -619,7 +619,7 @@ pub(crate) mod tests { ]; let table_id = db.create_table_for_test_multi_column("test", schema, col_list![0, 1])?; db.with_auto_commit(Workload::ForTests, |tx| { - db.insert(tx, table_id, product![1, 1, 1, 1]).map(drop) + insert(&db, tx, table_id, &product![1, 1, 1, 1]).map(drop) })?; let result = run_for_testing(&db, "select * from test where b = 1 and a = 1")?; @@ -665,7 +665,7 @@ pub(crate) mod tests { db.with_auto_commit(Workload::ForTests, |tx| { for i in 0..1000i32 { - db.insert(tx, table_id, product!(i)).unwrap(); + insert(&db, tx, table_id, &product!(i)).unwrap(); } Ok::<(), DBError>(()) }) @@ -697,7 +697,9 @@ pub(crate) mod tests { let schema = &[("a", AlgebraicType::U8), ("b", AlgebraicType::U8)]; let table_id = db.create_table_for_test_multi_column("test", schema, col_list![0, 1])?; let row = product![4u8, 8u8]; - db.with_auto_commit(Workload::ForTests, |tx| db.insert(tx, table_id, row.clone()).map(drop))?; + db.with_auto_commit(Workload::ForTests, |tx| { + insert(&db, tx, table_id, &row.clone()).map(drop) + })?; let result = run_for_testing(&db, "select * from test where a >= 3 and a <= 5 and b >= 3 and b <= 5")?; @@ -714,7 +716,7 @@ pub(crate) mod tests { let table_id = db.create_table_for_test("T", &[("a", AlgebraicType::U8)], &[])?; db.with_auto_commit(Workload::ForTests, |tx| -> Result<_, DBError> { for i in 0..5u8 { - db.insert(tx, table_id, product!(i))?; + insert(&db, tx, table_id, &product!(i))?; } Ok(()) })?; diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 52b5ccbea59..a694a103e7b 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -237,7 +237,7 @@ pub struct WriteConflict; mod tests { use super::{AssertTxFn, ModuleSubscriptions}; use crate::client::{ClientActorId, ClientConfig, ClientConnectionSender}; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{insert, TestDB}; use crate::db::relational_db::RelationalDB; use crate::error::DBError; use crate::execution_context::Workload; @@ -275,7 +275,7 @@ mod tests { // Create table with one row let table_id = db.create_table_for_test("T", &[("a", AlgebraicType::U8)], &[])?; db.with_auto_commit(Workload::ForTests, |tx| { - db.insert(tx, table_id, product!(1_u8)).map(drop) + insert(&db, tx, table_id, &product!(1_u8)).map(drop) })?; let (send, mut recv) = mpsc::unbounded_channel(); @@ -303,7 +303,7 @@ mod tests { let write_handle = runtime.spawn(async move { let _ = recv.recv().await; db2.with_auto_commit(Workload::ForTests, |tx| { - db2.insert(tx, table_id, product!(2_u8)).map(drop) + insert(&db2, tx, table_id, &product!(2_u8)).map(drop) }) }); diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index db75d337228..798b638aa6c 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -91,7 +91,7 @@ pub fn classify(expr: &QueryExpr) -> Option { mod tests { use super::*; use crate::db::datastore::traits::IsolationLevel; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{insert, TestDB}; use crate::db::relational_db::MutTx; use crate::execution_context::Workload; use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate}; @@ -155,7 +155,7 @@ mod tests { } fn insert_row(db: &RelationalDB, tx: &mut MutTx, table_id: TableId, row: ProductValue) -> ResultTest<()> { - db.insert(tx, table_id, row)?; + insert(db, tx, table_id, &row)?; Ok(()) } @@ -336,7 +336,7 @@ mod tests { let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests); let mut deletes = Vec::new(); for i in 0u64..9u64 { - db.insert(&mut tx, table_id, product!(i, i))?; + insert(&db, &mut tx, table_id, &product!(i, i))?; deletes.push(product!(i + 10, i)) } @@ -629,7 +629,7 @@ mod tests { let lhs_id = db.create_table_for_test("lhs", &[("id", I32), ("x", I32)], &[0.into()])?; db.with_auto_commit(Workload::ForTests, |tx| { for i in 0..5 { - db.insert(tx, lhs_id, product!(i, i + 5))?; + insert(db, tx, lhs_id, &product!(i, i + 5))?; } Ok(lhs_id) }) @@ -641,7 +641,7 @@ mod tests { let rhs_id = db.create_table_for_test("rhs", &[("rid", I32), ("id", I32), ("y", I32)], &[1.into()])?; db.with_auto_commit(Workload::ForTests, |tx| { for i in 10..20 { - db.insert(tx, rhs_id, product!(i, i - 10, i - 8))?; + insert(db, tx, rhs_id, &product!(i, i - 10, i - 8))?; } Ok(rhs_id) }) diff --git a/crates/core/src/util/slow.rs b/crates/core/src/util/slow.rs index 3850d6f874b..b9d56a58ca2 100644 --- a/crates/core/src/util/slow.rs +++ b/crates/core/src/util/slow.rs @@ -56,7 +56,7 @@ mod tests { use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::ProductValue; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{insert, TestDB}; use crate::db::relational_db::RelationalDB; use crate::execution_context::Workload; use spacetimedb_sats::{product, AlgebraicType}; @@ -86,7 +86,7 @@ mod tests { db.with_auto_commit(Workload::ForTests, |tx| -> ResultTest<_> { for i in 0..100_000 { - db.insert(tx, table_id, product![i, i * 2])?; + insert(&db, tx, table_id, &product![i, i * 2])?; } Ok(()) })?; diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index 5195a5493c8..b28c1ffdaa0 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -447,11 +447,14 @@ impl<'db, 'tx> DbProgram<'db, 'tx> { Ok(Code::Table(MemTable::new(head, table_access, rows))) } - fn _execute_insert(&mut self, table: &DbTable, rows: Vec) -> Result { + // TODO(centril): investigate taking bsatn as input instead. + fn _execute_insert(&mut self, table: &DbTable, inserts: Vec) -> Result { let tx = self.tx.unwrap_mut(); - let inserts = rows.clone(); // TODO code shouldn't be hot, let's remove later - for row in rows { - self.db.insert(tx, table.table_id, row)?; + let mut scratch = Vec::new(); + for row in &inserts { + row.encode(&mut scratch); + self.db.insert(tx, table.table_id, &scratch)?; + scratch.clear(); } Ok(Code::Pass(Some(Update { table_id: table.table_id, @@ -582,7 +585,7 @@ pub(crate) mod tests { StSequenceRow, StTableFields, StTableRow, ST_COLUMN_ID, ST_COLUMN_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_RESERVED_SEQUENCE_RANGE, ST_SEQUENCE_ID, ST_SEQUENCE_NAME, ST_TABLE_ID, ST_TABLE_NAME, }; - use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::tests_utils::{insert, TestDB}; use crate::execution_context::Workload; use pretty_assertions::assert_eq; use spacetimedb_lib::db::auth::{StAccess, StTableType}; @@ -634,7 +637,7 @@ pub(crate) mod tests { let schema = db.schema_for_table_mut(tx, table_id)?; for row in rows { - db.insert(tx, table_id, row.clone())?; + insert(db, tx, table_id, &row)?; } Ok(schema) diff --git a/crates/sats/src/algebraic_value.rs b/crates/sats/src/algebraic_value.rs index 3277c6092a7..863a3483e34 100644 --- a/crates/sats/src/algebraic_value.rs +++ b/crates/sats/src/algebraic_value.rs @@ -266,28 +266,6 @@ impl AlgebraicValue { _ => false, } } - - /// Converts `sequence_value` to an appropriate `AlgebraicValue` based on `ty`. - /// Truncates the `sequence_value` to fit `ty`. - /// - /// Panics if `ty` is not an integer type. - pub fn from_sequence_value(ty: &AlgebraicType, sequence_value: i128) -> Self { - match *ty { - AlgebraicType::I8 => (sequence_value as i8).into(), - AlgebraicType::U8 => (sequence_value as u8).into(), - AlgebraicType::I16 => (sequence_value as i16).into(), - AlgebraicType::U16 => (sequence_value as u16).into(), - AlgebraicType::I32 => (sequence_value as i32).into(), - AlgebraicType::U32 => (sequence_value as u32).into(), - AlgebraicType::I64 => (sequence_value as i64).into(), - AlgebraicType::U64 => (sequence_value as u64).into(), - AlgebraicType::I128 => sequence_value.into(), - AlgebraicType::U128 => (sequence_value as u128).into(), - AlgebraicType::I256 => sequence_value.into(), - AlgebraicType::U256 => (sequence_value as u128).into(), - _ => panic!("`{ty:?}` is not a sequence integer type"), - } - } } impl> From> for AlgebraicValue { diff --git a/crates/table/src/layout.rs b/crates/table/src/layout.rs index f1ee56083f5..9c9b02b0d77 100644 --- a/crates/table/src/layout.rs +++ b/crates/table/src/layout.rs @@ -714,9 +714,6 @@ impl<'de> DeserializeSeed<'de> for &AlgebraicTypeLayout { AlgebraicTypeLayout::Primitive(PrimitiveType::F64) => f64::deserialize(de).map(Into::into), AlgebraicTypeLayout::VarLen(VarLenType::Array(ty)) => WithTypespace::empty(&**ty).deserialize(de), AlgebraicTypeLayout::VarLen(VarLenType::String) => >::deserialize(de).map(Into::into), - /* - AlgebraicType::Sum(sum) => self.with(sum).deserialize(de).map(Into::into), - */ } } } diff --git a/crates/table/src/page.rs b/crates/table/src/page.rs index ab6a0d46420..faf93a54e4a 100644 --- a/crates/table/src/page.rs +++ b/crates/table/src/page.rs @@ -1154,6 +1154,21 @@ impl Page { (fixed, var) } + /// Returns a mutable view of the row from `start` lasting `fixed_row_size` number of bytes. + /// + /// This method is safe, but callers should take care that `start` and `fixed_row_size` + /// are correct for this page, and that `start` is aligned. + /// Callers should further ensure that mutations to the row leave the row bytes + /// in an expected state, i.e. initialized where required by the row type, + /// and with `VarLenRef`s that point to valid granules and with correct lengths. + /// + /// This call will clear the unmodified hash + /// as it is expected that the caller will alter the the page. + pub fn get_fixed_row_data_mut(&mut self, start: PageOffset, fixed_row_size: Size) -> &mut Bytes { + self.header.unmodified_hash = None; + &mut self.row_data[start.range(fixed_row_size)] + } + /// Return the total required var-len granules to store `objects`. pub fn total_granules_required_for_objects(objects: &[impl AsRef<[u8]>]) -> usize { objects @@ -1279,6 +1294,8 @@ impl Page { } /// Allocates a space for a fixed size row of `fixed_row_size` in the freelist, if possible. + /// + /// This call will clear the unmodified hash. #[inline] fn alloc_fixed_len_from_freelist(&mut self, fixed_row_size: Size) -> Option { let header = &mut self.header.fixed; @@ -1293,6 +1310,8 @@ impl Page { } /// Allocates a space for a fixed size row of `fixed_row_size` in the freelist, if possible. + /// + /// This call will clear the unmodified hash. #[inline] fn alloc_fixed_len_from_gap(&mut self, fixed_row_size: Size) -> Option { if gap_enough_size_for_row(self.header.var.first, self.header.fixed.last, fixed_row_size) { @@ -1376,6 +1395,8 @@ impl Page { /// Free a row, marking its fixed-len and var-len storage granules as available for re-use. /// + /// This call will clear the unmodified hash. + /// /// # Safety /// /// - `fixed_row` must point to a valid row in this page. diff --git a/crates/table/src/pages.rs b/crates/table/src/pages.rs index c434f91eafc..7dc74e01c47 100644 --- a/crates/table/src/pages.rs +++ b/crates/table/src/pages.rs @@ -64,7 +64,6 @@ impl Pages { /// /// Used in benchmarks. Internal operators will prefer directly indexing into `self.pages`, /// as that allows split borrows. - #[doc(hidden)] // Used in benchmarks. pub fn get_page_mut(&mut self, page: PageIndex) -> &mut Page { &mut self.pages[page.idx()] } diff --git a/crates/table/src/read_column.rs b/crates/table/src/read_column.rs index f1ad4825cc1..1295d01e621 100644 --- a/crates/table/src/read_column.rs +++ b/crates/table/src/read_column.rs @@ -332,6 +332,7 @@ impl_read_column_via_from! { u32 => spacetimedb_primitives::IndexId; u32 => spacetimedb_primitives::ConstraintId; u32 => spacetimedb_primitives::SequenceId; + u32 => spacetimedb_primitives::ScheduleId; u128 => Packed; i128 => Packed; u256 => Box; diff --git a/crates/table/src/static_bsatn_validator.rs b/crates/table/src/static_bsatn_validator.rs index 22cfcfbe040..2a1078ba1cf 100644 --- a/crates/table/src/static_bsatn_validator.rs +++ b/crates/table/src/static_bsatn_validator.rs @@ -286,6 +286,7 @@ impl Insn { impl MemoryUsage for Insn {} +#[derive(Clone)] pub struct StaticBsatnValidator { /// The list of instructions that make up this program. insns: Arc<[Insn]>, diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index bbdd4c9a032..6d756babe78 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -1,6 +1,12 @@ +use crate::{ + bflatn_to::write_row_to_pages_bsatn, + layout::AlgebraicTypeLayout, + static_bsatn_validator::{static_bsatn_validator, validate_bsatn, StaticBsatnValidator}, +}; + use super::{ bflatn_from::serialize_row_from_page, - bflatn_to::write_row_to_pages, + bflatn_to::{write_row_to_pages, Error}, blob_store::BlobStore, btree_index::{BTreeIndex, BTreeIndexRangeIter}, eq::eq_row_in_page, @@ -18,22 +24,27 @@ use super::{ var_len::VarLenMembers, MemoryUsage, }; -use core::hash::{Hash, Hasher}; use core::ops::RangeBounds; use core::{fmt, ptr}; +use core::{ + hash::{Hash, Hasher}, + hint::unreachable_unchecked, +}; use derive_more::{Add, AddAssign, From, Sub}; +use smallvec::SmallVec; use spacetimedb_data_structures::map::{DefaultHashBuilder, HashCollectionExt, HashMap}; use spacetimedb_lib::{bsatn::DecodeError, de::DeserializeOwned}; -use spacetimedb_primitives::{ColId, ColList, IndexId}; +use spacetimedb_primitives::{ColId, ColList, IndexId, SequenceId}; use spacetimedb_sats::{ algebraic_value::ser::ValueSerializer, bsatn::{self, ser::BsatnError, ToBsatn}, + i256, product_value::InvalidFieldError, satn::Satn, ser::{Serialize, Serializer}, - AlgebraicValue, ProductType, ProductValue, + u256, AlgebraicValue, ProductType, ProductValue, }; -use spacetimedb_schema::schema::TableSchema; +use spacetimedb_schema::{schema::TableSchema, type_for_generate::PrimitiveType}; use std::sync::Arc; use thiserror::Error; @@ -43,6 +54,9 @@ pub struct BlobNumBytes(usize); impl MemoryUsage for BlobNumBytes {} +pub type SeqIdList = SmallVec<[SequenceId; 4]>; +static_assert_size!(SeqIdList, 24); + /// A database table containing the row schema, the rows, and indices. /// /// The table stores the rows into a page manager @@ -81,7 +95,7 @@ pub(crate) struct TableInner { row_layout: RowTypeLayout, /// A [`StaticLayout`] for fast BFLATN <-> BSATN conversion, /// if the [`RowTypeLayout`] has a static BSATN length and layout. - static_layout: Option, + static_layout: Option<(StaticLayout, StaticBsatnValidator)>, /// The visitor program for `row_layout`. /// /// Must be in the `TableInner` so that [`RowRef::blob_store_bytes`] can use it. @@ -125,7 +139,7 @@ impl TableInner { } } -static_assert_size!(Table, 256); +static_assert_size!(Table, 272); impl MemoryUsage for Table { fn heap_usage(&self) -> usize { @@ -191,7 +205,7 @@ impl Table { /// Creates a new empty table with the given `schema` and `squashed_offset`. pub fn new(schema: Arc, squashed_offset: SquashedOffset) -> Self { let row_layout: RowTypeLayout = schema.get_row_type().clone().into(); - let static_layout = StaticLayout::for_row_type(&row_layout); + let static_layout = StaticLayout::for_row_type(&row_layout).map(|sl| (sl, static_bsatn_validator(&row_layout))); let visitor_prog = row_type_visitor(&row_layout); Self::new_with_indexes_capacity(schema, row_layout, static_layout, visitor_prog, squashed_offset, 0) } @@ -239,9 +253,7 @@ impl Table { // Insert row into indices and check unique constraints. // SAFETY: We just inserted `ptr`, so it must be present. - unsafe { - self.insert_into_indices(blob_store, ptr)?; - } + unsafe { self.insert_into_indices(blob_store, ptr) }?; // SAFETY: We just inserted `ptr`, // and `insert_into_indices` didn't remove it, @@ -266,8 +278,7 @@ impl Table { // SAFETY: `ptr` was derived from `row_ref` without interleaving calls, so it must be valid. unsafe { self.insert_into_pointer_map(blob_store, ptr, hash) }?; - self.row_count += 1; - self.blob_store_bytes += blob_bytes; + self.update_statistics_added_row(blob_bytes); Ok((hash, ptr)) } @@ -300,6 +311,179 @@ impl Table { Ok((row_ref, blob_bytes)) } + /// Physically insert a `row`, encoded in BSATN, into this table, + /// storing its large var-len members in the `blob_store`. + /// + /// On success, returns the hash of the newly-inserted row, + /// and a `RowRef` referring to the row. + /// + /// This does not check for set semantic or unique constraints. + /// + /// This is also useful when we need to insert a row temporarily to get back a `RowPointer`. + /// In this case, A call to this method should be followed by a call to [`delete_internal_skip_pointer_map`]. + pub fn insert_physically_bsatn<'a>( + &'a mut self, + blob_store: &'a mut dyn BlobStore, + row: &[u8], + ) -> Result<(RowRef<'a>, BlobNumBytes), InsertError> { + // Got a static layout? => Use fast-path insertion. + let (ptr, blob_bytes) = if let Some((static_layout, static_validator)) = self.inner.static_layout.as_ref() { + // Before inserting, validate the row, ensuring type safety. + // SAFETY: The `static_validator` was derived from the same row layout as the static layout. + unsafe { validate_bsatn(static_validator, static_layout, row) }.map_err(Error::Decode)?; + + let fixed_row_size = self.inner.row_layout.size(); + let squashed_offset = self.squashed_offset; + let res = self + .inner + .pages + .with_page_to_insert_row(fixed_row_size, 0, |page| { + // SAFETY: We've used the right `row_size` and we trust that others have too. + // `RowTypeLayout` also ensures that we satisfy the minimum row size. + let fixed_offset = unsafe { page.alloc_fixed_len(fixed_row_size) }.map_err(Error::PageError)?; + let (mut fixed, _) = page.split_fixed_var_mut(); + let fixed_buf = fixed.get_row_mut(fixed_offset, fixed_row_size); + // SAFETY: + // - We've validated that `row` is of sufficient length. + // - The `fixed_buf` is exactly the right `fixed_row_size`. + unsafe { static_layout.deserialize_row_into(fixed_buf, row) }; + Ok(fixed_offset) + }) + .map_err(Error::PagesError)?; + match res { + (page, Ok(offset)) => (RowPointer::new(false, page, offset, squashed_offset), 0.into()), + (_, Err(e)) => return Err(e), + } + } else { + // SAFETY: `self.pages` is known to be specialized for `self.row_layout`, + // as `self.pages` was constructed from `self.row_layout` in `Table::new`. + unsafe { + write_row_to_pages_bsatn( + &mut self.inner.pages, + &self.inner.visitor_prog, + blob_store, + &self.inner.row_layout, + row, + self.squashed_offset, + ) + }? + }; + + // SAFETY: We just inserted `ptr`, so it must be present. + let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) }; + + Ok((row_ref, blob_bytes)) + } + + /// Returns all the columns with sequences that need generation for this `row`. + /// + /// # Safety + /// + /// `self.is_row_present(row)` must hold. + pub unsafe fn sequence_triggers_for<'a>( + &'a self, + blob_store: &'a dyn BlobStore, + row: RowPointer, + ) -> (ColList, SeqIdList) { + let sequences = &*self.get_schema().sequences; + let row_ty = self.row_layout().product(); + + // SAFETY: Caller promised that `self.is_row_present(row)` holds. + let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, row) }; + + sequences + .iter() + // Find all the sequences that are triggered by this row. + .filter(|seq| { + // SAFETY: `seq.col_pos` is in-bounds of `row_ty.elements` + // as `row_ty` was derived from the same schema as `seq` is part of. + let elem_ty = unsafe { &row_ty.elements.get_unchecked(seq.col_pos.idx()) }; + // SAFETY: + // - `elem_ty` appears as a column in th row type. + // - `AlgebraicValue` is compatible with all types. + let val = unsafe { AlgebraicValue::unchecked_read_column(row_ref, elem_ty) }; + val.is_numeric_zero() + }) + .map(|seq| (seq.col_pos, seq.sequence_id)) + .unzip() + } + + /// Writes `seq_val` to the column at `col_id` in the row identified by `ptr`. + /// + /// Truncates the `seq_val` to fit the type of the column. + /// + /// # Safety + /// + /// - `self.is_row_present(row)` must hold. + /// - `col_id` must be a valid column, with a primiive type, of the row type. + pub unsafe fn write_gen_val_to_col(&mut self, col_id: ColId, ptr: RowPointer, seq_val: i128) { + let row_ty = self.inner.row_layout.product(); + let elem_ty = unsafe { row_ty.elements.get_unchecked(col_id.idx()) }; + let AlgebraicTypeLayout::Primitive(col_typ) = elem_ty.ty else { + // SAFETY: Columns with sequences must be primitive types. + unsafe { unreachable_unchecked() } + }; + + let fixed_row_size = self.inner.row_layout.size(); + let fixed_buf = self.inner.pages[ptr.page_index()].get_fixed_row_data_mut(ptr.page_offset(), fixed_row_size); + + fn write(dst: &mut [u8], offset: u16, bytes: [u8; N]) { + let offset = offset as usize; + dst[offset..offset + N].copy_from_slice(&bytes); + } + + match col_typ { + PrimitiveType::I8 => write(fixed_buf, elem_ty.offset, (seq_val as i8).to_le_bytes()), + PrimitiveType::U8 => write(fixed_buf, elem_ty.offset, (seq_val as u8).to_le_bytes()), + PrimitiveType::I16 => write(fixed_buf, elem_ty.offset, (seq_val as i16).to_le_bytes()), + PrimitiveType::U16 => write(fixed_buf, elem_ty.offset, (seq_val as u16).to_le_bytes()), + PrimitiveType::I32 => write(fixed_buf, elem_ty.offset, (seq_val as i32).to_le_bytes()), + PrimitiveType::U32 => write(fixed_buf, elem_ty.offset, (seq_val as u32).to_le_bytes()), + PrimitiveType::I64 => write(fixed_buf, elem_ty.offset, (seq_val as i64).to_le_bytes()), + PrimitiveType::U64 => write(fixed_buf, elem_ty.offset, (seq_val as u64).to_le_bytes()), + PrimitiveType::I128 => write(fixed_buf, elem_ty.offset, seq_val.to_le_bytes()), + PrimitiveType::U128 => write(fixed_buf, elem_ty.offset, (seq_val as u128).to_le_bytes()), + PrimitiveType::I256 => write(fixed_buf, elem_ty.offset, (i256::from(seq_val)).to_le_bytes()), + PrimitiveType::U256 => write(fixed_buf, elem_ty.offset, (u256::from(seq_val as u128)).to_le_bytes()), + PrimitiveType::Bool | PrimitiveType::F32 | PrimitiveType::F64 => { + panic!("`{:?}` is not a sequence integer type", &elem_ty.ty) + } + } + } + + /// Performs all the checks necessary after having fully decided on a rows contents. + /// + /// On `Ok(_)`, statistics of the table are also updated, + /// and the `ptr` still points to a valid row, and otherwise not. + /// + /// # Safety + /// + /// `self.is_row_present(row)` must hold. + pub fn confirm_insertion<'a>( + &'a mut self, + blob_store: &'a mut dyn BlobStore, + ptr: RowPointer, + blob_bytes: BlobNumBytes, + ) -> Result<(RowHash, RowPointer), InsertError> { + // SAFETY: Caller promised that `self.is_row_present(row)` holds. + let row_ref = unsafe { self.get_row_ref_unchecked(blob_store, ptr) }; + let hash = row_ref.row_hash(); + // SAFETY: Caller promised that `self.is_row_present(row)` holds. + unsafe { self.insert_into_pointer_map(blob_store, ptr, hash) }?; + // SAFETY: Caller promised that `self.is_row_present(row)` holds. + unsafe { self.insert_into_indices(blob_store, ptr) }?; + + self.update_statistics_added_row(blob_bytes); + Ok((hash, ptr)) + } + + /// We've added a row, update the statistics to record this. + #[inline] + fn update_statistics_added_row(&mut self, blob_bytes: BlobNumBytes) { + self.row_count += 1; + self.blob_store_bytes += blob_bytes; + } + /// Insert row identified by `ptr` into indices. /// This also checks unique constraints. /// Deletes the row if there were any violations. @@ -415,7 +599,7 @@ impl Table { committed_offset, tx_offset, &committed_table.inner.row_layout, - committed_table.inner.static_layout.as_ref(), + committed_table.static_layout(), ) } }) @@ -675,7 +859,7 @@ impl Table { pub fn clone_structure(&self, squashed_offset: SquashedOffset) -> Self { let schema = self.schema.clone(); let layout = self.row_layout().clone(); - let sbl = self.static_layout().cloned(); + let sbl = self.inner.static_layout.clone(); let visitor = self.inner.visitor_prog.clone(); let mut new = Table::new_with_indexes_capacity(schema, layout, sbl, visitor, squashed_offset, self.indexes.len()); @@ -841,21 +1025,16 @@ impl<'a> RowRef<'a> { RowHash(RowHash::hasher_builder().hash_one(self)) } - /// The length of this row when BSATN-encoded. - /// - /// Only available for rows whose types have a static BSATN layout. - /// Returns `None` for rows of other types, e.g. rows containing strings. - pub fn bsatn_length(&self) -> Option { - self.table.static_layout.as_ref().map(|s| s.bsatn_length as usize) + /// Returns the static layout for this row reference, if any. + pub fn static_layout(&self) -> Option<&StaticLayout> { + self.table.static_layout.as_ref().map(|(s, _)| s) } /// Encode the row referred to by `self` into a `Vec` using BSATN and then deserialize it. - /// The passed buffer is allowed to be in an arbitrary state before and after this operation. pub fn read_via_bsatn(&self, scratch: &mut Vec) -> Result where T: DeserializeOwned, { - scratch.clear(); self.to_bsatn_extend(scratch)?; Ok(bsatn::from_slice::(scratch)?) } @@ -903,7 +1082,7 @@ impl ToBsatn for RowRef<'_> { /// This method will use a [`StaticLayout`] if one is available, /// and may therefore be faster than calling [`bsatn::to_vec`]. fn to_bsatn_vec(&self) -> Result, BsatnError> { - if let Some(static_layout) = &self.table.static_layout { + if let Some(static_layout) = self.static_layout() { // Use fast path, by first fetching the row data and then using the static layout. let row = self.get_row_data(); // SAFETY: @@ -921,7 +1100,7 @@ impl ToBsatn for RowRef<'_> { /// This method will use a [`StaticLayout`] if one is available, /// and may therefore be faster than calling [`bsatn::to_writer`]. fn to_bsatn_extend(&self, buf: &mut Vec) -> Result<(), BsatnError> { - if let Some(static_layout) = &self.table.static_layout { + if let Some(static_layout) = self.static_layout() { // Use fast path, by first fetching the row data and then using the static layout. let row = self.get_row_data(); // SAFETY: @@ -938,7 +1117,7 @@ impl ToBsatn for RowRef<'_> { } fn static_bsatn_size(&self) -> Option { - self.table.static_layout.as_ref().map(|sbl| sbl.bsatn_length) + self.static_layout().map(|sl| sl.bsatn_length) } } @@ -957,7 +1136,7 @@ impl PartialEq for RowRef<'_> { } let (page_a, offset_a) = self.page_and_offset(); let (page_b, offset_b) = other.page_and_offset(); - let static_layout = self.table.static_layout.as_ref(); + let static_layout = self.static_layout(); // SAFETY: `offset_a/b` are valid rows in `page_a/b` typed at `a_ty` // and `static_bsatn_layout` is derived from `a_ty`. unsafe { eq_row_in_page(page_a, page_b, offset_a, offset_b, a_ty, static_layout) } @@ -1133,7 +1312,7 @@ impl Table { fn new_with_indexes_capacity( schema: Arc, row_layout: RowTypeLayout, - static_layout: Option, + static_layout: Option<(StaticLayout, StaticBsatnValidator)>, visitor_prog: VarLenVisitorProgram, squashed_offset: SquashedOffset, indexes_capacity: usize, @@ -1215,7 +1394,7 @@ impl Table { /// Returns the [`StaticLayout`] for this table, pub(crate) fn static_layout(&self) -> Option<&StaticLayout> { - self.inner.static_layout.as_ref() + self.inner.static_layout.as_ref().map(|(s, _)| s) } /// Rebuild the [`PointerMap`] by iterating over all the rows in `self` and inserting them.