Skip to content

Commit

Permalink
Support opening Vortex files without I/O (#1920)
Browse files Browse the repository at this point in the history
Part of #1676
  • Loading branch information
gatesn authored Jan 13, 2025
1 parent 0c2b023 commit 99eb574
Showing 1 changed file with 87 additions and 55 deletions.
142 changes: 87 additions & 55 deletions vortex-file/src/v2/open/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub struct VortexOpenOptions {
layout_ctx: LayoutContextRef,
/// An optional, externally provided, file layout.
file_layout: Option<FileLayout>,
/// An optional, externally provided, dtype.
dtype: Option<DType>,
/// An optional, externally provided, file size.
file_size: Option<u64>,
// TODO(ngates): also support a messages_middleware that can wrap a message cache to provide
// additional caching, metrics, or other intercepts, etc. It should support synchronous
// read + write of Map<MessageId, ByteBuffer> or similar.
Expand All @@ -51,7 +51,7 @@ impl VortexOpenOptions {
ctx,
layout_ctx: LayoutContextRef::default(),
file_layout: None,
dtype: None,
file_size: None,
initial_read_size: INITIAL_READ_SIZE,
split_by: SplitBy::Layout,
segment_cache: None,
Expand All @@ -60,6 +60,24 @@ impl VortexOpenOptions {
}
}

/// Configure a known file layout.
///
/// If this is provided, then the Vortex file can be opened without performing any I/O.
/// Once open, the [`FileLayout`] can be accessed via [`VortexFile::file_layout`].
pub fn with_file_layout(mut self, file_layout: FileLayout) -> Self {
self.file_layout = Some(file_layout);
self
}

/// Configure a known file size.
///
/// This helps to prevent an I/O request to discover the size of the file.
/// Of course, all bets are off if you pass an incorrect value.
pub fn with_file_size(mut self, file_size: u64) -> Self {
self.file_size = Some(file_size);
self
}

/// Configure the initial read size for the Vortex file.
pub fn with_initial_read_size(mut self, initial_read_size: u64) -> VortexResult<Self> {
if self.initial_read_size < u16::MAX as u64 {
Expand Down Expand Up @@ -91,9 +109,60 @@ impl VortexOpenOptions {

impl VortexOpenOptions {
/// Open the Vortex file using asynchronous IO.
pub async fn open<R: VortexReadAt>(self, read: R) -> VortexResult<VortexFile<FileIoDriver<R>>> {
pub async fn open<R: VortexReadAt>(
mut self,
read: R,
) -> VortexResult<VortexFile<FileIoDriver<R>>> {
// Set up our segment cache.
let segment_cache = self
.segment_cache
.as_ref()
.cloned()
.unwrap_or_else(|| Arc::new(InMemorySegmentCache::default()));

// If we need to read the file layout, then do so.
let file_layout = match self.file_layout.take() {
None => self.read_file_layout(&read, segment_cache.as_ref()).await?,
Some(file_layout) => file_layout,
};

// Set up the I/O driver.
let io_driver = FileIoDriver {
read,
file_layout: file_layout.clone(),
concurrency: self.io_concurrency,
};

// Set up the execution driver.
let exec_driver = self
.execution_mode
.unwrap_or(ExecutionMode::Inline)
.into_driver();

// Compute the splits of the file.
let splits = self.split_by.splits(&file_layout.root_layout)?.into();

// Finally, create the VortexFile.
Ok(VortexFile {
ctx: self.ctx.clone(),
file_layout,
io_driver,
exec_driver,
splits,
})
}

/// Read the [`FileLayout`] from the file.
async fn read_file_layout<R: VortexReadAt>(
&self,
read: &R,
segment_cache: &dyn SegmentCache,
) -> VortexResult<FileLayout> {
// Fetch the file size and perform the initial read.
let file_size = read.size().await?;
let file_size = match self.file_size {
None => read.size().await?,
Some(file_size) => file_size,
};
let initial_read_size = self.initial_read_size.min(file_size);
let initial_offset = file_size - initial_read_size;
let initial_read: ByteBuffer = read
Expand All @@ -105,11 +174,10 @@ impl VortexOpenOptions {
let postscript = self.parse_postscript(&initial_read)?;

// Check if we need to read more bytes.
// NOTE(ngates): for now, we assume the dtype and layout segments are adjacent.
let (initial_offset, initial_read) = if (self.dtype.is_none()
&& postscript.dtype.offset < initial_offset)
|| (self.file_layout.is_none() && postscript.file_layout.offset < initial_offset)
let (initial_offset, initial_read) = if (postscript.dtype.offset < initial_offset)
|| (postscript.file_layout.offset < initial_offset)
{
// NOTE(ngates): for now, we assume the dtype and layout segments are adjacent.
let offset = postscript.dtype.offset.min(postscript.file_layout.offset);
let mut new_initial_read =
ByteBufferMut::with_capacity(usize::try_from(file_size - offset)?);
Expand All @@ -125,10 +193,9 @@ impl VortexOpenOptions {
};

// Now we try to read the DType and Layout segments.
let dtype = self.dtype.clone().unwrap_or_else(|| {
self.parse_dtype(initial_offset, &initial_read, postscript.dtype)
.vortex_expect("Failed to parse dtype")
});
let dtype = self
.parse_dtype(initial_offset, &initial_read, postscript.dtype)
.vortex_expect("Failed to parse dtype");
let file_layout = self.file_layout.clone().unwrap_or_else(|| {
self.parse_file_layout(
initial_offset,
Expand All @@ -139,44 +206,12 @@ impl VortexOpenOptions {
.vortex_expect("Failed to parse file layout")
});

// Set up our segment cache and for good measure, we populate any segments that were
// covered by the initial read.
let segment_cache = self
.segment_cache
.as_ref()
.cloned()
.unwrap_or_else(|| Arc::new(InMemorySegmentCache::default()));
self.populate_segments(
initial_offset,
&initial_read,
&file_layout,
segment_cache.as_ref(),
)?;

// Set up the I/O driver.
let io_driver = FileIoDriver {
read,
file_layout: file_layout.clone(),
concurrency: self.io_concurrency,
};

// Set up the execution driver.
let exec_driver = self
.execution_mode
.unwrap_or(ExecutionMode::Inline)
.into_driver();
// If the initial read happened to cover any segments, then we can populate the
// segment cache
self.populate_segments(initial_offset, &initial_read, &file_layout, segment_cache)
.await?;

// Compute the splits of the file.
let splits = self.split_by.splits(&file_layout.root_layout)?.into();

// Finally, create the VortexFile.
Ok(VortexFile {
ctx: self.ctx.clone(),
file_layout,
io_driver,
exec_driver,
splits,
})
Ok(file_layout)
}

/// Parse the postscript from the initial read.
Expand Down Expand Up @@ -268,8 +303,7 @@ impl VortexOpenOptions {
}

/// Populate segments in the cache that were covered by the initial read.
#[allow(unused_variables)]
fn populate_segments(
async fn populate_segments(
&self,
initial_offset: u64,
initial_read: &ByteBuffer,
Expand All @@ -285,9 +319,7 @@ impl VortexOpenOptions {
let offset = usize::try_from(segment.offset - initial_offset)?;
let buffer = initial_read.slice(offset..offset + (segment.length as usize));

// FIXME(ngates): how should we write into the segment cache? Feels like it should be
// non-blocking and on some other thread?
// segments.put(segment_id, buffer)?;
segments.put(segment_id, buffer).await?;
}
Ok(())
}
Expand Down

0 comments on commit 99eb574

Please sign in to comment.