Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make scan::execute return a lazy iterator #340

Merged
merged 38 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
1ca141d
First stage of moving scan::execute to be lazy
OussamaSaoudi-db Sep 16, 2024
c4f40bd
Implement lazy scanning of files
OussamaSaoudi-db Sep 17, 2024
7e5ad45
Clean up tests affected by the move to lazy execute
OussamaSaoudi-db Sep 17, 2024
63f26aa
Clean up tests affected by the move to lazy execute
OussamaSaoudi-db Sep 17, 2024
e77f738
Clean up tests
OussamaSaoudi-db Sep 17, 2024
3ba5fef
Minimize line change count
OussamaSaoudi-db Sep 17, 2024
9733c20
Fix failing mac test
OussamaSaoudi-db Sep 17, 2024
00f834b
Update acceptance/src/data.rs
OussamaSaoudi-db Sep 24, 2024
27cccf1
Update kernel/src/scan/mod.rs
OussamaSaoudi-db Sep 24, 2024
b4c1203
Update kernel/src/scan/mod.rs
OussamaSaoudi-db Sep 24, 2024
946bfa3
Address PR comments
OussamaSaoudi-db Sep 24, 2024
9f57f9e
Fix doc string
OussamaSaoudi-db Sep 24, 2024
d71be85
Improve styling
OussamaSaoudi-db Sep 24, 2024
4130e41
Address PR comments
OussamaSaoudi-db Sep 26, 2024
3b56037
Move itertools to alphabetical order
OussamaSaoudi-db Sep 26, 2024
14caf61
Address PR comments
OussamaSaoudi-db Sep 27, 2024
b5bc921
remove itertools dependency
OussamaSaoudi-db Sep 27, 2024
95e1e8b
Address feedback
OussamaSaoudi-db Sep 30, 2024
b08992d
One more map_ok
OussamaSaoudi-db Sep 30, 2024
47710d2
Respond to feedback
OussamaSaoudi-db Oct 1, 2024
04ca9fe
use map_and_then
OussamaSaoudi-db Oct 2, 2024
49f19ad
Remove incorrect usage of map_ok
OussamaSaoudi-db Oct 2, 2024
62f0f6e
Style improvements
OussamaSaoudi-db Oct 2, 2024
673bf50
Add clarifying comments
OussamaSaoudi-db Oct 2, 2024
7d5d591
More style changes
OussamaSaoudi-db Oct 2, 2024
cc3772c
remove unused import
OussamaSaoudi-db Oct 2, 2024
454af5e
Address PR comments
OussamaSaoudi-db Oct 3, 2024
fcb3253
Small doc comment fix
OussamaSaoudi-db Oct 3, 2024
f52eaf9
Update kernel/tests/dv.rs
OussamaSaoudi-db Oct 3, 2024
35e09b2
apply pr changes
OussamaSaoudi-db Oct 3, 2024
c51fa2f
simplify execute
OussamaSaoudi-db Oct 4, 2024
faf5531
Update acceptance/src/data.rs
OussamaSaoudi-db Oct 7, 2024
e9d6eda
Update acceptance/src/data.rs
OussamaSaoudi-db Oct 7, 2024
b7174d4
Update kernel/examples/read-table-single-threaded/src/main.rs
OussamaSaoudi-db Oct 7, 2024
8f1d39f
Apply suggestions from code review
OussamaSaoudi-db Oct 7, 2024
155e967
Apply suggestions from code review
OussamaSaoudi-db Oct 7, 2024
0e0d1f5
Address nits
OussamaSaoudi-db Oct 7, 2024
ae9e45a
fix docs
OussamaSaoudi-db Oct 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions acceptance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ delta_kernel = { path = "../kernel", features = [
"developer-visibility",
] }
futures = "0.3"
itertools = "0.13"
object_store = { workspace = true }
parquet = { workspace = true }
serde = { version = "1", features = ["derive"] }
Expand Down
15 changes: 8 additions & 7 deletions acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use arrow_select::{concat::concat_batches, filter::filter_record_batch, take::ta

use delta_kernel::{engine::arrow_data::ArrowEngineData, DeltaResult, Engine, Error, Table};
use futures::{stream::TryStreamExt, StreamExt};
use itertools::Itertools;
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
use object_store::{local::LocalFileSystem, ObjectStore};
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};

Expand Down Expand Up @@ -127,9 +128,9 @@ pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo)
let mut schema = None;
let batches: Vec<RecordBatch> = scan
.execute(engine)?
.into_iter()
.map(|res| {
let data = res.raw_data.unwrap();
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let data = scan_result.raw_data?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
Expand All @@ -138,13 +139,13 @@ pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo)
if schema.is_none() {
schema = Some(record_batch.schema());
}
if let Some(mask) = res.mask {
filter_record_batch(&record_batch, &mask.into()).unwrap()
if let Some(mask) = scan_result.mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
record_batch
Ok(record_batch)
}
})
.collect();
.try_collect()?;
let all_data = concat_batches(&schema.unwrap(), batches.iter()).map_err(Error::from)?;
let all_data = sort_record_batch(all_data)?;
let golden = read_golden(test_case.root_dir(), None).await?;
Expand Down
42 changes: 22 additions & 20 deletions kernel/examples/read-table-single-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, Table};

use clap::{Parser, ValueEnum};
use itertools::Itertools;
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved

/// An example program that dumps out the data of a delta table. Struct and Map types are not
/// supported.
Expand Down Expand Up @@ -92,7 +93,6 @@ fn try_main() -> DeltaResult<()> {
let read_schema_opt = cli
.columns
.map(|cols| {
use itertools::Itertools;
let table_schema = snapshot.schema();
let selected_fields = cols
.iter()
Expand All @@ -113,26 +113,28 @@ fn try_main() -> DeltaResult<()> {
.with_schema_opt(read_schema_opt)
.build()?;

let mut batches = vec![];
for res in scan.execute(engine.as_ref())?.into_iter() {
let data = res.raw_data?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();
let batch = if let Some(mut mask) = res.mask {
let extra_rows = record_batch.num_rows() - mask.len();
if extra_rows > 0 {
// we need to extend the mask here in case it's too short
mask.extend(std::iter::repeat(true).take(extra_rows));
let batches: Vec<RecordBatch> = scan
.execute(engine.as_ref())?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let data = scan_result.raw_data?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();
if let Some(mut mask) = scan_result.mask {
let extra_rows = record_batch.num_rows() - mask.len();
if extra_rows > 0 {
// we need to extend the mask here in case it's too short
mask.extend(std::iter::repeat(true).take(extra_rows));
}
Ok(filter_record_batch(&record_batch, &mask.into())?)
scovich marked this conversation as resolved.
Show resolved Hide resolved
} else {
Ok(record_batch)
}
filter_record_batch(&record_batch, &mask.into())?
} else {
record_batch
};
batches.push(batch);
}
})
.try_collect()?;
print_batches(&batches)?;
Ok(())
}
39 changes: 24 additions & 15 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,17 @@ impl Scan {
}

/// Perform an "all in one" scan. This will use the provided `engine` to read and
/// process all the data for the query. Each [`ScanResult`] in the resultant vector encapsulates
/// process all the data for the query. Each [`ScanResult`] in the resultant iterator encapsulates
/// the raw data and an optional boolean vector built from the deletion vector if it was
/// present. See the documentation for [`ScanResult`] for more details. Generally
/// connectors/engines will want to use [`Scan::scan_data`] so they can have more control over
/// the execution of the scan.
// This calls [`Scan::files`] to get a set of `Add` actions for the scan, and then uses the
// This calls [`Scan::scan_data`] to get an iterator of `ScanData` actions for the scan, and then uses the
// `engine`'s [`crate::ParquetHandler`] to read the actual table data.
pub fn execute(&self, engine: &dyn Engine) -> DeltaResult<Vec<ScanResult>> {
pub fn execute<'a>(
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
&'a self,
scovich marked this conversation as resolved.
Show resolved Hide resolved
engine: &'a dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>> + 'a> {
struct ScanFile {
path: String,
size: i64,
Expand Down Expand Up @@ -264,15 +267,18 @@ impl Scan {

let global_state = Arc::new(self.global_scan_state());
let scan_data = self.scan_data(engine)?;
let mut scan_files = vec![];
for data in scan_data {
let (data, vec) = data?;
scan_files =
state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)?;
}
scan_files
.into_iter()
.map(|scan_file| -> DeltaResult<_> {
let scan_files_iter = scan_data
.map(|res| {
let (data, vec) = res?;
let scan_files = vec![];
state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)
})
// Iterator<DeltaResult<Vec<ScanFile>>> to Iterator<DeltaResult<ScanFile>>
.flatten_ok();

let result = scan_files_iter
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
.map(move |scan_file| -> DeltaResult<_> {
let scan_file = scan_file?;
let file_path = self.snapshot.table_root.join(&scan_file.path)?;
let mut selection_vector = scan_file
.dv_info
Expand All @@ -288,7 +294,7 @@ impl Scan {
None,
)?;
let gs = global_state.clone(); // Arc clone
Ok(read_result_iter.into_iter().map(move |read_result| {
Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> {
let read_result = read_result?;
// to transform the physical data into the correct logical form
let logical = transform_to_logical_internal(
Expand All @@ -314,8 +320,11 @@ impl Scan {
Ok(result)
}))
})
// Iterator<DeltaResult<Iterator<DeltaResult<ScanResult>>>> to Iterator<DeltaResult<DeltaResult<ScanResult>>>
.flatten_ok()
.try_collect()?
// Iterator<DeltaResult<DeltaResult<ScanResult>>> to Iterator<DeltaResult<ScanResult>>
.map(|x| x?);
Ok(result)
}
}

Expand Down Expand Up @@ -633,7 +642,7 @@ mod tests {
let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let files = scan.execute(&engine).unwrap();
let files: Vec<ScanResult> = scan.execute(&engine).unwrap().try_collect().unwrap();

assert_eq!(files.len(), 1);
let num_rows = files[0].raw_data.as_ref().unwrap().length();
Expand Down
44 changes: 23 additions & 21 deletions kernel/tests/dv.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
//! Read a small table with/without deletion vectors.
//! Must run at the root of the crate
use std::ops::Add;
use std::path::PathBuf;

use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::Table;
use delta_kernel::scan::ScanResult;
use delta_kernel::{DeltaResult, Table};

use itertools::Itertools;
use test_log::test;

fn count_total_scan_rows(
scan_result_iter: impl Iterator<Item = DeltaResult<ScanResult>>,
) -> DeltaResult<usize> {
scan_result_iter
.map(|scan_result| {
let scan_result = scan_result?;
let data = scan_result.raw_data?;
// NOTE: The mask only suppresses rows for which it is both present and false.
let deleted_rows = scan_result
.mask
.as_ref()
.map_or(0, |mask| mask.iter().filter(|&&m| !m).count());
Ok(data.length() - deleted_rows)
})
.fold_ok(0, Add::add)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice :)

}

#[test]
fn dv_table() -> Result<(), Box<dyn std::error::Error>> {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/"))?;
Expand All @@ -18,16 +38,7 @@ fn dv_table() -> Result<(), Box<dyn std::error::Error>> {
let scan = snapshot.into_scan_builder().build()?;

let stream = scan.execute(&engine)?;
let mut total_rows = 0;
for res in stream {
let data = res.raw_data?;
let rows = data.length();
for i in 0..rows {
if res.mask.as_ref().map_or(true, |mask| mask[i]) {
total_rows += 1;
}
}
}
let total_rows = count_total_scan_rows(stream)?;
assert_eq!(total_rows, 8);
Ok(())
}
Expand All @@ -43,16 +54,7 @@ fn non_dv_table() -> Result<(), Box<dyn std::error::Error>> {
let scan = snapshot.into_scan_builder().build()?;

let stream = scan.execute(&engine)?;
let mut total_rows = 0;
for res in stream {
let data = res.raw_data?;
let rows = data.length();
for i in 0..rows {
if res.mask.as_ref().map_or(true, |mask| mask[i]) {
total_rows += 1;
}
}
}
let total_rows = count_total_scan_rows(stream)?;
assert_eq!(total_rows, 10);
Ok(())
}
35 changes: 17 additions & 18 deletions kernel/tests/golden_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use arrow::{compute::filter_record_batch, record_batch::RecordBatch};
use arrow_ord::sort::{lexsort_to_indices, SortColumn};
use arrow_schema::Schema;
use arrow_select::{concat::concat_batches, take::take};
use itertools::Itertools;
use paste::paste;
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand Down Expand Up @@ -153,37 +154,35 @@ async fn latest_snapshot_test(
table: Table,
expected_path: Option<PathBuf>,
) -> Result<(), Box<dyn std::error::Error>> {
let snapshot = table.snapshot(&engine, None).unwrap();
let snapshot = table.snapshot(&engine, None)?;

let scan = snapshot.into_scan_builder().build().unwrap();
let scan_res = scan.execute(&engine).unwrap();
let scan = snapshot.into_scan_builder().build()?;
let scan_res = scan.execute(&engine)?;
let batches: Vec<RecordBatch> = scan_res
.into_iter()
.map(|sr| {
let data = sr.raw_data.unwrap();
let record_batch = to_arrow(data).unwrap();
if let Some(mut mask) = sr.mask {
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let data = scan_result.raw_data?;
let record_batch = to_arrow(data)?;
if let Some(mut mask) = scan_result.mask {
let extra_rows = record_batch.num_rows() - mask.len();
if extra_rows > 0 {
// we need to extend the mask here in case it's too short
mask.extend(std::iter::repeat(true).take(extra_rows));
}
filter_record_batch(&record_batch, &mask.into()).unwrap()
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
record_batch
Ok(record_batch)
}
})
.collect();
.try_collect()?;

let expected = read_expected(&expected_path.expect("expect an expected dir"))
.await
.unwrap();
let expected = read_expected(&expected_path.expect("expect an expected dir")).await?;

let schema: Arc<Schema> = Arc::new(scan.schema().as_ref().try_into().unwrap());
let schema: Arc<Schema> = Arc::new(scan.schema().as_ref().try_into()?);

let result = concat_batches(&schema, &batches).unwrap();
let result = sort_record_batch(result).unwrap();
let expected = sort_record_batch(expected).unwrap();
let result = concat_batches(&schema, &batches)?;
let result = sort_record_batch(result)?;
let expected = sort_record_batch(expected)?;
assert_columns_match(result.columns(), expected.columns());
assert_schema_fields_match(expected.schema().as_ref(), result.schema().as_ref());
assert!(
Expand Down
Loading
Loading