diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index df8f1813b..c7567a087 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -145,7 +145,6 @@ impl LogSegment { /// `start_version` and `end_version`: Its LogSegment is made of zero checkpoints and all commits /// between versions `start_version` (inclusive) and `end_version` (inclusive). If no `end_version` /// is specified it will be the most recent version by default. - #[allow(unused)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) fn for_table_changes( fs_client: &dyn FileSystemClient, diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 7cf9a01ae..9ccfa8a76 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -277,7 +277,7 @@ pub struct ScanResult { pub raw_data: DeltaResult>, /// Raw row mask. // TODO(nick) this should be allocated by the engine - raw_mask: Option>, + pub(crate) raw_mask: Option>, } impl ScanResult { diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 0deb24682..89951a39b 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -28,7 +28,6 @@ use itertools::Itertools; mod tests; /// Scan data for a Change Data Feed query. This holds metadata that is needed to read data rows. -#[allow(unused)] pub(crate) struct TableChangesScanData { /// Engine data with the schema defined in [`scan_row_schema`] /// @@ -127,7 +126,6 @@ struct LogReplayScanner { // // Note: This will be used once an expression is introduced to transform the engine data in // [`TableChangesScanData`] - #[allow(unused)] timestamp: i64, } diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index a5938082b..2c15bd537 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -178,7 +178,6 @@ impl TableChanges { &self.table_root } /// The partition columns that will be read. - #[allow(unused)] pub(crate) fn partition_columns(&self) -> &Vec { &self.end_snapshot.metadata().partition_columns } diff --git a/kernel/src/table_changes/physical_to_logical.rs b/kernel/src/table_changes/physical_to_logical.rs index 7232e2cf8..bc8488081 100644 --- a/kernel/src/table_changes/physical_to_logical.rs +++ b/kernel/src/table_changes/physical_to_logical.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::iter; use itertools::Itertools; @@ -15,7 +14,6 @@ use super::{ }; /// Returns a map from change data feed column name to an expression that generates the row data. -#[allow(unused)] fn get_cdf_columns(scan_file: &CdfScanFile) -> DeltaResult> { let timestamp = Scalar::timestamp_ntz_from_millis(scan_file.commit_timestamp)?; let version = scan_file.commit_version; @@ -34,8 +32,7 @@ fn get_cdf_columns(scan_file: &CdfScanFile) -> DeltaResult SchemaRef { +pub(crate) fn scan_file_physical_schema( + scan_file: &CdfScanFile, + physical_schema: &StructType, +) -> SchemaRef { if scan_file.scan_type == CdfScanFileType::Cdc { let change_type = StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false); - let fields = read_schema.fields().cloned().chain(iter::once(change_type)); + let fields = physical_schema.fields().cloned().chain(Some(change_type)); StructType::new(fields).into() } else { - read_schema.clone().into() + physical_schema.clone().into() } } diff --git a/kernel/src/table_changes/resolve_dvs.rs b/kernel/src/table_changes/resolve_dvs.rs index caa2cf310..85186a836 100644 --- a/kernel/src/table_changes/resolve_dvs.rs +++ b/kernel/src/table_changes/resolve_dvs.rs @@ -7,16 +7,15 @@ use crate::{DeltaResult, Engine, Error}; /// A [`CdfScanFile`] with its associated `selection_vector`. The `scan_type` is resolved to /// match the `_change_type` that its rows will have in the change data feed. -#[allow(unused)] -struct ResolvedCdfScanFile { +pub(crate) struct ResolvedCdfScanFile { /// The scan file that holds the path the data file to be read. The `scan_type` field is /// resolved to the `_change_type` of the rows for this data file. - scan_file: CdfScanFile, + pub(crate) scan_file: CdfScanFile, /// Optional vector of bools. If `selection_vector[i] = true`, then that row must be included /// in the CDF output. Otherwise the row must be filtered out. The vector may be shorter than /// the data file. In this case, all the remaining rows are *not* selected. If `selection_vector` /// is `None`, then all rows are selected. - selection_vector: Option>, + pub(crate) selection_vector: Option>, } /// Resolves the deletion vectors for a [`CdfScanFile`]. This function handles two @@ -33,8 +32,7 @@ struct ResolvedCdfScanFile { /// 2. The second case handles all other add, remove, and cdc [`CdfScanFile`]s. These will simply /// read the deletion vector (if present), and each is converted into a [`ResolvedCdfScanFile`]. /// No changes are made to the `scan_type`. -#[allow(unused)] -fn resolve_scan_file_dv( +pub(crate) fn resolve_scan_file_dv( engine: &dyn Engine, table_root: &Url, scan_file: CdfScanFile, diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 92d20ff3d..b40eaa4c6 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -2,18 +2,22 @@ use std::sync::Arc; use itertools::Itertools; use tracing::debug; +use url::Url; +use crate::actions::deletion_vector::split_vector; use crate::scan::state::GlobalScanState; -use crate::scan::{ColumnType, PhysicalPredicate}; +use crate::scan::{ColumnType, PhysicalPredicate, ScanResult}; use crate::schema::{SchemaRef, StructType}; -use crate::{DeltaResult, Engine, ExpressionRef}; +use crate::{DeltaResult, Engine, ExpressionRef, FileMeta}; use super::log_replay::{table_changes_action_iter, TableChangesScanData}; +use super::physical_to_logical::{physical_to_logical_expr, scan_file_physical_schema}; +use super::resolve_dvs::{resolve_scan_file_dv, ResolvedCdfScanFile}; +use super::scan_file::scan_data_to_scan_file; use super::{TableChanges, CDF_FIELDS}; /// The result of building a [`TableChanges`] scan over a table. This can be used to get a change /// data feed from the table -#[allow(unused)] #[derive(Debug)] pub struct TableChangesScan { // The [`TableChanges`] that specifies this scan's start and end versions @@ -28,9 +32,7 @@ pub struct TableChangesScan { // The predicate to filter the data physical_predicate: PhysicalPredicate, // The [`ColumnType`] of all the fields in the `logical_schema` - all_fields: Vec, - // `true` if any column in the `logical_schema` is a partition column - have_partition_cols: bool, + all_fields: Arc>, } /// This builder constructs a [`TableChangesScan`] that can be used to read the [`TableChanges`] @@ -115,7 +117,6 @@ impl TableChangesScanBuilder { let logical_schema = self .schema .unwrap_or_else(|| self.table_changes.schema.clone().into()); - let mut have_partition_cols = false; let mut read_fields = Vec::with_capacity(logical_schema.fields.len()); // Loop over all selected fields. We produce the following: @@ -138,7 +139,6 @@ impl TableChangesScanBuilder { // Store the index into the schema for this field. When we turn it into an // expression in the inner loop, we will index into the schema and get the name and // data type, which we need to properly materialize the column. - have_partition_cols = true; Ok(ColumnType::Partition(index)) } else if CDF_FIELDS .iter() @@ -168,8 +168,7 @@ impl TableChangesScanBuilder { table_changes: self.table_changes, logical_schema, physical_predicate, - all_fields, - have_partition_cols, + all_fields: Arc::new(all_fields), physical_schema: StructType::new(read_fields).into(), }) } @@ -181,7 +180,6 @@ impl TableChangesScan { /// necessary to read CDF. Additionally, [`TableChangesScanData`] holds metadata on the /// deletion vectors present in the commit. The engine data in each scan data is guaranteed /// to belong to the same commit. Several [`TableChangesScanData`] may belong to the same commit. - #[allow(unused)] fn scan_data( &self, engine: Arc, @@ -204,7 +202,6 @@ impl TableChangesScan { /// Get global state that is valid for the entire scan. This is somewhat expensive so should /// only be called once per scan. - #[allow(unused)] fn global_scan_state(&self) -> GlobalScanState { let end_snapshot = &self.table_changes.end_snapshot; GlobalScanState { @@ -215,6 +212,109 @@ impl TableChangesScan { column_mapping_mode: end_snapshot.column_mapping_mode, } } + + /// Get the predicate [`Expression`] of the scan. + fn physical_predicate(&self) -> Option { + if let PhysicalPredicate::Some(ref predicate, _) = self.physical_predicate { + Some(predicate.clone()) + } else { + None + } + } + + /// Perform an "all in one" scan to get the change data feed. This will use the provided `engine` + /// to read and process all the data for the query. Each [`ScanResult`] in the resultant iterator + /// encapsulates the raw data and an optional boolean vector built from the deletion vector if it + /// was present. See the documentation for [`ScanResult`] for more details. + pub fn execute( + &self, + engine: Arc, + ) -> DeltaResult>> { + let scan_data = self.scan_data(engine.clone())?; + let scan_files = scan_data_to_scan_file(scan_data); + + let global_scan_state = self.global_scan_state(); + let table_root = self.table_changes.table_root().clone(); + let all_fields = self.all_fields.clone(); + let physical_predicate = self.physical_predicate(); + let dv_engine_ref = engine.clone(); + + let result = scan_files + .map(move |scan_file| { + resolve_scan_file_dv(dv_engine_ref.as_ref(), &table_root, scan_file?) + }) // Iterator-Result-Iterator + .flatten_ok() // Iterator-Result + .map(move |resolved_scan_file| -> DeltaResult<_> { + read_scan_file( + engine.as_ref(), + resolved_scan_file?, + &global_scan_state, + &all_fields, + physical_predicate.clone(), + ) + }) // Iterator-Result-Iterator-Result + .flatten_ok() // Iterator-Result-Result + .map(|x| x?); // Iterator-Result + + Ok(result) + } +} + +/// Reads the data at the `resolved_scan_file` and transforms the data from physical to logical. +/// The result is a fallible iterator of [`ScanResult`] containing the logical data. +fn read_scan_file( + engine: &dyn Engine, + resolved_scan_file: ResolvedCdfScanFile, + global_state: &GlobalScanState, + all_fields: &[ColumnType], + physical_predicate: Option, +) -> DeltaResult>> { + let ResolvedCdfScanFile { + scan_file, + mut selection_vector, + } = resolved_scan_file; + + let physical_to_logical_expr = + physical_to_logical_expr(&scan_file, global_state.logical_schema.as_ref(), all_fields)?; + let physical_schema = scan_file_physical_schema(&scan_file, global_state.read_schema.as_ref()); + let phys_to_logical_eval = engine.get_expression_handler().get_evaluator( + physical_schema.clone(), + physical_to_logical_expr, + global_state.logical_schema.clone().into(), + ); + + let table_root = Url::parse(&global_state.table_root)?; + let location = table_root.join(&scan_file.path)?; + let file = FileMeta { + last_modified: 0, + size: 0, + location, + }; + let read_result_iter = engine.get_parquet_handler().read_parquet_files( + &[file], + physical_schema, + physical_predicate, + )?; + + let result = read_result_iter.map(move |batch| -> DeltaResult<_> { + let batch = batch?; + // to transform the physical data into the correct logical form + let logical = phys_to_logical_eval.evaluate(batch.as_ref()); + let len = logical.as_ref().map_or(0, |res| res.len()); + // need to split the dv_mask. what's left in dv_mask covers this result, and rest + // will cover the following results. we `take()` out of `selection_vector` to avoid + // trying to return a captured variable. We're going to reassign `selection_vector` + // to `rest` in a moment anyway + let mut sv = selection_vector.take(); + let rest = split_vector(sv.as_mut(), len, None); + let result = ScanResult { + raw_data: logical, + raw_mask: sv, + }; + selection_vector = rest; + Ok(result) + }); + Ok(result) } #[cfg(test)] @@ -248,9 +348,9 @@ mod tests { ColumnType::Selected("_commit_version".to_string()), ColumnType::Selected("_commit_timestamp".to_string()), ] + .into() ); assert_eq!(scan.physical_predicate, PhysicalPredicate::None); - assert!(!scan.have_partition_cols); } #[test] @@ -279,6 +379,7 @@ mod tests { ColumnType::Selected("id".to_string()), ColumnType::Selected("_commit_version".to_string()), ] + .into() ); assert_eq!( scan.logical_schema, @@ -288,7 +389,6 @@ mod tests { ]) .into() ); - assert!(!scan.have_partition_cols); assert_eq!( scan.physical_predicate, PhysicalPredicate::Some( diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index 8003ec08f..cc4514186 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -18,7 +18,6 @@ use crate::utils::require; use crate::{DeltaResult, Error, RowVisitor}; // The type of action associated with a [`CdfScanFile`]. -#[allow(unused)] #[derive(Debug, Clone, PartialEq)] pub(crate) enum CdfScanFileType { Add, @@ -27,7 +26,6 @@ pub(crate) enum CdfScanFileType { } /// Represents all the metadata needed to read a Change Data Feed. -#[allow(unused)] #[derive(Debug, PartialEq, Clone)] pub(crate) struct CdfScanFile { /// The type of action this file belongs to. This may be one of add, remove, or cdc @@ -51,7 +49,6 @@ pub(crate) type CdfScanCallback = fn(context: &mut T, scan_file: CdfScanFile) /// Transforms an iterator of [`TableChangesScanData`] into an iterator of /// [`CdfScanFile`] by visiting the engine data. -#[allow(unused)] pub(crate) fn scan_data_to_scan_file( scan_data: impl Iterator>, ) -> impl Iterator> { @@ -91,7 +88,6 @@ pub(crate) fn scan_data_to_scan_file( /// )?; /// } /// ``` -#[allow(unused)] pub(crate) fn visit_cdf_scan_files( scan_data: &TableChangesScanData, context: T, @@ -110,7 +106,6 @@ pub(crate) fn visit_cdf_scan_files( /// A visitor that extracts [`CdfScanFile`]s from engine data. Expects data to have the schema /// [`cdf_scan_row_schema`]. -#[allow(unused)] struct CdfScanFileVisitor<'a, T> { callback: CdfScanCallback, selection_vector: &'a [bool], @@ -219,7 +214,6 @@ pub(crate) fn cdf_scan_row_schema() -> SchemaRef { /// Expression to convert an action with `log_schema` into one with /// [`cdf_scan_row_schema`]. This is the expression used to create [`TableChangesScanData`]. -#[allow(unused)] pub(crate) fn cdf_scan_row_expression(commit_timestamp: i64, commit_number: i64) -> Expression { Expression::struct_from([ Expression::struct_from([ diff --git a/kernel/tests/cdf.rs b/kernel/tests/cdf.rs new file mode 100644 index 000000000..c93b22d24 --- /dev/null +++ b/kernel/tests/cdf.rs @@ -0,0 +1,156 @@ +use std::{error, sync::Arc}; + +use arrow::compute::filter_record_batch; +use arrow_array::RecordBatch; +use delta_kernel::engine::sync::SyncEngine; +use itertools::Itertools; + +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::{DeltaResult, Table, Version}; + +mod common; +use common::{load_test_data, to_arrow}; + +fn read_cdf_for_table( + test_name: impl AsRef, + start_version: Version, + end_version: impl Into>, +) -> DeltaResult> { + let test_dir = load_test_data("tests/data", test_name.as_ref()).unwrap(); + let test_path = test_dir.path().join(test_name.as_ref()); + let table = Table::try_from_uri(test_path.to_str().expect("table path to string")).unwrap(); + let engine = Arc::new(SyncEngine::new()); + let table_changes = table.table_changes(engine.as_ref(), start_version, end_version)?; + + // Project out the commit timestamp since file modification time may change anytime git clones + // or switches branches + let names = table_changes + .schema() + .fields() + .map(|field| field.name()) + .filter(|name| *name != "_commit_timestamp") + .collect_vec(); + let schema = table_changes.schema().project(&names)?; + let scan = table_changes + .into_scan_builder() + .with_schema(schema) + .build()?; + let batches: Vec = scan + .execute(engine)? + .map(|scan_result| -> DeltaResult<_> { + let scan_result = scan_result?; + let mask = scan_result.full_mask(); + let data = scan_result.raw_data?; + let record_batch = to_arrow(data)?; + match mask { + Some(mask) => Ok(filter_record_batch(&record_batch, &mask.into())?), + None => Ok(record_batch), + } + }) + .try_collect()?; + Ok(batches) +} + +#[test] +fn cdf_with_deletion_vector() -> Result<(), Box> { + let batches = read_cdf_for_table("cdf-table-with-dv", 0, None)?; + let mut expected = vec![ + "+-------+--------------+-----------------+", + "| value | _change_type | _commit_version |", + "+-------+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 8 | insert | 0 |", + "| 7 | insert | 0 |", + "| 9 | insert | 0 |", + "| 0 | delete | 1 |", + "| 9 | delete | 1 |", + "| 0 | insert | 2 |", + "| 9 | insert | 2 |", + "+-------+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn basic_cdf() -> Result<(), Box> { + let batches = read_cdf_for_table("cdf-table", 0, None)?; + let mut expected = vec![ + "+----+--------+------------+------------------+-----------------+", + "| id | name | birthday | _change_type | _commit_version |", + "+----+--------+------------+------------------+-----------------+", + "| 1 | Steve | 2023-12-22 | insert | 0 |", + "| 2 | Bob | 2023-12-23 | insert | 0 |", + "| 3 | Dave | 2023-12-23 | insert | 0 |", + "| 4 | Kate | 2023-12-23 | insert | 0 |", + "| 5 | Emily | 2023-12-24 | insert | 0 |", + "| 6 | Carl | 2023-12-24 | insert | 0 |", + "| 7 | Dennis | 2023-12-24 | insert | 0 |", + "| 8 | Claire | 2023-12-25 | insert | 0 |", + "| 9 | Ada | 2023-12-25 | insert | 0 |", + "| 10 | Borb | 2023-12-25 | insert | 0 |", + "| 3 | Dave | 2023-12-22 | update_postimage | 1 |", + "| 3 | Dave | 2023-12-23 | update_preimage | 1 |", + "| 4 | Kate | 2023-12-22 | update_postimage | 1 |", + "| 4 | Kate | 2023-12-23 | update_preimage | 1 |", + "| 2 | Bob | 2023-12-22 | update_postimage | 1 |", + "| 2 | Bob | 2023-12-23 | update_preimage | 1 |", + "| 7 | Dennis | 2023-12-24 | update_preimage | 2 |", + "| 7 | Dennis | 2023-12-29 | update_postimage | 2 |", + "| 5 | Emily | 2023-12-24 | update_preimage | 2 |", + "| 5 | Emily | 2023-12-29 | update_postimage | 2 |", + "| 6 | Carl | 2023-12-24 | update_preimage | 2 |", + "| 6 | Carl | 2023-12-29 | update_postimage | 2 |", + "| 7 | Dennis | 2023-12-29 | delete | 3 |", + "+----+--------+------------+------------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn cdf_non_partitioned() -> Result<(), Box> { + let batches = read_cdf_for_table("cdf-table-non-partitioned", 0, None)?; + let mut expected = vec![ + "+----+--------+------------+-------------------+---------------+--------------+----------------+------------------+-----------------+", + "| id | name | birthday | long_field | boolean_field | double_field | smallint_field | _change_type | _commit_version |", + "+----+--------+------------+-------------------+---------------+--------------+----------------+------------------+-----------------+", + "| 1 | Steve | 2024-04-14 | 1 | true | 3.14 | 1 | insert | 0 |", + "| 2 | Bob | 2024-04-15 | 1 | true | 3.14 | 1 | insert | 0 |", + "| 3 | Dave | 2024-04-15 | 2 | true | 3.14 | 1 | insert | 0 |", + "| 4 | Kate | 2024-04-15 | 3 | true | 3.14 | 1 | insert | 0 |", + "| 5 | Emily | 2024-04-16 | 4 | true | 3.14 | 1 | insert | 0 |", + "| 6 | Carl | 2024-04-16 | 5 | true | 3.14 | 1 | insert | 0 |", + "| 7 | Dennis | 2024-04-16 | 6 | true | 3.14 | 1 | insert | 0 |", + "| 8 | Claire | 2024-04-17 | 7 | true | 3.14 | 1 | insert | 0 |", + "| 9 | Ada | 2024-04-17 | 8 | true | 3.14 | 1 | insert | 0 |", + "| 10 | Borb | 2024-04-17 | 99999999999999999 | true | 3.14 | 1 | insert | 0 |", + "| 3 | Dave | 2024-04-15 | 2 | true | 3.14 | 1 | update_preimage | 1 |", + "| 3 | Dave | 2024-04-14 | 2 | true | 3.14 | 1 | update_postimage | 1 |", + "| 4 | Kate | 2024-04-15 | 3 | true | 3.14 | 1 | update_preimage | 1 |", + "| 4 | Kate | 2024-04-14 | 3 | true | 3.14 | 1 | update_postimage | 1 |", + "| 2 | Bob | 2024-04-15 | 1 | true | 3.14 | 1 | update_preimage | 1 |", + "| 2 | Bob | 2024-04-14 | 1 | true | 3.14 | 1 | update_postimage | 1 |", + "| 7 | Dennis | 2024-04-16 | 6 | true | 3.14 | 1 | update_preimage | 2 |", + "| 7 | Dennis | 2024-04-14 | 6 | true | 3.14 | 1 | update_postimage | 2 |", + "| 5 | Emily | 2024-04-16 | 4 | true | 3.14 | 1 | update_preimage | 2 |", + "| 5 | Emily | 2024-04-14 | 4 | true | 3.14 | 1 | update_postimage | 2 |", + "| 6 | Carl | 2024-04-16 | 5 | true | 3.14 | 1 | update_preimage | 2 |", + "| 6 | Carl | 2024-04-14 | 5 | true | 3.14 | 1 | update_postimage | 2 |", + "| 7 | Dennis | 2024-04-14 | 6 | true | 3.14 | 1 | delete | 3 |", + "| 1 | Alex | 2024-04-14 | 1 | true | 3.14 | 1 | insert | 4 |", + "| 2 | Alan | 2024-04-15 | 1 | true | 3.14 | 1 | insert | 4 |", + "+----+--------+------------+-------------------+---------------+--------------+----------------+------------------+-----------------+" + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} diff --git a/kernel/tests/common/mod.rs b/kernel/tests/common/mod.rs index 8eba56248..a918695b7 100644 --- a/kernel/tests/common/mod.rs +++ b/kernel/tests/common/mod.rs @@ -9,6 +9,50 @@ use delta_kernel::{DeltaResult, Engine, EngineData, Table}; use std::sync::Arc; +#[macro_export] +macro_rules! sort_lines { + ($lines: expr) => {{ + // sort except for header + footer + let num_lines = $lines.len(); + if num_lines > 3 { + $lines.as_mut_slice()[2..num_lines - 1].sort_unstable() + } + }}; +} + +// NB: expected_lines_sorted MUST be pre-sorted (via sort_lines!()) +#[macro_export] +macro_rules! assert_batches_sorted_eq { + ($expected_lines_sorted: expr, $CHUNKS: expr) => { + let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS) + .unwrap() + .to_string(); + // fix for windows: \r\n --> + let mut actual_lines: Vec<&str> = formatted.trim().lines().collect(); + sort_lines!(actual_lines); + assert_eq!( + $expected_lines_sorted, actual_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + $expected_lines_sorted, actual_lines + ); + }; +} + +/// unpack the test data from {test_parent_dir}/{test_name}.tar.zst into a temp dir, and return the dir it was +/// unpacked into +#[allow(unused)] +pub(crate) fn load_test_data( + test_parent_dir: &str, + test_name: &str, +) -> Result> { + let path = format!("{test_parent_dir}/{test_name}.tar.zst"); + let tar = zstd::Decoder::new(std::fs::File::open(path)?)?; + let mut archive = tar::Archive::new(tar); + let temp_dir = tempfile::tempdir()?; + archive.unpack(temp_dir.path())?; + Ok(temp_dir) +} + pub(crate) fn to_arrow(data: Box) -> DeltaResult { Ok(data .into_any() diff --git a/kernel/tests/data/cdf-table-non-partitioned.tar.zst b/kernel/tests/data/cdf-table-non-partitioned.tar.zst new file mode 100644 index 000000000..2a2f08cf0 Binary files /dev/null and b/kernel/tests/data/cdf-table-non-partitioned.tar.zst differ diff --git a/kernel/tests/data/cdf-table-with-dv.tar.zst b/kernel/tests/data/cdf-table-with-dv.tar.zst new file mode 100644 index 000000000..aaef3a3eb Binary files /dev/null and b/kernel/tests/data/cdf-table-with-dv.tar.zst differ diff --git a/kernel/tests/data/cdf-table.tar.zst b/kernel/tests/data/cdf-table.tar.zst new file mode 100644 index 000000000..30868b12c Binary files /dev/null and b/kernel/tests/data/cdf-table.tar.zst differ diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index a5a1debff..1d0c8406b 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -24,18 +24,7 @@ use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; mod common; -use common::to_arrow; - -/// unpack the test data from test_table.tar.zst into a temp dir, and return the dir it was -/// unpacked into -fn load_test_data(test_name: &str) -> Result> { - let path = format!("tests/golden_data/{}.tar.zst", test_name); - let tar = zstd::Decoder::new(std::fs::File::open(path)?)?; - let mut archive = tar::Archive::new(tar); - let temp_dir = tempfile::tempdir()?; - archive.unpack(temp_dir.path())?; - Ok(temp_dir) -} +use common::{load_test_data, to_arrow}; // NB adapated from DAT: read all parquet files in the directory and concatenate them async fn read_expected(path: &Path) -> DeltaResult { @@ -218,7 +207,7 @@ fn setup_golden_table( Option, tempfile::TempDir, ) { - let test_dir = load_test_data(test_name).unwrap(); + let test_dir = load_test_data("tests/golden_data", test_name).unwrap(); let test_path = test_dir.path().join(test_name); let table_path = test_path.join("delta"); let table = Table::try_from_uri(table_path.to_str().expect("table path to string")) diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index a0a8160c1..7a674ce57 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -318,33 +318,6 @@ async fn stats() -> Result<(), Box> { Ok(()) } -macro_rules! sort_lines { - ($lines: expr) => {{ - // sort except for header + footer - let num_lines = $lines.len(); - if num_lines > 3 { - $lines.as_mut_slice()[2..num_lines - 1].sort_unstable() - } - }}; -} - -// NB: expected_lines_sorted MUST be pre-sorted (via sort_lines!()) -macro_rules! assert_batches_sorted_eq { - ($expected_lines_sorted: expr, $CHUNKS: expr) => { - let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS) - .unwrap() - .to_string(); - // fix for windows: \r\n --> - let mut actual_lines: Vec<&str> = formatted.trim().lines().collect(); - sort_lines!(actual_lines); - assert_eq!( - $expected_lines_sorted, actual_lines, - "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", - $expected_lines_sorted, actual_lines - ); - }; -} - fn read_with_execute( engine: Arc, scan: &Scan,