Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datastore: insert via BSATN instead of via PV #2069

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use spacetimedb::{db::relational_db::RelationalDB, messages::websocket::Compress
use spacetimedb_bench::database::BenchDatabase as _;
use spacetimedb_bench::spacetime_raw::SpacetimeRaw;
use spacetimedb_primitives::{col_list, TableId};
use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductValue};
use spacetimedb_sats::{bsatn, product, AlgebraicType, AlgebraicValue, ProductValue};

fn create_table_location(db: &RelationalDB) -> Result<TableId, DBError> {
let schema = &[
Expand Down Expand Up @@ -57,11 +57,15 @@ fn eval(c: &mut Criterion) {
.db
.with_auto_commit(Workload::Internal, |tx| -> Result<(), DBError> {
// 1M rows
let mut scratch = Vec::new();
for entity_id in 0u64..1_000_000 {
let owner = entity_id % 1_000;
let footprint = AlgebraicValue::sum(entity_id as u8 % 4, AlgebraicValue::unit());
let row = product!(entity_id, owner, footprint);
let _ = raw.db.insert(tx, lhs, row)?;

scratch.clear();
bsatn::to_writer(&mut scratch, &row).unwrap();
let _ = raw.db.insert(tx, lhs, &scratch)?;
}
Ok(())
});
Expand All @@ -70,14 +74,18 @@ fn eval(c: &mut Criterion) {
.db
.with_auto_commit(Workload::Internal, |tx| -> Result<(), DBError> {
// 1000 chunks, 1200 rows per chunk = 1.2M rows
let mut scratch = Vec::new();
for chunk_index in 0u64..1_000 {
for i in 0u64..1200 {
let entity_id = chunk_index * 1200 + i;
let x = 0i32;
let z = entity_id as i32;
let dimension = 0u32;
let row = product!(entity_id, chunk_index, x, z, dimension);
let _ = raw.db.insert(tx, rhs, row)?;

scratch.clear();
bsatn::to_writer(&mut scratch, &row).unwrap();
let _ = raw.db.insert(tx, rhs, &scratch)?;
}
}
Ok(())
Expand Down
12 changes: 9 additions & 3 deletions crates/bench/src/spacetime_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::{
};
use spacetimedb::db::relational_db::{tests_utils::TestDB, RelationalDB};
use spacetimedb::execution_context::Workload;
use spacetimedb_lib::sats::AlgebraicValue;
use spacetimedb_primitives::{ColId, IndexId, TableId};
use spacetimedb_sats::{bsatn, AlgebraicValue};
use spacetimedb_schema::{
def::{BTreeAlgorithm, IndexAlgorithm},
schema::{IndexSchema, TableSchema},
Expand Down Expand Up @@ -102,8 +102,11 @@ impl BenchDatabase for SpacetimeRaw {

fn insert_bulk<T: BenchTable>(&mut self, table_id: &Self::TableId, rows: Vec<T>) -> ResultBench<()> {
self.db.with_auto_commit(Workload::Internal, |tx| {
let mut scratch = Vec::new();
for row in rows {
self.db.insert(tx, *table_id, row.into_product_value())?;
scratch.clear();
bsatn::to_writer(&mut scratch, &row.into_product_value()).unwrap();
self.db.insert(tx, *table_id, &scratch)?;
}
Ok(())
})
Expand All @@ -119,6 +122,7 @@ impl BenchDatabase for SpacetimeRaw {
.collect::<Vec<_>>();

assert_eq!(rows.len(), row_count as usize, "not enough rows found for update_bulk!");
let mut scratch = Vec::new();
for mut row in rows {
// It would likely be faster to collect a vector of IDs and delete + insert them all at once,
// but this implementation is closer to how `update` works in modules.
Expand All @@ -143,7 +147,9 @@ impl BenchDatabase for SpacetimeRaw {
panic!("column 1 is not a u64!");
}

self.db.insert(tx, *table_id, row)?;
scratch.clear();
bsatn::to_writer(&mut scratch, &row).unwrap();
self.db.insert(tx, *table_id, &scratch)?;
}
Ok(())
})
Expand Down
76 changes: 41 additions & 35 deletions crates/core/src/db/datastore/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,6 @@ impl MutTxDatastore for Locking {
tx.index_id_from_name(index_name)
}

fn get_next_sequence_value_mut_tx(&self, tx: &mut Self::MutTx, seq_id: SequenceId) -> Result<i128> {
tx.get_next_sequence_value(seq_id)
}

fn create_sequence_mut_tx(&self, tx: &mut Self::MutTx, sequence_schema: SequenceSchema) -> Result<SequenceId> {
tx.create_sequence(sequence_schema)
}
Expand Down Expand Up @@ -562,9 +558,9 @@ impl MutTxDatastore for Locking {
&'a self,
tx: &'a mut Self::MutTx,
table_id: TableId,
mut row: ProductValue,
) -> Result<(AlgebraicValue, RowRef<'a>)> {
let (gens, row_ref) = tx.insert(table_id, &mut row)?;
row: &[u8],
) -> Result<(ColList, RowRef<'a>)> {
let (gens, row_ref) = tx.insert::<true>(table_id, row)?;
Ok((gens, row_ref.collapse()))
}

Expand All @@ -589,7 +585,7 @@ impl MutTxDatastore for Locking {
row.program_bytes = program.bytes;

tx.delete(ST_MODULE_ID, ptr)?;
tx.insert(ST_MODULE_ID, &mut row.into()).map(drop)
tx.insert_via_serialize_bsatn(ST_MODULE_ID, &row).map(drop)
}

None => Err(anyhow!(
Expand Down Expand Up @@ -976,6 +972,7 @@ mod tests {
use crate::db::datastore::traits::{IsolationLevel, MutTx};
use crate::db::datastore::Result;
use crate::error::{DBError, IndexError};
use bsatn::to_vec;
use itertools::Itertools;
use pretty_assertions::assert_eq;
use spacetimedb_lib::db::auth::{StAccess, StTableType};
Expand Down Expand Up @@ -1344,6 +1341,18 @@ mod tests {
.collect()
}

fn insert<'a>(
datastore: &'a Locking,
tx: &'a mut MutTxId,
table_id: TableId,
row: &ProductValue,
) -> Result<(AlgebraicValue, RowRef<'a>)> {
let row = to_vec(&row).unwrap();
let (gen_cols, row_ref) = datastore.insert_mut_tx(tx, table_id, &row)?;
let gen_cols = row_ref.project(&gen_cols)?;
Ok((gen_cols, row_ref))
}

#[test]
fn test_bootstrapping_sets_up_tables() -> ResultTest<()> {
let datastore = get_datastore()?;
Expand Down Expand Up @@ -1683,7 +1692,7 @@ mod tests {
fn test_insert_pre_commit() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(1, "Foo", 18)]);
Ok(())
Expand All @@ -1693,7 +1702,7 @@ mod tests {
fn test_insert_wrong_schema_pre_commit() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = product!(0, "Foo");
assert!(datastore.insert_mut_tx(&mut tx, table_id, row).is_err());
assert!(insert(&datastore, &mut tx, table_id, &row).is_err());
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![]);
Ok(())
Expand All @@ -1703,7 +1712,7 @@ mod tests {
fn test_insert_post_commit() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
// 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, u32_str_u32(0, "Foo", 18))?;
insert(&datastore, &mut tx, table_id, &u32_str_u32(0, "Foo", 18))?;
datastore.commit_mut_tx(tx)?;
let tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
#[rustfmt::skip]
Expand All @@ -1717,7 +1726,7 @@ mod tests {
let row = u32_str_u32(15, "Foo", 18); // 15 is ignored.
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.rollback_mut_tx(tx);
let tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
#[rustfmt::skip]
Expand All @@ -1729,15 +1738,15 @@ mod tests {
fn test_insert_commit_delete_insert() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let created_row = u32_str_u32(1, "Foo", 18);
let num_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, [created_row]);
assert_eq!(num_deleted, 1);
assert_eq!(all_rows(&datastore, &tx, table_id).len(), 0);
let created_row = u32_str_u32(1, "Foo", 19);
datastore.insert_mut_tx(&mut tx, table_id, created_row)?;
insert(&datastore, &mut tx, table_id, &created_row)?;
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(1, "Foo", 19)]);
Ok(())
Expand All @@ -1747,7 +1756,7 @@ mod tests {
fn test_insert_delete_insert_delete_insert() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(1, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
insert(&datastore, &mut tx, table_id, &row)?;
for i in 0..2 {
assert_eq!(
all_rows(&datastore, &tx, table_id),
Expand All @@ -1764,7 +1773,7 @@ mod tests {
&[],
"Found rows present after deleting",
);
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
insert(&datastore, &mut tx, table_id, &row)?;
assert_eq!(
all_rows(&datastore, &tx, table_id),
vec![row.clone()],
Expand All @@ -1778,8 +1787,8 @@ mod tests {
fn test_unique_constraint_pre_commit() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
let result = datastore.insert_mut_tx(&mut tx, table_id, row);
insert(&datastore, &mut tx, table_id, &row)?;
let result = insert(&datastore, &mut tx, table_id, &row);
match result {
Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation {
constraint_name: _,
Expand All @@ -1798,10 +1807,10 @@ mod tests {
fn test_unique_constraint_post_commit() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let result = datastore.insert_mut_tx(&mut tx, table_id, row);
let result = insert(&datastore, &mut tx, table_id, &row);
match result {
Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation {
constraint_name: _,
Expand All @@ -1822,10 +1831,10 @@ mod tests {
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.rollback_mut_tx(tx);
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(2, "Foo", 18)]);
Ok(())
Expand All @@ -1838,7 +1847,7 @@ mod tests {

let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.commit_mut_tx(tx)?;

let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
Expand Down Expand Up @@ -1873,7 +1882,7 @@ mod tests {
IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", },
].map(Into::into));
let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored.
let result = datastore.insert_mut_tx(&mut tx, table_id, row);
let result = insert(&datastore, &mut tx, table_id, &row);
match result {
Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation {
constraint_name: _,
Expand All @@ -1892,7 +1901,7 @@ mod tests {
fn test_create_index_post_commit() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let index_def = IndexSchema {
Expand Down Expand Up @@ -1927,7 +1936,7 @@ mod tests {
IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", },
].map(Into::into));
let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored.
let result = datastore.insert_mut_tx(&mut tx, table_id, row);
let result = insert(&datastore, &mut tx, table_id, &row);
match result {
Err(DBError::Index(IndexError::UniqueConstraintViolation(UniqueConstraintViolation {
constraint_name: _,
Expand All @@ -1946,7 +1955,7 @@ mod tests {
fn test_create_index_post_rollback() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
datastore.commit_mut_tx(tx)?;
let mut tx = datastore.begin_mut_tx(IsolationLevel::Serializable, Workload::ForTests);
let index_def = IndexSchema {
Expand Down Expand Up @@ -1981,7 +1990,7 @@ mod tests {
IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", },
].map(Into::into));
let row = u32_str_u32(0, "Bar", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
insert(&datastore, &mut tx, table_id, &row)?;
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![
u32_str_u32(1, "Foo", 18),
Expand All @@ -1998,7 +2007,7 @@ mod tests {
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
// Because of auto_inc columns, we will get a slightly different
// value than the one we inserted.
let row = datastore.insert_mut_tx(&mut tx, table_id, row)?.1.to_product_value();
let row = insert(&datastore, &mut tx, table_id, &row)?.1.to_product_value();
datastore.commit_mut_tx(tx)?;

let all_rows_col_0_eq_1 = |tx: &MutTxId| {
Expand All @@ -2023,10 +2032,7 @@ mod tests {
assert_eq!(all_rows_col_0_eq_1(&tx).len(), 0);

// Reinsert the row.
let reinserted_row = datastore
.insert_mut_tx(&mut tx, table_id, row.clone())?
.1
.to_product_value();
let reinserted_row = insert(&datastore, &mut tx, table_id, &row)?.1.to_product_value();
assert_eq!(reinserted_row, row);

// The actual test: we should be able to iterate again, while still in the
Expand All @@ -2044,9 +2050,9 @@ mod tests {
fn test_read_only_tx_shared_lock() -> ResultTest<()> {
let (datastore, mut tx, table_id) = setup_table()?;
let row1 = u32_str_u32(1, "Foo", 18);
datastore.insert_mut_tx(&mut tx, table_id, row1.clone())?;
insert(&datastore, &mut tx, table_id, &row1)?;
let row2 = u32_str_u32(2, "Bar", 20);
datastore.insert_mut_tx(&mut tx, table_id, row2.clone())?;
insert(&datastore, &mut tx, table_id, &row2)?;
datastore.commit_mut_tx(tx)?;

// create multiple read only tx, and use them together.
Expand Down
Loading
Loading