Skip to content

Commit

Permalink
Segment Alignment (#1883)
Browse files Browse the repository at this point in the history
Instead of storing these in the segment map, we could store these in the
Array flatbuffer, and then can pass them into `SegmentReader::get(id,
alignment)`.

The problem with this is that it would require two round trips to load a
FlatLayout. One to fetch the array flatbuffer, then we know the
alignment of the data buffers, and then a second RT to fetch the data
buffers. Unless of course we store data buffer alignment in the
FlatLayout metadata? That could work actually...

FLUP: move ArrayData flatbuffer into the IPC definition as an
ArrayMessage. Move row_count out of ArrayParts.

Open Questions:
* Who should control / configure compression + encryption? I don't think
this should be handled inside the segment writer, since it doesn't know
what type of segment it is (data, flatbuffer, etc.). Instead, if the
layouts themselves request encryption / compression, then we can
configure different properties for different layouts. e.g. different
encryption key for each column.
  • Loading branch information
gatesn authored Jan 10, 2025
1 parent aeadc23 commit b706f21
Show file tree
Hide file tree
Showing 17 changed files with 487 additions and 311 deletions.
107 changes: 104 additions & 3 deletions vortex-array/src/parts.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use std::fmt::{Debug, Formatter};

use flatbuffers::Follow;
use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset};
use itertools::Itertools;
use vortex_buffer::ByteBuffer;
use vortex_dtype::DType;
use vortex_error::{vortex_panic, VortexResult};
use vortex_flatbuffers::{array as fba, FlatBuffer};
use vortex_error::{vortex_panic, VortexExpect, VortexResult};
use vortex_flatbuffers::{
array as fba, FlatBuffer, FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt,
};

use crate::stats::ArrayStatistics;
use crate::{ArrayData, ContextRef};

/// [`ArrayParts`] represents the information from an [`ArrayData`] that makes up the serialized
Expand All @@ -14,6 +18,7 @@ use crate::{ArrayData, ContextRef};
///
/// An [`ArrayParts`] can be fully decoded into an [`ArrayData`] using the `decode` function.
pub struct ArrayParts {
// TODO(ngates): I think we should remove this. It's not required in the serialized form.
row_count: usize,
// Typed as fb::Array
flatbuffer: FlatBuffer,
Expand Down Expand Up @@ -74,3 +79,99 @@ impl ArrayParts {
)
}
}

/// Convert an [`ArrayData`] into [`ArrayParts`].
impl From<ArrayData> for ArrayParts {
fn from(array: ArrayData) -> Self {
let flatbuffer = ArrayPartsFlatBuffer {
array: &array,
buffer_idx: 0,
}
.write_flatbuffer_bytes();
let mut buffers: Vec<ByteBuffer> = vec![];
for child in array.depth_first_traversal() {
for buffer in child.byte_buffers() {
buffers.push(buffer);
}
}
Self {
row_count: array.len(),
flatbuffer,
flatbuffer_loc: 0,
buffers,
}
}
}

/// A utility struct for creating an [`fba::Array`] flatbuffer.
pub struct ArrayPartsFlatBuffer<'a> {
array: &'a ArrayData,
buffer_idx: u16,
}

impl<'a> ArrayPartsFlatBuffer<'a> {
pub fn new(array: &'a ArrayData) -> Self {
Self {
array,
buffer_idx: 0,
}
}
}

impl FlatBufferRoot for ArrayPartsFlatBuffer<'_> {}

impl WriteFlatBuffer for ArrayPartsFlatBuffer<'_> {
type Target<'t> = fba::Array<'t>;

fn write_flatbuffer<'fb>(
&self,
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
let encoding = self.array.encoding().id().code();
let metadata = self
.array
.metadata_bytes()
.vortex_expect("IPCArray is missing metadata during serialization");
let metadata = Some(fbb.create_vector(metadata.as_ref()));

// Assign buffer indices for all child arrays.
let nbuffers = u16::try_from(self.array.nbuffers())
.vortex_expect("Array can have at most u16::MAX buffers");
let child_buffer_idx = self.buffer_idx + nbuffers;

let children = self
.array
.children()
.iter()
.scan(child_buffer_idx, |buffer_idx, child| {
// Update the number of buffers required.
let msg = ArrayPartsFlatBuffer {
array: child,
buffer_idx: *buffer_idx,
}
.write_flatbuffer(fbb);
*buffer_idx = u16::try_from(child.cumulative_nbuffers())
.ok()
.and_then(|nbuffers| nbuffers.checked_add(*buffer_idx))
.vortex_expect("Too many buffers (u16) for ArrayData");
Some(msg)
})
.collect_vec();
let children = Some(fbb.create_vector(&children));

let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));

let stats = Some(self.array.statistics().write_flatbuffer(fbb));

fba::Array::create(
fbb,
&fba::ArrayArgs {
encoding,
metadata,
children,
buffers,
stats,
},
)
}
}
23 changes: 23 additions & 0 deletions vortex-buffer/src/alignment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ impl Alignment {
// of trailing zeros in the binary representation of the alignment is greater or equal.
self.0.trailing_zeros() >= other.0.trailing_zeros()
}

/// Returns the log2 of the alignment.
pub fn exponent(&self) -> u8 {
u8::try_from(self.0.trailing_zeros())
.vortex_expect("alignment fits into u16, so exponent fits in u7")
}

/// Create from the log2 exponent of the alignment.
///
/// ## Panics
///
/// Panics if `alignment` is not a power of 2, or is greater than `u16::MAX`.
#[inline]
pub const fn from_exponent(exponent: u8) -> Self {
Self::new(1 << exponent)
}
}

impl Display for Alignment {
Expand Down Expand Up @@ -120,6 +136,13 @@ mod test {
Alignment::new(3);
}

#[test]
fn alignment_exponent() {
let alignment = Alignment::new(1024);
assert_eq!(alignment.exponent(), 10);
assert_eq!(Alignment::from_exponent(10), alignment);
}

#[test]
fn is_aligned_to() {
assert!(Alignment::new(1).is_aligned_to(Alignment::new(1)));
Expand Down
9 changes: 1 addition & 8 deletions vortex-file/src/v2/footer/file_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,7 @@ impl WriteFlatBuffer for FileLayout {
fbb: &mut flatbuffers::FlatBufferBuilder<'fb>,
) -> flatbuffers::WIPOffset<Self::Target<'fb>> {
let root_layout = self.root_layout.write_flatbuffer(fbb);

let segments = self
.segments
.iter()
.map(|segment| segment.write_flatbuffer(fbb))
.collect::<Vec<_>>();
let segments = fbb.create_vector(&segments);

let segments = fbb.create_vector_from_iter(self.segments.iter().map(fb::Segment::from));
fb::FileLayout::create(
fbb,
&fb::FileLayoutArgs {
Expand Down
16 changes: 8 additions & 8 deletions vortex-file/src/v2/footer/postscript.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ impl WriteFlatBuffer for Postscript {
&self,
fbb: &mut flatbuffers::FlatBufferBuilder<'fb>,
) -> flatbuffers::WIPOffset<Self::Target<'fb>> {
let dtype = self.dtype.write_flatbuffer(fbb);
let file_layout = self.file_layout.write_flatbuffer(fbb);
let dtype = fb::Segment::from(&self.dtype);
let file_layout = fb::Segment::from(&self.file_layout);
fb::Postscript::create(
fbb,
&fb::PostscriptArgs {
dtype: Some(dtype),
file_layout: Some(file_layout),
dtype: Some(&dtype),
file_layout: Some(&file_layout),
},
)
}
Expand All @@ -39,12 +39,12 @@ impl ReadFlatBuffer for Postscript {
fb: &<Self::Source<'buf> as Follow<'buf>>::Inner,
) -> Result<Self, Self::Error> {
Ok(Self {
dtype: Segment::read_flatbuffer(
&fb.dtype()
dtype: Segment::try_from(
fb.dtype()
.ok_or_else(|| vortex_err!("Postscript missing dtype segment"))?,
)?,
file_layout: Segment::read_flatbuffer(
&fb.file_layout()
file_layout: Segment::try_from(
fb.file_layout()
.ok_or_else(|| vortex_err!("Postscript missing file_layout segment"))?,
)?,
})
Expand Down
39 changes: 13 additions & 26 deletions vortex-file/src/v2/footer/segment.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,29 @@
use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset};
use vortex_error::{vortex_err, VortexError};
use vortex_flatbuffers::{footer2 as fb, ReadFlatBuffer, WriteFlatBuffer};
use vortex_buffer::Alignment;
use vortex_error::VortexError;
use vortex_flatbuffers::footer2 as fb;

/// The location of a segment within a Vortex file.
#[derive(Clone, Debug)]
pub(crate) struct Segment {
pub(crate) offset: u64,
pub(crate) length: usize,
pub(crate) length: u32,
pub(crate) alignment: Alignment,
}

impl WriteFlatBuffer for Segment {
type Target<'a> = fb::Segment<'a>;

fn write_flatbuffer<'fb>(
&self,
fbb: &mut FlatBufferBuilder<'fb>,
) -> WIPOffset<Self::Target<'fb>> {
fb::Segment::create(
fbb,
&fb::SegmentArgs {
offset: self.offset,
length: self.length as u64,
},
)
impl From<&Segment> for fb::Segment {
fn from(value: &Segment) -> Self {
fb::Segment::new(value.offset, value.length, value.alignment.exponent(), 0, 0)
}
}

impl ReadFlatBuffer for Segment {
type Source<'a> = fb::Segment<'a>;
impl TryFrom<&fb::Segment> for Segment {
type Error = VortexError;

fn read_flatbuffer<'buf>(
fb: &<Self::Source<'buf> as Follow<'buf>>::Inner,
) -> Result<Self, Self::Error> {
fn try_from(value: &fb::Segment) -> Result<Self, Self::Error> {
Ok(Self {
offset: fb.offset(),
length: usize::try_from(fb.length())
.map_err(|_| vortex_err!("segment length exceeds maximum usize"))?,
offset: value.offset(),
length: value.length(),
alignment: Alignment::from_exponent(value.alignment_exponent()),
})
}
}
13 changes: 5 additions & 8 deletions vortex-file/src/v2/open/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl OpenOptions {
) -> VortexResult<DType> {
let offset = usize::try_from(dtype.offset - initial_offset)?;
let sliced_buffer =
FlatBuffer::align_from(initial_read.slice(offset..offset + dtype.length));
FlatBuffer::align_from(initial_read.slice(offset..offset + (dtype.length as usize)));
let fbd_dtype = root::<fbd::DType>(&sliced_buffer)?;

DType::try_from_view(fbd_dtype, sliced_buffer.clone())
Expand All @@ -198,7 +198,7 @@ impl OpenOptions {
dtype: DType,
) -> VortexResult<FileLayout> {
let offset = usize::try_from(segment.offset - initial_offset)?;
let bytes = initial_read.slice(offset..offset + segment.length);
let bytes = initial_read.slice(offset..offset + (segment.length as usize));

let fb = root::<fb::FileLayout>(&bytes)?;
let fb_root_layout = fb
Expand Down Expand Up @@ -226,10 +226,7 @@ impl OpenOptions {
let fb_segments = fb
.segments()
.ok_or_else(|| vortex_err!("FileLayout missing segments"))?;
let segments = fb_segments
.iter()
.map(|s| Segment::read_flatbuffer(&s))
.try_collect()?;
let segments = fb_segments.iter().map(Segment::try_from).try_collect()?;

Ok(FileLayout {
root_layout,
Expand All @@ -253,9 +250,9 @@ impl OpenOptions {
let segment_id = SegmentId::from(u32::try_from(idx)?);

let offset = usize::try_from(segment.offset - initial_offset)?;
let bytes = initial_read.slice(offset..offset + segment.length);
let buffer = initial_read.slice(offset..offset + (segment.length as usize));

segments.set(segment_id, bytes.into_inner())?;
segments.set(segment_id, buffer)?;
}
Ok(())
}
Expand Down
10 changes: 6 additions & 4 deletions vortex-file/src/v2/segments/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use bytes::Bytes;
use futures::channel::oneshot;
use futures_util::future::try_join_all;
use futures_util::TryFutureExt;
use itertools::Itertools;
use vortex_array::aliases::hash_map::HashMap;
use vortex_buffer::ByteBuffer;
use vortex_error::{vortex_err, VortexResult};
use vortex_io::VortexReadAt;
use vortex_layout::segments::{AsyncSegmentReader, SegmentId};
Expand All @@ -17,7 +18,7 @@ use crate::v2::footer::Segment;
pub(crate) struct SegmentCache<R> {
read: R,
segments: Arc<[Segment]>,
inflight: RwLock<HashMap<SegmentId, Vec<oneshot::Sender<Bytes>>>>,
inflight: RwLock<HashMap<SegmentId, Vec<oneshot::Sender<ByteBuffer>>>>,
}

impl<R> SegmentCache<R> {
Expand All @@ -29,7 +30,7 @@ impl<R> SegmentCache<R> {
}
}

pub fn set(&mut self, _segment_id: SegmentId, _bytes: Bytes) -> VortexResult<()> {
pub fn set(&mut self, _segment_id: SegmentId, _bytes: ByteBuffer) -> VortexResult<()> {
// Do nothing for now
Ok(())
}
Expand All @@ -55,6 +56,7 @@ impl<R: VortexReadAt> SegmentCache<R> {
let segment = &self.segments[**id as usize];
self.read
.read_byte_range(segment.offset, segment.length as u64)
.map_ok(|bytes| ByteBuffer::from(bytes).aligned(segment.alignment))
}))
.await?;

Expand All @@ -77,7 +79,7 @@ impl<R: VortexReadAt> SegmentCache<R> {

#[async_trait]
impl<R: VortexReadAt> AsyncSegmentReader for SegmentCache<R> {
async fn get(&self, id: SegmentId) -> VortexResult<Bytes> {
async fn get(&self, id: SegmentId) -> VortexResult<ByteBuffer> {
let (send, recv) = oneshot::channel();
self.inflight
.write()
Expand Down
Loading

0 comments on commit b706f21

Please sign in to comment.