Skip to content

Commit

Permalink
Scan::execute takes an Arc<dyn EngineData> now
Browse files Browse the repository at this point in the history
  • Loading branch information
scovich committed Nov 28, 2024
1 parent 953ceed commit e682bce
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 56 deletions.
4 changes: 1 addition & 3 deletions acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,9 @@ pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo)
return Ok(());
}
}

let engine = engine.as_ref();
let table_root = test_case.table_root()?;
let table = Table::new(table_root);
let snapshot = table.snapshot(engine, None)?;
let snapshot = table.snapshot(engine.as_ref(), None)?;
let scan = snapshot.into_scan_builder().build()?;
let mut schema = None;
let batches: Vec<RecordBatch> = scan
Expand Down
8 changes: 4 additions & 4 deletions kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn try_main() -> DeltaResult<()> {
let table = Table::try_from_uri(&cli.path)?;
println!("Reading {}", table.location());

let engine: Box<dyn Engine> = match cli.engine {
let engine: Arc<dyn Engine> = match cli.engine {
EngineType::Default => {
let mut options = if let Some(region) = cli.region {
HashMap::from([("region", region)])
Expand All @@ -83,13 +83,13 @@ fn try_main() -> DeltaResult<()> {
if cli.public {
options.insert("skip_signature", "true".to_string());
}
Box::new(DefaultEngine::try_new(
Arc::new(DefaultEngine::try_new(
table.location(),
options,
Arc::new(TokioBackgroundExecutor::new()),
)?)
}
EngineType::Sync => Box::new(SyncEngine::new()),
EngineType::Sync => Arc::new(SyncEngine::new()),
};

let snapshot = table.snapshot(engine.as_ref(), None)?;
Expand Down Expand Up @@ -120,7 +120,7 @@ fn try_main() -> DeltaResult<()> {
.build()?;

let batches: Vec<RecordBatch> = scan
.execute(engine.as_ref())?
.execute(engine)?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
Expand Down
45 changes: 24 additions & 21 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,10 @@ impl Scan {
/// the execution of the scan.
// This calls [`Scan::scan_data`] to get an iterator of `ScanData` actions for the scan, and then uses the
// `engine`'s [`crate::ParquetHandler`] to read the actual table data.
pub fn execute<'a>(
&'a self,
engine: &'a dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>> + 'a> {
pub fn execute(
&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>> + '_> {
struct ScanFile {
path: String,
size: i64,
Expand Down Expand Up @@ -293,7 +293,7 @@ impl Scan {
);

let global_state = Arc::new(self.global_scan_state());
let scan_data = self.scan_data(engine)?;
let scan_data = self.scan_data(engine.as_ref())?;
let scan_files_iter = scan_data
.map(|res| {
let (data, vec) = res?;
Expand All @@ -309,7 +309,7 @@ impl Scan {
let file_path = self.snapshot.table_root.join(&scan_file.path)?;
let mut selection_vector = scan_file
.dv_info
.get_selection_vector(engine, &self.snapshot.table_root)?;
.get_selection_vector(engine.as_ref(), &self.snapshot.table_root)?;
let meta = FileMeta {
last_modified: 0,
size: scan_file.size as usize,
Expand All @@ -320,14 +320,17 @@ impl Scan {
global_state.read_schema.clone(),
self.predicate(),
)?;
let gs = global_state.clone(); // Arc clone

// Arc clones
let engine = engine.clone();
let global_state = global_state.clone();
Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> {
let read_result = read_result?;
// to transform the physical data into the correct logical form
let logical = transform_to_logical_internal(
engine,
engine.as_ref(),
read_result,
&gs,
&global_state,
&scan_file.partition_values,
&self.all_fields,
self.have_partition_cols,
Expand Down Expand Up @@ -662,12 +665,12 @@ mod tests {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let engine = Arc::new(SyncEngine::new());

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let snapshot = table.snapshot(engine.as_ref(), None).unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let files: Vec<ScanResult> = scan.execute(&engine).unwrap().try_collect().unwrap();
let files: Vec<ScanResult> = scan.execute(engine).unwrap().try_collect().unwrap();

assert_eq!(files.len(), 1);
let num_rows = files[0].raw_data.as_ref().unwrap().len();
Expand Down Expand Up @@ -743,16 +746,16 @@ mod tests {
fn test_data_row_group_skipping() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();
let engine = Arc::new(SyncEngine::new());

let table = Table::new(url);
let snapshot = Arc::new(table.snapshot(&engine, None).unwrap());
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None).unwrap());

// No predicate pushdown attempted, so the one data file should be returned.
//
// NOTE: The data file contains only five rows -- near guaranteed to produce one row group.
let scan = snapshot.clone().scan_builder().build().unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);

// Ineffective predicate pushdown attempted, so the one data file should be returned.
Expand All @@ -765,7 +768,7 @@ mod tests {
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);

// Effective predicate pushdown, so no data files should be returned.
Expand All @@ -775,18 +778,18 @@ mod tests {
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
let data: Vec<_> = scan.execute(engine).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 0);
}

#[test]
fn test_missing_column_row_group_skipping() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();
let engine = Arc::new(SyncEngine::new());

let table = Table::new(url);
let snapshot = Arc::new(table.snapshot(&engine, None).unwrap());
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None).unwrap());

// Predicate over a logically valid but physically missing column. No data files should be
// returned because the column is inferred to be all-null.
Expand All @@ -800,7 +803,7 @@ mod tests {
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);

// Predicate over a logically missing column, so the one data file should be returned.
Expand All @@ -812,7 +815,7 @@ mod tests {
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
let data: Vec<_> = scan.execute(engine).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);
}

Expand Down
8 changes: 5 additions & 3 deletions kernel/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::ArrowEngineData;
use delta_kernel::scan::Scan;
use delta_kernel::{DeltaResult, Engine, EngineData, Table};

use std::sync::Arc;

pub(crate) fn to_arrow(data: Box<dyn EngineData>) -> DeltaResult<RecordBatch> {
Ok(data
.into_any()
Expand All @@ -20,9 +22,9 @@ pub(crate) fn to_arrow(data: Box<dyn EngineData>) -> DeltaResult<RecordBatch> {
pub(crate) fn test_read(
expected: &ArrowEngineData,
table: &Table,
engine: &impl Engine,
engine: Arc<dyn Engine>,
) -> Result<(), Box<dyn std::error::Error>> {
let snapshot = table.snapshot(engine, None)?;
let snapshot = table.snapshot(engine.as_ref(), None)?;
let scan = snapshot.into_scan_builder().build()?;
let batches = read_scan(&scan, engine)?;
let formatted = pretty_format_batches(&batches).unwrap().to_string();
Expand All @@ -40,7 +42,7 @@ pub(crate) fn test_read(

// TODO (zach): this is listed as unused for acceptance crate
#[allow(unused)]
pub(crate) fn read_scan(scan: &Scan, engine: &dyn Engine) -> DeltaResult<Vec<RecordBatch>> {
pub(crate) fn read_scan(scan: &Scan, engine: Arc<dyn Engine>) -> DeltaResult<Vec<RecordBatch>> {
let scan_results = scan.execute(engine)?;
scan_results
.map(|scan_result| -> DeltaResult<_> {
Expand Down
13 changes: 7 additions & 6 deletions kernel/tests/dv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! Must run at the root of the crate
use std::ops::Add;
use std::path::PathBuf;
use std::sync::Arc;

use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::scan::ScanResult;
Expand Down Expand Up @@ -29,13 +30,13 @@ fn count_total_scan_rows(
fn dv_table() -> Result<(), Box<dyn std::error::Error>> {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/"))?;
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let engine = Arc::new(SyncEngine::new());

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None)?;
let snapshot = table.snapshot(engine.as_ref(), None)?;
let scan = snapshot.into_scan_builder().build()?;

let stream = scan.execute(&engine)?;
let stream = scan.execute(engine)?;
let total_rows = count_total_scan_rows(stream)?;
assert_eq!(total_rows, 8);
Ok(())
Expand All @@ -45,13 +46,13 @@ fn dv_table() -> Result<(), Box<dyn std::error::Error>> {
fn non_dv_table() -> Result<(), Box<dyn std::error::Error>> {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/"))?;
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let engine = Arc::new(SyncEngine::new());

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None)?;
let snapshot = table.snapshot(engine.as_ref(), None)?;
let scan = snapshot.into_scan_builder().build()?;

let stream = scan.execute(&engine)?;
let stream = scan.execute(engine)?;
let total_rows = count_total_scan_rows(stream)?;
assert_eq!(total_rows, 10);
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions kernel/tests/golden_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,8 @@ async fn latest_snapshot_test(
expected_path: Option<PathBuf>,
) -> Result<(), Box<dyn std::error::Error>> {
let snapshot = table.snapshot(&engine, None)?;

let scan = snapshot.into_scan_builder().build()?;
let scan_res = scan.execute(&engine)?;
let scan_res = scan.execute(Arc::new(engine))?;
let batches: Vec<RecordBatch> = scan_res
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
Expand Down
30 changes: 15 additions & 15 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>>
.await?;

let location = Url::parse("memory:///")?;
let engine = DefaultEngine::new(
let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
));

let table = Table::new(location);
let expected_data = vec![batch.clone(), batch];

let snapshot = table.snapshot(&engine, None)?;
let snapshot = table.snapshot(engine.as_ref(), None)?;
let scan = snapshot.into_scan_builder().build()?;

let mut files = 0;
let stream = scan.execute(&engine)?.zip(expected_data);
let stream = scan.execute(engine)?.zip(expected_data);

for (data, expected) in stream {
let raw_data = data?.raw_data?;
Expand Down Expand Up @@ -126,7 +126,7 @@ async fn two_commits() -> Result<(), Box<dyn std::error::Error>> {
let scan = snapshot.into_scan_builder().build()?;

let mut files = 0;
let stream = scan.execute(&engine)?.zip(expected_data);
let stream = scan.execute(Arc::new(engine))?.zip(expected_data);

for (data, expected) in stream {
let raw_data = data?.raw_data?;
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn remove_action() -> Result<(), Box<dyn std::error::Error>> {
let snapshot = table.snapshot(&engine, None)?;
let scan = snapshot.into_scan_builder().build()?;

let stream = scan.execute(&engine)?.zip(expected_data);
let stream = scan.execute(Arc::new(engine))?.zip(expected_data);

let mut files = 0;
for (data, expected) in stream {
Expand Down Expand Up @@ -247,14 +247,14 @@ async fn stats() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let location = Url::parse("memory:///").unwrap();
let engine = DefaultEngine::new(
let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from(""),
Arc::new(TokioBackgroundExecutor::new()),
);
));

let table = Table::new(location);
let snapshot = Arc::new(table.snapshot(&engine, None)?);
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);

// The first file has id between 1 and 3; the second has id between 5 and 7. For each operator,
// we validate the boundary values where we expect the set of matched files to change.
Expand Down Expand Up @@ -306,7 +306,7 @@ async fn stats() -> Result<(), Box<dyn std::error::Error>> {

let expected_files = expected_batches.len();
let mut files_scanned = 0;
let stream = scan.execute(&engine)?.zip(expected_batches);
let stream = scan.execute(engine.clone())?.zip(expected_batches);

for (batch, expected) in stream {
let raw_data = batch?.raw_data?;
Expand Down Expand Up @@ -346,7 +346,7 @@ macro_rules! assert_batches_sorted_eq {
}

fn read_with_execute(
engine: &dyn Engine,
engine: Arc<dyn Engine>,
scan: &Scan,
expected: &[String],
) -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -472,10 +472,10 @@ fn read_table_data(
)?;
let sync_engine = delta_kernel::engine::sync::SyncEngine::new();

let engines: &[&dyn Engine] = &[&sync_engine, &default_engine];
for &engine in engines {
let engines: Vec<Arc<dyn Engine>> = vec![Arc::new(sync_engine), Arc::new(default_engine)];
for engine in engines {
let table = Table::new(url.clone());
let snapshot = table.snapshot(engine, None)?;
let snapshot = table.snapshot(engine.as_ref(), None)?;

let read_schema = select_cols.map(|select_cols| {
let table_schema = snapshot.schema();
Expand All @@ -491,8 +491,8 @@ fn read_table_data(
.build()?;

sort_lines!(expected);
read_with_scan_data(table.location(), engine.as_ref(), &scan, &expected)?;
read_with_execute(engine, &scan, &expected)?;
read_with_scan_data(table.location(), engine, &scan, &expected)?;
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions kernel/tests/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
]))],
)?),
&table,
engine.as_ref(),
engine,
)?;
Ok(())
}
Expand Down Expand Up @@ -598,7 +598,7 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
],
)?),
&table,
engine.as_ref(),
engine,
)?;
Ok(())
}
Expand Down

0 comments on commit e682bce

Please sign in to comment.