Skip to content

Commit

Permalink
ScanFile expression and visitor for CDF (#546)
Browse files Browse the repository at this point in the history
## What changes are proposed in this pull request?
This PR introduces four concepts:
- `cdf_scan_row_schema`: This is the schema that engine data will be
transformed into at the end of the log replay phase. This schema prunes
the log schema down only to the fields necessary to produce CDF columns.
- `cdf_scan_row_expression`: This is a function that generates an
expression to transform an engine data into the `cdf_scan_row_schema`.
The function takes timestamp and commit number as arguments because it
inserts these as columns into the output engine data.
- `CDFScanFile`: This is a type that holds all the information needed to
read a data file and generate its CDF rows. It holds path, deletion
vector, the type of action, and the paired remove deletion vector. The
action type is encoded as an enum `CDFScanFileType`
- `CDFScanFileVisitor`: This is a visitor that reads engine data with
the `cdf_scan_row_schema` and constructs `CDFScanFile`s.

This PR is only for internal use, and is only expected to be used by
`TableChangesScan::execute` when it is implemented. Engines must *not*
use the visitor nor `CDFScanFile`.

## How was this change tested?
I generate a table with add, remove and cdc actions. Then:
- The table is read,
- The engine data is transformed using `table_changes_action_iter` which
in transforms the engine data into the `cdf_scan_row_schema` using the
`cdf_scan_row_expression`
- The transformed engine data is read again using the
`CDFScanFileVisitor` and assert that the `CDFScanFile`s are as expected.

This test checks the following cases:
- A remove with `None` partition values. An empty hashmap for partition
values should be used.
- A remove with partition values.
- An add/remove DV pair. This should place the correct remove dv into
the add's CdfScanFile.
- The visitor extracts the correct timestamp and commit version for each
file.
  • Loading branch information
OussamaSaoudi-db authored Dec 9, 2024
1 parent bea3326 commit ed714c5
Show file tree
Hide file tree
Showing 4 changed files with 427 additions and 23 deletions.
4 changes: 2 additions & 2 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ pub type ScanCallback<T> = fn(
/// ## Example
/// ```ignore
/// let mut context = [my context];
/// for res in scan_data { // scan data from scan.get_scan_data()
/// for res in scan_data { // scan data from scan.scan_data()
/// let (data, vector) = res?;
/// context = delta_kernel::scan::state::visit_scan_files(
/// data.as_ref(),
/// vector,
/// selection_vector,
/// context,
/// my_callback,
/// )?;
Expand Down
32 changes: 11 additions & 21 deletions kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ use crate::actions::{
PROTOCOL_NAME, REMOVE_NAME,
};
use crate::engine_data::{GetData, TypedGetData};
use crate::expressions::{column_expr, column_name, ColumnName, Expression};
use crate::expressions::{column_name, ColumnName};
use crate::path::ParsedLogPath;
use crate::scan::data_skipping::DataSkippingFilter;
use crate::scan::scan_row_schema;
use crate::scan::state::DvInfo;
use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType};
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
use crate::table_changes::{check_cdf_table_properties, ensure_cdf_read_supported};
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::{DeltaResult, Engine, EngineData, Error, ExpressionRef, RowVisitor};

use itertools::Itertools;

#[cfg(test)]
Expand All @@ -36,7 +37,7 @@ pub(crate) struct TableChangesScanData {
pub(crate) scan_data: Box<dyn EngineData>,
/// The selection vector used to filter the `scan_data`.
pub(crate) selection_vector: Vec<bool>,
/// An map from a remove action's path to its deletion vector
/// A map from a remove action's path to its deletion vector
pub(crate) remove_dvs: Arc<HashMap<String, DvInfo>>,
}

Expand Down Expand Up @@ -65,21 +66,6 @@ pub(crate) fn table_changes_action_iter(
Ok(result)
}

// Gets the expression for generating the engine data in [`TableChangesScanData`].
//
// TODO: This expression is temporary. In the future it will also select `cdc` and `remove` actions
// fields.
fn add_transform_expr() -> Expression {
Expression::Struct(vec![
column_expr!("add.path"),
column_expr!("add.size"),
column_expr!("add.modificationTime"),
column_expr!("add.stats"),
column_expr!("add.deletionVector"),
Expression::Struct(vec![column_expr!("add.partitionValues")]),
])
}

/// Processes a single commit file from the log to generate an iterator of [`TableChangesScanData`].
/// The scanner operates in two phases that _must_ be performed in the following order:
/// 1. Prepare phase [`LogReplayScanner::try_new`]: This iterates over every action in the commit.
Expand Down Expand Up @@ -237,7 +223,7 @@ impl LogReplayScanner {
remove_dvs,
commit_file,
// TODO: Add the timestamp as a column with an expression
timestamp: _,
timestamp,
} = self;
let remove_dvs = Arc::new(remove_dvs);

Expand All @@ -247,10 +233,14 @@ impl LogReplayScanner {
schema,
None,
)?;
let commit_version = commit_file
.version
.try_into()
.map_err(|_| Error::generic("Failed to convert commit version to i64"))?;
let evaluator = engine.get_expression_handler().get_evaluator(
get_log_add_schema().clone(),
add_transform_expr(),
scan_row_schema().into(),
cdf_scan_row_expression(timestamp, commit_version),
cdf_scan_row_schema().into(),
);

let result = action_iter.map(move |actions| -> DeltaResult<_> {
Expand Down
1 change: 1 addition & 0 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{DeltaResult, Engine, Error, Version};

mod log_replay;
pub mod scan;
mod scan_file;

static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
[
Expand Down
Loading

0 comments on commit ed714c5

Please sign in to comment.