Skip to content

Commit

Permalink
Const-alignment for flatbuffers (#1868)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Jan 9, 2025
1 parent de59c53 commit 22fb26c
Show file tree
Hide file tree
Showing 23 changed files with 210 additions and 109 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vortex-array/src/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use viewed::ViewedArrayData;
use vortex_buffer::ByteBuffer;
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexError, VortexExpect, VortexResult};
use vortex_flatbuffers::FlatBuffer;
use vortex_scalar::Scalar;

use crate::array::{
Expand Down Expand Up @@ -82,8 +83,7 @@ impl ArrayData {
ctx: ContextRef,
dtype: DType,
len: usize,
// TODO(ngates): use ConstByteBuffer
flatbuffer: ByteBuffer,
flatbuffer: FlatBuffer,
flatbuffer_init: F,
buffers: Vec<ByteBuffer>,
) -> VortexResult<Self>
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/data/viewed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use itertools::Itertools;
use vortex_buffer::ByteBuffer;
use vortex_dtype::{DType, Nullability, PType};
use vortex_error::{vortex_err, VortexExpect as _, VortexResult};
use vortex_flatbuffers::FlatBuffer;
use vortex_scalar::{Scalar, ScalarValue};

use crate::encoding::opaque::OpaqueEncoding;
Expand All @@ -20,8 +21,7 @@ pub(super) struct ViewedArrayData {
pub(super) dtype: DType,
pub(super) len: usize,
pub(super) metadata: Arc<dyn ArrayMetadata>,
// TODO(ngates): use ConstByteBuffer once it is stable
pub(super) flatbuffer: ByteBuffer,
pub(super) flatbuffer: FlatBuffer,
pub(super) flatbuffer_loc: usize,
pub(super) buffers: Arc<[ByteBuffer]>,
pub(super) ctx: ContextRef,
Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(once_cell_try)]
#![feature(trusted_len)]
#![feature(substr_range)]
//! Vortex crate containing core logic for encoding and memory representation of [arrays](ArrayData).
//!
//! At the heart of Vortex are [arrays](ArrayData) and [encodings](crate::encoding::EncodingVTable).
Expand Down Expand Up @@ -40,6 +41,7 @@ pub mod iter;
mod macros;
mod metadata;
pub mod nbytes;
pub mod parts;
pub mod patches;
pub mod stats;
pub mod stream;
Expand Down
76 changes: 76 additions & 0 deletions vortex-array/src/parts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use std::fmt::{Debug, Formatter};

use flatbuffers::Follow;
use vortex_buffer::ByteBuffer;
use vortex_dtype::DType;
use vortex_error::{vortex_panic, VortexResult};
use vortex_flatbuffers::{array as fba, FlatBuffer};

use crate::{ArrayData, ContextRef};

/// [`ArrayParts`] represents the information from an [`ArrayData`] that makes up the serialized
/// form. For example, it uses stores integer encoding IDs rather than a reference to an encoding
/// vtable, and it doesn't store any [`DType`] information.
///
/// An [`ArrayParts`] can be fully decoded into an [`ArrayData`] using the `decode` function.
pub struct ArrayParts {
row_count: usize,
// Typed as fb::Array
flatbuffer: FlatBuffer,
flatbuffer_loc: usize,
buffers: Vec<ByteBuffer>,
}

impl Debug for ArrayParts {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ArrayParts")
.field("row_count", &self.row_count)
.field("flatbuffer", &self.flatbuffer.len())
.field("flatbuffer_loc", &self.flatbuffer_loc)
.field("buffers", &self.buffers.len())
.finish()
}
}

impl ArrayParts {
/// Creates a new [`ArrayParts`] from a flatbuffer view.
///
/// ## Panics
///
/// This function will panic if the flatbuffer is not contained within the given [`FlatBuffer`].
pub fn new(
row_count: usize,
array: fba::Array,
flatbuffer: FlatBuffer,
buffers: Vec<ByteBuffer>,
) -> Self {
// We ensure that the flatbuffer given to us does indeed match that of the ByteBuffer
if flatbuffer
.as_ref()
.as_slice()
.subslice_range(array._tab.buf())
!= Some(0..flatbuffer.len())
{
vortex_panic!("Array flatbuffer is not contained within the buffer");
}
Self {
row_count,
flatbuffer,
flatbuffer_loc: array._tab.loc(),
buffers,
}
}

/// Decode an [`ArrayParts`] into an [`ArrayData`].
pub fn decode(self, ctx: ContextRef, dtype: DType) -> VortexResult<ArrayData> {
ArrayData::try_new_viewed(
ctx,
dtype,
self.row_count,
self.flatbuffer,
// SAFETY: ArrayComponents guarantees the buffers are valid.
|buf| unsafe { Ok(fba::Array::follow(buf, self.flatbuffer_loc)) },
self.buffers,
)
}
}
2 changes: 2 additions & 0 deletions vortex-buffer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ readme = "README.md"

[features]
arrow = ["dep:arrow-buffer"]
warn-copy = ["dep:log"]

[dependencies]
arrow-buffer = { workspace = true, optional = true }
bytes = { workspace = true }
log = { workspace = true, optional = true }
vortex-error = { workspace = true }

[lints]
Expand Down
22 changes: 20 additions & 2 deletions vortex-buffer/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,20 @@ impl<T> Buffer<T> {
}

/// Return a `Buffer<T>` with the given alignment. Where possible, this will be zero-copy.
pub fn aligned(self, alignment: Alignment) -> Self {
pub fn aligned(mut self, alignment: Alignment) -> Self {
if self.as_ptr().align_offset(*alignment) == 0 {
self.alignment = alignment;
self
} else {
#[cfg(feature = "warn-copy")]
{
let bt = std::backtrace::Backtrace::capture();
log::warn!(
"Buffer is not aligned to requested alignment {}, copying: {}",
alignment,
bt
)
}
Self::copy_from_aligned(self, alignment)
}
}
Expand Down Expand Up @@ -396,7 +406,15 @@ impl<T> From<BufferMut<T>> for Buffer<T> {
mod test {
use bytes::Buf;

use crate::{buffer, ByteBuffer};
use crate::{buffer, Alignment, ByteBuffer};

#[test]
fn align() {
let buf = buffer![0u8, 1, 2];
let aligned = buf.aligned(Alignment::new(32));
assert_eq!(aligned.alignment(), Alignment::new(32));
assert_eq!(aligned.as_slice(), &[0, 1, 2]);
}

#[test]
fn slice() {
Expand Down
22 changes: 20 additions & 2 deletions vortex-buffer/src/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bytes::Buf;
use vortex_error::VortexExpect;

use crate::{Alignment, ByteBuffer};
use crate::{Alignment, ByteBuffer, ConstBuffer, ConstByteBuffer};

/// An extension to the [`Buf`] trait that provides a function `copy_to_aligned` similar to
/// `copy_to_bytes` that allows for zero-copy aligned reads where possible.
Expand All @@ -16,9 +17,26 @@ pub trait AlignedBuf: Buf {
/// TODO(ngates): what should this do the alignment of the current buffer? We have to advance
/// it by len..
fn copy_to_aligned(&mut self, len: usize, alignment: Alignment) -> ByteBuffer {
// The default implementation uses copy_to_bytes, and then tries to align.
// When the underlying `copy_to_bytes` is zero-copy, this may perform one copy to align
// the bytes. When the underlying `copy_to_bytes` is not zero-copy, this may perform two
// copies.
//
// The only way to fix this would be to invert the implementation so `copy_to_bytes`
// invokes `copy_to_aligned` with an alignment of 1. But we cannot override this in the
// default trait.
//
// In practice, we tend to only call this function on `ByteBuffer: AlignedBuf`, and
// therefore we have a maximum of one copy, so I'm not too worried about it.
ByteBuffer::from(self.copy_to_bytes(len)).aligned(alignment)
}

/// See [`AlignedBuf::copy_to_aligned`].
fn copy_to_const_aligned<const A: usize>(&mut self, len: usize) -> ConstByteBuffer<A> {
// The default implementation uses copy_to_bytes, and then returns a ByteBuffer with
// alignment of 1. This will be zero-copy if the underlying `copy_to_bytes` is zero-copy.
ByteBuffer::from(self.copy_to_bytes(len)).aligned(alignment)
ConstBuffer::try_from(self.copy_to_aligned(len, Alignment::new(A)))
.vortex_expect("we just aligned the buffer")
}
}

Expand Down
10 changes: 10 additions & 0 deletions vortex-buffer/src/const.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ use crate::{Alignment, Buffer};
pub struct ConstBuffer<T, const A: usize>(Buffer<T>);

impl<T, const A: usize> ConstBuffer<T, A> {
/// Returns the alignment of the buffer.
pub const fn alignment() -> Alignment {
Alignment::new(A)
}

/// Align the given buffer (possibly with a copy) and return a new `ConstBuffer`.
pub fn align_from<B: Into<Buffer<T>>>(buf: B) -> Self {
Self(buf.into().aligned(Self::alignment()))
}

/// Unwrap the inner buffer.
pub fn into_inner(self) -> Buffer<T> {
self.0
Expand Down
3 changes: 3 additions & 0 deletions vortex-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,6 @@ pub type ByteBuffer = Buffer<u8>;

/// A mutable buffer of u8.
pub type ByteBufferMut = BufferMut<u8>;

/// A const-aligned buffer of u8.
pub type ConstByteBuffer<const A: usize> = ConstBuffer<u8, A>;
32 changes: 11 additions & 21 deletions vortex-dtype/src/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ use std::fmt::{Debug, Display, Formatter};
use std::hash::Hash;
use std::sync::Arc;

use flatbuffers::{root, root_unchecked};
use flatbuffers::root;
use itertools::Itertools;
use vortex_buffer::ByteBuffer;
use vortex_error::{
vortex_bail, vortex_err, vortex_panic, VortexExpect, VortexResult, VortexUnwrap,
};
use vortex_flatbuffers::dtype as fbd;
use vortex_flatbuffers::{dtype as fbd, FlatBuffer};
use DType::*;

use crate::field::Field;
Expand Down Expand Up @@ -185,21 +184,21 @@ impl Display for DType {
#[derive(Debug, Clone, PartialOrd, PartialEq, Eq)]
pub struct ViewedDType {
/// Underlying flatbuffer
buffer: ByteBuffer,
flatbuffer: FlatBuffer,
/// Location of the dtype data inside the underlying buffer
flatbuffer_loc: usize,
}

impl ViewedDType {
/// Create a [`ViewedDType`] from a [`fbd::DType`] and the shared buffer.
pub(crate) fn from_fb(fb_dtype: fbd::DType<'_>, buffer: ByteBuffer) -> Self {
pub(crate) fn from_fb(fb_dtype: fbd::DType<'_>, buffer: FlatBuffer) -> Self {
Self::with_location(fb_dtype._tab.loc(), buffer)
}

/// Create a [`ViewedDType`] from a buffer and a flatbuffer location
pub(crate) fn with_location(location: usize, buffer: ByteBuffer) -> Self {
pub(crate) fn with_location(location: usize, buffer: FlatBuffer) -> Self {
Self {
buffer,
flatbuffer: buffer,
flatbuffer_loc: location,
}
}
Expand All @@ -208,15 +207,15 @@ impl ViewedDType {
pub fn flatbuffer(&self) -> fbd::DType<'_> {
unsafe {
fbd::DType::init_from_table(flatbuffers::Table::new(
self.buffer.as_ref(),
self.flatbuffer.as_ref(),
self.flatbuffer_loc,
))
}
}

/// Returns the underlying shared buffer
pub fn buffer(&self) -> &ByteBuffer {
&self.buffer
pub fn buffer(&self) -> &FlatBuffer {
&self.flatbuffer
}
}

Expand Down Expand Up @@ -437,7 +436,7 @@ impl StructDType {
}

/// Creates a new instance from a flatbuffer-defined object and its underlying buffer.
pub fn from_fb(fb_struct: fbd::Struct_<'_>, buffer: ByteBuffer) -> VortexResult<Self> {
pub fn from_fb(fb_struct: fbd::Struct_<'_>, buffer: FlatBuffer) -> VortexResult<Self> {
let names = fb_struct
.names()
.ok_or_else(|| vortex_err!("failed to parse struct names from flatbuffer"))?
Expand All @@ -459,23 +458,14 @@ impl StructDType {
}

/// Create a new [`StructDType`] from flatbuffer bytes.
pub fn from_bytes(buffer: ByteBuffer) -> VortexResult<Self> {
pub fn from_bytes(buffer: FlatBuffer) -> VortexResult<Self> {
let fb_struct = root::<fbd::DType>(&buffer)?
.type__as_struct_()
.ok_or_else(|| vortex_err!("failed to parse struct from flatbuffer"))?;

Self::from_fb(fb_struct, buffer.clone())
}

/// # Safety
/// Parse a StructDType out of a buffer, must be validated by the other otherwise might panic or behave unexpectedly.
pub unsafe fn from_bytes_unchecked(buffer: ByteBuffer) -> Self {
let fb_struct = unsafe { root_unchecked::<fbd::DType>(&buffer) }
.type__as_struct_()
.vortex_expect("failed to parse struct from flatbuffer");
Self::from_fb(fb_struct, buffer.clone()).vortex_expect("Failed to build StructDType")
}

/// Get the names of the fields in the struct
pub fn names(&self) -> &FieldNames {
&self.names
Expand Down
10 changes: 4 additions & 6 deletions vortex-dtype/src/serde/flatbuffers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ use std::sync::Arc;

use flatbuffers::{FlatBufferBuilder, WIPOffset};
use itertools::Itertools;
use vortex_buffer::ByteBuffer;
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult};
use vortex_flatbuffers::{dtype as fbd, FlatBufferRoot, WriteFlatBuffer};
use vortex_flatbuffers::{dtype as fbd, FlatBuffer, FlatBufferRoot, WriteFlatBuffer};

use crate::{
flatbuffers as fb, DType, ExtDType, ExtID, ExtMetadata, PType, StructDType, ViewedDType,
Expand All @@ -15,7 +14,7 @@ pub use project::*;

impl DType {
/// Create a new
pub fn try_from_view(fb: fbd::DType, buffer: ByteBuffer) -> VortexResult<Self> {
pub fn try_from_view(fb: fbd::DType, buffer: FlatBuffer) -> VortexResult<Self> {
let vdt = ViewedDType::from_fb(fb, buffer);
Self::try_from(vdt)
}
Expand Down Expand Up @@ -264,16 +263,15 @@ mod test {
use std::sync::Arc;

use flatbuffers::root;
use vortex_buffer::ByteBuffer;
use vortex_flatbuffers::WriteFlatBufferExt;
use vortex_flatbuffers::{FlatBuffer, WriteFlatBufferExt};

use crate::nullability::Nullability;
use crate::{flatbuffers as fb, DType, PType, StructDType, ViewedDType};

fn roundtrip_dtype(dtype: DType) {
let bytes = dtype.write_flatbuffer_bytes();
let root_fb = root::<fb::DType>(&bytes).unwrap();
let view = ViewedDType::from_fb(root_fb, ByteBuffer::from(bytes.clone()));
let view = ViewedDType::from_fb(root_fb, FlatBuffer::from(bytes.clone()));

let deserialized = DType::try_from(view).unwrap();
assert_eq!(dtype, deserialized);
Expand Down
Loading

0 comments on commit 22fb26c

Please sign in to comment.