Skip to content

Commit

Permalink
remove transform_to_logical :)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklan committed Dec 19, 2024
1 parent a740ffc commit 6d72b75
Showing 1 changed file with 0 additions and 67 deletions.
67 changes: 0 additions & 67 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,73 +682,6 @@ pub fn selection_vector(
Ok(deletion_treemap_to_bools(dv_treemap))
}

/// Transform the raw data read from parquet into the correct logical form, based on the provided
/// global scan state and partition values
pub fn transform_to_logical(
engine: &dyn Engine,
data: Box<dyn EngineData>,
global_state: &GlobalScanState,
partition_values: &HashMap<String, String>,
) -> DeltaResult<Box<dyn EngineData>> {
let state_info = get_state_info(
&global_state.logical_schema,
&global_state.partition_columns,
)?;
transform_to_logical_internal(
engine,
data,
global_state,
partition_values,
&state_info.all_fields,
state_info.have_partition_cols,
)
}

// We have this function because `execute` can save `all_fields` and `have_partition_cols` in the
// scan, and then reuse them for each batch transform
fn transform_to_logical_internal(
engine: &dyn Engine,
data: Box<dyn EngineData>,
global_state: &GlobalScanState,
partition_values: &std::collections::HashMap<String, String>,
all_fields: &[ColumnType],
have_partition_cols: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let physical_schema = global_state.physical_schema.clone();
if !have_partition_cols && global_state.column_mapping_mode == ColumnMappingMode::None {
return Ok(data);
}
// need to add back partition cols and/or fix-up mapped columns
let all_fields = all_fields
.iter()
.map(|field| match field {
ColumnType::Partition(field_idx) => {
let field = global_state.logical_schema.fields.get_index(*field_idx);
let Some((_, field)) = field else {
return Err(Error::generic(
"logical schema did not contain expected field, can't transform data",
));
};
let name = field.physical_name();
let value_expression =
parse_partition_value(partition_values.get(name), field.data_type())?;
Ok(value_expression.into())
}
ColumnType::Selected(field_name) => Ok(ColumnName::new([field_name]).into()),
})
.try_collect()?;
let read_expression = Expression::Struct(all_fields);
let result = engine
.get_expression_handler()
.get_evaluator(
physical_schema,
read_expression,
global_state.logical_schema.clone().into(),
)
.evaluate(data.as_ref())?;
Ok(result)
}

// some utils that are used in file_stream.rs and state.rs tests
#[cfg(test)]
pub(crate) mod test_utils {
Expand Down

0 comments on commit 6d72b75

Please sign in to comment.