Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scan::execute takes an Arc<dyn EngineData> now #553

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,9 @@ pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo)
return Ok(());
}
}

let engine = engine.as_ref();
let table_root = test_case.table_root()?;
let table = Table::new(table_root);
let snapshot = table.snapshot(engine, None)?;
let snapshot = table.snapshot(engine.as_ref(), None)?;
let scan = snapshot.into_scan_builder().build()?;
let mut schema = None;
let batches: Vec<RecordBatch> = scan
Expand Down
8 changes: 4 additions & 4 deletions kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn try_main() -> DeltaResult<()> {
let table = Table::try_from_uri(&cli.path)?;
println!("Reading {}", table.location());

let engine: Box<dyn Engine> = match cli.engine {
let engine: Arc<dyn Engine> = match cli.engine {
EngineType::Default => {
let mut options = if let Some(region) = cli.region {
HashMap::from([("region", region)])
Expand All @@ -83,13 +83,13 @@ fn try_main() -> DeltaResult<()> {
if cli.public {
options.insert("skip_signature", "true".to_string());
}
Box::new(DefaultEngine::try_new(
Arc::new(DefaultEngine::try_new(
table.location(),
options,
Arc::new(TokioBackgroundExecutor::new()),
)?)
}
EngineType::Sync => Box::new(SyncEngine::new()),
EngineType::Sync => Arc::new(SyncEngine::new()),
};

let snapshot = table.snapshot(engine.as_ref(), None)?;
Expand Down Expand Up @@ -120,7 +120,7 @@ fn try_main() -> DeltaResult<()> {
.build()?;

let batches: Vec<RecordBatch> = scan
.execute(engine.as_ref())?
.execute(engine)?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
Expand Down
45 changes: 24 additions & 21 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,10 @@ impl Scan {
/// the execution of the scan.
// This calls [`Scan::scan_data`] to get an iterator of `ScanData` actions for the scan, and then uses the
// `engine`'s [`crate::ParquetHandler`] to read the actual table data.
pub fn execute<'a>(
&'a self,
engine: &'a dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>> + 'a> {
pub fn execute(
&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>> + '_> {
struct ScanFile {
path: String,
size: i64,
Expand Down Expand Up @@ -293,7 +293,7 @@ impl Scan {
);

let global_state = Arc::new(self.global_scan_state());
let scan_data = self.scan_data(engine)?;
let scan_data = self.scan_data(engine.as_ref())?;
let scan_files_iter = scan_data
.map(|res| {
let (data, vec) = res?;
Expand All @@ -309,7 +309,7 @@ impl Scan {
let file_path = self.snapshot.table_root.join(&scan_file.path)?;
let mut selection_vector = scan_file
.dv_info
.get_selection_vector(engine, &self.snapshot.table_root)?;
.get_selection_vector(engine.as_ref(), &self.snapshot.table_root)?;
let meta = FileMeta {
last_modified: 0,
size: scan_file.size as usize,
Expand All @@ -320,14 +320,17 @@ impl Scan {
global_state.read_schema.clone(),
self.predicate(),
)?;
let gs = global_state.clone(); // Arc clone

// Arc clones
let engine = engine.clone();
let global_state = global_state.clone();
Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> {
let read_result = read_result?;
// to transform the physical data into the correct logical form
let logical = transform_to_logical_internal(
engine,
engine.as_ref(),
read_result,
&gs,
&global_state,
&scan_file.partition_values,
&self.all_fields,
self.have_partition_cols,
Expand Down Expand Up @@ -662,12 +665,12 @@ mod tests {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let engine = Arc::new(SyncEngine::new());

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let snapshot = table.snapshot(engine.as_ref(), None).unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let files: Vec<ScanResult> = scan.execute(&engine).unwrap().try_collect().unwrap();
let files: Vec<ScanResult> = scan.execute(engine).unwrap().try_collect().unwrap();

assert_eq!(files.len(), 1);
let num_rows = files[0].raw_data.as_ref().unwrap().len();
Expand Down Expand Up @@ -743,16 +746,16 @@ mod tests {
fn test_data_row_group_skipping() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();
let engine = Arc::new(SyncEngine::new());

let table = Table::new(url);
let snapshot = Arc::new(table.snapshot(&engine, None).unwrap());
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None).unwrap());

// No predicate pushdown attempted, so the one data file should be returned.
//
// NOTE: The data file contains only five rows -- near guaranteed to produce one row group.
let scan = snapshot.clone().scan_builder().build().unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);

// Ineffective predicate pushdown attempted, so the one data file should be returned.
Expand All @@ -765,7 +768,7 @@ mod tests {
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);

// Effective predicate pushdown, so no data files should be returned.
Expand All @@ -775,18 +778,18 @@ mod tests {
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
let data: Vec<_> = scan.execute(engine).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 0);
}

#[test]
fn test_missing_column_row_group_skipping() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();
let engine = Arc::new(SyncEngine::new());

let table = Table::new(url);
let snapshot = Arc::new(table.snapshot(&engine, None).unwrap());
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None).unwrap());

// Predicate over a logically valid but physically missing column. No data files should be
// returned because the column is inferred to be all-null.
Expand All @@ -800,7 +803,7 @@ mod tests {
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
let data: Vec<_> = scan.execute(engine.clone()).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);

// Predicate over a logically missing column, so the one data file should be returned.
Expand All @@ -812,7 +815,7 @@ mod tests {
.with_predicate(predicate)
.build()
.unwrap();
let data: Vec<_> = scan.execute(&engine).unwrap().try_collect().unwrap();
let data: Vec<_> = scan.execute(engine).unwrap().try_collect().unwrap();
assert_eq!(data.len(), 1);
}

Expand Down
8 changes: 5 additions & 3 deletions kernel/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use crate::ArrowEngineData;
use delta_kernel::scan::Scan;
use delta_kernel::{DeltaResult, Engine, EngineData, Table};

use std::sync::Arc;

pub(crate) fn to_arrow(data: Box<dyn EngineData>) -> DeltaResult<RecordBatch> {
Ok(data
.into_any()
Expand All @@ -20,9 +22,9 @@ pub(crate) fn to_arrow(data: Box<dyn EngineData>) -> DeltaResult<RecordBatch> {
pub(crate) fn test_read(
expected: &ArrowEngineData,
table: &Table,
engine: &impl Engine,
engine: Arc<dyn Engine>,
) -> Result<(), Box<dyn std::error::Error>> {
let snapshot = table.snapshot(engine, None)?;
let snapshot = table.snapshot(engine.as_ref(), None)?;
let scan = snapshot.into_scan_builder().build()?;
let batches = read_scan(&scan, engine)?;
let formatted = pretty_format_batches(&batches).unwrap().to_string();
Expand All @@ -40,7 +42,7 @@ pub(crate) fn test_read(

// TODO (zach): this is listed as unused for acceptance crate
#[allow(unused)]
pub(crate) fn read_scan(scan: &Scan, engine: &dyn Engine) -> DeltaResult<Vec<RecordBatch>> {
pub(crate) fn read_scan(scan: &Scan, engine: Arc<dyn Engine>) -> DeltaResult<Vec<RecordBatch>> {
let scan_results = scan.execute(engine)?;
scan_results
.map(|scan_result| -> DeltaResult<_> {
Expand Down
13 changes: 7 additions & 6 deletions kernel/tests/dv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! Must run at the root of the crate
use std::ops::Add;
use std::path::PathBuf;
use std::sync::Arc;

use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::scan::ScanResult;
Expand Down Expand Up @@ -29,13 +30,13 @@ fn count_total_scan_rows(
fn dv_table() -> Result<(), Box<dyn std::error::Error>> {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/"))?;
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let engine = Arc::new(SyncEngine::new());

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None)?;
let snapshot = table.snapshot(engine.as_ref(), None)?;
let scan = snapshot.into_scan_builder().build()?;

let stream = scan.execute(&engine)?;
let stream = scan.execute(engine)?;
let total_rows = count_total_scan_rows(stream)?;
assert_eq!(total_rows, 8);
Ok(())
Expand All @@ -45,13 +46,13 @@ fn dv_table() -> Result<(), Box<dyn std::error::Error>> {
fn non_dv_table() -> Result<(), Box<dyn std::error::Error>> {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/"))?;
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();
let engine = Arc::new(SyncEngine::new());

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None)?;
let snapshot = table.snapshot(engine.as_ref(), None)?;
let scan = snapshot.into_scan_builder().build()?;

let stream = scan.execute(&engine)?;
let stream = scan.execute(engine)?;
let total_rows = count_total_scan_rows(stream)?;
assert_eq!(total_rows, 10);
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions kernel/tests/golden_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,8 @@ async fn latest_snapshot_test(
expected_path: Option<PathBuf>,
) -> Result<(), Box<dyn std::error::Error>> {
let snapshot = table.snapshot(&engine, None)?;

let scan = snapshot.into_scan_builder().build()?;
let scan_res = scan.execute(&engine)?;
let scan_res = scan.execute(Arc::new(engine))?;
let batches: Vec<RecordBatch> = scan_res
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
Expand Down
30 changes: 15 additions & 15 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>>
.await?;

let location = Url::parse("memory:///")?;
let engine = DefaultEngine::new(
let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from("/"),
Arc::new(TokioBackgroundExecutor::new()),
);
));

let table = Table::new(location);
let expected_data = vec![batch.clone(), batch];

let snapshot = table.snapshot(&engine, None)?;
let snapshot = table.snapshot(engine.as_ref(), None)?;
let scan = snapshot.into_scan_builder().build()?;

let mut files = 0;
let stream = scan.execute(&engine)?.zip(expected_data);
let stream = scan.execute(engine)?.zip(expected_data);

for (data, expected) in stream {
let raw_data = data?.raw_data?;
Expand Down Expand Up @@ -126,7 +126,7 @@ async fn two_commits() -> Result<(), Box<dyn std::error::Error>> {
let scan = snapshot.into_scan_builder().build()?;

let mut files = 0;
let stream = scan.execute(&engine)?.zip(expected_data);
let stream = scan.execute(Arc::new(engine))?.zip(expected_data);

for (data, expected) in stream {
let raw_data = data?.raw_data?;
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn remove_action() -> Result<(), Box<dyn std::error::Error>> {
let snapshot = table.snapshot(&engine, None)?;
let scan = snapshot.into_scan_builder().build()?;

let stream = scan.execute(&engine)?.zip(expected_data);
let stream = scan.execute(Arc::new(engine))?.zip(expected_data);

let mut files = 0;
for (data, expected) in stream {
Expand Down Expand Up @@ -247,14 +247,14 @@ async fn stats() -> Result<(), Box<dyn std::error::Error>> {
.await?;

let location = Url::parse("memory:///").unwrap();
let engine = DefaultEngine::new(
let engine = Arc::new(DefaultEngine::new(
storage.clone(),
Path::from(""),
Arc::new(TokioBackgroundExecutor::new()),
);
));

let table = Table::new(location);
let snapshot = Arc::new(table.snapshot(&engine, None)?);
let snapshot = Arc::new(table.snapshot(engine.as_ref(), None)?);

// The first file has id between 1 and 3; the second has id between 5 and 7. For each operator,
// we validate the boundary values where we expect the set of matched files to change.
Expand Down Expand Up @@ -306,7 +306,7 @@ async fn stats() -> Result<(), Box<dyn std::error::Error>> {

let expected_files = expected_batches.len();
let mut files_scanned = 0;
let stream = scan.execute(&engine)?.zip(expected_batches);
let stream = scan.execute(engine.clone())?.zip(expected_batches);

for (batch, expected) in stream {
let raw_data = batch?.raw_data?;
Expand Down Expand Up @@ -346,7 +346,7 @@ macro_rules! assert_batches_sorted_eq {
}

fn read_with_execute(
engine: &dyn Engine,
engine: Arc<dyn Engine>,
scan: &Scan,
expected: &[String],
) -> Result<(), Box<dyn std::error::Error>> {
Expand Down Expand Up @@ -472,10 +472,10 @@ fn read_table_data(
)?;
let sync_engine = delta_kernel::engine::sync::SyncEngine::new();

let engines: &[&dyn Engine] = &[&sync_engine, &default_engine];
for &engine in engines {
let engines: Vec<Arc<dyn Engine>> = vec![Arc::new(sync_engine), Arc::new(default_engine)];
for engine in engines {
let table = Table::new(url.clone());
let snapshot = table.snapshot(engine, None)?;
let snapshot = table.snapshot(engine.as_ref(), None)?;

let read_schema = select_cols.map(|select_cols| {
let table_schema = snapshot.schema();
Expand All @@ -491,8 +491,8 @@ fn read_table_data(
.build()?;

sort_lines!(expected);
read_with_scan_data(table.location(), engine.as_ref(), &scan, &expected)?;
read_with_execute(engine, &scan, &expected)?;
Comment on lines +494 to 495
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Order swapped so we don't have to clone the engine that read_with_execute consumes

read_with_scan_data(table.location(), engine, &scan, &expected)?;
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions kernel/tests/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ async fn test_append() -> Result<(), Box<dyn std::error::Error>> {
]))],
)?),
&table,
engine.as_ref(),
engine,
)?;
Ok(())
}
Expand Down Expand Up @@ -598,7 +598,7 @@ async fn test_append_partitioned() -> Result<(), Box<dyn std::error::Error>> {
],
)?),
&table,
engine.as_ref(),
engine,
)?;
Ok(())
}
Expand Down
Loading