diff --git a/vortex-array/src/parts.rs b/vortex-array/src/parts.rs index 4295a93116..7153d2267e 100644 --- a/vortex-array/src/parts.rs +++ b/vortex-array/src/parts.rs @@ -1,11 +1,15 @@ use std::fmt::{Debug, Formatter}; -use flatbuffers::Follow; +use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset}; +use itertools::Itertools; use vortex_buffer::ByteBuffer; use vortex_dtype::DType; -use vortex_error::{vortex_panic, VortexResult}; -use vortex_flatbuffers::{array as fba, FlatBuffer}; +use vortex_error::{vortex_panic, VortexExpect, VortexResult}; +use vortex_flatbuffers::{ + array as fba, FlatBuffer, FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt, +}; +use crate::stats::ArrayStatistics; use crate::{ArrayData, ContextRef}; /// [`ArrayParts`] represents the information from an [`ArrayData`] that makes up the serialized @@ -14,6 +18,7 @@ use crate::{ArrayData, ContextRef}; /// /// An [`ArrayParts`] can be fully decoded into an [`ArrayData`] using the `decode` function. pub struct ArrayParts { + // TODO(ngates): I think we should remove this. It's not required in the serialized form. row_count: usize, // Typed as fb::Array flatbuffer: FlatBuffer, @@ -74,3 +79,99 @@ impl ArrayParts { ) } } + +/// Convert an [`ArrayData`] into [`ArrayParts`]. +impl From for ArrayParts { + fn from(array: ArrayData) -> Self { + let flatbuffer = ArrayPartsFlatBuffer { + array: &array, + buffer_idx: 0, + } + .write_flatbuffer_bytes(); + let mut buffers: Vec = vec![]; + for child in array.depth_first_traversal() { + for buffer in child.byte_buffers() { + buffers.push(buffer); + } + } + Self { + row_count: array.len(), + flatbuffer, + flatbuffer_loc: 0, + buffers, + } + } +} + +/// A utility struct for creating an [`fba::Array`] flatbuffer. +pub struct ArrayPartsFlatBuffer<'a> { + array: &'a ArrayData, + buffer_idx: u16, +} + +impl<'a> ArrayPartsFlatBuffer<'a> { + pub fn new(array: &'a ArrayData) -> Self { + Self { + array, + buffer_idx: 0, + } + } +} + +impl FlatBufferRoot for ArrayPartsFlatBuffer<'_> {} + +impl WriteFlatBuffer for ArrayPartsFlatBuffer<'_> { + type Target<'t> = fba::Array<'t>; + + fn write_flatbuffer<'fb>( + &self, + fbb: &mut FlatBufferBuilder<'fb>, + ) -> WIPOffset> { + let encoding = self.array.encoding().id().code(); + let metadata = self + .array + .metadata_bytes() + .vortex_expect("IPCArray is missing metadata during serialization"); + let metadata = Some(fbb.create_vector(metadata.as_ref())); + + // Assign buffer indices for all child arrays. + let nbuffers = u16::try_from(self.array.nbuffers()) + .vortex_expect("Array can have at most u16::MAX buffers"); + let child_buffer_idx = self.buffer_idx + nbuffers; + + let children = self + .array + .children() + .iter() + .scan(child_buffer_idx, |buffer_idx, child| { + // Update the number of buffers required. + let msg = ArrayPartsFlatBuffer { + array: child, + buffer_idx: *buffer_idx, + } + .write_flatbuffer(fbb); + *buffer_idx = u16::try_from(child.cumulative_nbuffers()) + .ok() + .and_then(|nbuffers| nbuffers.checked_add(*buffer_idx)) + .vortex_expect("Too many buffers (u16) for ArrayData"); + Some(msg) + }) + .collect_vec(); + let children = Some(fbb.create_vector(&children)); + + let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx))); + + let stats = Some(self.array.statistics().write_flatbuffer(fbb)); + + fba::Array::create( + fbb, + &fba::ArrayArgs { + encoding, + metadata, + children, + buffers, + stats, + }, + ) + } +} diff --git a/vortex-buffer/src/alignment.rs b/vortex-buffer/src/alignment.rs index 7a1293762a..578ff9311f 100644 --- a/vortex-buffer/src/alignment.rs +++ b/vortex-buffer/src/alignment.rs @@ -58,6 +58,22 @@ impl Alignment { // of trailing zeros in the binary representation of the alignment is greater or equal. self.0.trailing_zeros() >= other.0.trailing_zeros() } + + /// Returns the log2 of the alignment. + pub fn exponent(&self) -> u8 { + u8::try_from(self.0.trailing_zeros()) + .vortex_expect("alignment fits into u16, so exponent fits in u7") + } + + /// Create from the log2 exponent of the alignment. + /// + /// ## Panics + /// + /// Panics if `alignment` is not a power of 2, or is greater than `u16::MAX`. + #[inline] + pub const fn from_exponent(exponent: u8) -> Self { + Self::new(1 << exponent) + } } impl Display for Alignment { @@ -120,6 +136,13 @@ mod test { Alignment::new(3); } + #[test] + fn alignment_exponent() { + let alignment = Alignment::new(1024); + assert_eq!(alignment.exponent(), 10); + assert_eq!(Alignment::from_exponent(10), alignment); + } + #[test] fn is_aligned_to() { assert!(Alignment::new(1).is_aligned_to(Alignment::new(1))); diff --git a/vortex-file/src/v2/footer/file_layout.rs b/vortex-file/src/v2/footer/file_layout.rs index e92a71cf1e..d16939ddcd 100644 --- a/vortex-file/src/v2/footer/file_layout.rs +++ b/vortex-file/src/v2/footer/file_layout.rs @@ -22,14 +22,7 @@ impl WriteFlatBuffer for FileLayout { fbb: &mut flatbuffers::FlatBufferBuilder<'fb>, ) -> flatbuffers::WIPOffset> { let root_layout = self.root_layout.write_flatbuffer(fbb); - - let segments = self - .segments - .iter() - .map(|segment| segment.write_flatbuffer(fbb)) - .collect::>(); - let segments = fbb.create_vector(&segments); - + let segments = fbb.create_vector_from_iter(self.segments.iter().map(fb::Segment::from)); fb::FileLayout::create( fbb, &fb::FileLayoutArgs { diff --git a/vortex-file/src/v2/footer/postscript.rs b/vortex-file/src/v2/footer/postscript.rs index c989a20024..3276d70b96 100644 --- a/vortex-file/src/v2/footer/postscript.rs +++ b/vortex-file/src/v2/footer/postscript.rs @@ -19,13 +19,13 @@ impl WriteFlatBuffer for Postscript { &self, fbb: &mut flatbuffers::FlatBufferBuilder<'fb>, ) -> flatbuffers::WIPOffset> { - let dtype = self.dtype.write_flatbuffer(fbb); - let file_layout = self.file_layout.write_flatbuffer(fbb); + let dtype = fb::Segment::from(&self.dtype); + let file_layout = fb::Segment::from(&self.file_layout); fb::Postscript::create( fbb, &fb::PostscriptArgs { - dtype: Some(dtype), - file_layout: Some(file_layout), + dtype: Some(&dtype), + file_layout: Some(&file_layout), }, ) } @@ -39,12 +39,12 @@ impl ReadFlatBuffer for Postscript { fb: & as Follow<'buf>>::Inner, ) -> Result { Ok(Self { - dtype: Segment::read_flatbuffer( - &fb.dtype() + dtype: Segment::try_from( + fb.dtype() .ok_or_else(|| vortex_err!("Postscript missing dtype segment"))?, )?, - file_layout: Segment::read_flatbuffer( - &fb.file_layout() + file_layout: Segment::try_from( + fb.file_layout() .ok_or_else(|| vortex_err!("Postscript missing file_layout segment"))?, )?, }) diff --git a/vortex-file/src/v2/footer/segment.rs b/vortex-file/src/v2/footer/segment.rs index 2c609e6434..f988123f50 100644 --- a/vortex-file/src/v2/footer/segment.rs +++ b/vortex-file/src/v2/footer/segment.rs @@ -1,42 +1,29 @@ -use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset}; -use vortex_error::{vortex_err, VortexError}; -use vortex_flatbuffers::{footer2 as fb, ReadFlatBuffer, WriteFlatBuffer}; +use vortex_buffer::Alignment; +use vortex_error::VortexError; +use vortex_flatbuffers::footer2 as fb; /// The location of a segment within a Vortex file. #[derive(Clone, Debug)] pub(crate) struct Segment { pub(crate) offset: u64, - pub(crate) length: usize, + pub(crate) length: u32, + pub(crate) alignment: Alignment, } -impl WriteFlatBuffer for Segment { - type Target<'a> = fb::Segment<'a>; - - fn write_flatbuffer<'fb>( - &self, - fbb: &mut FlatBufferBuilder<'fb>, - ) -> WIPOffset> { - fb::Segment::create( - fbb, - &fb::SegmentArgs { - offset: self.offset, - length: self.length as u64, - }, - ) +impl From<&Segment> for fb::Segment { + fn from(value: &Segment) -> Self { + fb::Segment::new(value.offset, value.length, value.alignment.exponent(), 0, 0) } } -impl ReadFlatBuffer for Segment { - type Source<'a> = fb::Segment<'a>; +impl TryFrom<&fb::Segment> for Segment { type Error = VortexError; - fn read_flatbuffer<'buf>( - fb: & as Follow<'buf>>::Inner, - ) -> Result { + fn try_from(value: &fb::Segment) -> Result { Ok(Self { - offset: fb.offset(), - length: usize::try_from(fb.length()) - .map_err(|_| vortex_err!("segment length exceeds maximum usize"))?, + offset: value.offset(), + length: value.length(), + alignment: Alignment::from_exponent(value.alignment_exponent()), }) } } diff --git a/vortex-file/src/v2/open/mod.rs b/vortex-file/src/v2/open/mod.rs index 84f9968c3c..345f7c8870 100644 --- a/vortex-file/src/v2/open/mod.rs +++ b/vortex-file/src/v2/open/mod.rs @@ -183,7 +183,7 @@ impl OpenOptions { ) -> VortexResult { let offset = usize::try_from(dtype.offset - initial_offset)?; let sliced_buffer = - FlatBuffer::align_from(initial_read.slice(offset..offset + dtype.length)); + FlatBuffer::align_from(initial_read.slice(offset..offset + (dtype.length as usize))); let fbd_dtype = root::(&sliced_buffer)?; DType::try_from_view(fbd_dtype, sliced_buffer.clone()) @@ -198,7 +198,7 @@ impl OpenOptions { dtype: DType, ) -> VortexResult { let offset = usize::try_from(segment.offset - initial_offset)?; - let bytes = initial_read.slice(offset..offset + segment.length); + let bytes = initial_read.slice(offset..offset + (segment.length as usize)); let fb = root::(&bytes)?; let fb_root_layout = fb @@ -226,10 +226,7 @@ impl OpenOptions { let fb_segments = fb .segments() .ok_or_else(|| vortex_err!("FileLayout missing segments"))?; - let segments = fb_segments - .iter() - .map(|s| Segment::read_flatbuffer(&s)) - .try_collect()?; + let segments = fb_segments.iter().map(Segment::try_from).try_collect()?; Ok(FileLayout { root_layout, @@ -253,9 +250,9 @@ impl OpenOptions { let segment_id = SegmentId::from(u32::try_from(idx)?); let offset = usize::try_from(segment.offset - initial_offset)?; - let bytes = initial_read.slice(offset..offset + segment.length); + let buffer = initial_read.slice(offset..offset + (segment.length as usize)); - segments.set(segment_id, bytes.into_inner())?; + segments.set(segment_id, buffer)?; } Ok(()) } diff --git a/vortex-file/src/v2/segments/cache.rs b/vortex-file/src/v2/segments/cache.rs index 0700515fe4..0fd0ab0b68 100644 --- a/vortex-file/src/v2/segments/cache.rs +++ b/vortex-file/src/v2/segments/cache.rs @@ -3,11 +3,12 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; -use bytes::Bytes; use futures::channel::oneshot; use futures_util::future::try_join_all; +use futures_util::TryFutureExt; use itertools::Itertools; use vortex_array::aliases::hash_map::HashMap; +use vortex_buffer::ByteBuffer; use vortex_error::{vortex_err, VortexResult}; use vortex_io::VortexReadAt; use vortex_layout::segments::{AsyncSegmentReader, SegmentId}; @@ -17,7 +18,7 @@ use crate::v2::footer::Segment; pub(crate) struct SegmentCache { read: R, segments: Arc<[Segment]>, - inflight: RwLock>>>, + inflight: RwLock>>>, } impl SegmentCache { @@ -29,7 +30,7 @@ impl SegmentCache { } } - pub fn set(&mut self, _segment_id: SegmentId, _bytes: Bytes) -> VortexResult<()> { + pub fn set(&mut self, _segment_id: SegmentId, _bytes: ByteBuffer) -> VortexResult<()> { // Do nothing for now Ok(()) } @@ -55,6 +56,7 @@ impl SegmentCache { let segment = &self.segments[**id as usize]; self.read .read_byte_range(segment.offset, segment.length as u64) + .map_ok(|bytes| ByteBuffer::from(bytes).aligned(segment.alignment)) })) .await?; @@ -77,7 +79,7 @@ impl SegmentCache { #[async_trait] impl AsyncSegmentReader for SegmentCache { - async fn get(&self, id: SegmentId) -> VortexResult { + async fn get(&self, id: SegmentId) -> VortexResult { let (send, recv) = oneshot::channel(); self.inflight .write() diff --git a/vortex-file/src/v2/segments/writer.rs b/vortex-file/src/v2/segments/writer.rs index e636add432..4a708a82f8 100644 --- a/vortex-file/src/v2/segments/writer.rs +++ b/vortex-file/src/v2/segments/writer.rs @@ -1,4 +1,4 @@ -use bytes::Bytes; +use vortex_buffer::ByteBuffer; use vortex_error::{vortex_err, VortexResult}; use vortex_io::VortexWrite; use vortex_layout::segments::{SegmentId, SegmentWriter}; @@ -8,12 +8,12 @@ use crate::v2::footer::Segment; /// A segment writer that holds buffers in memory until they are flushed by a writer. #[derive(Default)] pub(crate) struct BufferedSegmentWriter { - segments: Vec>, + segments: Vec, next_id: SegmentId, } impl SegmentWriter for BufferedSegmentWriter { - fn put(&mut self, data: Vec) -> SegmentId { + fn put(&mut self, data: ByteBuffer) -> SegmentId { self.segments.push(data); let id = self.next_id; self.next_id = SegmentId::from(*self.next_id + 1); @@ -28,14 +28,16 @@ impl BufferedSegmentWriter { write: &mut futures_util::io::Cursor, segments: &mut Vec, ) -> VortexResult<()> { - for segment in self.segments.drain(..) { + for buffer in self.segments.drain(..) { let offset = write.position(); - for buffer in segment { - write.write_all(buffer).await?; - } - let length = usize::try_from(write.position() - offset) - .map_err(|_| vortex_err!("segment length exceeds maximum usize"))?; - segments.push(Segment { offset, length }); + let alignment = buffer.alignment(); + write.write_all(buffer.into_inner()).await?; + segments.push(Segment { + offset, + length: u32::try_from(write.position() - offset) + .map_err(|_| vortex_err!("segment length exceeds maximum u32"))?, + alignment, + }); } Ok(()) } diff --git a/vortex-file/src/v2/writer.rs b/vortex-file/src/v2/writer.rs index 2a01fb7b9e..0284ecac29 100644 --- a/vortex-file/src/v2/writer.rs +++ b/vortex-file/src/v2/writer.rs @@ -4,7 +4,7 @@ use futures_util::StreamExt; use vortex_array::iter::ArrayIterator; use vortex_array::stream::ArrayStream; use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult}; -use vortex_flatbuffers::{FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt}; +use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt}; use vortex_io::VortexWrite; use vortex_layout::strategies::LayoutStrategy; @@ -120,8 +120,9 @@ impl WriteOptions { write.write_all(flatbuffer.write_flatbuffer_bytes()).await?; Ok(Segment { offset: layout_offset, - length: usize::try_from(write.position() - layout_offset) - .map_err(|_| vortex_err!("segment length exceeds maximum usize"))?, + length: u32::try_from(write.position() - layout_offset) + .map_err(|_| vortex_err!("segment length exceeds maximum u32"))?, + alignment: FlatBuffer::alignment(), }) } } diff --git a/vortex-flatbuffers/flatbuffers/vortex-file/footer2.fbs b/vortex-flatbuffers/flatbuffers/vortex-file/footer2.fbs index 0c2d1b553e..408d87e4e3 100644 --- a/vortex-flatbuffers/flatbuffers/vortex-file/footer2.fbs +++ b/vortex-flatbuffers/flatbuffers/vortex-file/footer2.fbs @@ -1,10 +1,19 @@ include "vortex-layout/layout.fbs"; /// A `Segment` acts as the locator for a buffer within the file. -table Segment { +// TODO(ngates): I think this should actually be more of a German String style struct? I don't know how +// many buffers we have that are <128 bits. +struct Segment { offset: uint64; - length: uint64; - // NOTE(ngates): add compression and encryption information + length: uint32; + alignment_exponent: uint8; + // These two fields are reserved for future use, I imagine they will be pointers + // into a compression scheme and encryption scheme tables that live in the FileLayout. + // This struct would have been padded to 128 bits anyway, so we're not losing anything. + // We could also restrict alignment exponent to e.g. 5 bits, and then use 3 bits to indicate a pointer into the + // compression scheme table. + _compression: uint8; + _encryption: uint16; } /// The `FileLayout` stores the root `Layout` as well as location information for each referenced segment. diff --git a/vortex-flatbuffers/src/generated/footer2.rs b/vortex-flatbuffers/src/generated/footer2.rs index 49b374b0e5..301f16e5e1 100644 --- a/vortex-flatbuffers/src/generated/footer2.rs +++ b/vortex-flatbuffers/src/generated/footer2.rs @@ -10,121 +10,231 @@ use core::cmp::Ordering; extern crate flatbuffers; use self::flatbuffers::{EndianScalar, Follow}; -pub enum SegmentOffset {} -#[derive(Copy, Clone, PartialEq)] - /// A `Segment` acts as the locator for a buffer within the file. -pub struct Segment<'a> { - pub _tab: flatbuffers::Table<'a>, -} - -impl<'a> flatbuffers::Follow<'a> for Segment<'a> { - type Inner = Segment<'a>; +// struct Segment, aligned to 8 +#[repr(transparent)] +#[derive(Clone, Copy, PartialEq)] +pub struct Segment(pub [u8; 16]); +impl Default for Segment { + fn default() -> Self { + Self([0; 16]) + } +} +impl core::fmt::Debug for Segment { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + f.debug_struct("Segment") + .field("offset", &self.offset()) + .field("length", &self.length()) + .field("alignment_exponent", &self.alignment_exponent()) + .field("_compression", &self._compression()) + .field("_encryption", &self._encryption()) + .finish() + } +} + +impl flatbuffers::SimpleToVerifyInSlice for Segment {} +impl<'a> flatbuffers::Follow<'a> for Segment { + type Inner = &'a Segment; #[inline] unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - Self { _tab: flatbuffers::Table::new(buf, loc) } + <&'a Segment>::follow(buf, loc) } } - -impl<'a> Segment<'a> { - pub const VT_OFFSET: flatbuffers::VOffsetT = 4; - pub const VT_LENGTH: flatbuffers::VOffsetT = 6; - - #[inline] - pub unsafe fn init_from_table(table: flatbuffers::Table<'a>) -> Self { - Segment { _tab: table } - } - #[allow(unused_mut)] - pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr, A: flatbuffers::Allocator + 'bldr>( - _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr, A>, - args: &'args SegmentArgs - ) -> flatbuffers::WIPOffset> { - let mut builder = SegmentBuilder::new(_fbb); - builder.add_length(args.length); - builder.add_offset(args.offset); - builder.finish() - } - - - #[inline] - pub fn offset(&self) -> u64 { - // Safety: - // Created from valid Table for this object - // which contains a valid value in this slot - unsafe { self._tab.get::(Segment::VT_OFFSET, Some(0)).unwrap()} - } +impl<'a> flatbuffers::Follow<'a> for &'a Segment { + type Inner = &'a Segment; #[inline] - pub fn length(&self) -> u64 { - // Safety: - // Created from valid Table for this object - // which contains a valid value in this slot - unsafe { self._tab.get::(Segment::VT_LENGTH, Some(0)).unwrap()} + unsafe fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + flatbuffers::follow_cast_ref::(buf, loc) } } +impl<'b> flatbuffers::Push for Segment { + type Output = Segment; + #[inline] + unsafe fn push(&self, dst: &mut [u8], _written_len: usize) { + let src = ::core::slice::from_raw_parts(self as *const Segment as *const u8, ::size()); + dst.copy_from_slice(src); + } + #[inline] + fn alignment() -> flatbuffers::PushAlignment { + flatbuffers::PushAlignment::new(8) + } +} -impl flatbuffers::Verifiable for Segment<'_> { +impl<'a> flatbuffers::Verifiable for Segment { #[inline] fn run_verifier( v: &mut flatbuffers::Verifier, pos: usize ) -> Result<(), flatbuffers::InvalidFlatbuffer> { use self::flatbuffers::Verifiable; - v.visit_table(pos)? - .visit_field::("offset", Self::VT_OFFSET, false)? - .visit_field::("length", Self::VT_LENGTH, false)? - .finish(); - Ok(()) + v.in_buffer::(pos) } } -pub struct SegmentArgs { - pub offset: u64, - pub length: u64, -} -impl<'a> Default for SegmentArgs { - #[inline] - fn default() -> Self { - SegmentArgs { - offset: 0, - length: 0, - } + +impl<'a> Segment { + #[allow(clippy::too_many_arguments)] + pub fn new( + offset: u64, + length: u32, + alignment_exponent: u8, + _compression: u8, + _encryption: u16, + ) -> Self { + let mut s = Self([0; 16]); + s.set_offset(offset); + s.set_length(length); + s.set_alignment_exponent(alignment_exponent); + s.set__compression(_compression); + s.set__encryption(_encryption); + s } -} -pub struct SegmentBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> { - fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a, A>, - start_: flatbuffers::WIPOffset, -} -impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> SegmentBuilder<'a, 'b, A> { - #[inline] - pub fn add_offset(&mut self, offset: u64) { - self.fbb_.push_slot::(Segment::VT_OFFSET, offset, 0); + pub fn offset(&self) -> u64 { + let mut mem = core::mem::MaybeUninit::<::Scalar>::uninit(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid value in this slot + EndianScalar::from_little_endian(unsafe { + core::ptr::copy_nonoverlapping( + self.0[0..].as_ptr(), + mem.as_mut_ptr() as *mut u8, + core::mem::size_of::<::Scalar>(), + ); + mem.assume_init() + }) + } + + pub fn set_offset(&mut self, x: u64) { + let x_le = x.to_little_endian(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid value in this slot + unsafe { + core::ptr::copy_nonoverlapping( + &x_le as *const _ as *const u8, + self.0[0..].as_mut_ptr(), + core::mem::size_of::<::Scalar>(), + ); + } } - #[inline] - pub fn add_length(&mut self, length: u64) { - self.fbb_.push_slot::(Segment::VT_LENGTH, length, 0); + + pub fn length(&self) -> u32 { + let mut mem = core::mem::MaybeUninit::<::Scalar>::uninit(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid value in this slot + EndianScalar::from_little_endian(unsafe { + core::ptr::copy_nonoverlapping( + self.0[8..].as_ptr(), + mem.as_mut_ptr() as *mut u8, + core::mem::size_of::<::Scalar>(), + ); + mem.assume_init() + }) + } + + pub fn set_length(&mut self, x: u32) { + let x_le = x.to_little_endian(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid value in this slot + unsafe { + core::ptr::copy_nonoverlapping( + &x_le as *const _ as *const u8, + self.0[8..].as_mut_ptr(), + core::mem::size_of::<::Scalar>(), + ); + } } - #[inline] - pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> SegmentBuilder<'a, 'b, A> { - let start = _fbb.start_table(); - SegmentBuilder { - fbb_: _fbb, - start_: start, + + pub fn alignment_exponent(&self) -> u8 { + let mut mem = core::mem::MaybeUninit::<::Scalar>::uninit(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid value in this slot + EndianScalar::from_little_endian(unsafe { + core::ptr::copy_nonoverlapping( + self.0[12..].as_ptr(), + mem.as_mut_ptr() as *mut u8, + core::mem::size_of::<::Scalar>(), + ); + mem.assume_init() + }) + } + + pub fn set_alignment_exponent(&mut self, x: u8) { + let x_le = x.to_little_endian(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid value in this slot + unsafe { + core::ptr::copy_nonoverlapping( + &x_le as *const _ as *const u8, + self.0[12..].as_mut_ptr(), + core::mem::size_of::<::Scalar>(), + ); } } - #[inline] - pub fn finish(self) -> flatbuffers::WIPOffset> { - let o = self.fbb_.end_table(self.start_); - flatbuffers::WIPOffset::new(o.value()) + + pub fn _compression(&self) -> u8 { + let mut mem = core::mem::MaybeUninit::<::Scalar>::uninit(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid value in this slot + EndianScalar::from_little_endian(unsafe { + core::ptr::copy_nonoverlapping( + self.0[13..].as_ptr(), + mem.as_mut_ptr() as *mut u8, + core::mem::size_of::<::Scalar>(), + ); + mem.assume_init() + }) + } + + pub fn set__compression(&mut self, x: u8) { + let x_le = x.to_little_endian(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid value in this slot + unsafe { + core::ptr::copy_nonoverlapping( + &x_le as *const _ as *const u8, + self.0[13..].as_mut_ptr(), + core::mem::size_of::<::Scalar>(), + ); + } } -} -impl core::fmt::Debug for Segment<'_> { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { - let mut ds = f.debug_struct("Segment"); - ds.field("offset", &self.offset()); - ds.field("length", &self.length()); - ds.finish() + pub fn _encryption(&self) -> u16 { + let mut mem = core::mem::MaybeUninit::<::Scalar>::uninit(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid value in this slot + EndianScalar::from_little_endian(unsafe { + core::ptr::copy_nonoverlapping( + self.0[14..].as_ptr(), + mem.as_mut_ptr() as *mut u8, + core::mem::size_of::<::Scalar>(), + ); + mem.assume_init() + }) + } + + pub fn set__encryption(&mut self, x: u16) { + let x_le = x.to_little_endian(); + // Safety: + // Created from a valid Table for this object + // Which contains a valid value in this slot + unsafe { + core::ptr::copy_nonoverlapping( + &x_le as *const _ as *const u8, + self.0[14..].as_mut_ptr(), + core::mem::size_of::<::Scalar>(), + ); + } } + } + pub enum FileLayoutOffset {} #[derive(Copy, Clone, PartialEq)] @@ -169,11 +279,11 @@ impl<'a> FileLayout<'a> { unsafe { self._tab.get::>(FileLayout::VT_ROOT_LAYOUT, None)} } #[inline] - pub fn segments(&self) -> Option>>> { + pub fn segments(&self) -> Option> { // Safety: // Created from valid Table for this object // which contains a valid value in this slot - unsafe { self._tab.get::>>>(FileLayout::VT_SEGMENTS, None)} + unsafe { self._tab.get::>>(FileLayout::VT_SEGMENTS, None)} } } @@ -185,14 +295,14 @@ impl flatbuffers::Verifiable for FileLayout<'_> { use self::flatbuffers::Verifiable; v.visit_table(pos)? .visit_field::>("root_layout", Self::VT_ROOT_LAYOUT, false)? - .visit_field::>>>("segments", Self::VT_SEGMENTS, false)? + .visit_field::>>("segments", Self::VT_SEGMENTS, false)? .finish(); Ok(()) } } pub struct FileLayoutArgs<'a> { pub root_layout: Option>>, - pub segments: Option>>>>, + pub segments: Option>>, } impl<'a> Default for FileLayoutArgs<'a> { #[inline] @@ -214,7 +324,7 @@ impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> FileLayoutBuilder<'a, 'b, A> { self.fbb_.push_slot_always::>(FileLayout::VT_ROOT_LAYOUT, root_layout); } #[inline] - pub fn add_segments(&mut self, segments: flatbuffers::WIPOffset>>>) { + pub fn add_segments(&mut self, segments: flatbuffers::WIPOffset>) { self.fbb_.push_slot_always::>(FileLayout::VT_SEGMENTS, segments); } #[inline] @@ -288,18 +398,18 @@ impl<'a> Postscript<'a> { #[inline] - pub fn dtype(&self) -> Option> { + pub fn dtype(&self) -> Option<&'a Segment> { // Safety: // Created from valid Table for this object // which contains a valid value in this slot - unsafe { self._tab.get::>(Postscript::VT_DTYPE, None)} + unsafe { self._tab.get::(Postscript::VT_DTYPE, None)} } #[inline] - pub fn file_layout(&self) -> Option> { + pub fn file_layout(&self) -> Option<&'a Segment> { // Safety: // Created from valid Table for this object // which contains a valid value in this slot - unsafe { self._tab.get::>(Postscript::VT_FILE_LAYOUT, None)} + unsafe { self._tab.get::(Postscript::VT_FILE_LAYOUT, None)} } } @@ -310,15 +420,15 @@ impl flatbuffers::Verifiable for Postscript<'_> { ) -> Result<(), flatbuffers::InvalidFlatbuffer> { use self::flatbuffers::Verifiable; v.visit_table(pos)? - .visit_field::>("dtype", Self::VT_DTYPE, false)? - .visit_field::>("file_layout", Self::VT_FILE_LAYOUT, false)? + .visit_field::("dtype", Self::VT_DTYPE, false)? + .visit_field::("file_layout", Self::VT_FILE_LAYOUT, false)? .finish(); Ok(()) } } pub struct PostscriptArgs<'a> { - pub dtype: Option>>, - pub file_layout: Option>>, + pub dtype: Option<&'a Segment>, + pub file_layout: Option<&'a Segment>, } impl<'a> Default for PostscriptArgs<'a> { #[inline] @@ -336,12 +446,12 @@ pub struct PostscriptBuilder<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> { } impl<'a: 'b, 'b, A: flatbuffers::Allocator + 'a> PostscriptBuilder<'a, 'b, A> { #[inline] - pub fn add_dtype(&mut self, dtype: flatbuffers::WIPOffset>) { - self.fbb_.push_slot_always::>(Postscript::VT_DTYPE, dtype); + pub fn add_dtype(&mut self, dtype: &Segment) { + self.fbb_.push_slot_always::<&Segment>(Postscript::VT_DTYPE, dtype); } #[inline] - pub fn add_file_layout(&mut self, file_layout: flatbuffers::WIPOffset>) { - self.fbb_.push_slot_always::>(Postscript::VT_FILE_LAYOUT, file_layout); + pub fn add_file_layout(&mut self, file_layout: &Segment) { + self.fbb_.push_slot_always::<&Segment>(Postscript::VT_FILE_LAYOUT, file_layout); } #[inline] pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a, A>) -> PostscriptBuilder<'a, 'b, A> { diff --git a/vortex-ipc/src/messages/encoder.rs b/vortex-ipc/src/messages/encoder.rs index 0b67239ae6..fac3328be1 100644 --- a/vortex-ipc/src/messages/encoder.rs +++ b/vortex-ipc/src/messages/encoder.rs @@ -1,12 +1,11 @@ use bytes::Bytes; -use flatbuffers::{FlatBufferBuilder, WIPOffset}; -use itertools::Itertools; -use vortex_array::stats::ArrayStatistics; +use flatbuffers::FlatBufferBuilder; +use vortex_array::parts::ArrayPartsFlatBuffer; use vortex_array::{flatbuffers as fba, ArrayData}; use vortex_buffer::ByteBuffer; use vortex_dtype::DType; use vortex_error::{vortex_panic, VortexExpect}; -use vortex_flatbuffers::{message as fb, FlatBufferRoot, WriteFlatBuffer}; +use vortex_flatbuffers::{message as fb, WriteFlatBuffer}; use crate::ALIGNMENT; @@ -78,11 +77,7 @@ impl MessageEncoder { let header = match message { EncoderMessage::Array(array) => { let row_count = array.len(); - let array_def = ArrayWriter { - array, - buffer_idx: 0, - } - .write_flatbuffer(&mut fbb); + let fb_array = ArrayPartsFlatBuffer::new(array).write_flatbuffer(&mut fbb); let mut fb_buffers = vec![]; for child in array.depth_first_traversal() { @@ -110,7 +105,7 @@ impl MessageEncoder { fba::ArrayData::create( &mut fbb, &fba::ArrayDataArgs { - array: Some(array_def), + array: Some(fb_array), row_count: row_count as u64, buffers: Some(fb_buffers), }, @@ -172,69 +167,3 @@ impl MessageEncoder { buffers } } - -struct ArrayWriter<'a> { - array: &'a ArrayData, - buffer_idx: u16, -} - -impl FlatBufferRoot for ArrayWriter<'_> {} - -impl WriteFlatBuffer for ArrayWriter<'_> { - type Target<'t> = fba::Array<'t>; - - fn write_flatbuffer<'fb>( - &self, - fbb: &mut FlatBufferBuilder<'fb>, - ) -> WIPOffset> { - let encoding = self.array.encoding().id().code(); - let metadata = self - .array - .metadata_bytes() - .vortex_expect("IPCArray is missing metadata during serialization"); - let metadata = Some(fbb.create_vector(metadata.as_ref())); - - // Assign buffer indices for all child arrays. - // The second tuple element holds the buffer_index for this Array subtree. If this array - // has a buffer, that is its buffer index. If it does not, that buffer index belongs - // to one of the children. - let nbuffers = u16::try_from(self.array.nbuffers()) - .vortex_expect("Array can have at most u16::MAX buffers"); - let child_buffer_idx = self.buffer_idx + nbuffers; - - let children = self - .array - .children() - .iter() - .scan(child_buffer_idx, |buffer_idx, child| { - // Update the number of buffers required. - let msg = ArrayWriter { - array: child, - buffer_idx: *buffer_idx, - } - .write_flatbuffer(fbb); - *buffer_idx = u16::try_from(child.cumulative_nbuffers()) - .ok() - .and_then(|nbuffers| nbuffers.checked_add(*buffer_idx)) - .vortex_expect("Too many buffers (u16) for ArrayData"); - Some(msg) - }) - .collect_vec(); - let children = Some(fbb.create_vector(&children)); - - let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx))); - - let stats = Some(self.array.statistics().write_flatbuffer(fbb)); - - fba::Array::create( - fbb, - &fba::ArrayArgs { - encoding, - metadata, - children, - buffers, - stats, - }, - ) - } -} diff --git a/vortex-layout/src/data.rs b/vortex-layout/src/data.rs index 1d8be62394..1d5b07021e 100644 --- a/vortex-layout/src/data.rs +++ b/vortex-layout/src/data.rs @@ -217,6 +217,17 @@ impl LayoutData { } } + /// Returns the number of segments in the layout. + pub fn nsegments(&self) -> usize { + match &self.0 { + Inner::Owned(owned) => owned.segments.as_ref().map_or(0, |segments| segments.len()), + Inner::Viewed(viewed) => viewed + .flatbuffer() + .segments() + .map_or(0, |segments| segments.len()), + } + } + /// Fetch the i'th segment id of the layout. pub fn segment_id(&self, i: usize) -> Option { match &self.0 { @@ -232,6 +243,11 @@ impl LayoutData { } } + /// Iterate the segment IDs of the layout. + pub fn segments(&self) -> impl Iterator + '_ { + (0..self.nsegments()).map(move |i| self.segment_id(i).vortex_expect("segment bounds")) + } + /// Returns the layout metadata pub fn metadata(&self) -> Option { match &self.0 { diff --git a/vortex-layout/src/layouts/flat/evaluator.rs b/vortex-layout/src/layouts/flat/evaluator.rs index 0107995cd9..1c4156217c 100644 --- a/vortex-layout/src/layouts/flat/evaluator.rs +++ b/vortex-layout/src/layouts/flat/evaluator.rs @@ -1,33 +1,50 @@ use async_trait::async_trait; +use flatbuffers::root; +use futures::future::try_join_all; use vortex_array::compute::filter; +use vortex_array::parts::ArrayParts; use vortex_array::ArrayData; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; +use vortex_error::{vortex_err, VortexExpect, VortexResult}; use vortex_expr::ExprRef; -use vortex_ipc::messages::{BufMessageReader, DecoderMessage}; +use vortex_flatbuffers::{array as fba, FlatBuffer}; use vortex_scan::{AsyncEvaluator, RowMask}; use crate::layouts::flat::reader::FlatReader; use crate::reader::LayoutScanExt; +use crate::LayoutReader; #[async_trait(?Send)] impl AsyncEvaluator for FlatReader { async fn evaluate(self: &Self, row_mask: RowMask, expr: ExprRef) -> VortexResult { - // Grab the byte buffer for the segment. - let bytes = self.segments().get(self.segment_id()).await?; - - // Decode the ArrayParts from the message bytes. - // TODO(ngates): ArrayParts should probably live in vortex-array, and not required - // IPC message framing to read or write. - let mut msg_reader = BufMessageReader::new(bytes); - let array = if let DecoderMessage::Array(parts) = msg_reader - .next() - .ok_or_else(|| vortex_err!("Flat message body missing"))?? - { - parts.decode(self.ctx(), self.dtype().clone()) - } else { - vortex_bail!("Flat message is not ArrayParts") - }?; + // Fetch all the array buffers. + let mut buffers = try_join_all( + self.layout() + .segments() + .map(|segment_id| self.segments().get(segment_id)), + ) + .await?; + // Pop the array flatbuffer. + let flatbuffer = FlatBuffer::try_from( + buffers + .pop() + .ok_or_else(|| vortex_err!("Flat message missing"))?, + )?; + + let row_count = usize::try_from(self.layout().row_count()) + .vortex_expect("FlatLayout row count does not fit within usize"); + + let array_parts = ArrayParts::new( + row_count, + root::(flatbuffer.as_ref()).vortex_expect("Invalid fba::Array flatbuffer"), + flatbuffer.clone(), + buffers, + ); + + // Decode into an ArrayData. + let array = array_parts.decode(self.ctx(), self.dtype().clone())?; + + // And finally apply the expression // TODO(ngates): what's the best order to apply the filter mask / expression? let array = expr.evaluate(&array)?; filter(&array, row_mask.into_filter_mask()?) diff --git a/vortex-layout/src/layouts/flat/reader.rs b/vortex-layout/src/layouts/flat/reader.rs index 4468e24c65..2524f3c078 100644 --- a/vortex-layout/src/layouts/flat/reader.rs +++ b/vortex-layout/src/layouts/flat/reader.rs @@ -1,23 +1,18 @@ use std::sync::Arc; use vortex_array::ContextRef; -use vortex_error::{vortex_err, vortex_panic, VortexResult}; +use vortex_error::{vortex_panic, VortexResult}; use vortex_scan::AsyncEvaluator; use crate::layouts::flat::FlatLayout; use crate::reader::LayoutReader; -use crate::segments::{AsyncSegmentReader, SegmentId}; +use crate::segments::AsyncSegmentReader; use crate::{LayoutData, LayoutEncoding}; pub struct FlatReader { layout: LayoutData, ctx: ContextRef, segments: Arc, - // The segment ID of the array in this FlatLayout. - // NOTE(ngates): we don't cache the ArrayData here since the cache lives for as long as the - // reader does, which means likely holding a strong reference to the array for much longer - // than necessary, and potentially causing a memory leak. - segment_id: SegmentId, } impl FlatReader { @@ -30,15 +25,10 @@ impl FlatReader { vortex_panic!("Mismatched layout ID") } - let segment_id = layout - .segment_id(0) - .ok_or_else(|| vortex_err!("FlatLayout missing SegmentID"))?; - Ok(Self { layout, ctx, segments, - segment_id, }) } @@ -49,10 +39,6 @@ impl FlatReader { pub(crate) fn segments(&self) -> &dyn AsyncSegmentReader { self.segments.as_ref() } - - pub(crate) fn segment_id(&self) -> SegmentId { - self.segment_id - } } impl LayoutReader for FlatReader { diff --git a/vortex-layout/src/layouts/flat/writer.rs b/vortex-layout/src/layouts/flat/writer.rs index 8f992709c0..97967f92ad 100644 --- a/vortex-layout/src/layouts/flat/writer.rs +++ b/vortex-layout/src/layouts/flat/writer.rs @@ -1,6 +1,8 @@ +use vortex_array::parts::ArrayPartsFlatBuffer; use vortex_array::ArrayData; use vortex_dtype::DType; use vortex_error::{vortex_bail, vortex_err, VortexResult}; +use vortex_flatbuffers::WriteFlatBufferExt; use crate::layouts::flat::FlatLayout; use crate::segments::SegmentWriter; @@ -32,12 +34,27 @@ impl LayoutWriter for FlatLayoutWriter { vortex_bail!("FlatLayoutStrategy::push_batch called after finish"); } let row_count = chunk.len() as u64; - let segment_id = segments.put_chunk(chunk); + + // We store each Array buffer in its own segment. + let mut segment_ids = vec![]; + for child in chunk.depth_first_traversal() { + for buffer in child.byte_buffers() { + // TODO(ngates): decide a way of splitting buffers if they exceed u32 size. + // We could write empty segments either side of buffers to concatenate? + // Or we could use Layout::metadata to store this information. + segment_ids.push(segments.put(buffer)); + } + } + + // ...followed by a FlatBuffer describing the array layout. + let flatbuffer = ArrayPartsFlatBuffer::new(&chunk).write_flatbuffer_bytes(); + segment_ids.push(segments.put(flatbuffer.into_inner())); + self.layout = Some(LayoutData::new_owned( &FlatLayout, self.dtype.clone(), row_count, - Some(vec![segment_id]), + Some(segment_ids), None, None, )); diff --git a/vortex-layout/src/segments/mod.rs b/vortex-layout/src/segments/mod.rs index e0dc148c9b..143b0fa47d 100644 --- a/vortex-layout/src/segments/mod.rs +++ b/vortex-layout/src/segments/mod.rs @@ -1,10 +1,8 @@ use std::ops::Deref; use async_trait::async_trait; -use bytes::Bytes; -use vortex_array::ArrayData; +use vortex_buffer::ByteBuffer; use vortex_error::VortexResult; -use vortex_ipc::messages::{EncoderMessage, MessageEncoder}; /// The identifier for a single segment. // TODO(ngates): should this be a `[u8]` instead? Allowing for arbitrary segment identifiers? @@ -30,50 +28,38 @@ pub trait AsyncSegmentReader: Send + Sync { /// Attempt to get the data associated with a given segment ID. /// /// If the segment ID is not found, `None` is returned. - async fn get(&self, id: SegmentId) -> VortexResult; + async fn get(&self, id: SegmentId) -> VortexResult; } pub trait SegmentWriter { /// Write the given data into a segment and return its identifier. /// The provided buffers are concatenated together to form the segment. - fn put(&mut self, data: Vec) -> SegmentId; - - // TODO(ngates): convert this to take an `ArrayParts` so it's obvious to the caller that the - // serialized message does not include the array's length or dtype. - // TODO(ngates): do not use the IPC message encoder since it adds extra unnecessary framing. - fn put_chunk(&mut self, array: ArrayData) -> SegmentId { - self.put(MessageEncoder::default().encode(EncoderMessage::Array(&array))) - } + fn put(&mut self, buffer: ByteBuffer) -> SegmentId; } #[cfg(test)] pub mod test { - use bytes::{Bytes, BytesMut}; use vortex_error::{vortex_err, VortexExpect}; use super::*; #[derive(Default)] pub struct TestSegments { - segments: Vec, + segments: Vec, } impl SegmentWriter for TestSegments { - fn put(&mut self, data: Vec) -> SegmentId { + fn put(&mut self, data: ByteBuffer) -> SegmentId { let id = u32::try_from(self.segments.len()) .vortex_expect("Cannot store more than u32::MAX segments"); - let mut buffer = BytesMut::with_capacity(data.iter().map(Bytes::len).sum()); - for bytes in data { - buffer.extend_from_slice(&bytes); - } - self.segments.push(buffer.freeze()); + self.segments.push(data); id.into() } } #[async_trait] impl AsyncSegmentReader for TestSegments { - async fn get(&self, id: SegmentId) -> VortexResult { + async fn get(&self, id: SegmentId) -> VortexResult { self.segments .get(*id as usize) .cloned()