Skip to content

Commit

Permalink
TableChangesScan::execute and end to end testing for CDF (#580)
Browse files Browse the repository at this point in the history
## What changes are proposed in this pull request?
This PR introduces the `execute` method to `TableChangesScan`, which
performs a CDF scan and returns `ScanResult` the change data feed. The
`ScanResult` holds engine data and a selection vector to apply to the
rows of the data.

A helper method `read_scan_file` is added to read the rows of a
`scan_file` and perform physical to logical transformation.

The macros `sort_lines` and `asert_batches_sorted_eq` are moved to the
`common` crate so they can be shared between the `read.rs` and new
`cdf.rs` integration tests.

The `cdf-table` and `cdf-table-non-partitioned` test tables are from the
delta-rs project.

## How was this change tested?
This tests data reading for 3 tables with the following features:
- a table with deletion vectors involving deletion and restoration
- a non-partitioned table
- a partitioned table
  • Loading branch information
OussamaSaoudi-db authored Dec 11, 2024
1 parent af075a8 commit 7bcbb57
Show file tree
Hide file tree
Showing 15 changed files with 328 additions and 79 deletions.
1 change: 0 additions & 1 deletion kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ impl LogSegment {
/// `start_version` and `end_version`: Its LogSegment is made of zero checkpoints and all commits
/// between versions `start_version` (inclusive) and `end_version` (inclusive). If no `end_version`
/// is specified it will be the most recent version by default.
#[allow(unused)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) fn for_table_changes(
fs_client: &dyn FileSystemClient,
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ pub struct ScanResult {
pub raw_data: DeltaResult<Box<dyn EngineData>>,
/// Raw row mask.
// TODO(nick) this should be allocated by the engine
raw_mask: Option<Vec<bool>>,
pub(crate) raw_mask: Option<Vec<bool>>,
}

impl ScanResult {
Expand Down
2 changes: 0 additions & 2 deletions kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use itertools::Itertools;
mod tests;

/// Scan data for a Change Data Feed query. This holds metadata that is needed to read data rows.
#[allow(unused)]
pub(crate) struct TableChangesScanData {
/// Engine data with the schema defined in [`scan_row_schema`]
///
Expand Down Expand Up @@ -127,7 +126,6 @@ struct LogReplayScanner {
//
// Note: This will be used once an expression is introduced to transform the engine data in
// [`TableChangesScanData`]
#[allow(unused)]
timestamp: i64,
}

Expand Down
1 change: 0 additions & 1 deletion kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ impl TableChanges {
&self.table_root
}
/// The partition columns that will be read.
#[allow(unused)]
pub(crate) fn partition_columns(&self) -> &Vec<String> {
&self.end_snapshot.metadata().partition_columns
}
Expand Down
15 changes: 7 additions & 8 deletions kernel/src/table_changes/physical_to_logical.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::iter;

use itertools::Itertools;

Expand All @@ -15,7 +14,6 @@ use super::{
};

/// Returns a map from change data feed column name to an expression that generates the row data.
#[allow(unused)]
fn get_cdf_columns(scan_file: &CdfScanFile) -> DeltaResult<HashMap<&str, Expression>> {
let timestamp = Scalar::timestamp_ntz_from_millis(scan_file.commit_timestamp)?;
let version = scan_file.commit_version;
Expand All @@ -34,8 +32,7 @@ fn get_cdf_columns(scan_file: &CdfScanFile) -> DeltaResult<HashMap<&str, Express

/// Generates the expression used to convert physical data from the `scan_file` path into logical
/// data matching the `logical_schema`
#[allow(unused)]
fn physical_to_logical_expr(
pub(crate) fn physical_to_logical_expr(
scan_file: &CdfScanFile,
logical_schema: &StructType,
all_fields: &[ColumnType],
Expand Down Expand Up @@ -67,14 +64,16 @@ fn physical_to_logical_expr(
}

/// Gets the physical schema that will be used to read data in the `scan_file` path.
#[allow(unused)]
fn scan_file_read_schema(scan_file: &CdfScanFile, read_schema: &StructType) -> SchemaRef {
pub(crate) fn scan_file_physical_schema(
scan_file: &CdfScanFile,
physical_schema: &StructType,
) -> SchemaRef {
if scan_file.scan_type == CdfScanFileType::Cdc {
let change_type = StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false);
let fields = read_schema.fields().cloned().chain(iter::once(change_type));
let fields = physical_schema.fields().cloned().chain(Some(change_type));
StructType::new(fields).into()
} else {
read_schema.clone().into()
physical_schema.clone().into()
}
}

Expand Down
10 changes: 4 additions & 6 deletions kernel/src/table_changes/resolve_dvs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ use crate::{DeltaResult, Engine, Error};

/// A [`CdfScanFile`] with its associated `selection_vector`. The `scan_type` is resolved to
/// match the `_change_type` that its rows will have in the change data feed.
#[allow(unused)]
struct ResolvedCdfScanFile {
pub(crate) struct ResolvedCdfScanFile {
/// The scan file that holds the path the data file to be read. The `scan_type` field is
/// resolved to the `_change_type` of the rows for this data file.
scan_file: CdfScanFile,
pub(crate) scan_file: CdfScanFile,
/// Optional vector of bools. If `selection_vector[i] = true`, then that row must be included
/// in the CDF output. Otherwise the row must be filtered out. The vector may be shorter than
/// the data file. In this case, all the remaining rows are *not* selected. If `selection_vector`
/// is `None`, then all rows are selected.
selection_vector: Option<Vec<bool>>,
pub(crate) selection_vector: Option<Vec<bool>>,
}

/// Resolves the deletion vectors for a [`CdfScanFile`]. This function handles two
Expand All @@ -33,8 +32,7 @@ struct ResolvedCdfScanFile {
/// 2. The second case handles all other add, remove, and cdc [`CdfScanFile`]s. These will simply
/// read the deletion vector (if present), and each is converted into a [`ResolvedCdfScanFile`].
/// No changes are made to the `scan_type`.
#[allow(unused)]
fn resolve_scan_file_dv(
pub(crate) fn resolve_scan_file_dv(
engine: &dyn Engine,
table_root: &Url,
scan_file: CdfScanFile,
Expand Down
128 changes: 114 additions & 14 deletions kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@ use std::sync::Arc;

use itertools::Itertools;
use tracing::debug;
use url::Url;

use crate::actions::deletion_vector::split_vector;
use crate::scan::state::GlobalScanState;
use crate::scan::{ColumnType, PhysicalPredicate};
use crate::scan::{ColumnType, PhysicalPredicate, ScanResult};
use crate::schema::{SchemaRef, StructType};
use crate::{DeltaResult, Engine, ExpressionRef};
use crate::{DeltaResult, Engine, ExpressionRef, FileMeta};

use super::log_replay::{table_changes_action_iter, TableChangesScanData};
use super::physical_to_logical::{physical_to_logical_expr, scan_file_physical_schema};
use super::resolve_dvs::{resolve_scan_file_dv, ResolvedCdfScanFile};
use super::scan_file::scan_data_to_scan_file;
use super::{TableChanges, CDF_FIELDS};

/// The result of building a [`TableChanges`] scan over a table. This can be used to get a change
/// data feed from the table
#[allow(unused)]
#[derive(Debug)]
pub struct TableChangesScan {
// The [`TableChanges`] that specifies this scan's start and end versions
Expand All @@ -28,9 +32,7 @@ pub struct TableChangesScan {
// The predicate to filter the data
physical_predicate: PhysicalPredicate,
// The [`ColumnType`] of all the fields in the `logical_schema`
all_fields: Vec<ColumnType>,
// `true` if any column in the `logical_schema` is a partition column
have_partition_cols: bool,
all_fields: Arc<Vec<ColumnType>>,
}

/// This builder constructs a [`TableChangesScan`] that can be used to read the [`TableChanges`]
Expand Down Expand Up @@ -115,7 +117,6 @@ impl TableChangesScanBuilder {
let logical_schema = self
.schema
.unwrap_or_else(|| self.table_changes.schema.clone().into());
let mut have_partition_cols = false;
let mut read_fields = Vec::with_capacity(logical_schema.fields.len());

// Loop over all selected fields. We produce the following:
Expand All @@ -138,7 +139,6 @@ impl TableChangesScanBuilder {
// Store the index into the schema for this field. When we turn it into an
// expression in the inner loop, we will index into the schema and get the name and
// data type, which we need to properly materialize the column.
have_partition_cols = true;
Ok(ColumnType::Partition(index))
} else if CDF_FIELDS
.iter()
Expand Down Expand Up @@ -168,8 +168,7 @@ impl TableChangesScanBuilder {
table_changes: self.table_changes,
logical_schema,
physical_predicate,
all_fields,
have_partition_cols,
all_fields: Arc::new(all_fields),
physical_schema: StructType::new(read_fields).into(),
})
}
Expand All @@ -181,7 +180,6 @@ impl TableChangesScan {
/// necessary to read CDF. Additionally, [`TableChangesScanData`] holds metadata on the
/// deletion vectors present in the commit. The engine data in each scan data is guaranteed
/// to belong to the same commit. Several [`TableChangesScanData`] may belong to the same commit.
#[allow(unused)]
fn scan_data(
&self,
engine: Arc<dyn Engine>,
Expand All @@ -204,7 +202,6 @@ impl TableChangesScan {

/// Get global state that is valid for the entire scan. This is somewhat expensive so should
/// only be called once per scan.
#[allow(unused)]
fn global_scan_state(&self) -> GlobalScanState {
let end_snapshot = &self.table_changes.end_snapshot;
GlobalScanState {
Expand All @@ -215,6 +212,109 @@ impl TableChangesScan {
column_mapping_mode: end_snapshot.column_mapping_mode,
}
}

/// Get the predicate [`Expression`] of the scan.
fn physical_predicate(&self) -> Option<ExpressionRef> {
if let PhysicalPredicate::Some(ref predicate, _) = self.physical_predicate {
Some(predicate.clone())
} else {
None
}
}

/// Perform an "all in one" scan to get the change data feed. This will use the provided `engine`
/// to read and process all the data for the query. Each [`ScanResult`] in the resultant iterator
/// encapsulates the raw data and an optional boolean vector built from the deletion vector if it
/// was present. See the documentation for [`ScanResult`] for more details.
pub fn execute(
&self,
engine: Arc<dyn Engine>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>>> {
let scan_data = self.scan_data(engine.clone())?;
let scan_files = scan_data_to_scan_file(scan_data);

let global_scan_state = self.global_scan_state();
let table_root = self.table_changes.table_root().clone();
let all_fields = self.all_fields.clone();
let physical_predicate = self.physical_predicate();
let dv_engine_ref = engine.clone();

let result = scan_files
.map(move |scan_file| {
resolve_scan_file_dv(dv_engine_ref.as_ref(), &table_root, scan_file?)
}) // Iterator-Result-Iterator
.flatten_ok() // Iterator-Result
.map(move |resolved_scan_file| -> DeltaResult<_> {
read_scan_file(
engine.as_ref(),
resolved_scan_file?,
&global_scan_state,
&all_fields,
physical_predicate.clone(),
)
}) // Iterator-Result-Iterator-Result
.flatten_ok() // Iterator-Result-Result
.map(|x| x?); // Iterator-Result

Ok(result)
}
}

/// Reads the data at the `resolved_scan_file` and transforms the data from physical to logical.
/// The result is a fallible iterator of [`ScanResult`] containing the logical data.
fn read_scan_file(
engine: &dyn Engine,
resolved_scan_file: ResolvedCdfScanFile,
global_state: &GlobalScanState,
all_fields: &[ColumnType],
physical_predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>>> {
let ResolvedCdfScanFile {
scan_file,
mut selection_vector,
} = resolved_scan_file;

let physical_to_logical_expr =
physical_to_logical_expr(&scan_file, global_state.logical_schema.as_ref(), all_fields)?;
let physical_schema = scan_file_physical_schema(&scan_file, global_state.read_schema.as_ref());
let phys_to_logical_eval = engine.get_expression_handler().get_evaluator(
physical_schema.clone(),
physical_to_logical_expr,
global_state.logical_schema.clone().into(),
);

let table_root = Url::parse(&global_state.table_root)?;
let location = table_root.join(&scan_file.path)?;
let file = FileMeta {
last_modified: 0,
size: 0,
location,
};
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
&[file],
physical_schema,
physical_predicate,
)?;

let result = read_result_iter.map(move |batch| -> DeltaResult<_> {
let batch = batch?;
// to transform the physical data into the correct logical form
let logical = phys_to_logical_eval.evaluate(batch.as_ref());
let len = logical.as_ref().map_or(0, |res| res.len());
// need to split the dv_mask. what's left in dv_mask covers this result, and rest
// will cover the following results. we `take()` out of `selection_vector` to avoid
// trying to return a captured variable. We're going to reassign `selection_vector`
// to `rest` in a moment anyway
let mut sv = selection_vector.take();
let rest = split_vector(sv.as_mut(), len, None);
let result = ScanResult {
raw_data: logical,
raw_mask: sv,
};
selection_vector = rest;
Ok(result)
});
Ok(result)
}

#[cfg(test)]
Expand Down Expand Up @@ -248,9 +348,9 @@ mod tests {
ColumnType::Selected("_commit_version".to_string()),
ColumnType::Selected("_commit_timestamp".to_string()),
]
.into()
);
assert_eq!(scan.physical_predicate, PhysicalPredicate::None);
assert!(!scan.have_partition_cols);
}

#[test]
Expand Down Expand Up @@ -279,6 +379,7 @@ mod tests {
ColumnType::Selected("id".to_string()),
ColumnType::Selected("_commit_version".to_string()),
]
.into()
);
assert_eq!(
scan.logical_schema,
Expand All @@ -288,7 +389,6 @@ mod tests {
])
.into()
);
assert!(!scan.have_partition_cols);
assert_eq!(
scan.physical_predicate,
PhysicalPredicate::Some(
Expand Down
6 changes: 0 additions & 6 deletions kernel/src/table_changes/scan_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::utils::require;
use crate::{DeltaResult, Error, RowVisitor};

// The type of action associated with a [`CdfScanFile`].
#[allow(unused)]
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum CdfScanFileType {
Add,
Expand All @@ -27,7 +26,6 @@ pub(crate) enum CdfScanFileType {
}

/// Represents all the metadata needed to read a Change Data Feed.
#[allow(unused)]
#[derive(Debug, PartialEq, Clone)]
pub(crate) struct CdfScanFile {
/// The type of action this file belongs to. This may be one of add, remove, or cdc
Expand All @@ -51,7 +49,6 @@ pub(crate) type CdfScanCallback<T> = fn(context: &mut T, scan_file: CdfScanFile)

/// Transforms an iterator of [`TableChangesScanData`] into an iterator of
/// [`CdfScanFile`] by visiting the engine data.
#[allow(unused)]
pub(crate) fn scan_data_to_scan_file(
scan_data: impl Iterator<Item = DeltaResult<TableChangesScanData>>,
) -> impl Iterator<Item = DeltaResult<CdfScanFile>> {
Expand Down Expand Up @@ -91,7 +88,6 @@ pub(crate) fn scan_data_to_scan_file(
/// )?;
/// }
/// ```
#[allow(unused)]
pub(crate) fn visit_cdf_scan_files<T>(
scan_data: &TableChangesScanData,
context: T,
Expand All @@ -110,7 +106,6 @@ pub(crate) fn visit_cdf_scan_files<T>(

/// A visitor that extracts [`CdfScanFile`]s from engine data. Expects data to have the schema
/// [`cdf_scan_row_schema`].
#[allow(unused)]
struct CdfScanFileVisitor<'a, T> {
callback: CdfScanCallback<T>,
selection_vector: &'a [bool],
Expand Down Expand Up @@ -219,7 +214,6 @@ pub(crate) fn cdf_scan_row_schema() -> SchemaRef {

/// Expression to convert an action with `log_schema` into one with
/// [`cdf_scan_row_schema`]. This is the expression used to create [`TableChangesScanData`].
#[allow(unused)]
pub(crate) fn cdf_scan_row_expression(commit_timestamp: i64, commit_number: i64) -> Expression {
Expression::struct_from([
Expression::struct_from([
Expand Down
Loading

0 comments on commit 7bcbb57

Please sign in to comment.