Skip to content

Commit

Permalink
Arc layout scan (#1825)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Jan 6, 2025
1 parent 1f894d3 commit 8ce0d8e
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 40 deletions.
7 changes: 2 additions & 5 deletions vortex-layout/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::ops::Deref;
use std::sync::Arc;

use bytes::Bytes;
use flatbuffers::{root, FlatBufferBuilder, WIPOffset};
Expand Down Expand Up @@ -243,11 +244,7 @@ impl LayoutData {
}

/// Create a scan of this layout.
pub fn new_scan(
self,
scan: Scan,
ctx: ContextRef,
) -> VortexResult<Box<dyn LayoutScan + 'static>> {
pub fn new_scan(self, scan: Scan, ctx: ContextRef) -> VortexResult<Arc<dyn LayoutScan>> {
self.encoding().scan(self, scan, ctx)
}
}
Expand Down
3 changes: 2 additions & 1 deletion vortex-layout/src/encoding.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;

use vortex_array::ContextRef;
use vortex_error::VortexResult;
Expand Down Expand Up @@ -27,7 +28,7 @@ pub trait LayoutEncoding: Debug + Send + Sync {
layout: LayoutData,
scan: Scan,
ctx: ContextRef,
) -> VortexResult<Box<dyn LayoutScan>>;
) -> VortexResult<Arc<dyn LayoutScan>>;
}

pub type LayoutEncodingRef = &'static dyn LayoutEncoding;
6 changes: 4 additions & 2 deletions vortex-layout/src/layouts/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ mod scan;
pub mod stats;
pub mod writer;

use std::sync::Arc;

use vortex_array::ContextRef;
use vortex_error::VortexResult;

Expand Down Expand Up @@ -30,7 +32,7 @@ impl LayoutEncoding for ChunkedLayout {
layout: LayoutData,
scan: Scan,
ctx: ContextRef,
) -> VortexResult<Box<dyn LayoutScan>> {
Ok(ChunkedScan::try_new(layout, scan, ctx)?.boxed())
) -> VortexResult<Arc<dyn LayoutScan>> {
Ok(ChunkedScan::try_new(layout, scan, ctx)?.into_arc())
}
}
27 changes: 15 additions & 12 deletions vortex-layout/src/layouts/chunked/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ use crate::{LayoutData, LayoutEncoding, RowMask};
/// boundary we will read the chunk twice). However, if we have overlapping row ranges, as can
/// happen if the parent is performing multiple scans (filter + projection), then we may read the
/// same chunk many times.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct ChunkedScan {
layout: LayoutData,
scan: Scan,
dtype: DType,
ctx: ContextRef,
// Shared stats table scanner
stats_scanner: Arc<RwLock<Box<dyn Scanner>>>,
stats_scanner: RwLock<Box<dyn Scanner>>,
// Cached pruning mask for the scan
pruning_mask: Arc<OnceLock<Option<BooleanBuffer>>>,
pruning_mask: OnceLock<Option<BooleanBuffer>>,
// Shared lazy chunk scanners
chunk_scans: Arc<[OnceLock<Box<dyn LayoutScan>>]>,
chunk_scans: Vec<OnceLock<Arc<dyn LayoutScan>>>,
// The stats that are present in the layout
present_stats: Arc<[Stat]>,
}
Expand Down Expand Up @@ -106,8 +106,8 @@ impl ChunkedScan {
scan,
dtype,
ctx,
stats_scanner: Arc::new(RwLock::new(stats_scanner)),
pruning_mask: Arc::new(OnceLock::new()),
stats_scanner: RwLock::new(stats_scanner),
pruning_mask: OnceLock::new(),
chunk_scans,
present_stats,
})
Expand All @@ -123,9 +123,9 @@ impl LayoutScan for ChunkedScan {
&self.dtype
}

fn create_scanner(&self, mask: RowMask) -> VortexResult<Box<dyn Scanner>> {
fn create_scanner(self: Arc<Self>, mask: RowMask) -> VortexResult<Box<dyn Scanner>> {
Ok(Box::new(ChunkedScanner {
chunked_scan: self.clone(),
chunked_scan: self,
mask,
chunk_states: None,
}) as _)
Expand All @@ -135,7 +135,7 @@ impl LayoutScan for ChunkedScan {
/// A scanner for a chunked layout.
#[derive(Debug)]
struct ChunkedScanner {
chunked_scan: ChunkedScan,
chunked_scan: Arc<ChunkedScan>,
mask: RowMask,
// State for each chunk in the layout
chunk_states: Option<Vec<ChunkState>>,
Expand Down Expand Up @@ -226,7 +226,9 @@ impl Scanner for ChunkedScanner {
.mask
.slice(chunk_range.start, chunk_range.end)?
.shift(chunk_range.start)?;
chunks.push(ChunkState::Pending(chunk_scan.create_scanner(chunk_mask)?));
chunks.push(ChunkState::Pending(
chunk_scan.clone().create_scanner(chunk_mask)?,
));
}

self.chunk_states = Some(chunks);
Expand Down Expand Up @@ -288,6 +290,7 @@ enum PollStats {
#[cfg(test)]
mod test {
use std::assert_matches::assert_matches;
use std::sync::Arc;

use vortex_array::{ArrayLen, IntoArrayData, IntoArrayVariant};
use vortex_buffer::buffer;
Expand Down Expand Up @@ -326,7 +329,7 @@ mod test {
let (segments, layout) = chunked_layout();

let scan = layout.new_scan(Scan::all(), Default::default()).unwrap();
let result = segments.do_scan(scan.as_ref()).into_primitive().unwrap();
let result = segments.do_scan(scan).into_primitive().unwrap();

assert_eq!(result.len(), 9);
assert_eq!(result.as_slice::<i32>(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
Expand Down Expand Up @@ -354,7 +357,7 @@ mod test {
_ = scan.stats_scanner.write().unwrap().poll(&segments).unwrap();

let mut scanner = ChunkedScanner {
chunked_scan: scan,
chunked_scan: Arc::new(scan),
mask: RowMask::new_valid_between(0, 9),
chunk_states: None,
};
Expand Down
6 changes: 4 additions & 2 deletions vortex-layout/src/layouts/flat/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod scan;
pub mod writer;

use std::sync::Arc;

use vortex_array::ContextRef;
use vortex_error::VortexResult;

Expand All @@ -22,7 +24,7 @@ impl LayoutEncoding for FlatLayout {
layout: LayoutData,
scan: Scan,
ctx: ContextRef,
) -> VortexResult<Box<dyn LayoutScan>> {
Ok(FlatScan::try_new(layout, scan, ctx)?.boxed())
) -> VortexResult<Arc<dyn LayoutScan>> {
Ok(FlatScan::try_new(layout, scan, ctx)?.into_arc())
}
}
15 changes: 6 additions & 9 deletions vortex-layout/src/layouts/flat/scan.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use vortex_array::compute::{fill_null, filter, FilterMask};
use vortex_array::{ArrayData, ContextRef};
use vortex_dtype::DType;
Expand Down Expand Up @@ -41,7 +43,7 @@ impl LayoutScan for FlatScan {
&self.dtype
}

fn create_scanner(&self, mask: RowMask) -> VortexResult<Box<dyn Scanner>> {
fn create_scanner(self: Arc<Self>, mask: RowMask) -> VortexResult<Box<dyn Scanner>> {
let segment_id = self
.layout
.segment_id(0)
Expand Down Expand Up @@ -147,12 +149,7 @@ mod test {
.unwrap();

let result = segments
.do_scan(
layout
.new_scan(Scan::all(), Default::default())
.unwrap()
.as_ref(),
)
.do_scan(layout.new_scan(Scan::all(), Default::default()).unwrap())
.into_primitive()
.unwrap();

Expand All @@ -177,7 +174,7 @@ mod test {
};

let result = segments
.do_scan(layout.new_scan(scan, Default::default()).unwrap().as_ref())
.do_scan(layout.new_scan(scan, Default::default()).unwrap())
.into_primitive()
.unwrap();

Expand Down Expand Up @@ -209,7 +206,7 @@ mod test {
let scan = layout.new_scan(scan, Default::default()).unwrap();
assert_eq!(scan.dtype(), &DType::Bool(Nullability::Nullable));

let result = segments.do_scan(scan.as_ref()).into_bool().unwrap();
let result = segments.do_scan(scan).into_bool().unwrap();
assert!(result.boolean_buffer().value(0));
assert!(!result.boolean_buffer().value(1));
}
Expand Down
6 changes: 4 additions & 2 deletions vortex-layout/src/layouts/struct_/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod scan;
pub mod writer;

use std::sync::Arc;

use vortex_array::ContextRef;
use vortex_error::VortexResult;

Expand All @@ -23,7 +25,7 @@ impl LayoutEncoding for StructLayout {
layout: LayoutData,
scan: Scan,
ctx: ContextRef,
) -> VortexResult<Box<dyn LayoutScan>> {
Ok(StructScan::try_new(layout, scan, ctx)?.boxed())
) -> VortexResult<Arc<dyn LayoutScan>> {
Ok(StructScan::try_new(layout, scan, ctx)?.into_arc())
}
}
4 changes: 3 additions & 1 deletion vortex-layout/src/layouts/struct_/scan.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use vortex_array::ContextRef;
use vortex_dtype::DType;
use vortex_error::{vortex_panic, VortexResult};
Expand Down Expand Up @@ -40,7 +42,7 @@ impl LayoutScan for StructScan {
&self.dtype
}

fn create_scanner(&self, mask: RowMask) -> VortexResult<Box<dyn Scanner>> {
fn create_scanner(self: Arc<Self>, mask: RowMask) -> VortexResult<Box<dyn Scanner>> {
Ok(Box::new(StructScanner {
layout: self.layout.clone(),
scan: self.scan.clone(),
Expand Down
9 changes: 5 additions & 4 deletions vortex-layout/src/scanner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod scan;

use std::fmt::Debug;
use std::sync::Arc;

pub use scan::*;
use vortex_array::ArrayData;
Expand All @@ -23,16 +24,16 @@ pub trait LayoutScan: 'static + Send + Sync + Debug {
/// Note that since a [`Scanner`] returns a single ArrayData, the caller is responsible for
/// ensuring the working set and result of the scan fit into memory. The [`LayoutData`] can
/// be asked for "splits" if the caller needs a hint for how to partition the scan.
fn create_scanner(&self, mask: RowMask) -> VortexResult<Box<dyn Scanner>>;
fn create_scanner(self: Arc<Self>, mask: RowMask) -> VortexResult<Box<dyn Scanner>>;
}

pub trait LayoutScanExt: LayoutScan {
/// Box the layout scan.
fn boxed(self) -> Box<dyn LayoutScan + 'static>
fn into_arc(self) -> Arc<dyn LayoutScan>
where
Self: Sized + 'static,
Self: Sized,
{
Box::new(self)
Arc::new(self) as _
}
}

Expand Down
7 changes: 5 additions & 2 deletions vortex-layout/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub trait SegmentWriter {

#[cfg(test)]
pub mod test {
use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use vortex_error::{vortex_panic, VortexExpect};

Expand All @@ -59,9 +61,10 @@ pub mod test {
}

impl TestSegments {
pub fn do_scan(&self, scan: &dyn LayoutScan) -> ArrayData {
pub fn do_scan(&self, scan: Arc<dyn LayoutScan>) -> ArrayData {
let row_count = scan.layout().row_count();
let mut scanner = scan
.create_scanner(RowMask::new_valid_between(0, scan.layout().row_count()))
.create_scanner(RowMask::new_valid_between(0, row_count))
.vortex_expect("Failed to create scanner");
match scanner.poll(self).vortex_expect("Failed to poll scanner") {
Poll::Some(array) => array,
Expand Down

0 comments on commit 8ce0d8e

Please sign in to comment.