Skip to content

Commit

Permalink
Remove lifetime requirement on Scan::execute (#588)
Browse files Browse the repository at this point in the history
## What changes are proposed in this pull request?
Currently, `Scan::execute` takes the lifetime of the `Scan` into the
Iterator, forcing it to live as long as the scan. This Pr removes that
requirement so that the user can lazily consume the iterator without
managing the lifetime of `Scan`. This simplifies the usage of kernel

This PR also changes all `read_schema` names to `physical_schema` to
make it clear and consistent what the schema represents. note that
`Snapshot` already calls this the `physical_schema`
### This PR affects the following public APIs
`Scan::execute` no longer depends on the lifetime of `&self`

## How was this change tested?
Everything compiles :)
  • Loading branch information
OussamaSaoudi-db authored Dec 11, 2024
1 parent 7bcbb57 commit 04ccccb
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 19 deletions.
2 changes: 1 addition & 1 deletion ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub unsafe extern "C" fn get_global_read_schema(
state: Handle<SharedGlobalScanState>,
) -> Handle<SharedSchema> {
let state = unsafe { state.as_ref() };
state.read_schema.clone().into()
state.physical_schema.clone().into()
}

/// Free a global read schema
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ fn do_work(
) {
// get the type for the function calls
let engine: &dyn Engine = engine.as_ref();
let read_schema = scan_state.read_schema.clone();
let physical_schema = scan_state.physical_schema.clone();
// in a loop, try and get a ScanFile. Note that `recv` will return an `Err` when the other side
// hangs up, which indicates there's no more data to process.
while let Ok(scan_file) = scan_file_rx.recv() {
Expand Down Expand Up @@ -287,7 +287,7 @@ fn do_work(
// vector
let read_results = engine
.get_parquet_handler()
.read_parquet_files(&[meta], read_schema.clone(), None)
.read_parquet_files(&[meta], physical_schema.clone(), None)
.unwrap();

for read_result in read_results {
Expand Down
30 changes: 18 additions & 12 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl ScanBuilder {
logical_schema,
physical_schema: Arc::new(StructType::new(state_info.read_fields)),
physical_predicate,
all_fields: state_info.all_fields,
all_fields: Arc::new(state_info.all_fields),
have_partition_cols: state_info.have_partition_cols,
})
}
Expand Down Expand Up @@ -329,7 +329,7 @@ pub struct Scan {
logical_schema: SchemaRef,
physical_schema: SchemaRef,
physical_predicate: PhysicalPredicate,
all_fields: Vec<ColumnType>,
all_fields: Arc<Vec<ColumnType>>,
have_partition_cols: bool,
}

Expand Down Expand Up @@ -411,7 +411,7 @@ impl Scan {
table_root: self.snapshot.table_root.to_string(),
partition_columns: self.snapshot.metadata().partition_columns.clone(),
logical_schema: self.logical_schema.clone(),
read_schema: self.physical_schema.clone(),
physical_schema: self.physical_schema.clone(),
column_mapping_mode: self.snapshot.column_mapping_mode,
}
}
Expand All @@ -427,7 +427,7 @@ impl Scan {
pub fn execute(
&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>> + '_> {
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>>> {
struct ScanFile {
path: String,
size: i64,
Expand Down Expand Up @@ -456,6 +456,11 @@ impl Scan {
);

let global_state = Arc::new(self.global_scan_state());
let table_root = self.snapshot.table_root.clone();
let physical_predicate = self.physical_predicate();
let all_fields = self.all_fields.clone();
let have_partition_cols = self.have_partition_cols;

let scan_data = self.scan_data(engine.as_ref())?;
let scan_files_iter = scan_data
.map(|res| {
Expand All @@ -469,10 +474,10 @@ impl Scan {
let result = scan_files_iter
.map(move |scan_file| -> DeltaResult<_> {
let scan_file = scan_file?;
let file_path = self.snapshot.table_root.join(&scan_file.path)?;
let file_path = table_root.join(&scan_file.path)?;
let mut selection_vector = scan_file
.dv_info
.get_selection_vector(engine.as_ref(), &self.snapshot.table_root)?;
.get_selection_vector(engine.as_ref(), &table_root)?;
let meta = FileMeta {
last_modified: 0,
size: scan_file.size as usize,
Expand All @@ -485,13 +490,14 @@ impl Scan {
// https://github.com/delta-io/delta-kernel-rs/issues/434 for more details.
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
&[meta],
global_state.read_schema.clone(),
self.physical_predicate(),
global_state.physical_schema.clone(),
physical_predicate.clone(),
)?;

// Arc clones
let engine = engine.clone();
let global_state = global_state.clone();
let all_fields = all_fields.clone();
Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> {
let read_result = read_result?;
// to transform the physical data into the correct logical form
Expand All @@ -500,8 +506,8 @@ impl Scan {
read_result,
&global_state,
&scan_file.partition_values,
&self.all_fields,
self.have_partition_cols,
&all_fields,
have_partition_cols,
);
let len = logical.as_ref().map_or(0, |res| res.len());
// need to split the dv_mask. what's left in dv_mask covers this result, and rest
Expand Down Expand Up @@ -651,7 +657,7 @@ fn transform_to_logical_internal(
all_fields: &[ColumnType],
have_partition_cols: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let read_schema = global_state.read_schema.clone();
let physical_schema = global_state.physical_schema.clone();
if !have_partition_cols && global_state.column_mapping_mode == ColumnMappingMode::None {
return Ok(data);
}
Expand All @@ -678,7 +684,7 @@ fn transform_to_logical_internal(
let result = engine
.get_expression_handler()
.get_evaluator(
read_schema,
physical_schema,
read_expression,
global_state.logical_schema.clone().into(),
)
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct GlobalScanState {
pub table_root: String,
pub partition_columns: Vec<String>,
pub logical_schema: SchemaRef,
pub read_schema: SchemaRef,
pub physical_schema: SchemaRef,
pub column_mapping_mode: ColumnMappingMode,
}

Expand Down
5 changes: 3 additions & 2 deletions kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl TableChangesScan {
table_root: self.table_changes.table_root.to_string(),
partition_columns: end_snapshot.metadata().partition_columns.clone(),
logical_schema: self.logical_schema.clone(),
read_schema: self.physical_schema.clone(),
physical_schema: self.physical_schema.clone(),
column_mapping_mode: end_snapshot.column_mapping_mode,
}
}
Expand Down Expand Up @@ -276,7 +276,8 @@ fn read_scan_file(

let physical_to_logical_expr =
physical_to_logical_expr(&scan_file, global_state.logical_schema.as_ref(), all_fields)?;
let physical_schema = scan_file_physical_schema(&scan_file, global_state.read_schema.as_ref());
let physical_schema =
scan_file_physical_schema(&scan_file, global_state.physical_schema.as_ref());
let phys_to_logical_eval = engine.get_expression_handler().get_evaluator(
physical_schema.clone(),
physical_to_logical_expr,
Expand Down
2 changes: 1 addition & 1 deletion kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ fn read_with_scan_data(
.get_parquet_handler()
.read_parquet_files(
&[meta],
global_state.read_schema.clone(),
global_state.physical_schema.clone(),
scan.physical_predicate().clone(),
)
.unwrap();
Expand Down

0 comments on commit 04ccccb

Please sign in to comment.