diff --git a/Cargo.lock b/Cargo.lock index df5d0be4d3..e1c6bc08d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4918,6 +4918,7 @@ dependencies = [ "arrow-buffer", "bytes", "divan", + "log", "vortex-error", ] @@ -5115,8 +5116,8 @@ dependencies = [ name = "vortex-flatbuffers" version = "0.21.1" dependencies = [ - "bytes", "flatbuffers", + "vortex-buffer", ] [[package]] diff --git a/vortex-array/src/data/mod.rs b/vortex-array/src/data/mod.rs index 1ee57ed05e..b16518601b 100644 --- a/vortex-array/src/data/mod.rs +++ b/vortex-array/src/data/mod.rs @@ -9,6 +9,7 @@ use viewed::ViewedArrayData; use vortex_buffer::ByteBuffer; use vortex_dtype::DType; use vortex_error::{vortex_err, VortexError, VortexExpect, VortexResult}; +use vortex_flatbuffers::FlatBuffer; use vortex_scalar::Scalar; use crate::array::{ @@ -82,8 +83,7 @@ impl ArrayData { ctx: ContextRef, dtype: DType, len: usize, - // TODO(ngates): use ConstByteBuffer - flatbuffer: ByteBuffer, + flatbuffer: FlatBuffer, flatbuffer_init: F, buffers: Vec, ) -> VortexResult diff --git a/vortex-array/src/data/viewed.rs b/vortex-array/src/data/viewed.rs index 5ece906a9d..a5bc4dd807 100644 --- a/vortex-array/src/data/viewed.rs +++ b/vortex-array/src/data/viewed.rs @@ -6,6 +6,7 @@ use itertools::Itertools; use vortex_buffer::ByteBuffer; use vortex_dtype::{DType, Nullability, PType}; use vortex_error::{vortex_err, VortexExpect as _, VortexResult}; +use vortex_flatbuffers::FlatBuffer; use vortex_scalar::{Scalar, ScalarValue}; use crate::encoding::opaque::OpaqueEncoding; @@ -20,8 +21,7 @@ pub(super) struct ViewedArrayData { pub(super) dtype: DType, pub(super) len: usize, pub(super) metadata: Arc, - // TODO(ngates): use ConstByteBuffer once it is stable - pub(super) flatbuffer: ByteBuffer, + pub(super) flatbuffer: FlatBuffer, pub(super) flatbuffer_loc: usize, pub(super) buffers: Arc<[ByteBuffer]>, pub(super) ctx: ContextRef, diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 1ccf8587f8..bc64075527 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -1,5 +1,6 @@ #![feature(once_cell_try)] #![feature(trusted_len)] +#![feature(substr_range)] //! Vortex crate containing core logic for encoding and memory representation of [arrays](ArrayData). //! //! At the heart of Vortex are [arrays](ArrayData) and [encodings](crate::encoding::EncodingVTable). @@ -40,6 +41,7 @@ pub mod iter; mod macros; mod metadata; pub mod nbytes; +pub mod parts; pub mod patches; pub mod stats; pub mod stream; diff --git a/vortex-array/src/parts.rs b/vortex-array/src/parts.rs new file mode 100644 index 0000000000..4295a93116 --- /dev/null +++ b/vortex-array/src/parts.rs @@ -0,0 +1,76 @@ +use std::fmt::{Debug, Formatter}; + +use flatbuffers::Follow; +use vortex_buffer::ByteBuffer; +use vortex_dtype::DType; +use vortex_error::{vortex_panic, VortexResult}; +use vortex_flatbuffers::{array as fba, FlatBuffer}; + +use crate::{ArrayData, ContextRef}; + +/// [`ArrayParts`] represents the information from an [`ArrayData`] that makes up the serialized +/// form. For example, it uses stores integer encoding IDs rather than a reference to an encoding +/// vtable, and it doesn't store any [`DType`] information. +/// +/// An [`ArrayParts`] can be fully decoded into an [`ArrayData`] using the `decode` function. +pub struct ArrayParts { + row_count: usize, + // Typed as fb::Array + flatbuffer: FlatBuffer, + flatbuffer_loc: usize, + buffers: Vec, +} + +impl Debug for ArrayParts { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ArrayParts") + .field("row_count", &self.row_count) + .field("flatbuffer", &self.flatbuffer.len()) + .field("flatbuffer_loc", &self.flatbuffer_loc) + .field("buffers", &self.buffers.len()) + .finish() + } +} + +impl ArrayParts { + /// Creates a new [`ArrayParts`] from a flatbuffer view. + /// + /// ## Panics + /// + /// This function will panic if the flatbuffer is not contained within the given [`FlatBuffer`]. + pub fn new( + row_count: usize, + array: fba::Array, + flatbuffer: FlatBuffer, + buffers: Vec, + ) -> Self { + // We ensure that the flatbuffer given to us does indeed match that of the ByteBuffer + if flatbuffer + .as_ref() + .as_slice() + .subslice_range(array._tab.buf()) + != Some(0..flatbuffer.len()) + { + vortex_panic!("Array flatbuffer is not contained within the buffer"); + } + Self { + row_count, + flatbuffer, + flatbuffer_loc: array._tab.loc(), + buffers, + } + } + + /// Decode an [`ArrayParts`] into an [`ArrayData`]. + pub fn decode(self, ctx: ContextRef, dtype: DType) -> VortexResult { + ArrayData::try_new_viewed( + ctx, + dtype, + self.row_count, + self.flatbuffer, + // SAFETY: ArrayComponents guarantees the buffers are valid. + |buf| unsafe { Ok(fba::Array::follow(buf, self.flatbuffer_loc)) }, + self.buffers, + ) + } +} diff --git a/vortex-buffer/Cargo.toml b/vortex-buffer/Cargo.toml index 2f0bf142b1..f2c780b984 100644 --- a/vortex-buffer/Cargo.toml +++ b/vortex-buffer/Cargo.toml @@ -15,10 +15,12 @@ readme = "README.md" [features] arrow = ["dep:arrow-buffer"] +warn-copy = ["dep:log"] [dependencies] arrow-buffer = { workspace = true, optional = true } bytes = { workspace = true } +log = { workspace = true, optional = true } vortex-error = { workspace = true } [lints] diff --git a/vortex-buffer/src/buffer.rs b/vortex-buffer/src/buffer.rs index 31ec0391f3..b91b01e57d 100644 --- a/vortex-buffer/src/buffer.rs +++ b/vortex-buffer/src/buffer.rs @@ -262,10 +262,20 @@ impl Buffer { } /// Return a `Buffer` with the given alignment. Where possible, this will be zero-copy. - pub fn aligned(self, alignment: Alignment) -> Self { + pub fn aligned(mut self, alignment: Alignment) -> Self { if self.as_ptr().align_offset(*alignment) == 0 { + self.alignment = alignment; self } else { + #[cfg(feature = "warn-copy")] + { + let bt = std::backtrace::Backtrace::capture(); + log::warn!( + "Buffer is not aligned to requested alignment {}, copying: {}", + alignment, + bt + ) + } Self::copy_from_aligned(self, alignment) } } @@ -396,7 +406,15 @@ impl From> for Buffer { mod test { use bytes::Buf; - use crate::{buffer, ByteBuffer}; + use crate::{buffer, Alignment, ByteBuffer}; + + #[test] + fn align() { + let buf = buffer![0u8, 1, 2]; + let aligned = buf.aligned(Alignment::new(32)); + assert_eq!(aligned.alignment(), Alignment::new(32)); + assert_eq!(aligned.as_slice(), &[0, 1, 2]); + } #[test] fn slice() { diff --git a/vortex-buffer/src/bytes.rs b/vortex-buffer/src/bytes.rs index 1c68b7cef7..c52d9d2331 100644 --- a/vortex-buffer/src/bytes.rs +++ b/vortex-buffer/src/bytes.rs @@ -1,6 +1,7 @@ use bytes::Buf; +use vortex_error::VortexExpect; -use crate::{Alignment, ByteBuffer}; +use crate::{Alignment, ByteBuffer, ConstBuffer, ConstByteBuffer}; /// An extension to the [`Buf`] trait that provides a function `copy_to_aligned` similar to /// `copy_to_bytes` that allows for zero-copy aligned reads where possible. @@ -16,9 +17,26 @@ pub trait AlignedBuf: Buf { /// TODO(ngates): what should this do the alignment of the current buffer? We have to advance /// it by len.. fn copy_to_aligned(&mut self, len: usize, alignment: Alignment) -> ByteBuffer { + // The default implementation uses copy_to_bytes, and then tries to align. + // When the underlying `copy_to_bytes` is zero-copy, this may perform one copy to align + // the bytes. When the underlying `copy_to_bytes` is not zero-copy, this may perform two + // copies. + // + // The only way to fix this would be to invert the implementation so `copy_to_bytes` + // invokes `copy_to_aligned` with an alignment of 1. But we cannot override this in the + // default trait. + // + // In practice, we tend to only call this function on `ByteBuffer: AlignedBuf`, and + // therefore we have a maximum of one copy, so I'm not too worried about it. + ByteBuffer::from(self.copy_to_bytes(len)).aligned(alignment) + } + + /// See [`AlignedBuf::copy_to_aligned`]. + fn copy_to_const_aligned(&mut self, len: usize) -> ConstByteBuffer { // The default implementation uses copy_to_bytes, and then returns a ByteBuffer with // alignment of 1. This will be zero-copy if the underlying `copy_to_bytes` is zero-copy. - ByteBuffer::from(self.copy_to_bytes(len)).aligned(alignment) + ConstBuffer::try_from(self.copy_to_aligned(len, Alignment::new(A))) + .vortex_expect("we just aligned the buffer") } } diff --git a/vortex-buffer/src/const.rs b/vortex-buffer/src/const.rs index 2b34957dfb..2881da2590 100644 --- a/vortex-buffer/src/const.rs +++ b/vortex-buffer/src/const.rs @@ -9,6 +9,16 @@ use crate::{Alignment, Buffer}; pub struct ConstBuffer(Buffer); impl ConstBuffer { + /// Returns the alignment of the buffer. + pub const fn alignment() -> Alignment { + Alignment::new(A) + } + + /// Align the given buffer (possibly with a copy) and return a new `ConstBuffer`. + pub fn align_from>>(buf: B) -> Self { + Self(buf.into().aligned(Self::alignment())) + } + /// Unwrap the inner buffer. pub fn into_inner(self) -> Buffer { self.0 diff --git a/vortex-buffer/src/lib.rs b/vortex-buffer/src/lib.rs index 36f3773fd9..641bcd4104 100644 --- a/vortex-buffer/src/lib.rs +++ b/vortex-buffer/src/lib.rs @@ -65,3 +65,6 @@ pub type ByteBuffer = Buffer; /// A mutable buffer of u8. pub type ByteBufferMut = BufferMut; + +/// A const-aligned buffer of u8. +pub type ConstByteBuffer = ConstBuffer; diff --git a/vortex-dtype/src/dtype.rs b/vortex-dtype/src/dtype.rs index 774a12450d..a63046b694 100644 --- a/vortex-dtype/src/dtype.rs +++ b/vortex-dtype/src/dtype.rs @@ -2,13 +2,12 @@ use std::fmt::{Debug, Display, Formatter}; use std::hash::Hash; use std::sync::Arc; -use flatbuffers::{root, root_unchecked}; +use flatbuffers::root; use itertools::Itertools; -use vortex_buffer::ByteBuffer; use vortex_error::{ vortex_bail, vortex_err, vortex_panic, VortexExpect, VortexResult, VortexUnwrap, }; -use vortex_flatbuffers::dtype as fbd; +use vortex_flatbuffers::{dtype as fbd, FlatBuffer}; use DType::*; use crate::field::Field; @@ -185,21 +184,21 @@ impl Display for DType { #[derive(Debug, Clone, PartialOrd, PartialEq, Eq)] pub struct ViewedDType { /// Underlying flatbuffer - buffer: ByteBuffer, + flatbuffer: FlatBuffer, /// Location of the dtype data inside the underlying buffer flatbuffer_loc: usize, } impl ViewedDType { /// Create a [`ViewedDType`] from a [`fbd::DType`] and the shared buffer. - pub(crate) fn from_fb(fb_dtype: fbd::DType<'_>, buffer: ByteBuffer) -> Self { + pub(crate) fn from_fb(fb_dtype: fbd::DType<'_>, buffer: FlatBuffer) -> Self { Self::with_location(fb_dtype._tab.loc(), buffer) } /// Create a [`ViewedDType`] from a buffer and a flatbuffer location - pub(crate) fn with_location(location: usize, buffer: ByteBuffer) -> Self { + pub(crate) fn with_location(location: usize, buffer: FlatBuffer) -> Self { Self { - buffer, + flatbuffer: buffer, flatbuffer_loc: location, } } @@ -208,15 +207,15 @@ impl ViewedDType { pub fn flatbuffer(&self) -> fbd::DType<'_> { unsafe { fbd::DType::init_from_table(flatbuffers::Table::new( - self.buffer.as_ref(), + self.flatbuffer.as_ref(), self.flatbuffer_loc, )) } } /// Returns the underlying shared buffer - pub fn buffer(&self) -> &ByteBuffer { - &self.buffer + pub fn buffer(&self) -> &FlatBuffer { + &self.flatbuffer } } @@ -437,7 +436,7 @@ impl StructDType { } /// Creates a new instance from a flatbuffer-defined object and its underlying buffer. - pub fn from_fb(fb_struct: fbd::Struct_<'_>, buffer: ByteBuffer) -> VortexResult { + pub fn from_fb(fb_struct: fbd::Struct_<'_>, buffer: FlatBuffer) -> VortexResult { let names = fb_struct .names() .ok_or_else(|| vortex_err!("failed to parse struct names from flatbuffer"))? @@ -459,7 +458,7 @@ impl StructDType { } /// Create a new [`StructDType`] from flatbuffer bytes. - pub fn from_bytes(buffer: ByteBuffer) -> VortexResult { + pub fn from_bytes(buffer: FlatBuffer) -> VortexResult { let fb_struct = root::(&buffer)? .type__as_struct_() .ok_or_else(|| vortex_err!("failed to parse struct from flatbuffer"))?; @@ -467,15 +466,6 @@ impl StructDType { Self::from_fb(fb_struct, buffer.clone()) } - /// # Safety - /// Parse a StructDType out of a buffer, must be validated by the other otherwise might panic or behave unexpectedly. - pub unsafe fn from_bytes_unchecked(buffer: ByteBuffer) -> Self { - let fb_struct = unsafe { root_unchecked::(&buffer) } - .type__as_struct_() - .vortex_expect("failed to parse struct from flatbuffer"); - Self::from_fb(fb_struct, buffer.clone()).vortex_expect("Failed to build StructDType") - } - /// Get the names of the fields in the struct pub fn names(&self) -> &FieldNames { &self.names diff --git a/vortex-dtype/src/serde/flatbuffers/mod.rs b/vortex-dtype/src/serde/flatbuffers/mod.rs index 7b03abb961..0427208d5e 100644 --- a/vortex-dtype/src/serde/flatbuffers/mod.rs +++ b/vortex-dtype/src/serde/flatbuffers/mod.rs @@ -2,9 +2,8 @@ use std::sync::Arc; use flatbuffers::{FlatBufferBuilder, WIPOffset}; use itertools::Itertools; -use vortex_buffer::ByteBuffer; use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult}; -use vortex_flatbuffers::{dtype as fbd, FlatBufferRoot, WriteFlatBuffer}; +use vortex_flatbuffers::{dtype as fbd, FlatBuffer, FlatBufferRoot, WriteFlatBuffer}; use crate::{ flatbuffers as fb, DType, ExtDType, ExtID, ExtMetadata, PType, StructDType, ViewedDType, @@ -15,7 +14,7 @@ pub use project::*; impl DType { /// Create a new - pub fn try_from_view(fb: fbd::DType, buffer: ByteBuffer) -> VortexResult { + pub fn try_from_view(fb: fbd::DType, buffer: FlatBuffer) -> VortexResult { let vdt = ViewedDType::from_fb(fb, buffer); Self::try_from(vdt) } @@ -264,8 +263,7 @@ mod test { use std::sync::Arc; use flatbuffers::root; - use vortex_buffer::ByteBuffer; - use vortex_flatbuffers::WriteFlatBufferExt; + use vortex_flatbuffers::{FlatBuffer, WriteFlatBufferExt}; use crate::nullability::Nullability; use crate::{flatbuffers as fb, DType, PType, StructDType, ViewedDType}; @@ -273,7 +271,7 @@ mod test { fn roundtrip_dtype(dtype: DType) { let bytes = dtype.write_flatbuffer_bytes(); let root_fb = root::(&bytes).unwrap(); - let view = ViewedDType::from_fb(root_fb, ByteBuffer::from(bytes.clone())); + let view = ViewedDType::from_fb(root_fb, FlatBuffer::from(bytes.clone())); let deserialized = DType::try_from(view).unwrap(); assert_eq!(dtype, deserialized); diff --git a/vortex-dtype/src/serde/flatbuffers/project.rs b/vortex-dtype/src/serde/flatbuffers/project.rs index 43277062b2..a1677f9787 100644 --- a/vortex-dtype/src/serde/flatbuffers/project.rs +++ b/vortex-dtype/src/serde/flatbuffers/project.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use vortex_buffer::ByteBuffer; use vortex_error::{vortex_err, VortexResult}; +use vortex_flatbuffers::FlatBuffer; use crate::field::Field; use crate::{flatbuffers as fb, DType, StructDType}; @@ -28,7 +28,7 @@ pub fn resolve_field<'a, 'b: 'a>(fb: fb::Struct_<'b>, field: &'a Field) -> Vorte pub fn extract_field( fb_dtype: fb::DType<'_>, field: &Field, - buffer: &ByteBuffer, + buffer: &FlatBuffer, ) -> VortexResult { let fb_struct = fb_dtype .type__as_struct_() @@ -42,7 +42,7 @@ pub fn extract_field( pub fn project_and_deserialize( fb_dtype: fb::DType<'_>, projection: &[Field], - buffer: &ByteBuffer, + buffer: &FlatBuffer, ) -> VortexResult { let fb_struct = fb_dtype .type__as_struct_() @@ -66,7 +66,7 @@ pub fn project_and_deserialize( fn read_field( fb_struct: fb::Struct_, idx: usize, - buffer: &ByteBuffer, + buffer: &FlatBuffer, ) -> VortexResult<(Arc, DType)> { let name = fb_struct .names() diff --git a/vortex-file/src/read/builder/initial_read.rs b/vortex-file/src/read/builder/initial_read.rs index ae4413e509..078aeb9cb7 100644 --- a/vortex-file/src/read/builder/initial_read.rs +++ b/vortex-file/src/read/builder/initial_read.rs @@ -4,7 +4,7 @@ use flatbuffers::{root, root_unchecked}; use vortex_buffer::{ByteBuffer, ByteBufferMut, ConstBuffer}; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult, VortexUnwrap}; -use vortex_flatbuffers::{dtype as fbd, footer}; +use vortex_flatbuffers::{dtype as fbd, footer, FlatBuffer}; use vortex_io::VortexReadAt; use crate::{EOF_SIZE, INITIAL_READ_SIZE, MAGIC_BYTES, VERSION}; @@ -51,7 +51,8 @@ impl InitialRead { } pub fn dtype(&self) -> DType { - let dtype_buffer = self.buf.as_ref().slice(self.fb_schema_byte_range()); + let dtype_buffer = + FlatBuffer::align_from(self.buf.as_ref().slice(self.fb_schema_byte_range())); let fb_dtype = unsafe { root_unchecked::(&dtype_buffer) }; DType::try_from_view(fb_dtype, dtype_buffer.clone()) diff --git a/vortex-file/src/read/layouts/flat.rs b/vortex-file/src/read/layouts/flat.rs index f1f3d8bcb8..26b1493da7 100644 --- a/vortex-file/src/read/layouts/flat.rs +++ b/vortex-file/src/read/layouts/flat.rs @@ -65,7 +65,7 @@ impl FlatLayoutReader { let mut reader = BufMessageReader::new(buf); match reader.next().transpose()? { Some(DecoderMessage::Array(array_parts)) => { - array_parts.into_array_data(self.ctx.clone(), self.dtype.as_ref().clone()) + array_parts.decode(self.ctx.clone(), self.dtype.as_ref().clone()) } Some(msg) => vortex_bail!("Expected Array message, got {:?}", msg), None => vortex_bail!("Expected Array message, got EOF"), diff --git a/vortex-file/src/v2/open/mod.rs b/vortex-file/src/v2/open/mod.rs index 7e7e710f15..63cc37061c 100644 --- a/vortex-file/src/v2/open/mod.rs +++ b/vortex-file/src/v2/open/mod.rs @@ -10,7 +10,7 @@ use vortex_array::ContextRef; use vortex_buffer::{ByteBuffer, ByteBufferMut}; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; -use vortex_flatbuffers::{dtype as fbd, footer2 as fb, ReadFlatBuffer}; +use vortex_flatbuffers::{dtype as fbd, footer2 as fb, FlatBuffer, ReadFlatBuffer}; use vortex_io::VortexReadAt; use vortex_layout::segments::SegmentId; use vortex_layout::{LayoutContextRef, LayoutData, LayoutId}; @@ -184,7 +184,8 @@ impl OpenOptions { dtype: Segment, ) -> VortexResult { let offset = usize::try_from(dtype.offset - initial_offset)?; - let sliced_buffer = initial_read.slice(offset..offset + dtype.length); + let sliced_buffer = + FlatBuffer::align_from(initial_read.slice(offset..offset + dtype.length)); let fbd_dtype = root::(&sliced_buffer)?; DType::try_from_view(fbd_dtype, sliced_buffer.clone()) diff --git a/vortex-flatbuffers/Cargo.toml b/vortex-flatbuffers/Cargo.toml index 7e2f4216c6..75fa518de3 100644 --- a/vortex-flatbuffers/Cargo.toml +++ b/vortex-flatbuffers/Cargo.toml @@ -22,8 +22,8 @@ layout = ["array"] file = ["ipc"] [dependencies] -bytes = { workspace = true } flatbuffers = { workspace = true } +vortex-buffer = { workspace = true } [lints] workspace = true diff --git a/vortex-flatbuffers/src/lib.rs b/vortex-flatbuffers/src/lib.rs index 7370fa9cdc..5c38979d0b 100644 --- a/vortex-flatbuffers/src/lib.rs +++ b/vortex-flatbuffers/src/lib.rs @@ -158,8 +158,14 @@ pub mod layout; /// ``` pub mod message; -use bytes::Bytes; use flatbuffers::{root, FlatBufferBuilder, Follow, InvalidFlatbuffer, Verifiable, WIPOffset}; +use vortex_buffer::{ByteBuffer, ConstByteBuffer}; + +/// We define a const-aligned byte buffer for flatbuffers with 8-byte alignment. +/// +/// This is based on the assumption that the maximum primitive type is 8 bytes. +/// See: +pub type FlatBuffer = ConstByteBuffer<8>; pub trait FlatBufferRoot {} @@ -190,17 +196,17 @@ pub trait WriteFlatBuffer { } pub trait WriteFlatBufferExt: WriteFlatBuffer + FlatBufferRoot { - /// Write the flatbuffer into a [`Bytes`]. - fn write_flatbuffer_bytes(&self) -> Bytes; + /// Write the flatbuffer into a [`FlatBuffer`]. + fn write_flatbuffer_bytes(&self) -> FlatBuffer; } impl WriteFlatBufferExt for F { - fn write_flatbuffer_bytes(&self) -> Bytes { + fn write_flatbuffer_bytes(&self) -> FlatBuffer { let mut fbb = FlatBufferBuilder::new(); let root_offset = self.write_flatbuffer(&mut fbb); fbb.finish_minimal(root_offset); let (vec, start) = fbb.collapse(); let end = vec.len(); - Bytes::from(vec).slice(start..end) + FlatBuffer::align_from(ByteBuffer::from(vec).slice(start..end)) } } diff --git a/vortex-io/src/io_buf.rs b/vortex-io/src/io_buf.rs index d243ec60f9..439a45aa0b 100644 --- a/vortex-io/src/io_buf.rs +++ b/vortex-io/src/io_buf.rs @@ -3,6 +3,7 @@ use std::ops::Range; use bytes::Bytes; +use vortex_buffer::ConstByteBuffer; /// Trait for types that can provide a readonly byte buffer interface to I/O frameworks. /// @@ -127,3 +128,17 @@ unsafe impl IoBuf for Bytes { self.as_ref() } } + +unsafe impl IoBuf for ConstByteBuffer { + fn read_ptr(&self) -> *const u8 { + self.as_ptr() + } + + fn bytes_init(&self) -> usize { + self.len() + } + + fn as_slice(&self) -> &[u8] { + self.as_ref() + } +} diff --git a/vortex-ipc/src/iterator.rs b/vortex-ipc/src/iterator.rs index 3c293b40f7..88decc284e 100644 --- a/vortex-ipc/src/iterator.rs +++ b/vortex-ipc/src/iterator.rs @@ -45,7 +45,7 @@ impl Iterator for SyncIPCReader { Ok(msg) => match msg { DecoderMessage::Array(array_parts) => Some( array_parts - .into_array_data(self.ctx.clone(), self.dtype.clone()) + .decode(self.ctx.clone(), self.dtype.clone()) .and_then(|array| { if array.dtype() != self.dtype() { Err(vortex_err!( diff --git a/vortex-ipc/src/messages/decoder.rs b/vortex-ipc/src/messages/decoder.rs index 1146340d74..79112947c7 100644 --- a/vortex-ipc/src/messages/decoder.rs +++ b/vortex-ipc/src/messages/decoder.rs @@ -1,22 +1,22 @@ -use std::fmt::{Debug, Formatter}; +use std::fmt::Debug; use bytes::Buf; -use flatbuffers::{root, root_unchecked, Follow}; +use flatbuffers::{root, root_unchecked}; use itertools::Itertools; -use vortex_array::{flatbuffers as fba, ArrayData, ContextRef}; +use vortex_array::parts::ArrayParts; use vortex_buffer::{AlignedBuf, Alignment, ByteBuffer}; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; -use vortex_flatbuffers::message as fb; use vortex_flatbuffers::message::{MessageHeader, MessageVersion}; +use vortex_flatbuffers::{message as fb, FlatBuffer}; use crate::ALIGNMENT; /// A message decoded from an IPC stream. /// -/// Note that the `Array` variant cannot fully decode into an [`ArrayData`] without a [`ContextRef`] -/// and a [`DType`]. As such, we partially decode into an [`ArrayParts`] and allow the caller to -/// finish the decoding. +/// Note that the `Array` variant cannot fully decode into an [`vortex_array::ArrayData`] without +/// a [`vortex_array::ContextRef`] and a [`DType`]. As such, we partially decode into an +/// [`ArrayParts`] and allow the caller to finish the decoding. #[derive(Debug)] pub enum DecoderMessage { Array(ArrayParts), @@ -24,41 +24,6 @@ pub enum DecoderMessage { DType(DType), } -/// ArrayParts represents a partially decoded Vortex array. -/// It can be completely decoded calling `into_array_data` with a context and dtype. -pub struct ArrayParts { - row_count: usize, - // Typed as fb::Array - // FIXME(ngates): use ConstAlignedArray aliased to FlatBuffer - array_flatbuffer: ByteBuffer, - array_flatbuffer_loc: usize, - buffers: Vec, -} - -impl Debug for ArrayParts { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ArrayComponents") - .field("row_count", &self.row_count) - .field("array_flatbuffer", &self.array_flatbuffer.len()) - .field("buffers", &self.buffers.len()) - .finish() - } -} - -impl ArrayParts { - pub fn into_array_data(self, ctx: ContextRef, dtype: DType) -> VortexResult { - ArrayData::try_new_viewed( - ctx, - dtype, - self.row_count, - self.array_flatbuffer, - // SAFETY: ArrayComponents guarantees the buffers are valid. - |buf| unsafe { Ok(fba::Array::follow(buf, self.array_flatbuffer_loc)) }, - self.buffers, - ) - } -} - #[derive(Default)] enum State { #[default] @@ -69,7 +34,7 @@ enum State { } struct ReadingArray { - header: ByteBuffer, + header: FlatBuffer, buffers_length: usize, } @@ -108,11 +73,6 @@ impl Default for MessageDecoder { } } -/// The alignment required for a flatbuffer message. -/// This is based on the assumption that the maximum primitive type is 8 bytes. -/// See: https://groups.google.com/g/flatbuffers/c/PSgQeWeTx_g -const FB_ALIGNMENT: Alignment = Alignment::new(8); - impl MessageDecoder { /// Attempt to read the next message from the bytes object. /// @@ -135,7 +95,7 @@ impl MessageDecoder { return Ok(PollRead::NeedMore(*msg_length)); } - let msg_bytes = bytes.copy_to_aligned(*msg_length, FB_ALIGNMENT); + let msg_bytes = bytes.copy_to_const_aligned(*msg_length); let msg = root::(msg_bytes.as_ref())?; if msg.version() != MessageVersion::V0 { vortex_bail!("Unsupported message version {:?}", msg.version()); @@ -245,12 +205,12 @@ impl MessageDecoder { let row_count = usize::try_from(array_data_msg.row_count()) .map_err(|_| vortex_err!("row count is too large for usize"))?; - let msg = DecoderMessage::Array(ArrayParts { + let msg = DecoderMessage::Array(ArrayParts::new( row_count, - array_flatbuffer: header.clone(), - array_flatbuffer_loc: array_msg._tab.loc(), + array_msg, + header.clone(), buffers, - }); + )); self.state = Default::default(); return Ok(PollRead::Some(msg)); @@ -264,7 +224,7 @@ impl MessageDecoder { mod test { use bytes::BytesMut; use vortex_array::array::ConstantArray; - use vortex_array::{ArrayDType, IntoArrayData}; + use vortex_array::{ArrayDType, ArrayData, IntoArrayData}; use vortex_buffer::buffer; use vortex_error::vortex_panic; @@ -289,7 +249,7 @@ mod test { // Decode the array parts with the context let actual = array_parts - .into_array_data(Default::default(), expected.dtype().clone()) + .decode(Default::default(), expected.dtype().clone()) .unwrap(); assert_eq!(expected.len(), actual.len()); diff --git a/vortex-ipc/src/stream.rs b/vortex-ipc/src/stream.rs index fefa3e1265..686b732080 100644 --- a/vortex-ipc/src/stream.rs +++ b/vortex-ipc/src/stream.rs @@ -60,7 +60,7 @@ impl Stream for AsyncIPCReader { Some(msg) => match msg { Ok(DecoderMessage::Array(array_parts)) => Poll::Ready(Some( array_parts - .into_array_data(this.ctx.clone(), this.dtype.clone()) + .decode(this.ctx.clone(), this.dtype.clone()) .and_then(|array| { if array.dtype() != this.dtype { Err(vortex_err!( diff --git a/vortex-layout/src/layouts/flat/evaluator.rs b/vortex-layout/src/layouts/flat/evaluator.rs index e498a8a9c8..8c32695041 100644 --- a/vortex-layout/src/layouts/flat/evaluator.rs +++ b/vortex-layout/src/layouts/flat/evaluator.rs @@ -45,7 +45,7 @@ impl Operation for FlatEvaluator { .next() .ok_or_else(|| vortex_err!("Flat message body missing"))?? { - parts.into_array_data(self.reader.ctx(), self.reader.dtype().clone()) + parts.decode(self.reader.ctx(), self.reader.dtype().clone()) } else { vortex_bail!("Flat message is not ArrayParts") }?;