Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Layout.fixed: bool for a fast path in eq_row_in_page & row_type_visitor #2025

Merged
merged 2 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/table/benches/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use spacetimedb_sats::algebraic_value::ser::ValueSerializer;
use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ArrayValue, ProductType, ProductValue};
use spacetimedb_table::bflatn_from::serialize_row_from_page;
use spacetimedb_table::bflatn_to::write_row_to_page;
use spacetimedb_table::bflatn_to_bsatn_fast_path::StaticBsatnLayout;
use spacetimedb_table::blob_store::NullBlobStore;
use spacetimedb_table::eq::eq_row_in_page;
use spacetimedb_table::indexes::{Byte, Bytes, PageOffset, RowHash};
Expand Down Expand Up @@ -758,6 +759,7 @@ fn eq_in_page_same(c: &mut Criterion) {
let mut group = c.benchmark_group("eq_in_page");
for (name, ty, value, _null_visitor, _aligned_offsets_visitor) in product_value_test_cases() {
let (ty, mut page, visitor) = ty_page_visitor(ty);
let static_bsatn_layout = StaticBsatnLayout::for_row_type(&ty);

let (offset_0, _) = unsafe { write_row_to_page(&mut page, &mut NullBlobStore, &visitor, &ty, &value) }.unwrap();
let (offset_1, _) = unsafe { write_row_to_page(&mut page, &mut NullBlobStore, &visitor, &ty, &value) }.unwrap();
Expand All @@ -771,6 +773,7 @@ fn eq_in_page_same(c: &mut Criterion) {
black_box(offset_0),
black_box(offset_1),
black_box(&ty),
black_box(static_bsatn_layout.as_ref()),
)
})
});
Expand Down
18 changes: 15 additions & 3 deletions crates/table/benches/page_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ fn as_bytes<T>(t: &T) -> &Bytes {
unsafe trait Row {
fn row_type() -> ProductType;

fn row_type_for_schema() -> ProductType {
Centril marked this conversation as resolved.
Show resolved Hide resolved
let mut ty = Self::row_type();
// Ensure that we have names for every column,
// so that its accepted when used in a `ModuleDef` as a row type.
for (idx, elem) in ty.elements.iter_mut().enumerate() {
if elem.name.is_none() {
elem.name = Some(format!("col_{idx}").into());
}
}
ty
}

fn var_len_visitor() -> VarLenVisitorProgram {
row_type_visitor(&Self::row_type().into())
}
Expand Down Expand Up @@ -459,7 +471,7 @@ fn schema_from_ty(ty: ProductType, name: &str) -> TableSchema {

fn make_table(c: &mut Criterion) {
fn bench_make_table<R: Row>(group: Group<'_, '_>, name: &str) {
let ty = R::row_type();
let ty = R::row_type_for_schema();
let schema = schema_from_ty(ty.clone(), name);
group.bench_function(name, |b| {
b.iter_custom(|num_iters| {
Expand All @@ -484,7 +496,7 @@ fn make_table(c: &mut Criterion) {
}

fn make_table_for_row_type<R: Row>(name: &str) -> Table {
let ty = R::row_type();
let ty = R::row_type_for_schema();
let schema = schema_from_ty(ty.clone(), name);
Table::new(schema.into(), SquashedOffset::COMMITTED_STATE)
}
Expand Down Expand Up @@ -664,7 +676,7 @@ trait IndexedRow: Row + Sized {
let name = Self::table_name();
let mut builder = RawModuleDefV9Builder::new();
builder
.build_table_with_new_type(name.clone(), Self::row_type(), true)
.build_table_with_new_type(name.clone(), Self::row_type_for_schema(), true)
.with_index(
RawIndexAlgorithm::BTree {
columns: Self::indexed_columns(),
Expand Down
67 changes: 59 additions & 8 deletions crates/table/src/bflatn_to_bsatn_fast_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ use super::{
},
util::range_move,
};
use core::mem::MaybeUninit;
use core::ptr;
use core::{mem::MaybeUninit, ops::Range};

/// A precomputed BSATN layout for a type whose encoded length is a known constant,
/// enabling fast BFLATN -> BSATN conversion.
#[derive(PartialEq, Eq, Debug, Clone)]
pub(crate) struct StaticBsatnLayout {
pub struct StaticBsatnLayout {
/// The length of the encoded BSATN representation of a row of this type,
/// in bytes.
///
Expand Down Expand Up @@ -132,12 +132,35 @@ impl StaticBsatnLayout {
unsafe { buf.set_len(start + len) }
}

/// Compares `row_a` for equality against `row_b`.
///
/// # Safety
///
/// - `row` must store a valid, initialized instance of the BFLATN row type
/// for which `self` was computed.
/// As a consequence of this, for every `field` in `self.fields`,
/// `row[field.bflatn_offset .. field.bflatn_offset + field.length]` will be initialized.
pub(crate) unsafe fn eq(&self, row_a: &Bytes, row_b: &Bytes) -> bool {
// No need to check the lengths.
// We assume they are of the same length.
self.fields.iter().all(|field| {
// SAFETY: The consequence of what the caller promised is that
// `row_(a/b).len() >= field.bflatn_offset + field.length >= field.bflatn_offset`.
unsafe { field.eq(row_a, row_b) }
})
}

/// Construct a `StaticBsatnLayout` for converting BFLATN rows of `row_type` into BSATN.
///
/// Returns `None` if `row_type` contains a column which does not have a constant length in BSATN,
/// either a [`VarLenType`]
/// or a [`SumTypeLayout`] whose variants do not have the same "live" unpadded length.
pub(crate) fn for_row_type(row_type: &RowTypeLayout) -> Option<Self> {
pub fn for_row_type(row_type: &RowTypeLayout) -> Option<Self> {
if !row_type.layout().fixed {
// Don't bother computing the static layout if there are variable components.
return None;
}

let mut builder = LayoutBuilder::new_builder();
builder.visit_product(row_type.product())?;
Some(builder.build())
Expand Down Expand Up @@ -168,20 +191,48 @@ struct MemcpyField {
impl MemoryUsage for MemcpyField {}

impl MemcpyField {
/// Returns the range for this field in a BFLATN byte array.
fn bflatn_range(&self) -> Range<usize> {
range_move(0..self.length as usize, self.bflatn_offset as usize)
}

/// Returns the range for this field in a BSATN byte array.
fn bsatn_range(&self) -> Range<usize> {
range_move(0..self.length as usize, self.bsatn_offset as usize)
}

/// Compares `row_a` and `row_b` for equality in this field.
///
/// # Safety
///
/// - `row_a.len() >= self.bflatn_offset + self.length`
/// - `row_b.len() >= self.bflatn_offset + self.length`
unsafe fn eq(&self, row_a: &Bytes, row_b: &Bytes) -> bool {
let range = self.bflatn_range();
let range2 = range.clone();
// SAFETY: The `range` is in bounds as
// `row_a.len() >= self.bflatn_offset + self.length >= self.bflatn_offset`.
let row_a_field = unsafe { row_a.get_unchecked(range) };
// SAFETY: The `range` is in bounds as
// `row_b.len() >= self.bflatn_offset + self.length >= self.bflatn_offset`.
let row_b_field = unsafe { row_b.get_unchecked(range2) };
row_a_field == row_b_field
}

/// Copies the bytes at `row[self.bflatn_offset .. self.bflatn_offset + self.length]`
/// into `buf[self.bsatn_offset + self.length]`.
/// into `buf[self.bsatn_offset .. self.bsatn_offset + self.length]`.
///
/// # Safety
///
/// - `buf` must be exactly `self.bsatn_offset + self.length` long.
/// - `row` must be exactly `self.bflatn_offset + self.length` long.
/// - `buf.len() >= self.bsatn_offset + self.length`.
/// - `row.len() >= self.bflatn_offset + self.length`
unsafe fn copy(&self, buf: &mut [MaybeUninit<Byte>], row: &Bytes) {
let len = self.length as usize;
// SAFETY: forward caller requirement #1.
let to = unsafe { buf.get_unchecked_mut(range_move(0..len, self.bsatn_offset as usize)) };
let to = unsafe { buf.get_unchecked_mut(self.bsatn_range()) };
let dst = to.as_mut_ptr().cast();
// SAFETY: forward caller requirement #2.
let from = unsafe { row.get_unchecked(range_move(0..len, self.bflatn_offset as usize)) };
let from = unsafe { row.get_unchecked(self.bflatn_range()) };
let src = from.as_ptr();

// SAFETY:
Expand Down
94 changes: 78 additions & 16 deletions crates/table/src/eq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use super::{
bflatn_from::read_tag,
bflatn_to_bsatn_fast_path::StaticBsatnLayout,
indexes::{Bytes, PageOffset},
layout::{align_to, AlgebraicTypeLayout, HasLayout, ProductTypeLayout, RowTypeLayout},
page::Page,
Expand All @@ -24,29 +25,44 @@ use super::{
/// 1. `fixed_offset_a/b` are valid offsets for rows typed at `ty` in `page_a/b`.
/// 2. for any `vlr_a/b: VarLenRef` in the fixed parts of row `a` and `b`,
/// `vlr_a/b.first_offset` must either be `NULL` or point to a valid granule in `page_a/b`.
/// 3. the `static_bsatn_layout` must be derived from `ty`.
pub unsafe fn eq_row_in_page(
page_a: &Page,
page_b: &Page,
fixed_offset_a: PageOffset,
fixed_offset_b: PageOffset,
ty: &RowTypeLayout,
static_bsatn_layout: Option<&StaticBsatnLayout>,
) -> bool {
// Context for the whole comparison.
let mut ctx = EqCtx {
// Contexts for rows `a` and `b`.
a: BytesPage::new(page_a, fixed_offset_a, ty),
b: BytesPage::new(page_b, fixed_offset_b, ty),
curr_offset: 0,
};
// Test for equality!
// SAFETY:
// 1. Per requirement 1., rows `a/b` are valid at type `ty` and properly aligned for `ty`.
// Their fixed parts are defined as:
// `value_a/b = ctx.a/b.bytes[range_move(0..fixed_row_size, fixed_offset_a/b)]`
// as needed.
// 2. for any `vlr_a/b: VarLenRef` stored in `value_a/b`,
// `vlr_a/b.first_offset` must either be `NULL` or point to a valid granule in `page_a/b`.
unsafe { eq_product(&mut ctx, ty.product()) }
// Contexts for rows `a` and `b`.
let a = BytesPage::new(page_a, fixed_offset_a, ty);
let b = BytesPage::new(page_b, fixed_offset_b, ty);

// If there are only fixed parts in the layout,
// there are no pointers to anywhere,
// So it is sound to simply check for byte-wise equality while ignoring padding.
match static_bsatn_layout {
None => {
// Context for the whole comparison.
let mut ctx = EqCtx { a, b, curr_offset: 0 };

// Test for equality!
// SAFETY:
// 1. Per requirement 1., rows `a/b` are valid at type `ty` and properly aligned for `ty`.
// Their fixed parts are defined as:
// `value_a/b = ctx.a/b.bytes[range_move(0..fixed_row_size, fixed_offset_a/b)]`
// as needed.
// 2. for any `vlr_a/b: VarLenRef` stored in `value_a/b`,
// `vlr_a/b.first_offset` must either be `NULL` or point to a valid granule in `page_a/b`.
unsafe { eq_product(&mut ctx, ty.product()) }
}
Some(static_bsatn_layout) => {
// SAFETY: caller promised that `a/b` are valid BFLATN representations matching `ty`
// and as `static_bsatn_layout` was promised to be derived from `ty`,
// so too are `a/b` valid for `static_bsatn_layout`.
unsafe { static_bsatn_layout.eq(a.bytes, b.bytes) }
}
}
}

/// A view into the fixed part of a row combined with the page it belongs to.
Expand Down Expand Up @@ -210,3 +226,49 @@ fn eq_byte_array(ctx: &mut EqCtx<'_, '_>, len: usize) -> bool {
ctx.curr_offset += len;
data_a == data_b
}

#[cfg(test)]
mod test {
use crate::blob_store::NullBlobStore;
use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductType};

#[test]
fn sum_with_variant_with_distinct_layout() {
// This is a type where the layout of the sum variants differ,
// with the latter having some padding bytes due to alignment.
let ty = ProductType::from([AlgebraicType::sum([
AlgebraicType::U64, // xxxxxxxx
AlgebraicType::product([AlgebraicType::U8, AlgebraicType::U32]), // xpppxxxx
])]);

let bs = &mut NullBlobStore;
let mut table_a = crate::table::test::table(ty.clone());
let mut table_b = crate::table::test::table(ty);

// Insert u64::MAX with tag 0 and then delete it.
let a0 = product![AlgebraicValue::sum(0, u64::MAX.into())];
let (_, a0_rr) = table_a.insert(bs, &a0).unwrap();
let a0_ptr = a0_rr.pointer();
assert!(table_a.delete(bs, a0_ptr, |_| {}).is_some());

// Insert u64::ALTERNATING_BIT_PATTERN with tag 0 and then delete it.
let b0 = 0b01010101_01010101_01010101_01010101_01010101_01010101_01010101_01010101u64;
let b0 = product![AlgebraicValue::sum(0, b0.into())];
let (_, b0_rr) = table_b.insert(bs, &b0).unwrap();
let b0_ptr = b0_rr.pointer();
assert!(table_b.delete(bs, b0_ptr, |_| {}).is_some());

// Insert two identical rows `a1` and `b2` into the tables.
// They should occupy the spaces of the previous rows.
let v1 = product![AlgebraicValue::sum(1, product![0u8, 0u32].into())];
let (_, a1_rr) = table_a.insert(bs, &v1).unwrap();
let bs = &mut NullBlobStore;
let (_, b1_rr) = table_b.insert(bs, &v1).unwrap();
assert_eq!(a0_ptr, a1_rr.pointer());
assert_eq!(b0_ptr, b1_rr.pointer());

// Check that the rows are considered equal
// and that padding does not mess this up.
assert_eq!(a1_rr, b1_rr);
}
}
Loading
Loading