diff --git a/Cargo.lock b/Cargo.lock index d1d4b0b95a..a8e989e6ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5111,6 +5111,8 @@ dependencies = [ "futures-executor", "futures-util", "itertools 0.14.0", + "pin-project-lite", + "rayon", "rstest", "tokio", "tracing", @@ -5217,7 +5219,6 @@ dependencies = [ "vortex-error", "vortex-expr", "vortex-flatbuffers", - "vortex-ipc", "vortex-scalar", "vortex-scan", ] @@ -5335,7 +5336,6 @@ dependencies = [ name = "vortex-scan" version = "0.21.1" dependencies = [ - "async-trait", "rstest", "vortex-array", "vortex-buffer", diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index 6007f8b8f6..7a02a7f156 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -24,6 +24,8 @@ futures = { workspace = true, features = ["std"] } futures-executor = { workspace = true } futures-util = { workspace = true } itertools = { workspace = true } +pin-project-lite = { workspace = true } +rayon = { workspace = true } tokio = { workspace = true, features = ["rt"] } tracing = { workspace = true, optional = true } vortex-array = { workspace = true } diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index cdb8c5f10b..82a83ff634 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -1,14 +1,17 @@ use std::ops::Range; +use std::pin::Pin; use std::sync::Arc; -use std::task::Poll; +use std::task::{Context, Poll}; -use futures::pin_mut; -use futures_util::future::poll_fn; -use futures_util::{stream, TryFutureExt}; +use futures::channel::oneshot; +use futures::Stream; +use futures_executor::block_on; +use futures_util::{stream, StreamExt, TryStreamExt}; +use pin_project_lite::pin_project; use vortex_array::stream::{ArrayStream, ArrayStreamAdapter}; -use vortex_array::ContextRef; +use vortex_array::{ArrayData, ContextRef}; use vortex_dtype::DType; -use vortex_error::VortexResult; +use vortex_error::{vortex_err, VortexExpect, VortexResult}; use vortex_io::VortexReadAt; use vortex_layout::{ExprEvaluator, LayoutData, LayoutReader}; use vortex_scan::Scan; @@ -18,10 +21,9 @@ use crate::v2::segments::cache::SegmentCache; pub struct VortexFile { pub(crate) ctx: ContextRef, pub(crate) layout: LayoutData, - pub(crate) segments: Arc>, - // TODO(ngates): not yet used by the file reader - #[allow(dead_code)] + pub(crate) segments: SegmentCache, pub(crate) splits: Arc<[Range]>, + pub(crate) thread_pool: Arc, } impl VortexFile {} @@ -39,43 +41,115 @@ impl VortexFile { } /// Performs a scan operation over the file. - pub fn scan(&self, scan: Arc) -> VortexResult { + pub fn scan(self, scan: Arc) -> VortexResult { // Create a shared reader for the scan. let reader: Arc = self .layout - .reader(self.segments.clone(), self.ctx.clone())?; + .reader(self.segments.reader(), self.ctx.clone())?; let result_dtype = scan.result_dtype(self.dtype())?; + // For each row-group, we set up a future that will evaluate the scan and post its. + let row_group_driver = stream::iter(ArcIter::new(self.splits.clone())) + .map(move |row_range| { + let (send, recv) = oneshot::channel(); + let reader = reader.clone(); + let range_scan = scan.clone().range_scan(row_range); + + // Launch the scan task onto the thread pool. + self.thread_pool.spawn_fifo(move || { + let array_result = + range_scan.and_then(|range_scan| { + block_on(range_scan.evaluate_async(|row_mask, expr| { + reader.evaluate_expr(row_mask, expr) + })) + }); + // Post the result back to the main thread + send.send(array_result) + .map_err(|_| vortex_err!("send failed, recv dropped")) + .vortex_expect("send_failed, recv dropped"); + }); - // TODO(ngates): we could query the layout for splits and then process them in parallel. - // For now, we just scan the entire layout with one mask. - // Note that to implement this we would use stream::try_unfold - let stream = stream::once(async move { - // TODO(ngates): we should launch the evaluate_async onto a worker thread pool. - let row_range = 0..self.layout.row_count(); - - let eval = scan - .range_scan(row_range)? - .evaluate_async(|row_mask, expr| reader.evaluate_expr(row_mask, expr)); - pin_mut!(eval); - - poll_fn(|cx| { - // Now we alternate between polling the eval task and driving the I/O. - loop { - if let Poll::Ready(array) = eval.try_poll_unpin(cx) { - return Poll::Ready(array); - } - let drive = self.segments.drive(); - pin_mut!(drive); - match drive.try_poll_unpin(cx) { - Poll::Ready(_) => {} - Poll::Pending => return Poll::Pending, - } - } + recv }) - .await - }); + .then(|recv| async move { + recv.await + .unwrap_or_else(|_cancelled| Err(vortex_err!("recv failed, send dropped"))) + }); + // TODO(ngates): we should call buffered(n) on this stream so that is launches multiple + // splits to run in parallel. Currently we use block_on, so there's no point this being + // any higher than the size of the thread pool. If we switch to running LocalExecutor, + // then there may be some value in slightly over-subscribing. + + // Set up an I/O driver that will make progress on 32 I/O requests at a time. + // TODO(ngates): we should probably have segments hold an Arc'd driver stream internally + // so that multiple scans can poll it, while still sharing the same global concurrency + // limit? + let io_driver = self.segments.driver().buffered(32); + + Ok(ArrayStreamAdapter::new( + result_dtype, + ScanDriver { + row_group_driver, + io_driver, + }, + )) + } +} + +pin_project! { + struct ScanDriver { + #[pin] + row_group_driver: R, + #[pin] + io_driver: S, + } +} + +impl Stream for ScanDriver +where + R: Stream>, + S: Stream>, +{ + type Item = VortexResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { + // If the row group driver is ready, then we can return the result. + if let Poll::Ready(r) = this.row_group_driver.try_poll_next_unpin(cx) { + return Poll::Ready(r); + } + // Otherwise, we try to poll the I/O driver. + // If the I/O driver is not ready, then we return Pending and wait for I/ + // to wake up the driver. + if matches!(this.io_driver.as_mut().poll_next(cx), Poll::Pending) { + return Poll::Pending; + } + } + } +} + +/// There is no `IntoIterator` for `Arc<[T]>` so to avoid copying into a Vec, we define our own. +/// See . +struct ArcIter { + inner: Arc<[T]>, + pos: usize, +} + +impl ArcIter { + fn new(inner: Arc<[T]>) -> Self { + Self { inner, pos: 0 } + } +} + +impl Iterator for ArcIter { + type Item = T; - Ok(ArrayStreamAdapter::new(result_dtype, stream)) + fn next(&mut self) -> Option { + (self.pos < self.inner.len()).then(|| { + let item = self.inner[self.pos].clone(); + self.pos += 1; + item + }) } } diff --git a/vortex-file/src/v2/open/mod.rs b/vortex-file/src/v2/open/mod.rs index 4e5352d5ab..d369ecadae 100644 --- a/vortex-file/src/v2/open/mod.rs +++ b/vortex-file/src/v2/open/mod.rs @@ -141,8 +141,13 @@ impl VortexOpenOptions { Ok(VortexFile { ctx: self.ctx.clone(), layout: file_layout.root_layout, - segments: Arc::new(segment_cache), + segments: segment_cache, splits, + thread_pool: Arc::new( + rayon::ThreadPoolBuilder::new() + .build() + .map_err(|e| vortex_err!("Failed to create thread pool: {:?}", e))?, + ), }) } diff --git a/vortex-file/src/v2/segments/cache.rs b/vortex-file/src/v2/segments/cache.rs index 0fd0ab0b68..bb04b6b5ae 100644 --- a/vortex-file/src/v2/segments/cache.rs +++ b/vortex-file/src/v2/segments/cache.rs @@ -1,13 +1,12 @@ //! The segment reader provides an async interface to layouts for resolving individual segments. -use std::sync::{Arc, RwLock}; +use std::future::Future; +use std::sync::Arc; use async_trait::async_trait; -use futures::channel::oneshot; -use futures_util::future::try_join_all; -use futures_util::TryFutureExt; -use itertools::Itertools; -use vortex_array::aliases::hash_map::HashMap; +use futures::channel::{mpsc, oneshot}; +use futures::Stream; +use futures_util::{stream, SinkExt, StreamExt, TryFutureExt}; use vortex_buffer::ByteBuffer; use vortex_error::{vortex_err, VortexResult}; use vortex_io::VortexReadAt; @@ -18,15 +17,25 @@ use crate::v2::footer::Segment; pub(crate) struct SegmentCache { read: R, segments: Arc<[Segment]>, - inflight: RwLock>>>, + request_send: mpsc::UnboundedSender, + request_recv: mpsc::UnboundedReceiver, +} + +struct SegmentRequest { + // The ID of the requested segment + id: SegmentId, + // The one-shot channel to send the segment back to the caller + callback: oneshot::Sender, } impl SegmentCache { pub fn new(read: R, segments: Arc<[Segment]>) -> Self { + let (send, recv) = mpsc::unbounded(); Self { read, segments, - inflight: RwLock::new(HashMap::new()), + request_send: send, + request_recv: recv, } } @@ -34,59 +43,65 @@ impl SegmentCache { // Do nothing for now Ok(()) } + + /// Returns a reader for the segment cache. + pub fn reader(&self) -> Arc { + Arc::new(SegmentCacheReader(self.request_send.clone())) + } } -impl SegmentCache { +impl SegmentCache { /// Drives the segment cache. - pub(crate) async fn drive(&self) -> VortexResult<()> - where - Self: Unpin, - { - // Grab a read lock and collect a set of segments to read. - let segment_ids = self - .inflight - .read() - .map_err(|_| vortex_err!("poisoned"))? - .iter() - .filter_map(|(id, channels)| (!channels.is_empty()).then_some(*id)) - .collect::>(); - - // Read all the segments. - let buffers = try_join_all(segment_ids.iter().map(|id| { - let segment = &self.segments[**id as usize]; - self.read - .read_byte_range(segment.offset, segment.length as u64) - .map_ok(|bytes| ByteBuffer::from(bytes).aligned(segment.alignment)) - })) - .await?; - - // Send the buffers to the waiting channels. - let mut inflight = self.inflight.write().map_err(|_| vortex_err!("poisoned"))?; - for (id, buffer) in segment_ids.into_iter().zip_eq(buffers.into_iter()) { - let channels = inflight - .remove(&id) - .ok_or_else(|| vortex_err!("missing inflight segment"))?; - for sender in channels { - sender - .send(buffer.clone()) - .map_err(|_| vortex_err!("receiver dropped"))?; - } - } - - Ok(()) + pub(crate) fn driver( + self, + ) -> impl Stream>> + 'static { + self.request_recv + // The more chunks we grab, the better visibility we have to perform coalescing. + // Since we know this stream is finite (number of segments in the file), then we + // can just shove in a very high capacity. Rest assured the internal Vec is not + // pre-allocated with this capacity. + .ready_chunks(100_000) + // TODO(ngates): now we should flat_map the requests to split them into coalesced + // read operations. + .flat_map(stream::iter) + .map(move |request| { + let read = self.read.clone(); + let segments = self.segments.clone(); + async move { + let segment = &segments[*request.id as usize]; + let bytes = read + .read_byte_range(segment.offset, segment.length as u64) + .map_ok(|bytes| ByteBuffer::from(bytes).aligned(segment.alignment)) + .await?; + request + .callback + .send(bytes) + .map_err(|_| vortex_err!("receiver dropped"))?; + Ok(()) + } + }) } } +struct SegmentCacheReader(mpsc::UnboundedSender); + #[async_trait] -impl AsyncSegmentReader for SegmentCache { +impl AsyncSegmentReader for SegmentCacheReader { async fn get(&self, id: SegmentId) -> VortexResult { + // Set up a channel to send the segment back to the caller. let (send, recv) = oneshot::channel(); - self.inflight - .write() - .map_err(|_| vortex_err!("poisoned"))? - .entry(id) - .or_default() - .push(send); + + // TODO(ngates): attempt to resolve the segments from the cache before joining the + // request queue. + + // Send a request to the segment cache. + self.0 + .clone() + .send(SegmentRequest { id, callback: send }) + .await + .map_err(|e| vortex_err!("Failed to request segment {:?}", e))?; + + // Await the callback recv.await .map_err(|cancelled| vortex_err!("segment read cancelled: {:?}", cancelled)) } diff --git a/vortex-layout/Cargo.toml b/vortex-layout/Cargo.toml index f342135498..040cd59f83 100644 --- a/vortex-layout/Cargo.toml +++ b/vortex-layout/Cargo.toml @@ -27,7 +27,6 @@ vortex-dtype = { workspace = true } vortex-error = { workspace = true } vortex-expr = { workspace = true } vortex-flatbuffers = { workspace = true, features = ["layout"] } -vortex-ipc = { workspace = true } vortex-scalar = { workspace = true } vortex-scan = { workspace = true } diff --git a/vortex-scan/Cargo.toml b/vortex-scan/Cargo.toml index cb43a4355c..3634909504 100644 --- a/vortex-scan/Cargo.toml +++ b/vortex-scan/Cargo.toml @@ -14,7 +14,6 @@ readme.workspace = true categories.workspace = true [dependencies] -async-trait = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } vortex-dtype = { workspace = true }