Skip to content

Commit

Permalink
I/O Driver (#1897)
Browse files Browse the repository at this point in the history
Create a basic implementation of an I/O driver so we can hook the new
layouts up to DataFusion which requires sendable record batch streams.
  • Loading branch information
gatesn authored Jan 10, 2025
1 parent f97c0cd commit 4a5b819
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 96 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ futures = { workspace = true, features = ["std"] }
futures-executor = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
pin-project-lite = { workspace = true }
rayon = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
tracing = { workspace = true, optional = true }
vortex-array = { workspace = true }
Expand Down
152 changes: 113 additions & 39 deletions vortex-file/src/v2/file.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::task::{Context, Poll};

use futures::pin_mut;
use futures_util::future::poll_fn;
use futures_util::{stream, TryFutureExt};
use futures::channel::oneshot;
use futures::Stream;
use futures_executor::block_on;
use futures_util::{stream, StreamExt, TryStreamExt};
use pin_project_lite::pin_project;
use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
use vortex_array::ContextRef;
use vortex_array::{ArrayData, ContextRef};
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_error::{vortex_err, VortexExpect, VortexResult};
use vortex_io::VortexReadAt;
use vortex_layout::{ExprEvaluator, LayoutData, LayoutReader};
use vortex_scan::Scan;
Expand All @@ -18,10 +21,9 @@ use crate::v2::segments::cache::SegmentCache;
pub struct VortexFile<R> {
pub(crate) ctx: ContextRef,
pub(crate) layout: LayoutData,
pub(crate) segments: Arc<SegmentCache<R>>,
// TODO(ngates): not yet used by the file reader
#[allow(dead_code)]
pub(crate) segments: SegmentCache<R>,
pub(crate) splits: Arc<[Range<u64>]>,
pub(crate) thread_pool: Arc<rayon::ThreadPool>,
}

impl<R> VortexFile<R> {}
Expand All @@ -39,43 +41,115 @@ impl<R: VortexReadAt + Unpin> VortexFile<R> {
}

/// Performs a scan operation over the file.
pub fn scan(&self, scan: Arc<Scan>) -> VortexResult<impl ArrayStream + '_> {
pub fn scan(self, scan: Arc<Scan>) -> VortexResult<impl ArrayStream + 'static> {
// Create a shared reader for the scan.
let reader: Arc<dyn LayoutReader> = self
.layout
.reader(self.segments.clone(), self.ctx.clone())?;
.reader(self.segments.reader(), self.ctx.clone())?;
let result_dtype = scan.result_dtype(self.dtype())?;

// For each row-group, we set up a future that will evaluate the scan and post its.
let row_group_driver = stream::iter(ArcIter::new(self.splits.clone()))
.map(move |row_range| {
let (send, recv) = oneshot::channel();
let reader = reader.clone();
let range_scan = scan.clone().range_scan(row_range);

// Launch the scan task onto the thread pool.
self.thread_pool.spawn_fifo(move || {
let array_result =
range_scan.and_then(|range_scan| {
block_on(range_scan.evaluate_async(|row_mask, expr| {
reader.evaluate_expr(row_mask, expr)
}))
});
// Post the result back to the main thread
send.send(array_result)
.map_err(|_| vortex_err!("send failed, recv dropped"))
.vortex_expect("send_failed, recv dropped");
});

// TODO(ngates): we could query the layout for splits and then process them in parallel.
// For now, we just scan the entire layout with one mask.
// Note that to implement this we would use stream::try_unfold
let stream = stream::once(async move {
// TODO(ngates): we should launch the evaluate_async onto a worker thread pool.
let row_range = 0..self.layout.row_count();

let eval = scan
.range_scan(row_range)?
.evaluate_async(|row_mask, expr| reader.evaluate_expr(row_mask, expr));
pin_mut!(eval);

poll_fn(|cx| {
// Now we alternate between polling the eval task and driving the I/O.
loop {
if let Poll::Ready(array) = eval.try_poll_unpin(cx) {
return Poll::Ready(array);
}
let drive = self.segments.drive();
pin_mut!(drive);
match drive.try_poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => return Poll::Pending,
}
}
recv
})
.await
});
.then(|recv| async move {
recv.await
.unwrap_or_else(|_cancelled| Err(vortex_err!("recv failed, send dropped")))
});
// TODO(ngates): we should call buffered(n) on this stream so that is launches multiple
// splits to run in parallel. Currently we use block_on, so there's no point this being
// any higher than the size of the thread pool. If we switch to running LocalExecutor,
// then there may be some value in slightly over-subscribing.

// Set up an I/O driver that will make progress on 32 I/O requests at a time.
// TODO(ngates): we should probably have segments hold an Arc'd driver stream internally
// so that multiple scans can poll it, while still sharing the same global concurrency
// limit?
let io_driver = self.segments.driver().buffered(32);

Ok(ArrayStreamAdapter::new(
result_dtype,
ScanDriver {
row_group_driver,
io_driver,
},
))
}
}

pin_project! {
struct ScanDriver<R, S> {
#[pin]
row_group_driver: R,
#[pin]
io_driver: S,
}
}

impl<R, S> Stream for ScanDriver<R, S>
where
R: Stream<Item = VortexResult<ArrayData>>,
S: Stream<Item = VortexResult<()>>,
{
type Item = VortexResult<ArrayData>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
// If the row group driver is ready, then we can return the result.
if let Poll::Ready(r) = this.row_group_driver.try_poll_next_unpin(cx) {
return Poll::Ready(r);
}
// Otherwise, we try to poll the I/O driver.
// If the I/O driver is not ready, then we return Pending and wait for I/
// to wake up the driver.
if matches!(this.io_driver.as_mut().poll_next(cx), Poll::Pending) {
return Poll::Pending;
}
}
}
}

/// There is no `IntoIterator` for `Arc<[T]>` so to avoid copying into a Vec<T>, we define our own.
/// See <https://users.rust-lang.org/t/arc-to-owning-iterator/115190/11>.
struct ArcIter<T> {
inner: Arc<[T]>,
pos: usize,
}

impl<T> ArcIter<T> {
fn new(inner: Arc<[T]>) -> Self {
Self { inner, pos: 0 }
}
}

impl<T: Clone> Iterator for ArcIter<T> {
type Item = T;

Ok(ArrayStreamAdapter::new(result_dtype, stream))
fn next(&mut self) -> Option<Self::Item> {
(self.pos < self.inner.len()).then(|| {
let item = self.inner[self.pos].clone();
self.pos += 1;
item
})
}
}
7 changes: 6 additions & 1 deletion vortex-file/src/v2/open/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,13 @@ impl VortexOpenOptions {
Ok(VortexFile {
ctx: self.ctx.clone(),
layout: file_layout.root_layout,
segments: Arc::new(segment_cache),
segments: segment_cache,
splits,
thread_pool: Arc::new(
rayon::ThreadPoolBuilder::new()
.build()
.map_err(|e| vortex_err!("Failed to create thread pool: {:?}", e))?,
),
})
}

Expand Down
119 changes: 67 additions & 52 deletions vortex-file/src/v2/segments/cache.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
//! The segment reader provides an async interface to layouts for resolving individual segments.
use std::sync::{Arc, RwLock};
use std::future::Future;
use std::sync::Arc;

use async_trait::async_trait;
use futures::channel::oneshot;
use futures_util::future::try_join_all;
use futures_util::TryFutureExt;
use itertools::Itertools;
use vortex_array::aliases::hash_map::HashMap;
use futures::channel::{mpsc, oneshot};
use futures::Stream;
use futures_util::{stream, SinkExt, StreamExt, TryFutureExt};
use vortex_buffer::ByteBuffer;
use vortex_error::{vortex_err, VortexResult};
use vortex_io::VortexReadAt;
Expand All @@ -18,75 +17,91 @@ use crate::v2::footer::Segment;
pub(crate) struct SegmentCache<R> {
read: R,
segments: Arc<[Segment]>,
inflight: RwLock<HashMap<SegmentId, Vec<oneshot::Sender<ByteBuffer>>>>,
request_send: mpsc::UnboundedSender<SegmentRequest>,
request_recv: mpsc::UnboundedReceiver<SegmentRequest>,
}

struct SegmentRequest {
// The ID of the requested segment
id: SegmentId,
// The one-shot channel to send the segment back to the caller
callback: oneshot::Sender<ByteBuffer>,
}

impl<R> SegmentCache<R> {
pub fn new(read: R, segments: Arc<[Segment]>) -> Self {
let (send, recv) = mpsc::unbounded();
Self {
read,
segments,
inflight: RwLock::new(HashMap::new()),
request_send: send,
request_recv: recv,
}
}

pub fn set(&mut self, _segment_id: SegmentId, _bytes: ByteBuffer) -> VortexResult<()> {
// Do nothing for now
Ok(())
}

/// Returns a reader for the segment cache.
pub fn reader(&self) -> Arc<dyn AsyncSegmentReader + 'static> {
Arc::new(SegmentCacheReader(self.request_send.clone()))
}
}

impl<R: VortexReadAt> SegmentCache<R> {
impl<R: VortexReadAt + Unpin> SegmentCache<R> {
/// Drives the segment cache.
pub(crate) async fn drive(&self) -> VortexResult<()>
where
Self: Unpin,
{
// Grab a read lock and collect a set of segments to read.
let segment_ids = self
.inflight
.read()
.map_err(|_| vortex_err!("poisoned"))?
.iter()
.filter_map(|(id, channels)| (!channels.is_empty()).then_some(*id))
.collect::<Vec<_>>();

// Read all the segments.
let buffers = try_join_all(segment_ids.iter().map(|id| {
let segment = &self.segments[**id as usize];
self.read
.read_byte_range(segment.offset, segment.length as u64)
.map_ok(|bytes| ByteBuffer::from(bytes).aligned(segment.alignment))
}))
.await?;

// Send the buffers to the waiting channels.
let mut inflight = self.inflight.write().map_err(|_| vortex_err!("poisoned"))?;
for (id, buffer) in segment_ids.into_iter().zip_eq(buffers.into_iter()) {
let channels = inflight
.remove(&id)
.ok_or_else(|| vortex_err!("missing inflight segment"))?;
for sender in channels {
sender
.send(buffer.clone())
.map_err(|_| vortex_err!("receiver dropped"))?;
}
}

Ok(())
pub(crate) fn driver(
self,
) -> impl Stream<Item = impl Future<Output = VortexResult<()>>> + 'static {
self.request_recv
// The more chunks we grab, the better visibility we have to perform coalescing.
// Since we know this stream is finite (number of segments in the file), then we
// can just shove in a very high capacity. Rest assured the internal Vec is not
// pre-allocated with this capacity.
.ready_chunks(100_000)
// TODO(ngates): now we should flat_map the requests to split them into coalesced
// read operations.
.flat_map(stream::iter)
.map(move |request| {
let read = self.read.clone();
let segments = self.segments.clone();
async move {
let segment = &segments[*request.id as usize];
let bytes = read
.read_byte_range(segment.offset, segment.length as u64)
.map_ok(|bytes| ByteBuffer::from(bytes).aligned(segment.alignment))
.await?;
request
.callback
.send(bytes)
.map_err(|_| vortex_err!("receiver dropped"))?;
Ok(())
}
})
}
}

struct SegmentCacheReader(mpsc::UnboundedSender<SegmentRequest>);

#[async_trait]
impl<R: VortexReadAt> AsyncSegmentReader for SegmentCache<R> {
impl AsyncSegmentReader for SegmentCacheReader {
async fn get(&self, id: SegmentId) -> VortexResult<ByteBuffer> {
// Set up a channel to send the segment back to the caller.
let (send, recv) = oneshot::channel();
self.inflight
.write()
.map_err(|_| vortex_err!("poisoned"))?
.entry(id)
.or_default()
.push(send);

// TODO(ngates): attempt to resolve the segments from the cache before joining the
// request queue.

// Send a request to the segment cache.
self.0
.clone()
.send(SegmentRequest { id, callback: send })
.await
.map_err(|e| vortex_err!("Failed to request segment {:?}", e))?;

// Await the callback
recv.await
.map_err(|cancelled| vortex_err!("segment read cancelled: {:?}", cancelled))
}
Expand Down
1 change: 0 additions & 1 deletion vortex-layout/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-expr = { workspace = true }
vortex-flatbuffers = { workspace = true, features = ["layout"] }
vortex-ipc = { workspace = true }
vortex-scalar = { workspace = true }
vortex-scan = { workspace = true }

Expand Down
Loading

0 comments on commit 4a5b819

Please sign in to comment.