From ed714c53f6a2a0b8327bbf479e93c3a35525f436 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 9 Dec 2024 12:51:48 -0800 Subject: [PATCH] ScanFile expression and visitor for CDF (#546) ## What changes are proposed in this pull request? This PR introduces four concepts: - `cdf_scan_row_schema`: This is the schema that engine data will be transformed into at the end of the log replay phase. This schema prunes the log schema down only to the fields necessary to produce CDF columns. - `cdf_scan_row_expression`: This is a function that generates an expression to transform an engine data into the `cdf_scan_row_schema`. The function takes timestamp and commit number as arguments because it inserts these as columns into the output engine data. - `CDFScanFile`: This is a type that holds all the information needed to read a data file and generate its CDF rows. It holds path, deletion vector, the type of action, and the paired remove deletion vector. The action type is encoded as an enum `CDFScanFileType` - `CDFScanFileVisitor`: This is a visitor that reads engine data with the `cdf_scan_row_schema` and constructs `CDFScanFile`s. This PR is only for internal use, and is only expected to be used by `TableChangesScan::execute` when it is implemented. Engines must *not* use the visitor nor `CDFScanFile`. ## How was this change tested? I generate a table with add, remove and cdc actions. Then: - The table is read, - The engine data is transformed using `table_changes_action_iter` which in transforms the engine data into the `cdf_scan_row_schema` using the `cdf_scan_row_expression` - The transformed engine data is read again using the `CDFScanFileVisitor` and assert that the `CDFScanFile`s are as expected. This test checks the following cases: - A remove with `None` partition values. An empty hashmap for partition values should be used. - A remove with partition values. - An add/remove DV pair. This should place the correct remove dv into the add's CdfScanFile. - The visitor extracts the correct timestamp and commit version for each file. --- kernel/src/scan/state.rs | 4 +- kernel/src/table_changes/log_replay.rs | 32 +- kernel/src/table_changes/mod.rs | 1 + kernel/src/table_changes/scan_file.rs | 413 +++++++++++++++++++++++++ 4 files changed, 427 insertions(+), 23 deletions(-) create mode 100644 kernel/src/table_changes/scan_file.rs diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index 19534d0cd..cc55103b8 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -112,11 +112,11 @@ pub type ScanCallback = fn( /// ## Example /// ```ignore /// let mut context = [my context]; -/// for res in scan_data { // scan data from scan.get_scan_data() +/// for res in scan_data { // scan data from scan.scan_data() /// let (data, vector) = res?; /// context = delta_kernel::scan::state::visit_scan_files( /// data.as_ref(), -/// vector, +/// selection_vector, /// context, /// my_callback, /// )?; diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 993a89912..9c6cfe872 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -11,16 +11,17 @@ use crate::actions::{ PROTOCOL_NAME, REMOVE_NAME, }; use crate::engine_data::{GetData, TypedGetData}; -use crate::expressions::{column_expr, column_name, ColumnName, Expression}; +use crate::expressions::{column_name, ColumnName}; use crate::path::ParsedLogPath; use crate::scan::data_skipping::DataSkippingFilter; -use crate::scan::scan_row_schema; use crate::scan::state::DvInfo; use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType}; +use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema}; use crate::table_changes::{check_cdf_table_properties, ensure_cdf_read_supported}; use crate::table_properties::TableProperties; use crate::utils::require; use crate::{DeltaResult, Engine, EngineData, Error, ExpressionRef, RowVisitor}; + use itertools::Itertools; #[cfg(test)] @@ -36,7 +37,7 @@ pub(crate) struct TableChangesScanData { pub(crate) scan_data: Box, /// The selection vector used to filter the `scan_data`. pub(crate) selection_vector: Vec, - /// An map from a remove action's path to its deletion vector + /// A map from a remove action's path to its deletion vector pub(crate) remove_dvs: Arc>, } @@ -65,21 +66,6 @@ pub(crate) fn table_changes_action_iter( Ok(result) } -// Gets the expression for generating the engine data in [`TableChangesScanData`]. -// -// TODO: This expression is temporary. In the future it will also select `cdc` and `remove` actions -// fields. -fn add_transform_expr() -> Expression { - Expression::Struct(vec![ - column_expr!("add.path"), - column_expr!("add.size"), - column_expr!("add.modificationTime"), - column_expr!("add.stats"), - column_expr!("add.deletionVector"), - Expression::Struct(vec![column_expr!("add.partitionValues")]), - ]) -} - /// Processes a single commit file from the log to generate an iterator of [`TableChangesScanData`]. /// The scanner operates in two phases that _must_ be performed in the following order: /// 1. Prepare phase [`LogReplayScanner::try_new`]: This iterates over every action in the commit. @@ -237,7 +223,7 @@ impl LogReplayScanner { remove_dvs, commit_file, // TODO: Add the timestamp as a column with an expression - timestamp: _, + timestamp, } = self; let remove_dvs = Arc::new(remove_dvs); @@ -247,10 +233,14 @@ impl LogReplayScanner { schema, None, )?; + let commit_version = commit_file + .version + .try_into() + .map_err(|_| Error::generic("Failed to convert commit version to i64"))?; let evaluator = engine.get_expression_handler().get_evaluator( get_log_add_schema().clone(), - add_transform_expr(), - scan_row_schema().into(), + cdf_scan_row_expression(timestamp, commit_version), + cdf_scan_row_schema().into(), ); let result = action_iter.map(move |actions| -> DeltaResult<_> { diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index e0ad823f1..766866d25 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -17,6 +17,7 @@ use crate::{DeltaResult, Engine, Error, Version}; mod log_replay; pub mod scan; +mod scan_file; static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { [ diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs new file mode 100644 index 000000000..8003ec08f --- /dev/null +++ b/kernel/src/table_changes/scan_file.rs @@ -0,0 +1,413 @@ +//! This module handles [`CdfScanFile`]s for [`TableChangesScan`]. A [`CdfScanFile`] consists of all the +//! metadata required to generate a change data feed. [`CdfScanFile`] can be constructed using +//! [`CdfScanFileVisitor`]. The visitor reads from engine data with the schema [`cdf_scan_row_schema`]. +//! You can convert engine data to this schema using the [`cdf_scan_row_expression`]. +use itertools::Itertools; +use std::collections::HashMap; +use std::sync::{Arc, LazyLock}; + +use super::log_replay::TableChangesScanData; +use crate::actions::visitors::visit_deletion_vector_at; +use crate::engine_data::{GetData, TypedGetData}; +use crate::expressions::{column_expr, Expression}; +use crate::scan::state::DvInfo; +use crate::schema::{ + ColumnName, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType, +}; +use crate::utils::require; +use crate::{DeltaResult, Error, RowVisitor}; + +// The type of action associated with a [`CdfScanFile`]. +#[allow(unused)] +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum CdfScanFileType { + Add, + Remove, + Cdc, +} + +/// Represents all the metadata needed to read a Change Data Feed. +#[allow(unused)] +#[derive(Debug, PartialEq, Clone)] +pub(crate) struct CdfScanFile { + /// The type of action this file belongs to. This may be one of add, remove, or cdc + pub scan_type: CdfScanFileType, + /// A `&str` which is the path to the file + pub path: String, + /// A [`DvInfo`] struct with the path to the action's deletion vector + pub dv_info: DvInfo, + /// An optional [`DvInfo`] struct. If present, this is deletion vector of a remove action with + /// the same path as this [`CdfScanFile`] + pub remove_dv: Option, + /// A `HashMap` which are partition values + pub partition_values: HashMap, + /// The commit version that this action was performed in + pub commit_version: i64, + /// The timestamp of the commit that this action was performed in + pub commit_timestamp: i64, +} + +pub(crate) type CdfScanCallback = fn(context: &mut T, scan_file: CdfScanFile); + +/// Transforms an iterator of [`TableChangesScanData`] into an iterator of +/// [`CdfScanFile`] by visiting the engine data. +#[allow(unused)] +pub(crate) fn scan_data_to_scan_file( + scan_data: impl Iterator>, +) -> impl Iterator> { + scan_data + .map(|scan_data| -> DeltaResult<_> { + let scan_data = scan_data?; + let callback: CdfScanCallback> = + |context, scan_file| context.push(scan_file); + Ok(visit_cdf_scan_files(&scan_data, vec![], callback)?.into_iter()) + }) // Iterator-Result-Iterator + .flatten_ok() // Iterator-Result +} + +/// Request that the kernel call a callback on each valid file that needs to be read for the +/// scan. +/// +/// The arguments to the callback are: +/// * `context`: an `&mut context` argument. this can be anything that engine needs to pass through to each call +/// * `CdfScanFile`: a [`CdfScanFile`] struct that holds all the metadata required to perform Change Data +/// Feed +/// +/// ## Context +/// A note on the `context`. This can be any value the engine wants. This function takes ownership +/// of the passed arg, but then returns it, so the engine can repeatedly call `visit_cdf_scan_files` +/// with the same context. +/// +/// ## Example +/// ```ignore +/// let mut context = [my context]; +/// for res in scan_data { // scan data table_changes_scan.scan_data() +/// let (data, vector, remove_dv) = res?; +/// context = delta_kernel::table_changes::scan_file::visit_cdf_scan_files( +/// data.as_ref(), +/// selection_vector, +/// context, +/// my_callback, +/// )?; +/// } +/// ``` +#[allow(unused)] +pub(crate) fn visit_cdf_scan_files( + scan_data: &TableChangesScanData, + context: T, + callback: CdfScanCallback, +) -> DeltaResult { + let mut visitor = CdfScanFileVisitor { + callback, + context, + selection_vector: &scan_data.selection_vector, + remove_dvs: scan_data.remove_dvs.as_ref(), + }; + + visitor.visit_rows_of(scan_data.scan_data.as_ref())?; + Ok(visitor.context) +} + +/// A visitor that extracts [`CdfScanFile`]s from engine data. Expects data to have the schema +/// [`cdf_scan_row_schema`]. +#[allow(unused)] +struct CdfScanFileVisitor<'a, T> { + callback: CdfScanCallback, + selection_vector: &'a [bool], + remove_dvs: &'a HashMap, + context: T, +} + +impl RowVisitor for CdfScanFileVisitor<'_, T> { + fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { + require!( + getters.len() == 18, + Error::InternalError(format!( + "Wrong number of CdfScanFileVisitor getters: {}", + getters.len() + )) + ); + for row_index in 0..row_count { + if !self.selection_vector[row_index] { + continue; + } + + let (scan_type, path, deletion_vector, partition_values) = + if let Some(path) = getters[0].get_opt(row_index, "scanFile.add.path")? { + let scan_type = CdfScanFileType::Add; + let deletion_vector = visit_deletion_vector_at(row_index, &getters[1..=5])?; + let partition_values = getters[6] + .get_opt(row_index, "scanFile.add.fileConstantValues.partitionValues")?; + (scan_type, path, deletion_vector, partition_values) + } else if let Some(path) = getters[7].get_opt(row_index, "scanFile.remove.path")? { + let scan_type = CdfScanFileType::Remove; + let deletion_vector = visit_deletion_vector_at(row_index, &getters[8..=12])?; + let partition_values = getters[13].get_opt( + row_index, + "scanFile.remove.fileConstantValues.partitionValues", + )?; + (scan_type, path, deletion_vector, partition_values) + } else if let Some(path) = getters[14].get_opt(row_index, "scanFile.cdc.path")? { + let scan_type = CdfScanFileType::Cdc; + let partition_values = getters[15] + .get_opt(row_index, "scanFile.cdc.fileConstantValues.partitionValues")?; + (scan_type, path, None, partition_values) + } else { + continue; + }; + let partition_values = partition_values.unwrap_or_else(Default::default); + let scan_file = CdfScanFile { + remove_dv: self.remove_dvs.get(&path).cloned(), + scan_type, + path, + dv_info: DvInfo { deletion_vector }, + partition_values, + commit_timestamp: getters[16].get(row_index, "scanFile.timestamp")?, + commit_version: getters[17].get(row_index, "scanFile.commit_version")?, + }; + (self.callback)(&mut self.context, scan_file) + } + Ok(()) + } + + fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) { + static NAMES_AND_TYPES: LazyLock = + LazyLock::new(|| cdf_scan_row_schema().leaves(None)); + NAMES_AND_TYPES.as_ref() + } +} + +/// Get the schema that scan rows (from [`TableChanges::scan_data`]) will be returned with. +pub(crate) fn cdf_scan_row_schema() -> SchemaRef { + static CDF_SCAN_ROW_SCHEMA: LazyLock> = LazyLock::new(|| { + let deletion_vector = StructType::new([ + StructField::new("storageType", DataType::STRING, true), + StructField::new("pathOrInlineDv", DataType::STRING, true), + StructField::new("offset", DataType::INTEGER, true), + StructField::new("sizeInBytes", DataType::INTEGER, true), + StructField::new("cardinality", DataType::LONG, true), + ]); + let partition_values = MapType::new(DataType::STRING, DataType::STRING, true); + let file_constant_values = + StructType::new([StructField::new("partitionValues", partition_values, true)]); + + let add = StructType::new([ + StructField::new("path", DataType::STRING, true), + StructField::new("deletionVector", deletion_vector.clone(), true), + StructField::new("fileConstantValues", file_constant_values.clone(), true), + ]); + let remove = StructType::new([ + StructField::new("path", DataType::STRING, true), + StructField::new("deletionVector", deletion_vector, true), + StructField::new("fileConstantValues", file_constant_values.clone(), true), + ]); + let cdc = StructType::new([ + StructField::new("path", DataType::STRING, true), + StructField::new("fileConstantValues", file_constant_values, true), + ]); + + Arc::new(StructType::new([ + StructField::new("add", add, true), + StructField::new("remove", remove, true), + StructField::new("cdc", cdc, true), + StructField::new("timestamp", DataType::LONG, false), + StructField::new("commit_version", DataType::LONG, false), + ])) + }); + CDF_SCAN_ROW_SCHEMA.clone() +} + +/// Expression to convert an action with `log_schema` into one with +/// [`cdf_scan_row_schema`]. This is the expression used to create [`TableChangesScanData`]. +#[allow(unused)] +pub(crate) fn cdf_scan_row_expression(commit_timestamp: i64, commit_number: i64) -> Expression { + Expression::struct_from([ + Expression::struct_from([ + column_expr!("add.path"), + column_expr!("add.deletionVector"), + Expression::struct_from([column_expr!("add.partitionValues")]), + ]), + Expression::struct_from([ + column_expr!("remove.path"), + column_expr!("remove.deletionVector"), + Expression::struct_from([column_expr!("remove.partitionValues")]), + ]), + Expression::struct_from([ + column_expr!("cdc.path"), + Expression::struct_from([column_expr!("cdc.partitionValues")]), + ]), + commit_timestamp.into(), + commit_number.into(), + ]) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use itertools::Itertools; + + use super::{scan_data_to_scan_file, CdfScanFile, CdfScanFileType}; + use crate::actions::deletion_vector::DeletionVectorDescriptor; + use crate::actions::{Add, Cdc, Remove}; + use crate::engine::sync::SyncEngine; + use crate::log_segment::LogSegment; + use crate::scan::state::DvInfo; + use crate::schema::{DataType, StructField, StructType}; + use crate::table_changes::log_replay::table_changes_action_iter; + use crate::utils::test_utils::{Action, LocalMockTable}; + use crate::Engine; + + #[tokio::test] + async fn test_scan_file_visiting() { + let engine = SyncEngine::new(); + let mut mock_table = LocalMockTable::new(); + + let dv_info = DeletionVectorDescriptor { + storage_type: "u".to_string(), + path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(), + offset: Some(1), + size_in_bytes: 36, + cardinality: 2, + }; + let add_partition_values = HashMap::from([("a".to_string(), "b".to_string())]); + let add_paired = Add { + path: "fake_path_1".into(), + deletion_vector: Some(dv_info.clone()), + partition_values: add_partition_values, + data_change: true, + ..Default::default() + }; + let remove_paired = Remove { + path: "fake_path_1".into(), + deletion_vector: None, + partition_values: None, + data_change: true, + ..Default::default() + }; + + let rm_dv = DeletionVectorDescriptor { + storage_type: "u".to_string(), + path_or_inline_dv: "U5OWRz5k%CFT.Td}yCPW".to_string(), + offset: Some(1), + size_in_bytes: 38, + cardinality: 3, + }; + let rm_partition_values = Some(HashMap::from([("c".to_string(), "d".to_string())])); + let remove = Remove { + path: "fake_path_2".into(), + deletion_vector: Some(rm_dv), + partition_values: rm_partition_values, + data_change: true, + ..Default::default() + }; + + let cdc_partition_values = HashMap::from([("x".to_string(), "y".to_string())]); + let cdc = Cdc { + path: "fake_path_3".into(), + partition_values: cdc_partition_values, + ..Default::default() + }; + + let remove_no_partition = Remove { + path: "fake_path_2".into(), + deletion_vector: None, + partition_values: None, + data_change: true, + ..Default::default() + }; + + mock_table + .commit([ + Action::Remove(remove_paired.clone()), + Action::Add(add_paired.clone()), + Action::Remove(remove.clone()), + ]) + .await; + mock_table.commit([Action::Cdc(cdc.clone())]).await; + mock_table + .commit([Action::Remove(remove_no_partition.clone())]) + .await; + + let table_root = url::Url::from_directory_path(mock_table.table_root()).unwrap(); + let log_root = table_root.join("_delta_log/").unwrap(); + let log_segment = LogSegment::for_table_changes( + engine.get_file_system_client().as_ref(), + log_root, + 0, + None, + ) + .unwrap(); + let table_schema = StructType::new([ + StructField::new("id", DataType::INTEGER, true), + StructField::new("value", DataType::STRING, true), + ]); + let scan_data = table_changes_action_iter( + Arc::new(engine), + log_segment.ascending_commit_files.clone(), + table_schema.into(), + None, + ) + .unwrap(); + let scan_files: Vec<_> = scan_data_to_scan_file(scan_data).try_collect().unwrap(); + + // Generate the expected [`CdfScanFile`] + let timestamps = log_segment + .ascending_commit_files + .iter() + .map(|commit| commit.location.last_modified) + .collect_vec(); + let expected_remove_dv = DvInfo { + deletion_vector: None, + }; + let expected_scan_files = vec![ + CdfScanFile { + scan_type: CdfScanFileType::Add, + path: add_paired.path, + dv_info: DvInfo { + deletion_vector: add_paired.deletion_vector, + }, + partition_values: add_paired.partition_values, + commit_version: 0, + commit_timestamp: timestamps[0], + remove_dv: Some(expected_remove_dv), + }, + CdfScanFile { + scan_type: CdfScanFileType::Remove, + path: remove.path, + dv_info: DvInfo { + deletion_vector: remove.deletion_vector, + }, + partition_values: remove.partition_values.unwrap(), + commit_version: 0, + commit_timestamp: timestamps[0], + remove_dv: None, + }, + CdfScanFile { + scan_type: CdfScanFileType::Cdc, + path: cdc.path, + dv_info: DvInfo { + deletion_vector: None, + }, + partition_values: cdc.partition_values, + commit_version: 1, + commit_timestamp: timestamps[1], + remove_dv: None, + }, + CdfScanFile { + scan_type: CdfScanFileType::Remove, + path: remove_no_partition.path, + dv_info: DvInfo { + deletion_vector: None, + }, + partition_values: HashMap::new(), + commit_version: 2, + commit_timestamp: timestamps[2], + remove_dv: None, + }, + ]; + + assert_eq!(scan_files, expected_scan_files); + } +}