Skip to content

Commit

Permalink
datastore: insert via bsatn instead of PV
Browse files Browse the repository at this point in the history
  • Loading branch information
Centril committed Dec 18, 2024
1 parent fd676c3 commit 5df3305
Show file tree
Hide file tree
Showing 26 changed files with 557 additions and 308 deletions.
14 changes: 11 additions & 3 deletions crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compress
use spacetimedb_bench::database::BenchDatabase as _;
use spacetimedb_bench::spacetime_raw::SpacetimeRaw;
use spacetimedb_primitives::{col_list, TableId};
use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductValue};
use spacetimedb_sats::{bsatn, product, AlgebraicType, AlgebraicValue, ProductValue};

fn create_table_location(db: &RelationalDB) -> Result<TableId, DBError> {
let schema = &[
Expand Down Expand Up @@ -57,11 +57,15 @@ fn eval(c: &mut Criterion) {
.db
.with_auto_commit(Workload::Internal, |tx| -> Result<(), DBError> {
// 1M rows
let mut scratch = Vec::new();
for entity_id in 0u64..1_000_000 {
let owner = entity_id % 1_000;
let footprint = AlgebraicValue::sum(entity_id as u8 % 4, AlgebraicValue::unit());
let row = product!(entity_id, owner, footprint);
let _ = raw.db.insert(tx, lhs, row)?;

scratch.clear();
bsatn::to_writer(&mut scratch, &row).unwrap();
let _ = raw.db.insert(tx, lhs, &scratch)?;
}
Ok(())
});
Expand All @@ -70,14 +74,18 @@ fn eval(c: &mut Criterion) {
.db
.with_auto_commit(Workload::Internal, |tx| -> Result<(), DBError> {
// 1000 chunks, 1200 rows per chunk = 1.2M rows
let mut scratch = Vec::new();
for chunk_index in 0u64..1_000 {
for i in 0u64..1200 {
let entity_id = chunk_index * 1200 + i;
let x = 0i32;
let z = entity_id as i32;
let dimension = 0u32;
let row = product!(entity_id, chunk_index, x, z, dimension);
let _ = raw.db.insert(tx, rhs, row)?;

scratch.clear();
bsatn::to_writer(&mut scratch, &row).unwrap();
let _ = raw.db.insert(tx, rhs, &scratch)?;
}
}
Ok(())
Expand Down
12 changes: 9 additions & 3 deletions crates/bench/src/spacetime_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::{
};
use spacetimedb::db::relational_db::{tests_utils::TestDB, RelationalDB};
use spacetimedb::execution_context::Workload;
use spacetimedb_lib::sats::AlgebraicValue;
use spacetimedb_primitives::{ColId, IndexId, TableId};
use spacetimedb_sats::{bsatn, AlgebraicValue};
use spacetimedb_schema::{
def::{BTreeAlgorithm, IndexAlgorithm},
schema::{IndexSchema, TableSchema},
Expand Down Expand Up @@ -102,8 +102,11 @@ impl BenchDatabase for SpacetimeRaw {

fn insert_bulk<T: BenchTable>(&mut self, table_id: &Self::TableId, rows: Vec<T>) -> ResultBench<()> {
self.db.with_auto_commit(Workload::Internal, |tx| {
let mut scratch = Vec::new();
for row in rows {
self.db.insert(tx, *table_id, row.into_product_value())?;
scratch.clear();
bsatn::to_writer(&mut scratch, &row.into_product_value()).unwrap();
self.db.insert(tx, *table_id, &scratch)?;
}
Ok(())
})
Expand All @@ -119,6 +122,7 @@ impl BenchDatabase for SpacetimeRaw {
.collect::<Vec<_>>();

assert_eq!(rows.len(), row_count as usize, "not enough rows found for update_bulk!");
let mut scratch = Vec::new();
for mut row in rows {
// It would likely be faster to collect a vector of IDs and delete + insert them all at once,
// but this implementation is closer to how `update` works in modules.
Expand All @@ -143,7 +147,9 @@ impl BenchDatabase for SpacetimeRaw {
panic!("column 1 is not a u64!");
}

self.db.insert(tx, *table_id, row)?;
scratch.clear();
bsatn::to_writer(&mut scratch, &row).unwrap();
self.db.insert(tx, *table_id, &scratch)?;
}
Ok(())
})
Expand Down
76 changes: 41 additions & 35 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,6 @@ impl MutTxDatastore for Locking {
tx.index_id_from_name(index_name)
}

fn get_next_sequence_value_mut_tx(&self, tx: &mut Self::MutTx, seq_id: SequenceId) -> Result<i128> {
tx.get_next_sequence_value(seq_id)
}

fn create_sequence_mut_tx(&self, tx: &mut Self::MutTx, sequence_schema: SequenceSchema) -> Result<SequenceId> {
tx.create_sequence(sequence_schema)
}
Expand Down Expand Up @@ -562,9 +558,9 @@ impl MutTxDatastore for Locking {
&'a self,
tx: &'a mut Self::MutTx,
table_id: TableId,
mut row: ProductValue,
) -> Result<(AlgebraicValue, RowRef<'a>)> {
let (gens, row_ref) = tx.insert(table_id, &mut row)?;
row: &[u8],
) -> Result<(ColList, RowRef<'a>)> {
let (gens, row_ref) = tx.insert::<true>(table_id, row)?;
Ok((gens, row_ref.collapse()))
}

Expand All @@ -589,7 +585,7 @@ impl MutTxDatastore for Locking {
row.program_bytes = program.bytes;

tx.delete(ST_MODULE_ID, ptr)?;
tx.insert(ST_MODULE_ID, &mut row.into()).map(drop)
tx.insert_via_serialize_bsatn(ST_MODULE_ID, &row).map(drop)
}

None => Err(anyhow!(
Expand Down Expand Up @@ -976,6 +972,7 @@ mod tests {
use crate::db::datastore::traits::{IsolationLevel, MutTx};
use crate::db::datastore::Result;
use crate::error::{DBError, IndexError};
use bsatn::to_vec;
use itertools::Itertools;
use pretty_assertions::assert_eq;
use spacetimedb_lib::db::auth::{StAccess, StTableType};
Expand Down Expand Up @@ -1344,6 +1341,18 @@ mod tests {
.collect()
}

fn insert<'a>(
datastore: &'a Locking,
tx: &'a mut MutTxId,
table_id: TableId,
row: &ProductValue,
) -> Result<(AlgebraicValue, RowRef<'a>)> {
let row = to_vec(&row).unwrap();
let (gen_cols, row_ref) = datastore.insert_mut_tx(tx, table_id, &row)?;
let gen_cols = row_ref.project(&gen_cols)?;
Ok((gen_cols, row_ref))
}

#[test]
fn test_bootstrapping_sets_up_tables() -> ResultTest<()> {
let datastore = get_datastore()?;
Expand Down Expand Up @@ -1683,7 +1692,7 @@ mod tests {
fn test_insert_pre_commit() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(1, "Foo", 18)]);
Ok(())
Expand All @@ -1693,7 +1702,7 @@ mod tests {
fn test_insert_wrong_schema_pre_commit() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = product!(0, "Foo");
assert!(datastore.insert_mut_tx(&mut tx, table_id, row).is_err());
assert!(insert(&datastore, &mut tx, table_id, &row).is_err());
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![]);
Ok(())
Expand All @@ -1703,7 +1712,7 @@ mod tests {
fn test_insert_post_commit() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
// 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, u32_str_u32(0, "Foo", 18))?;
insert(&datastore, &mut tx, table_id, &u32_str_u32(0, "Foo", 18))?;
datastore.commit_mut_tx(tx)?;
let tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
#[rustfmt::skip]
Expand All @@ -1717,7 +1726,7 @@ mod tests {
let row = u32_str_u32(15, "Foo", 18); // 15 is ignored.
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.rollback_mut_tx(tx);
let tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
#[rustfmt::skip]
Expand All @@ -1729,15 +1738,15 @@ mod tests {
fn test_insert_commit_delete_insert() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let created_row = u32_str_u32(1, "Foo", 18);
let num_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, [created_row]);
assert_eq!(num_deleted, 1);
assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0);
let created_row = u32_str_u32(1, "Foo", 19);
datastore.insert_mut_tx(&mut tx, table_id, created_row)?;
insert(&datastore, &mut tx, table_id, &created_row)?;
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(1, "Foo", 19)]);
Ok(())
Expand All @@ -1747,7 +1756,7 @@ mod tests {
fn test_insert_delete_insert_delete_insert() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(1, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
insert(&datastore, &mut tx, table_id, &row)?;
for i in 0..2 {
assert_eq!(
all_rows(&datastore, &tx, table_id),
Expand All @@ -1764,7 +1773,7 @@ mod tests {
&[],
"Found rows present after deleting",
);
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
insert(&datastore, &mut tx, table_id, &row)?;
assert_eq!(
all_rows(&datastore, &tx, table_id),
vec![row.clone()],
Expand All @@ -1778,8 +1787,8 @@ mod tests {
fn test_unique_constraint_pre_commit() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
let result = datastore.insert_mut_tx(&mut tx, table_id, row);
insert(&datastore, &mut tx, table_id, &row)?;
let result = insert(&datastore, &mut tx, table_id, &row);
match result {
Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation {
constraint_name: _,
Expand All @@ -1798,10 +1807,10 @@ mod tests {
fn test_unique_constraint_post_commit() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let result = datastore.insert_mut_tx(&mut tx, table_id, row);
let result = insert(&datastore, &mut tx, table_id, &row);
match result {
Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation {
constraint_name: _,
Expand All @@ -1822,10 +1831,10 @@ mod tests {
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.rollback_mut_tx(tx);
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(2, "Foo", 18)]);
Ok(())
Expand All @@ -1838,7 +1847,7 @@ mod tests {

let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.commit_mut_tx(tx)?;

let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
Expand Down Expand Up @@ -1873,7 +1882,7 @@ mod tests {
IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", },
].map(Into::into));
let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored.
let result = datastore.insert_mut_tx(&mut tx, table_id, row);
let result = insert(&datastore, &mut tx, table_id, &row);
match result {
Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation {
constraint_name: _,
Expand All @@ -1892,7 +1901,7 @@ mod tests {
fn test_create_index_post_commit() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let index_def = IndexSchema {
Expand Down Expand Up @@ -1927,7 +1936,7 @@ mod tests {
IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", },
].map(Into::into));
let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored.
let result = datastore.insert_mut_tx(&mut tx, table_id, row);
let result = insert(&datastore, &mut tx, table_id, &row);
match result {
Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation {
constraint_name: _,
Expand All @@ -1946,7 +1955,7 @@ mod tests {
fn test_create_index_post_rollback() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let index_def = IndexSchema {
Expand Down Expand Up @@ -1981,7 +1990,7 @@ mod tests {
IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", },
].map(Into::into));
let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![
u32_str_u32(1, "Foo", 18),
Expand All @@ -1998,7 +2007,7 @@ mod tests {
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
// Because of auto_inc columns, we will get a slightly different
// value than the one we inserted.
let row = datastore.insert_mut_tx(&mut tx, table_id, row)?.1.to_product_value();
let row = insert(&datastore, &mut tx, table_id, &row)?.1.to_product_value();
datastore.commit_mut_tx(tx)?;

let all_rows_col_0_eq_1 = |tx: &MutTxId| {
Expand All @@ -2023,10 +2032,7 @@ mod tests {
assert_eq!(all_rows_col_0_eq_1(&tx).len(), 0);

// Reinsert the row.
let reinserted_row = datastore
.insert_mut_tx(&mut tx, table_id, row.clone())?
.1
.to_product_value();
let reinserted_row = insert(&datastore, &mut tx, table_id, &row)?.1.to_product_value();
assert_eq!(reinserted_row, row);

// The actual test: we should be able to iterate again, while still in the
Expand All @@ -2044,9 +2050,9 @@ mod tests {
fn test_read_only_tx_shared_lock() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row1 = u32_str_u32(1, "Foo", 18);
datastore.insert_mut_tx(&mut tx, table_id, row1.clone())?;
insert(&datastore, &mut tx, table_id, &row1)?;
let row2 = u32_str_u32(2, "Bar", 20);
datastore.insert_mut_tx(&mut tx, table_id, row2.clone())?;
insert(&datastore, &mut tx, table_id, &row2)?;
datastore.commit_mut_tx(tx)?;

// create multiple read only tx, and use them together.
Expand Down
Loading

0 comments on commit 5df3305

Please sign in to comment.