Skip to content

Commit

Permalink
read.rs uses transform
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklan committed Dec 19, 2024
1 parent d0e7d9b commit a740ffc
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::expressions::{column_expr, BinaryOperator, Expression, ExpressionRef};
use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats};
use delta_kernel::scan::{transform_to_logical, Scan};
use delta_kernel::scan::Scan;
use delta_kernel::schema::{DataType, Schema};
use delta_kernel::{Engine, FileMeta, Table};
use object_store::{memory::InMemory, path::Path, ObjectStore};
Expand Down Expand Up @@ -339,7 +339,7 @@ struct ScanFile {
path: String,
size: i64,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
transform: Option<ExpressionRef>,
}

fn scan_data_callback(
Expand All @@ -348,14 +348,14 @@ fn scan_data_callback(
size: i64,
_stats: Option<Stats>,
dv_info: DvInfo,
_transforms: Option<ExpressionRef>,
partition_values: HashMap<String, String>,
transform: Option<ExpressionRef>,
_: HashMap<String, String>,
) {
batches.push(ScanFile {
path: path.to_string(),
size,
dv_info,
partition_values,
transform,
});
}

Expand Down Expand Up @@ -404,15 +404,20 @@ fn read_with_scan_data(
for read_result in read_results {
let read_result = read_result.unwrap();
let len = read_result.len();

// ask the kernel to transform the physical data into the correct logical form
let logical = transform_to_logical(
engine,
read_result,
&global_state,
&scan_file.partition_values,
)
.unwrap();
// to transform the physical data into the correct logical form
let logical = if let Some(ref transform) = scan_file.transform {
engine
.get_expression_handler()
.get_evaluator(
global_state.physical_schema.clone(),
transform.as_ref().clone(), // TODO: Maybe eval should take a ref
global_state.logical_schema.clone().into(),
)
.evaluate(read_result.as_ref())
.unwrap()
} else {
read_result
};

let record_batch = to_arrow(logical).unwrap();
let rest = split_vector(selection_vector.as_mut(), len, Some(true));
Expand Down

0 comments on commit a740ffc

Please sign in to comment.