From e682bce889dc3c20da9ce9b2d51c4ba3acdf18fa Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Thu, 28 Nov 2024 05:44:42 -0800 Subject: [PATCH] Scan::execute takes an Arc now --- acceptance/src/data.rs | 4 +- .../read-table-single-threaded/src/main.rs | 8 ++-- kernel/src/scan/mod.rs | 45 ++++++++++--------- kernel/tests/common/mod.rs | 8 ++-- kernel/tests/dv.rs | 13 +++--- kernel/tests/golden_tables.rs | 3 +- kernel/tests/read.rs | 30 ++++++------- kernel/tests/write.rs | 4 +- 8 files changed, 59 insertions(+), 56 deletions(-) diff --git a/acceptance/src/data.rs b/acceptance/src/data.rs index e341d2e52..9fae63cee 100644 --- a/acceptance/src/data.rs +++ b/acceptance/src/data.rs @@ -120,11 +120,9 @@ pub async fn assert_scan_data(engine: Arc, test_case: &TestCaseInfo) return Ok(()); } } - - let engine = engine.as_ref(); let table_root = test_case.table_root()?; let table = Table::new(table_root); - let snapshot = table.snapshot(engine, None)?; + let snapshot = table.snapshot(engine.as_ref(), None)?; let scan = snapshot.into_scan_builder().build()?; let mut schema = None; let batches: Vec = scan diff --git a/kernel/examples/read-table-single-threaded/src/main.rs b/kernel/examples/read-table-single-threaded/src/main.rs index e11c74428..32ad3173d 100644 --- a/kernel/examples/read-table-single-threaded/src/main.rs +++ b/kernel/examples/read-table-single-threaded/src/main.rs @@ -73,7 +73,7 @@ fn try_main() -> DeltaResult<()> { let table = Table::try_from_uri(&cli.path)?; println!("Reading {}", table.location()); - let engine: Box = match cli.engine { + let engine: Arc = match cli.engine { EngineType::Default => { let mut options = if let Some(region) = cli.region { HashMap::from([("region", region)]) @@ -83,13 +83,13 @@ fn try_main() -> DeltaResult<()> { if cli.public { options.insert("skip_signature", "true".to_string()); } - Box::new(DefaultEngine::try_new( + Arc::new(DefaultEngine::try_new( table.location(), options, Arc::new(TokioBackgroundExecutor::new()), )?) } - EngineType::Sync => Box::new(SyncEngine::new()), + EngineType::Sync => Arc::new(SyncEngine::new()), }; let snapshot = table.snapshot(engine.as_ref(), None)?; @@ -120,7 +120,7 @@ fn try_main() -> DeltaResult<()> { .build()?; let batches: Vec = scan - .execute(engine.as_ref())? + .execute(engine)? .map(|scan_result| -> DeltaResult<_> { let scan_result = scan_result?; let mask = scan_result.full_mask(); diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index c4c5873ef..5cc8e753a 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -261,10 +261,10 @@ impl Scan { /// the execution of the scan. // This calls [`Scan::scan_data`] to get an iterator of `ScanData` actions for the scan, and then uses the // `engine`'s [`crate::ParquetHandler`] to read the actual table data. - pub fn execute<'a>( - &'a self, - engine: &'a dyn Engine, - ) -> DeltaResult> + 'a> { + pub fn execute( + &self, + engine: Arc, + ) -> DeltaResult> + '_> { struct ScanFile { path: String, size: i64, @@ -293,7 +293,7 @@ impl Scan { ); let global_state = Arc::new(self.global_scan_state()); - let scan_data = self.scan_data(engine)?; + let scan_data = self.scan_data(engine.as_ref())?; let scan_files_iter = scan_data .map(|res| { let (data, vec) = res?; @@ -309,7 +309,7 @@ impl Scan { let file_path = self.snapshot.table_root.join(&scan_file.path)?; let mut selection_vector = scan_file .dv_info - .get_selection_vector(engine, &self.snapshot.table_root)?; + .get_selection_vector(engine.as_ref(), &self.snapshot.table_root)?; let meta = FileMeta { last_modified: 0, size: scan_file.size as usize, @@ -320,14 +320,17 @@ impl Scan { global_state.read_schema.clone(), self.predicate(), )?; - let gs = global_state.clone(); // Arc clone + + // Arc clones + let engine = engine.clone(); + let global_state = global_state.clone(); Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> { let read_result = read_result?; // to transform the physical data into the correct logical form let logical = transform_to_logical_internal( - engine, + engine.as_ref(), read_result, - &gs, + &global_state, &scan_file.partition_values, &self.all_fields, self.have_partition_cols, @@ -662,12 +665,12 @@ mod tests { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); - let engine = SyncEngine::new(); + let engine = Arc::new(SyncEngine::new()); let table = Table::new(url); - let snapshot = table.snapshot(&engine, None).unwrap(); + let snapshot = table.snapshot(engine.as_ref(), None).unwrap(); let scan = snapshot.into_scan_builder().build().unwrap(); - let files: Vec = scan.execute(&engine).unwrap().try_collect().unwrap(); + let files: Vec = scan.execute(engine).unwrap().try_collect().unwrap(); assert_eq!(files.len(), 1); let num_rows = files[0].raw_data.as_ref().unwrap().len(); @@ -743,16 +746,16 @@ mod tests { fn test_data_row_group_skipping() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/")); let url = url::Url::from_directory_path(path.unwrap()).unwrap(); - let engine = SyncEngine::new(); + let engine = Arc::new(SyncEngine::new()); let table = Table::new(url); - let snapshot = Arc::new(table.snapshot(&engine, None).unwrap()); + let snapshot = Arc::new(table.snapshot(engine.as_ref(), None).unwrap()); // No predicate pushdown attempted, so the one data file should be returned. // // NOTE: The data file contains only five rows -- near guaranteed to produce one row group. let scan = snapshot.clone().scan_builder().build().unwrap(); - let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap(); + let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap(); assert_eq!(data.len(), 1); // Ineffective predicate pushdown attempted, so the one data file should be returned. @@ -765,7 +768,7 @@ mod tests { .with_predicate(predicate) .build() .unwrap(); - let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap(); + let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap(); assert_eq!(data.len(), 1); // Effective predicate pushdown, so no data files should be returned. @@ -775,7 +778,7 @@ mod tests { .with_predicate(predicate) .build() .unwrap(); - let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap(); + let data: Vec<_> = scan.execute(engine).unwrap().try_collect().unwrap(); assert_eq!(data.len(), 0); } @@ -783,10 +786,10 @@ mod tests { fn test_missing_column_row_group_skipping() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/")); let url = url::Url::from_directory_path(path.unwrap()).unwrap(); - let engine = SyncEngine::new(); + let engine = Arc::new(SyncEngine::new()); let table = Table::new(url); - let snapshot = Arc::new(table.snapshot(&engine, None).unwrap()); + let snapshot = Arc::new(table.snapshot(engine.as_ref(), None).unwrap()); // Predicate over a logically valid but physically missing column. No data files should be // returned because the column is inferred to be all-null. @@ -800,7 +803,7 @@ mod tests { .with_predicate(predicate) .build() .unwrap(); - let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap(); + let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap(); assert_eq!(data.len(), 1); // Predicate over a logically missing column, so the one data file should be returned. @@ -812,7 +815,7 @@ mod tests { .with_predicate(predicate) .build() .unwrap(); - let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap(); + let data: Vec<_> = scan.execute(engine).unwrap().try_collect().unwrap(); assert_eq!(data.len(), 1); } diff --git a/kernel/tests/common/mod.rs b/kernel/tests/common/mod.rs index c219efd61..8eba56248 100644 --- a/kernel/tests/common/mod.rs +++ b/kernel/tests/common/mod.rs @@ -7,6 +7,8 @@ use crate::ArrowEngineData; use delta_kernel::scan::Scan; use delta_kernel::{DeltaResult, Engine, EngineData, Table}; +use std::sync::Arc; + pub(crate) fn to_arrow(data: Box) -> DeltaResult { Ok(data .into_any() @@ -20,9 +22,9 @@ pub(crate) fn to_arrow(data: Box) -> DeltaResult { pub(crate) fn test_read( expected: &ArrowEngineData, table: &Table, - engine: &impl Engine, + engine: Arc, ) -> Result<(), Box> { - let snapshot = table.snapshot(engine, None)?; + let snapshot = table.snapshot(engine.as_ref(), None)?; let scan = snapshot.into_scan_builder().build()?; let batches = read_scan(&scan, engine)?; let formatted = pretty_format_batches(&batches).unwrap().to_string(); @@ -40,7 +42,7 @@ pub(crate) fn test_read( // TODO (zach): this is listed as unused for acceptance crate #[allow(unused)] -pub(crate) fn read_scan(scan: &Scan, engine: &dyn Engine) -> DeltaResult> { +pub(crate) fn read_scan(scan: &Scan, engine: Arc) -> DeltaResult> { let scan_results = scan.execute(engine)?; scan_results .map(|scan_result| -> DeltaResult<_> { diff --git a/kernel/tests/dv.rs b/kernel/tests/dv.rs index 7410f68e1..8dd4a4061 100644 --- a/kernel/tests/dv.rs +++ b/kernel/tests/dv.rs @@ -2,6 +2,7 @@ //! Must run at the root of the crate use std::ops::Add; use std::path::PathBuf; +use std::sync::Arc; use delta_kernel::engine::sync::SyncEngine; use delta_kernel::scan::ScanResult; @@ -29,13 +30,13 @@ fn count_total_scan_rows( fn dv_table() -> Result<(), Box> { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/"))?; let url = url::Url::from_directory_path(path).unwrap(); - let engine = SyncEngine::new(); + let engine = Arc::new(SyncEngine::new()); let table = Table::new(url); - let snapshot = table.snapshot(&engine, None)?; + let snapshot = table.snapshot(engine.as_ref(), None)?; let scan = snapshot.into_scan_builder().build()?; - let stream = scan.execute(&engine)?; + let stream = scan.execute(engine)?; let total_rows = count_total_scan_rows(stream)?; assert_eq!(total_rows, 8); Ok(()) @@ -45,13 +46,13 @@ fn dv_table() -> Result<(), Box> { fn non_dv_table() -> Result<(), Box> { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/"))?; let url = url::Url::from_directory_path(path).unwrap(); - let engine = SyncEngine::new(); + let engine = Arc::new(SyncEngine::new()); let table = Table::new(url); - let snapshot = table.snapshot(&engine, None)?; + let snapshot = table.snapshot(engine.as_ref(), None)?; let scan = snapshot.into_scan_builder().build()?; - let stream = scan.execute(&engine)?; + let stream = scan.execute(engine)?; let total_rows = count_total_scan_rows(stream)?; assert_eq!(total_rows, 10); Ok(()) diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index ea17deb70..a5a1debff 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -180,9 +180,8 @@ async fn latest_snapshot_test( expected_path: Option, ) -> Result<(), Box> { let snapshot = table.snapshot(&engine, None)?; - let scan = snapshot.into_scan_builder().build()?; - let scan_res = scan.execute(&engine)?; + let scan_res = scan.execute(Arc::new(engine))?; let batches: Vec = scan_res .map(|scan_result| -> DeltaResult<_> { let scan_result = scan_result?; diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 96ddf8429..857ed3279 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -56,20 +56,20 @@ async fn single_commit_two_add_files() -> Result<(), Box> .await?; let location = Url::parse("memory:///")?; - let engine = DefaultEngine::new( + let engine = Arc::new(DefaultEngine::new( storage.clone(), Path::from("/"), Arc::new(TokioBackgroundExecutor::new()), - ); + )); let table = Table::new(location); let expected_data = vec![batch.clone(), batch]; - let snapshot = table.snapshot(&engine, None)?; + let snapshot = table.snapshot(engine.as_ref(), None)?; let scan = snapshot.into_scan_builder().build()?; let mut files = 0; - let stream = scan.execute(&engine)?.zip(expected_data); + let stream = scan.execute(engine)?.zip(expected_data); for (data, expected) in stream { let raw_data = data?.raw_data?; @@ -126,7 +126,7 @@ async fn two_commits() -> Result<(), Box> { let scan = snapshot.into_scan_builder().build()?; let mut files = 0; - let stream = scan.execute(&engine)?.zip(expected_data); + let stream = scan.execute(Arc::new(engine))?.zip(expected_data); for (data, expected) in stream { let raw_data = data?.raw_data?; @@ -183,7 +183,7 @@ async fn remove_action() -> Result<(), Box> { let snapshot = table.snapshot(&engine, None)?; let scan = snapshot.into_scan_builder().build()?; - let stream = scan.execute(&engine)?.zip(expected_data); + let stream = scan.execute(Arc::new(engine))?.zip(expected_data); let mut files = 0; for (data, expected) in stream { @@ -247,14 +247,14 @@ async fn stats() -> Result<(), Box> { .await?; let location = Url::parse("memory:///").unwrap(); - let engine = DefaultEngine::new( + let engine = Arc::new(DefaultEngine::new( storage.clone(), Path::from(""), Arc::new(TokioBackgroundExecutor::new()), - ); + )); let table = Table::new(location); - let snapshot = Arc::new(table.snapshot(&engine, None)?); + let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?); // The first file has id between 1 and 3; the second has id between 5 and 7. For each operator, // we validate the boundary values where we expect the set of matched files to change. @@ -306,7 +306,7 @@ async fn stats() -> Result<(), Box> { let expected_files = expected_batches.len(); let mut files_scanned = 0; - let stream = scan.execute(&engine)?.zip(expected_batches); + let stream = scan.execute(engine.clone())?.zip(expected_batches); for (batch, expected) in stream { let raw_data = batch?.raw_data?; @@ -346,7 +346,7 @@ macro_rules! assert_batches_sorted_eq { } fn read_with_execute( - engine: &dyn Engine, + engine: Arc, scan: &Scan, expected: &[String], ) -> Result<(), Box> { @@ -472,10 +472,10 @@ fn read_table_data( )?; let sync_engine = delta_kernel::engine::sync::SyncEngine::new(); - let engines: &[&dyn Engine] = &[&sync_engine, &default_engine]; - for &engine in engines { + let engines: Vec> = vec![Arc::new(sync_engine), Arc::new(default_engine)]; + for engine in engines { let table = Table::new(url.clone()); - let snapshot = table.snapshot(engine, None)?; + let snapshot = table.snapshot(engine.as_ref(), None)?; let read_schema = select_cols.map(|select_cols| { let table_schema = snapshot.schema(); @@ -491,8 +491,8 @@ fn read_table_data( .build()?; sort_lines!(expected); + read_with_scan_data(table.location(), engine.as_ref(), &scan, &expected)?; read_with_execute(engine, &scan, &expected)?; - read_with_scan_data(table.location(), engine, &scan, &expected)?; } Ok(()) } diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 4c67ddf0c..e62f8fd7c 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -450,7 +450,7 @@ async fn test_append() -> Result<(), Box> { ]))], )?), &table, - engine.as_ref(), + engine, )?; Ok(()) } @@ -598,7 +598,7 @@ async fn test_append_partitioned() -> Result<(), Box> { ], )?), &table, - engine.as_ref(), + engine, )?; Ok(()) }