From 7bcbb575dd28b3e3c4f412977832a81506213358 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 10 Dec 2024 16:31:03 -0800 Subject: [PATCH] TableChangesScan::execute and end to end testing for CDF (#580) ## What changes are proposed in this pull request? This PR introduces the `execute` method to `TableChangesScan`, which performs a CDF scan and returns `ScanResult` the change data feed. The `ScanResult` holds engine data and a selection vector to apply to the rows of the data. A helper method `read_scan_file` is added to read the rows of a `scan_file` and perform physical to logical transformation. The macros `sort_lines` and `asert_batches_sorted_eq` are moved to the `common` crate so they can be shared between the `read.rs` and new `cdf.rs` integration tests. The `cdf-table` and `cdf-table-non-partitioned` test tables are from the delta-rs project. ## How was this change tested? This tests data reading for 3 tables with the following features: - a table with deletion vectors involving deletion and restoration - a non-partitioned table - a partitioned table --- kernel/src/log_segment.rs | 1 - kernel/src/scan/mod.rs | 2 +- kernel/src/table_changes/log_replay.rs | 2 - kernel/src/table_changes/mod.rs | 1 - .../src/table_changes/physical_to_logical.rs | 15 +- kernel/src/table_changes/resolve_dvs.rs | 10 +- kernel/src/table_changes/scan.rs | 128 ++++++++++++-- kernel/src/table_changes/scan_file.rs | 6 - kernel/tests/cdf.rs | 156 ++++++++++++++++++ kernel/tests/common/mod.rs | 44 +++++ .../data/cdf-table-non-partitioned.tar.zst | Bin 0 -> 11152 bytes kernel/tests/data/cdf-table-with-dv.tar.zst | Bin 0 -> 1915 bytes kernel/tests/data/cdf-table.tar.zst | Bin 0 -> 9540 bytes kernel/tests/golden_tables.rs | 15 +- kernel/tests/read.rs | 27 --- 15 files changed, 328 insertions(+), 79 deletions(-) create mode 100644 kernel/tests/cdf.rs create mode 100644 kernel/tests/data/cdf-table-non-partitioned.tar.zst create mode 100644 kernel/tests/data/cdf-table-with-dv.tar.zst create mode 100644 kernel/tests/data/cdf-table.tar.zst diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index df8f1813b..c7567a087 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -145,7 +145,6 @@ impl LogSegment { /// `start_version` and `end_version`: Its LogSegment is made of zero checkpoints and all commits /// between versions `start_version` (inclusive) and `end_version` (inclusive). If no `end_version` /// is specified it will be the most recent version by default. - #[allow(unused)] #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] pub(crate) fn for_table_changes( fs_client: &dyn FileSystemClient, diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 7cf9a01ae..9ccfa8a76 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -277,7 +277,7 @@ pub struct ScanResult { pub raw_data: DeltaResult>, /// Raw row mask. // TODO(nick) this should be allocated by the engine - raw_mask: Option>, + pub(crate) raw_mask: Option>, } impl ScanResult { diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 0deb24682..89951a39b 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -28,7 +28,6 @@ use itertools::Itertools; mod tests; /// Scan data for a Change Data Feed query. This holds metadata that is needed to read data rows. -#[allow(unused)] pub(crate) struct TableChangesScanData { /// Engine data with the schema defined in [`scan_row_schema`] /// @@ -127,7 +126,6 @@ struct LogReplayScanner { // // Note: This will be used once an expression is introduced to transform the engine data in // [`TableChangesScanData`] - #[allow(unused)] timestamp: i64, } diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index a5938082b..2c15bd537 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -178,7 +178,6 @@ impl TableChanges { &self.table_root } /// The partition columns that will be read. - #[allow(unused)] pub(crate) fn partition_columns(&self) -> &Vec { &self.end_snapshot.metadata().partition_columns } diff --git a/kernel/src/table_changes/physical_to_logical.rs b/kernel/src/table_changes/physical_to_logical.rs index 7232e2cf8..bc8488081 100644 --- a/kernel/src/table_changes/physical_to_logical.rs +++ b/kernel/src/table_changes/physical_to_logical.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::iter; use itertools::Itertools; @@ -15,7 +14,6 @@ use super::{ }; /// Returns a map from change data feed column name to an expression that generates the row data. -#[allow(unused)] fn get_cdf_columns(scan_file: &CdfScanFile) -> DeltaResult> { let timestamp = Scalar::timestamp_ntz_from_millis(scan_file.commit_timestamp)?; let version = scan_file.commit_version; @@ -34,8 +32,7 @@ fn get_cdf_columns(scan_file: &CdfScanFile) -> DeltaResult SchemaRef { +pub(crate) fn scan_file_physical_schema( + scan_file: &CdfScanFile, + physical_schema: &StructType, +) -> SchemaRef { if scan_file.scan_type == CdfScanFileType::Cdc { let change_type = StructField::new(CHANGE_TYPE_COL_NAME, DataType::STRING, false); - let fields = read_schema.fields().cloned().chain(iter::once(change_type)); + let fields = physical_schema.fields().cloned().chain(Some(change_type)); StructType::new(fields).into() } else { - read_schema.clone().into() + physical_schema.clone().into() } } diff --git a/kernel/src/table_changes/resolve_dvs.rs b/kernel/src/table_changes/resolve_dvs.rs index caa2cf310..85186a836 100644 --- a/kernel/src/table_changes/resolve_dvs.rs +++ b/kernel/src/table_changes/resolve_dvs.rs @@ -7,16 +7,15 @@ use crate::{DeltaResult, Engine, Error}; /// A [`CdfScanFile`] with its associated `selection_vector`. The `scan_type` is resolved to /// match the `_change_type` that its rows will have in the change data feed. -#[allow(unused)] -struct ResolvedCdfScanFile { +pub(crate) struct ResolvedCdfScanFile { /// The scan file that holds the path the data file to be read. The `scan_type` field is /// resolved to the `_change_type` of the rows for this data file. - scan_file: CdfScanFile, + pub(crate) scan_file: CdfScanFile, /// Optional vector of bools. If `selection_vector[i] = true`, then that row must be included /// in the CDF output. Otherwise the row must be filtered out. The vector may be shorter than /// the data file. In this case, all the remaining rows are *not* selected. If `selection_vector` /// is `None`, then all rows are selected. - selection_vector: Option>, + pub(crate) selection_vector: Option>, } /// Resolves the deletion vectors for a [`CdfScanFile`]. This function handles two @@ -33,8 +32,7 @@ struct ResolvedCdfScanFile { /// 2. The second case handles all other add, remove, and cdc [`CdfScanFile`]s. These will simply /// read the deletion vector (if present), and each is converted into a [`ResolvedCdfScanFile`]. /// No changes are made to the `scan_type`. -#[allow(unused)] -fn resolve_scan_file_dv( +pub(crate) fn resolve_scan_file_dv( engine: &dyn Engine, table_root: &Url, scan_file: CdfScanFile, diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 92d20ff3d..b40eaa4c6 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -2,18 +2,22 @@ use std::sync::Arc; use itertools::Itertools; use tracing::debug; +use url::Url; +use crate::actions::deletion_vector::split_vector; use crate::scan::state::GlobalScanState; -use crate::scan::{ColumnType, PhysicalPredicate}; +use crate::scan::{ColumnType, PhysicalPredicate, ScanResult}; use crate::schema::{SchemaRef, StructType}; -use crate::{DeltaResult, Engine, ExpressionRef}; +use crate::{DeltaResult, Engine, ExpressionRef, FileMeta}; use super::log_replay::{table_changes_action_iter, TableChangesScanData}; +use super::physical_to_logical::{physical_to_logical_expr, scan_file_physical_schema}; +use super::resolve_dvs::{resolve_scan_file_dv, ResolvedCdfScanFile}; +use super::scan_file::scan_data_to_scan_file; use super::{TableChanges, CDF_FIELDS}; /// The result of building a [`TableChanges`] scan over a table. This can be used to get a change /// data feed from the table -#[allow(unused)] #[derive(Debug)] pub struct TableChangesScan { // The [`TableChanges`] that specifies this scan's start and end versions @@ -28,9 +32,7 @@ pub struct TableChangesScan { // The predicate to filter the data physical_predicate: PhysicalPredicate, // The [`ColumnType`] of all the fields in the `logical_schema` - all_fields: Vec, - // `true` if any column in the `logical_schema` is a partition column - have_partition_cols: bool, + all_fields: Arc>, } /// This builder constructs a [`TableChangesScan`] that can be used to read the [`TableChanges`] @@ -115,7 +117,6 @@ impl TableChangesScanBuilder { let logical_schema = self .schema .unwrap_or_else(|| self.table_changes.schema.clone().into()); - let mut have_partition_cols = false; let mut read_fields = Vec::with_capacity(logical_schema.fields.len()); // Loop over all selected fields. We produce the following: @@ -138,7 +139,6 @@ impl TableChangesScanBuilder { // Store the index into the schema for this field. When we turn it into an // expression in the inner loop, we will index into the schema and get the name and // data type, which we need to properly materialize the column. - have_partition_cols = true; Ok(ColumnType::Partition(index)) } else if CDF_FIELDS .iter() @@ -168,8 +168,7 @@ impl TableChangesScanBuilder { table_changes: self.table_changes, logical_schema, physical_predicate, - all_fields, - have_partition_cols, + all_fields: Arc::new(all_fields), physical_schema: StructType::new(read_fields).into(), }) } @@ -181,7 +180,6 @@ impl TableChangesScan { /// necessary to read CDF. Additionally, [`TableChangesScanData`] holds metadata on the /// deletion vectors present in the commit. The engine data in each scan data is guaranteed /// to belong to the same commit. Several [`TableChangesScanData`] may belong to the same commit. - #[allow(unused)] fn scan_data( &self, engine: Arc, @@ -204,7 +202,6 @@ impl TableChangesScan { /// Get global state that is valid for the entire scan. This is somewhat expensive so should /// only be called once per scan. - #[allow(unused)] fn global_scan_state(&self) -> GlobalScanState { let end_snapshot = &self.table_changes.end_snapshot; GlobalScanState { @@ -215,6 +212,109 @@ impl TableChangesScan { column_mapping_mode: end_snapshot.column_mapping_mode, } } + + /// Get the predicate [`Expression`] of the scan. + fn physical_predicate(&self) -> Option { + if let PhysicalPredicate::Some(ref predicate, _) = self.physical_predicate { + Some(predicate.clone()) + } else { + None + } + } + + /// Perform an "all in one" scan to get the change data feed. This will use the provided `engine` + /// to read and process all the data for the query. Each [`ScanResult`] in the resultant iterator + /// encapsulates the raw data and an optional boolean vector built from the deletion vector if it + /// was present. See the documentation for [`ScanResult`] for more details. + pub fn execute( + &self, + engine: Arc, + ) -> DeltaResult>> { + let scan_data = self.scan_data(engine.clone())?; + let scan_files = scan_data_to_scan_file(scan_data); + + let global_scan_state = self.global_scan_state(); + let table_root = self.table_changes.table_root().clone(); + let all_fields = self.all_fields.clone(); + let physical_predicate = self.physical_predicate(); + let dv_engine_ref = engine.clone(); + + let result = scan_files + .map(move |scan_file| { + resolve_scan_file_dv(dv_engine_ref.as_ref(), &table_root, scan_file?) + }) // Iterator-Result-Iterator + .flatten_ok() // Iterator-Result + .map(move |resolved_scan_file| -> DeltaResult<_> { + read_scan_file( + engine.as_ref(), + resolved_scan_file?, + &global_scan_state, + &all_fields, + physical_predicate.clone(), + ) + }) // Iterator-Result-Iterator-Result + .flatten_ok() // Iterator-Result-Result + .map(|x| x?); // Iterator-Result + + Ok(result) + } +} + +/// Reads the data at the `resolved_scan_file` and transforms the data from physical to logical. +/// The result is a fallible iterator of [`ScanResult`] containing the logical data. +fn read_scan_file( + engine: &dyn Engine, + resolved_scan_file: ResolvedCdfScanFile, + global_state: &GlobalScanState, + all_fields: &[ColumnType], + physical_predicate: Option, +) -> DeltaResult>> { + let ResolvedCdfScanFile { + scan_file, + mut selection_vector, + } = resolved_scan_file; + + let physical_to_logical_expr = + physical_to_logical_expr(&scan_file, global_state.logical_schema.as_ref(), all_fields)?; + let physical_schema = scan_file_physical_schema(&scan_file, global_state.read_schema.as_ref()); + let phys_to_logical_eval = engine.get_expression_handler().get_evaluator( + physical_schema.clone(), + physical_to_logical_expr, + global_state.logical_schema.clone().into(), + ); + + let table_root = Url::parse(&global_state.table_root)?; + let location = table_root.join(&scan_file.path)?; + let file = FileMeta { + last_modified: 0, + size: 0, + location, + }; + let read_result_iter = engine.get_parquet_handler().read_parquet_files( + &[file], + physical_schema, + physical_predicate, + )?; + + let result = read_result_iter.map(move |batch| -> DeltaResult<_> { + let batch = batch?; + // to transform the physical data into the correct logical form + let logical = phys_to_logical_eval.evaluate(batch.as_ref()); + let len = logical.as_ref().map_or(0, |res| res.len()); + // need to split the dv_mask. what's left in dv_mask covers this result, and rest + // will cover the following results. we `take()` out of `selection_vector` to avoid + // trying to return a captured variable. We're going to reassign `selection_vector` + // to `rest` in a moment anyway + let mut sv = selection_vector.take(); + let rest = split_vector(sv.as_mut(), len, None); + let result = ScanResult { + raw_data: logical, + raw_mask: sv, + }; + selection_vector = rest; + Ok(result) + }); + Ok(result) } #[cfg(test)] @@ -248,9 +348,9 @@ mod tests { ColumnType::Selected("_commit_version".to_string()), ColumnType::Selected("_commit_timestamp".to_string()), ] + .into() ); assert_eq!(scan.physical_predicate, PhysicalPredicate::None); - assert!(!scan.have_partition_cols); } #[test] @@ -279,6 +379,7 @@ mod tests { ColumnType::Selected("id".to_string()), ColumnType::Selected("_commit_version".to_string()), ] + .into() ); assert_eq!( scan.logical_schema, @@ -288,7 +389,6 @@ mod tests { ]) .into() ); - assert!(!scan.have_partition_cols); assert_eq!( scan.physical_predicate, PhysicalPredicate::Some( diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index 8003ec08f..cc4514186 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -18,7 +18,6 @@ use crate::utils::require; use crate::{DeltaResult, Error, RowVisitor}; // The type of action associated with a [`CdfScanFile`]. -#[allow(unused)] #[derive(Debug, Clone, PartialEq)] pub(crate) enum CdfScanFileType { Add, @@ -27,7 +26,6 @@ pub(crate) enum CdfScanFileType { } /// Represents all the metadata needed to read a Change Data Feed. -#[allow(unused)] #[derive(Debug, PartialEq, Clone)] pub(crate) struct CdfScanFile { /// The type of action this file belongs to. This may be one of add, remove, or cdc @@ -51,7 +49,6 @@ pub(crate) type CdfScanCallback = fn(context: &mut T, scan_file: CdfScanFile) /// Transforms an iterator of [`TableChangesScanData`] into an iterator of /// [`CdfScanFile`] by visiting the engine data. -#[allow(unused)] pub(crate) fn scan_data_to_scan_file( scan_data: impl Iterator>, ) -> impl Iterator> { @@ -91,7 +88,6 @@ pub(crate) fn scan_data_to_scan_file( /// )?; /// } /// ``` -#[allow(unused)] pub(crate) fn visit_cdf_scan_files( scan_data: &TableChangesScanData, context: T, @@ -110,7 +106,6 @@ pub(crate) fn visit_cdf_scan_files( /// A visitor that extracts [`CdfScanFile`]s from engine data. Expects data to have the schema /// [`cdf_scan_row_schema`]. -#[allow(unused)] struct CdfScanFileVisitor<'a, T> { callback: CdfScanCallback, selection_vector: &'a [bool], @@ -219,7 +214,6 @@ pub(crate) fn cdf_scan_row_schema() -> SchemaRef { /// Expression to convert an action with `log_schema` into one with /// [`cdf_scan_row_schema`]. This is the expression used to create [`TableChangesScanData`]. -#[allow(unused)] pub(crate) fn cdf_scan_row_expression(commit_timestamp: i64, commit_number: i64) -> Expression { Expression::struct_from([ Expression::struct_from([ diff --git a/kernel/tests/cdf.rs b/kernel/tests/cdf.rs new file mode 100644 index 000000000..c93b22d24 --- /dev/null +++ b/kernel/tests/cdf.rs @@ -0,0 +1,156 @@ +use std::{error, sync::Arc}; + +use arrow::compute::filter_record_batch; +use arrow_array::RecordBatch; +use delta_kernel::engine::sync::SyncEngine; +use itertools::Itertools; + +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::{DeltaResult, Table, Version}; + +mod common; +use common::{load_test_data, to_arrow}; + +fn read_cdf_for_table( + test_name: impl AsRef, + start_version: Version, + end_version: impl Into>, +) -> DeltaResult> { + let test_dir = load_test_data("tests/data", test_name.as_ref()).unwrap(); + let test_path = test_dir.path().join(test_name.as_ref()); + let table = Table::try_from_uri(test_path.to_str().expect("table path to string")).unwrap(); + let engine = Arc::new(SyncEngine::new()); + let table_changes = table.table_changes(engine.as_ref(), start_version, end_version)?; + + // Project out the commit timestamp since file modification time may change anytime git clones + // or switches branches + let names = table_changes + .schema() + .fields() + .map(|field| field.name()) + .filter(|name| *name != "_commit_timestamp") + .collect_vec(); + let schema = table_changes.schema().project(&names)?; + let scan = table_changes + .into_scan_builder() + .with_schema(schema) + .build()?; + let batches: Vec = scan + .execute(engine)? + .map(|scan_result| -> DeltaResult<_> { + let scan_result = scan_result?; + let mask = scan_result.full_mask(); + let data = scan_result.raw_data?; + let record_batch = to_arrow(data)?; + match mask { + Some(mask) => Ok(filter_record_batch(&record_batch, &mask.into())?), + None => Ok(record_batch), + } + }) + .try_collect()?; + Ok(batches) +} + +#[test] +fn cdf_with_deletion_vector() -> Result<(), Box> { + let batches = read_cdf_for_table("cdf-table-with-dv", 0, None)?; + let mut expected = vec![ + "+-------+--------------+-----------------+", + "| value | _change_type | _commit_version |", + "+-------+--------------+-----------------+", + "| 0 | insert | 0 |", + "| 1 | insert | 0 |", + "| 2 | insert | 0 |", + "| 3 | insert | 0 |", + "| 4 | insert | 0 |", + "| 5 | insert | 0 |", + "| 6 | insert | 0 |", + "| 8 | insert | 0 |", + "| 7 | insert | 0 |", + "| 9 | insert | 0 |", + "| 0 | delete | 1 |", + "| 9 | delete | 1 |", + "| 0 | insert | 2 |", + "| 9 | insert | 2 |", + "+-------+--------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn basic_cdf() -> Result<(), Box> { + let batches = read_cdf_for_table("cdf-table", 0, None)?; + let mut expected = vec![ + "+----+--------+------------+------------------+-----------------+", + "| id | name | birthday | _change_type | _commit_version |", + "+----+--------+------------+------------------+-----------------+", + "| 1 | Steve | 2023-12-22 | insert | 0 |", + "| 2 | Bob | 2023-12-23 | insert | 0 |", + "| 3 | Dave | 2023-12-23 | insert | 0 |", + "| 4 | Kate | 2023-12-23 | insert | 0 |", + "| 5 | Emily | 2023-12-24 | insert | 0 |", + "| 6 | Carl | 2023-12-24 | insert | 0 |", + "| 7 | Dennis | 2023-12-24 | insert | 0 |", + "| 8 | Claire | 2023-12-25 | insert | 0 |", + "| 9 | Ada | 2023-12-25 | insert | 0 |", + "| 10 | Borb | 2023-12-25 | insert | 0 |", + "| 3 | Dave | 2023-12-22 | update_postimage | 1 |", + "| 3 | Dave | 2023-12-23 | update_preimage | 1 |", + "| 4 | Kate | 2023-12-22 | update_postimage | 1 |", + "| 4 | Kate | 2023-12-23 | update_preimage | 1 |", + "| 2 | Bob | 2023-12-22 | update_postimage | 1 |", + "| 2 | Bob | 2023-12-23 | update_preimage | 1 |", + "| 7 | Dennis | 2023-12-24 | update_preimage | 2 |", + "| 7 | Dennis | 2023-12-29 | update_postimage | 2 |", + "| 5 | Emily | 2023-12-24 | update_preimage | 2 |", + "| 5 | Emily | 2023-12-29 | update_postimage | 2 |", + "| 6 | Carl | 2023-12-24 | update_preimage | 2 |", + "| 6 | Carl | 2023-12-29 | update_postimage | 2 |", + "| 7 | Dennis | 2023-12-29 | delete | 3 |", + "+----+--------+------------+------------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} + +#[test] +fn cdf_non_partitioned() -> Result<(), Box> { + let batches = read_cdf_for_table("cdf-table-non-partitioned", 0, None)?; + let mut expected = vec![ + "+----+--------+------------+-------------------+---------------+--------------+----------------+------------------+-----------------+", + "| id | name | birthday | long_field | boolean_field | double_field | smallint_field | _change_type | _commit_version |", + "+----+--------+------------+-------------------+---------------+--------------+----------------+------------------+-----------------+", + "| 1 | Steve | 2024-04-14 | 1 | true | 3.14 | 1 | insert | 0 |", + "| 2 | Bob | 2024-04-15 | 1 | true | 3.14 | 1 | insert | 0 |", + "| 3 | Dave | 2024-04-15 | 2 | true | 3.14 | 1 | insert | 0 |", + "| 4 | Kate | 2024-04-15 | 3 | true | 3.14 | 1 | insert | 0 |", + "| 5 | Emily | 2024-04-16 | 4 | true | 3.14 | 1 | insert | 0 |", + "| 6 | Carl | 2024-04-16 | 5 | true | 3.14 | 1 | insert | 0 |", + "| 7 | Dennis | 2024-04-16 | 6 | true | 3.14 | 1 | insert | 0 |", + "| 8 | Claire | 2024-04-17 | 7 | true | 3.14 | 1 | insert | 0 |", + "| 9 | Ada | 2024-04-17 | 8 | true | 3.14 | 1 | insert | 0 |", + "| 10 | Borb | 2024-04-17 | 99999999999999999 | true | 3.14 | 1 | insert | 0 |", + "| 3 | Dave | 2024-04-15 | 2 | true | 3.14 | 1 | update_preimage | 1 |", + "| 3 | Dave | 2024-04-14 | 2 | true | 3.14 | 1 | update_postimage | 1 |", + "| 4 | Kate | 2024-04-15 | 3 | true | 3.14 | 1 | update_preimage | 1 |", + "| 4 | Kate | 2024-04-14 | 3 | true | 3.14 | 1 | update_postimage | 1 |", + "| 2 | Bob | 2024-04-15 | 1 | true | 3.14 | 1 | update_preimage | 1 |", + "| 2 | Bob | 2024-04-14 | 1 | true | 3.14 | 1 | update_postimage | 1 |", + "| 7 | Dennis | 2024-04-16 | 6 | true | 3.14 | 1 | update_preimage | 2 |", + "| 7 | Dennis | 2024-04-14 | 6 | true | 3.14 | 1 | update_postimage | 2 |", + "| 5 | Emily | 2024-04-16 | 4 | true | 3.14 | 1 | update_preimage | 2 |", + "| 5 | Emily | 2024-04-14 | 4 | true | 3.14 | 1 | update_postimage | 2 |", + "| 6 | Carl | 2024-04-16 | 5 | true | 3.14 | 1 | update_preimage | 2 |", + "| 6 | Carl | 2024-04-14 | 5 | true | 3.14 | 1 | update_postimage | 2 |", + "| 7 | Dennis | 2024-04-14 | 6 | true | 3.14 | 1 | delete | 3 |", + "| 1 | Alex | 2024-04-14 | 1 | true | 3.14 | 1 | insert | 4 |", + "| 2 | Alan | 2024-04-15 | 1 | true | 3.14 | 1 | insert | 4 |", + "+----+--------+------------+-------------------+---------------+--------------+----------------+------------------+-----------------+" + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} diff --git a/kernel/tests/common/mod.rs b/kernel/tests/common/mod.rs index 8eba56248..a918695b7 100644 --- a/kernel/tests/common/mod.rs +++ b/kernel/tests/common/mod.rs @@ -9,6 +9,50 @@ use delta_kernel::{DeltaResult, Engine, EngineData, Table}; use std::sync::Arc; +#[macro_export] +macro_rules! sort_lines { + ($lines: expr) => {{ + // sort except for header + footer + let num_lines = $lines.len(); + if num_lines > 3 { + $lines.as_mut_slice()[2..num_lines - 1].sort_unstable() + } + }}; +} + +// NB: expected_lines_sorted MUST be pre-sorted (via sort_lines!()) +#[macro_export] +macro_rules! assert_batches_sorted_eq { + ($expected_lines_sorted: expr, $CHUNKS: expr) => { + let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS) + .unwrap() + .to_string(); + // fix for windows: \r\n --> + let mut actual_lines: Vec<&str> = formatted.trim().lines().collect(); + sort_lines!(actual_lines); + assert_eq!( + $expected_lines_sorted, actual_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + $expected_lines_sorted, actual_lines + ); + }; +} + +/// unpack the test data from {test_parent_dir}/{test_name}.tar.zst into a temp dir, and return the dir it was +/// unpacked into +#[allow(unused)] +pub(crate) fn load_test_data( + test_parent_dir: &str, + test_name: &str, +) -> Result> { + let path = format!("{test_parent_dir}/{test_name}.tar.zst"); + let tar = zstd::Decoder::new(std::fs::File::open(path)?)?; + let mut archive = tar::Archive::new(tar); + let temp_dir = tempfile::tempdir()?; + archive.unpack(temp_dir.path())?; + Ok(temp_dir) +} + pub(crate) fn to_arrow(data: Box) -> DeltaResult { Ok(data .into_any() diff --git a/kernel/tests/data/cdf-table-non-partitioned.tar.zst b/kernel/tests/data/cdf-table-non-partitioned.tar.zst new file mode 100644 index 0000000000000000000000000000000000000000..2a2f08cf0827f8dc12cfb37495cbb264425d4800 GIT binary patch literal 11152 zcmV;BD{s^&wJ-euSj4LUYSm0FPCy)T8$-7X4Kv{nK_!q5^urD5kJkYu1nXfYVp=3Y zKO``NK(o~x_MANLrKQ-FRvP&jqF=D)N}H0Bl7ON*><4N^(|Bf}!5^)SwwK7o0@VVY z0wASbrIl8n9x6n^AyyCs;1I9_yIRJ&Ru5&Vcu!Y-SV2{Ke0n-i>B)IyGD-SP$Ri#~ z0pKCP4@EAZGbtvMeWaR>RtMCXve7mxd0HLNPSQ7Jv)P!V)dA5;OPLvi9WhCo$#z;D zkXI?CvLYU!C%CcW;<#21$5ELMc@Z@p!^U2_k9&)m3Swl&-iG zrL;?6hf-EmRa(31v!e?fId0Te?NMpPONf-@ym8)J+&bK0c(v|Xk0xxBVHbZUVYYYJ zXkmTaHjiJl-ZV=-lG@&*7m%0-C3t~S02)`=KtluOQkN!U4!o))jqKXztuM~}`riG{ zeQS=j-n-v>NgDY&O`8+iS9(2BQu;}?*S(i~ZPDD}&mEU#Y56xpEAqXqN%n7lDe7_S zZ|zC@O0T0AcTt`3y%)%i21Q4V5UIc=?wt=Vn1D#EsK%doP$N1*0TBZ)+Ijw^*8>)v}Q4EE0&jg_|9uf0Cfuhnd`dnxUoH54w|5uk{@zPVGieq(V<+t^*`!&LHD%*JEvzJuUI8835^K@w{I?B%xos!8*5%Gt zTw+d>l@Sd}1Psh36JucutFgZN-i)uPJNNl=uC+DEvgCi8IQ-s{BmUML^|rp}ZvEIl zOpm3-JgmW{2Q_8Nka5JDH%|Nj227PJ9LI4^PHaH1u1Gk~^JHQILxvCqjX)$()O20f zm3afkiOGoL&ZI9R^16=0Atnu)Ip>b1rXq?+`Z6L)sfVKlFxr?>y*;)g-sB(k<^6LThFP?E z{M&{-mzESo3@c&XhN=lDr9M@96!>FVhIgB|WAm5f^ z>F37}&17PpNwFV4W$Y)&kBRh?enQ|N$WQI($InUHXlLyw$d9S? zt`B)qf`ouUJSjo+RaH@DgeW+aGLteBM1@pUQx_o$4s}sMMWB(L>eEF9(Vwb%0R$`S z1tAJ@T#MscLtU642f!g(eY!9~R7m}*XhVpCL)=r45s1XAKHO6febsLV5UeXVLKGb0 zvT#{~Xs_NIDugIFWE7+XEaIs?MnUvdy(7RLb!8?STma<_(*Y4&T9qX#5TKdDXNwn^ z=N(=yZoK#Qx#Swg@x~=5N$Yp`tvCO+J2bO}_0eN3c8fWV<7%~9ElP-jY&eeNYPFB! z%$s>upkt01HJ}i^4!svcuCEVZK(MaZd{H1+S7^R8AXrynUSRS(&-0$9*ladiu3nfZ zAqukjzVGwA@B6%NOd2!9Jdwqa9-x$d;f^UNKp|0os*mqWLkS&X@Vj%)@8(?0IP=W! zo9_pqDOl9NaHaquDDc2@0B8+gTAS}2dVTeaf3xJJ-!Q&Q+c`|MeZsUi-`Q)}Ch@-s{a9V%UgD3)AAa`uc5q*-1O< zP`p|6R)1-u4M<-rr!Ni&*3}bYu4wVXP{C%i*}{N5sw$2Yr`2k;G7}Z%DT@-D&9*wz zg$ot#_E`6>w-0mIV%K|PONQQmT;|<#X>mz@Tk^dvsl#kb&;Pyo+aBeEG(HGphY(Gi z5sH9J8Xs05@qTA6dQtO@z1}{XKZheO_h^=O`#xHY`?lM2nB6*u7>l1r5OG2TkXmzi zU}FRUCLuY*v~D9H$vf-4dCywRU1RPq?py086WBR$4gf$2?|wbrqQ_fnzWcSM{#}!M z%<`y*1r-o%pg19+ zorJi-$`FAKerFMHe!1v7_Ma?2&S4(IKIa#?#V+QqVL8k;Uvsp-yxV?la$EY4G(D{3 zxbr;EaW;emD#$>2X@jL`oDE5J-8dW4ebL62Gu1~WAnUBV$i*Gro44j!O^_L7s7#@X zq5?^=gBwgiNN|0z=tUp;z4O-G^Y8mUl}e>jsW#@^b1Z6&ugBPnBQN>azPxsqVrVZ6 zZyUn0XhZRanB>>ydpQuza~Jszuh%>God5aDqgN89G*BQ|S!7r_s8|5O%3>jn92}gY zID*XgeV-IFQDL64D5=%HPhXGw&D(R0ugGyurBW&L{7F2|^HQnvC-J;gDh4@LNHNc! zgogwbfat-?VL}#6Y~lC%ygjbfmdquS<2bHXTQZkS8IiYS%7|2KY{BM=i+RRgi=n?c zf8Y1nY%qEe|;-)EE2J`qo)`n(n1sDg%!_KE0N^R$8#s@0k>u%O{O%sR|^caI|& zGs6`pqC^P_0|YCpiIqh~1q3ULiW5;E)0$ch3hYr=XwEdHQmIswGH<{*F&Wirsg(B^ z@6N?*OTr}%1*6E{6CNu^S$t}}1HNGjERTd?TD#4p0>u*FT36u0FDEq>4# z=a6fzbryBUI$sf@;O#cjW<#Q=+1c6IDf0%56O++Mn+@@Hb~y5Zr4CYfCOJYBTml+Rd1A3xtTS+$+-O;{THS6c7Hh@x1q&BiFln(b zBhtY^0g8s2Ky|S^FJufcMc;h)P;=fxE#AB__tqmXt~9uojWhU0NiP!tCN;XoJ+ z5W*k~fHAd-k_#2Q4jb#XktxNz)%9RIv53G9(u*SS~@&lmz z-TN^hS&wYwo8K+4!WYnN@E0p7xoK=IFZrMo98dN^9Z44EHR1uMq1?p>x_f67ZXo0^ zU==4o-R;M>AY&M7A5k;C-(PH?-2~GpLTR8r)P@c(eFIcnAe0Soi+=#c5b`Eyw%{MI z2<=Xm4TvPsb;Vb2TZ>m-?RJ5Q_otEptmtik!wCtV&ku}#)+7n(yh0hEb#Z_Jy8vk& z3dgVCYn2UvjmUuKj}Ab&bQh@|NaK0B;9~&roI{?+v87JTv9}pwp6#5)l;G&s`EFjUS zkXwUgNHn_F8K%8r=8G{K5dZ9)R=}9t%LW)|48aq0fknp&h@87F8&JBI1GEe}GyV?& zCzTDjNP@)))FKB2s0+*XVg89YmY%_wOeo_V-BJu?KAPnR&J`F-OEgMkR!2xmb2^?G(;6v3+LBD_S$vN2cVZ?V}jtgki z5)X@fU3q7JV7ULX?TDMi(|+Vb@?&lV*nOq z2OL(qasnd78h};}c)WKVhz(#a(mpBpVXr6qN34jC(#@n*O3sz<8YMx4KsRVj=RCti z$L}NcBvqW@s}k{?G#h&WDQ6_H{F9JdQ9~9{r||ujfYNP)Qwgx=y8(6HoY{cHfV=6N z6YOK)^9>3IiqB-d14?er0hIyXWpk6Z1I!b zJsQ^=K)>$tWYK(9+l%tvJdo(S1bu+#tXfAHPl zirgfYi9cG?AM1y9ysL{9*f0X5I5!M3{LW2m51>){fau;6s2-q#+?VVMHsD_};Gji^ z?9Mmq1(bQz0Ne;?Q-E0kzQt~Rcy#FRoi$p5Jz2bp8fq;Hu{vRTl92cT)z z(8_>ccmk;~<4?0olLLNf?{fl#9{THNBl1*r0tOU-qlZBaC?L4F_?qzk-B&>a(1wW} zj%-M#q{fZ`LH^jpDb$>#O7hme0ia>Wvtg9#w&rXjR`u#?#DLJ#%!ifr#tMS~$5#w^ zy>;CF1yQQJ7&T2 zSN%;n_TU1>tr9xHNS{56Hhw?Su#?rL>bwdj+qy%NlCpQ76%`;%STws4;GF5b^ker( z?67Cm6rf-maFiqnOdPE#ZZBCa&2@`=x}9e9-M3(0uR+uIUK|gj8z-sN4yjAs{dB7O^jW-@ATl zL~s)ZaQL+fThs{}5b!Tg3LPL5GhTc)0(Me1p!r3(0R$-ef67mVueSg*uMx|F{`&2NavL zD%;1=6tTWg&(Q=Q(f(k4;_s27Nw1pXFm+2ZL!a25>qjMJ{Sq*GfdU6m7u4d-S8d7r zC`eSCax30KQl20&0_L17oo|3*h1VJ1yWN^~z=!MkPC@T2K;<*N{z6RT!vkhZKfpxQ)0F()@b5b&UDzWC z2e_>}fa+afln&EV4lp)jn|=T~BLg1P*n{8{0!U^{Z3Bc6-c9hbej=~V@K=1QZU_!3io}pRE9istak*r4%C%y>=quz(*mj@o1+R6C~kzNp&+zbkUGgmUe?%0EpS{pFnS^)g9?x&M`=|9 zaUe+t*LHr{(Y-w-khsu|6*Iv7otrJYM9>pxWq{|rP<)v2CE&>00j?sWkQoqADX|#- z`qUW{Bb_Z^gJ08cY4dqc)IWPQzJzfzeH0Z+oq*B;#uLpbt#cd#Xbk9QMlhZMaJebz zXl3dU4>Y>)pl5l@LaL*A{KR5>D!IqaUo!+pyI7F;Otlg~PPmv!-5pl56a|qxZfb+& zQ6_oYi*Edh6tO4x)~5XfY9rno4rBb;?M~@vBnD`>z8Yxjv5>F`zVEFCM+^E2$!+ce zy&FGs$A%$E}E)&xqdI>HdFWmAo!v7SU)_oi90!{X@ ztNja~{+k=flhfbc(DX(;U4M=(!TAHW3TnY*xcF{#KJ%8o^lM)h$ViIU55tW;v{Xfd zI7O1Hm5Z>#l2^B|keW zHQrHcsLooOXjq=hdQT{yZJo*V^Z-|>bAB}85q##U1{V#;D2Bh40Q~kIV&mhwkGNx4sGvyn zWv~EBx%ekeKpt`q_#04LK7{K>o&s0`6kkob-Fe$jf7P3N z@n#b2*K)9CjX~?j5u|^#WAzxMNir}Z%CbX3L9&6c=sU_z7JRPQ#QkPP4og(m9O=);g zQMgh=t|(vmb*(Nn#5(hRp3WH@E>0K6b+R4{i0|`sj(DQDa>BS;_?&!QtIG+)=G*af zi1cTFHr*fn6(jldkQbTca!VvMokEh2EA3q8}5 zfRm4Zf~U#;?T5)P{rHnRolZTPfaDTZ!gxu%5xi55P8)G-(W2Udg^7w(Du?KcIKc1( z@ObdFSZlC!anzkoNOGyp<+ba1Rpal-?^YkA$i*J|85ul?GsGp6~+aJ}NlF?ciWpeTwM)FMei zYh(U$dcn%XN5>RB1l%Bl0!BO``kyp@;37i^`QoG5W^O6p+U8DvlBAW_oJG)L-@*Pr z21$!QXh*LtNj*r#QS@QrQv{3-R+NNoRx7OR*daK4$J`1N-l?`M)V4&dA0t!3?z-5UCHkx;PbMO07ooW6d zO_EZWHf?m!(lxnMUAkydWysZu5d-G?Je@aV#<1Xkp;D<-WLzmBp)zIQz`Q> z4k(Q@Hds(;rI~NMbFMk>ZEdbRld<3Hz4wE(e-4td5rZ{0ir^nRwVR4ym4JbsCTD;! zIw7U7ri_Q9z?UTNWYMCBYXh~NExtKljV%re zsyz3Kr1r}vzw`N;w|%k2SR6%LV>TjobB%B5FTgq|h*C;vS&JRU~o39|CaN>S$=0BLPIm6kc^yBC!kWNt0 zyqjuD)=_tngSh7oj^xH5{+fgPig>(y&v1?v)Si^{HK`BvxexplJO-dIXWvNA11a46 z4+r3HBKn^p^Vei2)c$-0O@8e!Ww#anZN4V2Sr@rjdlBbB=6r?|hvd^_9n@^HGAI5g zfBfS(1b4HQ;{Tv#la+t=jm&W3RKiLf+h(Q z#sGfZmkrXBK?OZl+q%Fgo3JG_oWJyHCwG|?>y7GDZRN;Bak?A z#1*{A0|!!Q2)O`L=ZDj@YCd-D%_w+;!!{fKp?jS^&$Y%N^7SS*Nmx;<<)MEZH@W- z$y;lzjuiZUe-q~`X#U)-{1f=n8~L-CuYI2hlsiF2)@2KripBHgR_BHQ5xp9>oEEV z9AA}~$I@G$@TL|G@J{db^7Wa(3QH{z<;8CxvN$OVN17x;vtTPl1)17Yk)C1_XCrXf z@I5jSz~P@zP*J>TBN1Wi3`w(V=&K zCu?70Hi1l1j~Y8jcBXK!ivhXlVLo6`q@8^OX6YPIG|~nzC_QIGUHZKsJ`+}J%RL~| z2gCWl{ytFS0*>N&vN0e`zB(6#Z@{1_sWQ_i?T0-c%|PY}6%&X@$_7xWrU5yi25xTU z3|Jf}`foyB7%}y$A`H<~^amAUj_`0A?Np+GGlY{a1DJIHQnVuf8MD|5lkM(lcJ4)P zZcG`6eNkLtRu4FcmRC#sT_B z4YgPC=g7QY;v>uUt%*K_24?1Zeo(!IHVOG&F$;rnNiUedU?4Y1QHQj^&kB7{*u%^? zAX?TylG`6>F=T#XCbF+{o`b29w1DtIL)d=`%Pm|>e-yHLK9iW`F;VdPh9?s2zAc5! zD)O#Q@8=naPg2l=_)ZurLvXAo5gI#Nn6 z%m-%<$XF|*%_@l0M%kjmUeaYE;GaxktBNF|BGnnShS9L}s#`(rPLd@G1 zkjAWlG&&fAIRvCoqn!+p{uP?xgN%HTeZ2qCj?#A={-alIqmZx_LLe`C;v$!x&vqdh zHMFB*MygQn)2ib$mpix?Gr5~!yG8;*&-1Rx)Z7c>TKI?X#&$YbF824@oP zjhHw9@9}?a)C@#ndl{Gt5=`m36)b6qgU8i-ApyB0xVd6C02{hG4Q4|knfhRoItL!+ z38Sf;q}n2)q<(T+q`5<+rYEVpO`(xc$JS@?+P%U`9e6qMvJ(!&^Er8u)b&RaO6~=T zEhcf?$GTyjgbe%Dl0ioeoah`Ru7H)D6)vrILdI~i8D(e0;j9%LGcOheUKL%Vwo9*i zTK(#T{1*$?VSzXR;4bZb^0BPa?U0Vd6KuE)Sp?xm1vJw;MoQ zlC10vaN5O*N(BH#4L}d@Ij51p$BKOeG>{koOWyz_v%u{0%RAWsYn}lb$Ugucs+35B z1eflB0c@oKKF=MX#;YfPlyeWV4&ZXis+??T)SPHIxRC(R=N=JLQDYtt=m7#? zCWQcC^4bHfhYA2204FiJ04DVVKtm$IZ-DI1jXl6mU6S9E0HkOK$Zf<2I7y&4;3^Dg zWr5ydjr<09?m2M+9*5z}0WTY%Kw@;{C*e-vxqA0JV3bH(u>gQrIKKfFbu{k;3@5(S zVQ$q~yHj^SMZ2I}DCzv3Z(La%&C-M>`78mx_iniUcmP61`@hB0%$M@t0IGT*D*3&t zr&urkwaEhaVW)iq7`wyZ1jK^@&>6w2tet%Wv`Pv$^1I!B61*p7iv`wA+It7+7B$|0 zfYtzewgKk%Q%!|;wE7)F?pn#1lOr@8j|*Sa2upas^vQ9H&UWaiKr@K`YOi{!vZueK zeJlpPT(NlLzwi6P=w#e>w(i@fTpe@r{WpYuR<4baNfa=6l8OLEW`vp=8auBTKrX2> zZ~$AAGywjhZ-8$p1C%O)zvK^U4#Wq9K=b^JR##wa*mJ%C0%#3jrFZ}fPW0pW1rXuj zKXYx(YLP!+?eWe#(S-oiB~p2L-Oe7UvR;Q5BNiOaSj*&dwE}AWbBaN z0N@2<@rMCo6Zn9|jrs<-uhB8`$LQX;%V>aKaPhGK^9HBc-jWBvn;hVKio8QfodFP0 z)QrS7;W;E1d#NU@f{Wm=1vaPd1&=F;d}E?vO^7&wcKz5>{9 zJ1yf8eCljA075PWY;vIhUIC#=FbM#rJ%Yyx;>QA76;L^9aMG z*yu07;Uv9(YtaW22B!Q9nDk8e9K3V$nA}?=7{&CpLN$-#sD`kX$0U6TA9Dt#U{{o34*ksNc*^>*d-YZ4vG0G+s=HasOcGaOHq9bd|?*cCh z?s3C!43|eb^hZ8ArkM&1NF!-RE04bnpT>4PV5YXtY}v_WO*Fbuq$0=cnkb*fm~(yG zS|8Nc^Y+gNgqHVb2$muoh0u%%C1c_-uO@9#;k++ND>4=6f;=w08(9|SGx&HA`gjK| z7laA{SKJMIcI5tA96XNy+)cql;`hD*=Khh7nC4%YyCd^f|4tWZ*rmvCz_;B7$h8gt zeckw9ZQT)o;{3v4A7J$+4FHMw1|aHwfCsc*B>iQ|Au==4nE&5>aT6Cjbn&B7o7e z0Y!kf++4w%Cph947=Tjto(FyiZ|WUT{#^its8G-awhIiP%(W1<6zK%%P2hT0XaK-r z9f^`qt6dL>u5N(Ey-fZ-B00 zfi)YrzyQwjwz2_H4!Bcv0^t>i?yV%^iw|{PXM?)|pS<86276yvV>o1dBsLC1&1)mX iPn3>`Sm&=oH;8$910t_)lQSUw?72ZZ=2wJ-euSd}{fiexYpMgWmk2mJs4|9`Lle+R=NsvJPj3NEd0e*w^xQp=4Z zseK_roX1L@TO@%Ai-fHHNfw=5*x9w^f&T!36rAW$K)F6q3UCT92!?sqB0v|AlMnF(3&{jai1{07F z1A1RVpdpcl5C!bqJG)>a97H1^C!r4!28+7|Y_?UdUTZOXxzGJ*8fPi!G2Ek7H5bb@ zoYkyak5hwLkGSLvB2bAJEnrM=;TeqUkIbZQDNhsAqG@JqEWZk4z};22`-+p%D73+>*6Jox-l&IQT4of>V@A=mtkuuF(bTKuSG(9& zi{o%+EzY>jTH2OvS;g48m#ejU3+P$d;RufwOL==t;77s&XDROtdNeGvn6<2P^rURu zH+l?5dgl-Z8&82qrBL1frPFhFufz{sV+n5Z~39ge$(ld0UDK!6R$-95mDlTpCoaQ6Z3F2LP4+&zH10}v5l zHZbn)m=1T(G{D?BDgl*O9aRT`j1x6jR4J8Ld1X&R88BzLd87FYO(9s0AebLjAeBlz zzT4Hd56-@OkrWXMX(V#tBs3z4q=-mFqYNe(6v?lp-J{VRp)9@VLsIgn+z!+mCxM2n#XP=lo2F8V~~6_C6$_!rK$ z4P>qTa2&SXn`7Rn9>dXJ|J|?bY(A=c-BOPFd=$57+Qr!34BNYbGOlWutJ=LI{M}{R#*Q>Mp`DnNF@szI3 z9Y@r42PW)_1$_LF5>mYy4AKNr+H>+}07^_ncn z?_PuDh@HsLG&9K;qSI4r3wh@dEtWE=u}5`(rl-(Emfv3>RJupWF%?S?&+|N#O81_3 zz2|lBY4c^D$ne}Kf2GJW+-CXLZ=JH78Vp;?bLa6aO@BQo+Nj>&Js3W!`@C~V-g+GI z7l=nyx<~by?yN1Ge{nlJJCNb{m)1Rp6EYkh)xB7L1@}uRuZwb+4D3YSn zZA+^p>p`Rc1b>F~=xgm4Rb`!4678fT=h7wV-pF0q+Ow6Weff z15r8wlu8GM_BYyLS<$+29r_F{=;*bdB?C85Sl@;XYekO@9OVAw<<#HdALij5Yu-(2lM|XJG~wAFv&3!qjEI*BoTkx8B@_Q<9eD%Cntn$VY{X zDKF}b{O#LO`{5Jb1T=AZ;<@H{zZ#b?x!NEBgtiv)%q4$DO4%P`MtBx?khV0kW~d2aH_1Xu>t3}8VoRE7Z{GUp*s+X41iVL&H+9@07YKu z_aV~!XMlkR__cA5qU=X7=N@nt0+}{fL%;INAU{XXsR83Wz|^(CnZHa9VD0q)ihSgN zKI8QZ2@?XOr@m^Y8RJb@v%EWBHY=t+MYgDd zU1=8vn7X|MhA{O4oW5$do8RTA^kvAVpD%x-k8v%E&cEBPGF`{Is@+mR{^Wu%#QHEr?syyeo`xL+mihi%vbaI ziwL6;yjc%5j=lGu2#_7<8-RIaQ-#SbrA$dCedwr{EV1%ngS*lsY*|7GA)Cnc0WSg& z0z^Ju>vlv%5JDiG0X>AEVGb%V4SNh?Qh`J2(JTp zKq!NqFo<24jp>aidXNF5!DcXsHItnsOx>HMj+B)px14kOOMd^S%-r&dL|9!Sl@ubP zE5uSUGpFipc1y1}u3qFWZ?Q`G@te=ZuP^0$J2o?OP)RQNiFyH%gHaL>-1|)C9|Bjbvqx=iO_w*P5YYEe7ddn zxplm{to>gr)0efK3<3Gv7LeEZ@Y5-~HuKA`{?$&YSvhHCZ>LmFx9pTMoswl&T6HwH zT65Fc(w$jO8D&S8EE$k;yC<#H&*r0=Dd^3US%-%PJV}rksUT&aM+g^8VC08~HKl;S z+DSse4g;!Ht97HO9>5?85+h7@iX`Dl)ybp1)+%MUs%O=v=3bitFX*QglwyyPg3gt4 z>#V%)dPt(&B}pumBeEbtL<_t|(Bp)zkOyUGMWph8h={H#Xo$l`Js^ZNKnQnWVu1xa zJ3uG`git~V0e}!*FqO(6Rlx))?n(gR1yPwP#mo%9@3vU*rz;w|EY%IVELPDFGhE#k zs|a!m&e^j$RV$Ua{XY?mww%UfcL_a*N;nk5z6Sz0GqeJ1?@& z0#m{mVcm+%2o@NMfS3Qj z%CYmTjhgL`lr@jrKpgLJ3ob+}sH9idL;5Hy+S-fq;sH zZiL*dJSSdD=tsy+_j5Dk#tXVaH&Zwma>MF?yr1tz$W4nYTD4d=LvB1GDXp6+qc3P~ z>vp&01*t|DTB(3UU?o5USv6H|q7fSUVXZtUDwX$yyF^#zWHm%XBcukxnGqeC49=B} z(#YTk*aarbFj5&gFe5U$g7anD=n4*$*rjEQ9GDU5!C8{=_`qy>aG(miP{?L55)&DO z7X~qi+0?)sm=Te|fgmfAUD<_oKNn36Jju+O3$u~IAyQ*gb7e$QT0S7O;eiNvL070m zb0OdZgWbW(p%pkyIG8-T! z2(y|CgAkfZG&R=%HJ+0j^wVMu1vpRv>55fe?~7fQt+|uqc5A!3?+n(Bgz6Y-p#{?MtSb<(J;m9SssxkfQ0k&$iUv zpGvO#Ds!59U1L9|Meglm?Y0^3duQkNxdK3FfrAh<#880jW~WoqYT2JMb=PXANjbmS z)u?IrM(_Hu^S)7Dt#Z2-rHcV33l>zim`ob9Aixe}p+84hqclBEhxjUK7_)Y@;WS^lp(NywaT;F87y za=xW*Ex&YH>!qEo-*?STjv8lgbe|tT&pAGO_Htc!>JdUei9pHym#)=sYx$i@*KFPH zCVlfr>v!Wv`J=|}MR{k}-@LJZ1MCVTCI@C+)+9?8CAuFdl(+iPq$U6@S-32LQB2C0 zv(l#4GX0*;FS$s4ZRAX4(E2 z?670Q3=9My>EeZrHs<$Rp0p=>H=WckFvNJtx?~wf(vkx+CRxvwE2A`AK%r158F*rH zLZMn7kk|3hA7%| z@7>?hXMd8`>3tmadt2zuU8(HbOXlfu$&1mMtV~i@+a03H^`Cv)gHnm;dUCPVNuXeSo zoBU{H-6DVgu{pl6^R|t4U5&D}x>FP+F85Gt{kP-zmBui z(pc@^$lUjF+d41)?Rh^tr5jXwv~X#qPfbd8r#$PVm)?zgqy08^8{61kf0UcFal0II z8hdOB#~ECF0rF+r5K~;&byeX3a$T~yc|M&|N=~b@N}iwHJWlP~BIV>qYx|2%ySwrA zqLdz#dxS(Hkvu5(2#MrDxkpAlUADhf)A1bP)v>TB-AN&GO_I)4`UaXSe=UR?GgLX0_)|`!=@my}Cuuzb(Gh zx^HUM-*BUhEx4$H(`8Mn1cXPZV|Q$rRE#MSkw8QwNeu^M888&aUb%(7bQ9DTv1bkLVd-N)BLd0@Nklq4Q6f z2k@b~#J{0VuY$u*@5oWoJ>NQyGrVLT=SngeSY5#PiYNvNfk~hN-&YKHmyM17T*o)y zWEzRc0U}UVoCR($6csi=uEh@oV86`#d;_$(OzwvPHvu%8I#F^Zz!&$*7}!uy_KW^ioLM0@+~pCI?V%u@ew315maBQZeic zm5HWWD0x{yCvUYGSI#ZODCvbubTq|3CNi*myN@w_C69Aaxq&(sQ_mKiTYCbeSOdxt z=!h9WMci0v00-x`0g!ey%y0saYu+8(XMqna*L5$(i2LAx0o~1NK;Ho83w6{0uS>^; z2QX;)pqnehu_U+1{TFuY;m?@?LwP^Is>CG1TnD(mLFc8sG4-rrOvvj+1FJDiKe>1Y ze9UKm!}WOd{}dS@!!->#$ma=xomYXP1vnfi12!GpfK|`LFkk@Ce^Gb(z5$E_T5JKY zd+KtABZ7WJy75b+RYVz#W*_W;8xnJhg7~|vo@|LoA-V~2z}t!=40ty5Z!eER?=3*S z|6s0cfL+`E379bBKo(U{^x8fMzp9G*efl;tT%DkSG9=~{f&6#bJz1$p#ipnn2QYjd zfF3Es0QTqq+T;!B^w6>e))DqpHb8p9PrzFF!GPX(mJQgjN0>+rC}^BFJAP+5iIw}- z;2nVYDEEabN`T91ss@Y}Oq7ZJgaHJ9joudXT0s8M%Zuqgrj(g9o7 z2Mh>S)&Te>4K!|Mih~wuKxn0;8ekOO0rwZVs`CxdPa|QZ40;ChBe-jwh1yRI0Dm+? zd;n)T$SRlQxZHPTS8obkWuGtr`YbK>ZT*3W5YuY_uslYL{UJuqO_9L{o@c%VV3w*N z7J&mu127kEqZt4_Hxmv}n+-T9aA&H6De2V=NW2a3dlty{LIO7c7_#<%08-%r1?I!# z6R0WiGgb^p9NR=jj!lhbKjkAmd#8$=zHbugD*%bi9U~SGfzMrqeE&Jid6I*%S zaXOdenA~p{-T}L|7(M@`WB`5p*IirjfEtjG&>rFfD5?WWdA|p|ctwW$1Hqdn{@fcp zV1W0?0>=UW19a4Y&#gS)f!qakz=y{HG)b1^zA#kF4)|U4XL~>*rU8IaU%=hulL?+a zn?h#Do_ir@k-HSIeRaOGJ?JPe*HDyBS{jtv_b40;U|RKVSX z0TkojxwaO%#5{79W8JthU_ehw^f5 z1kZH-(QQ9z@{g@pAofP10q|c}^$iHtd;^F_O&D<31pV>|1l$7T5!Y5WK(sY`0-AUL z=W4$`pDn#yI-P)zTMB{L`*m|0#4+L60z7$~;TLL@cZod$0S$hZA-i6SLT>^@G)6=F z*3=ko2e3~09ro3taR5nw`~l1e0{{xD2!ncRa9hm5QX1_BaJK<;%>uN!Kn{$t@i#yR z4RDhm_B=oo;CjF8*sZ&ESQ*6BfU?@*2{97Zcw`%}R>ec^FLJlK2UwoTOuzyP(f~kO zWkUFjreqDNAqKJm1RL81@azpF?xb$PD@eg+je?YI13+#A{I*2B_f`A;#QXyw!KQCo zYnrnD@CS(Jjsu8I3;JB4?wu= zpW12>O<1Dy_EWt=nEh4H^u(k>RD9m?z=H%J0fvg{#O;pRF(!u-OnA}47lT)mfaOf2c*shJaMbq z0HjZlsSH5*oY%MSdU8O%@2>EKFy>-_WVO|6!1&}X^dJoXY`}Y46$kIgub#TCS3`F% zP_MiLuA?{t*PV^v@1ta-9ia1tCa$?yfvzk&D;9`NDY&;hN7x2Us#^ls($BBzBc_0^?&96Nk#Yq#_F4jOtZkeq4VW_96 zhqM8qwgJI=0qq;K&H$n^*17{qA&b=wKmp7kFCD;ZX++2$(vy|L=i3H0;&GviY|_D6 z#>oJ}PHzFGGxbFr>0ome=u{-I`zi%A8`nf=Bv8o4zdYzCSNpL7_+5~_DUj;)XQH3Bwf^hvp0)Vy%M@^~InDAj5JdpN7>R-a3Ei|9X z$-?oFC13{2SBvcKoid`cpN`TRgqy*jkN=tOA|34kvjMaLHnX^iV6@d`T^KVp5qx#j z@(Wk0iFTT|QOh|cdu1q#(?rd$?U|p<@t?ETqOMjJy6WvaSs}F&SMGTmwG=Xov28jt zJ3H-Oms3^|Ti2aa>$sH{)kH16>5|FUm$piUKwp$>xON6h00XgsFkvn45fsnt|oe;CIYq@)kHhZghrwOieBVN zkaX|_NtCZ4z*i%c&_*q1wz_ae;WV95OQCX>O%`RPGz~zoX(Akzxf<1wiF(TEGL^6j%PYJ86Zli{;rZZ|bvrTN4O|@+`zrwUa=`4##msa`&ez?~ z&^5NwS^nCR)pY(S*#GYRPu=^U`s)f;ez6;$mEP(_`wsqJXIT-tD&FfN%?|5V=b7aI z#@p}ar_oLd-P7tiS~+x2|6SV*WhJZYot^ixQx3+YRLVJBv8F~6?KD!hVw*J5sF6sb zA2kxb8cAo=NSCRRL_KPx6h>hcNvDlkDzjNCAPW@m)O_q2y`g)h^*uNn;xUJl1lq6~ z@WNqzuj_;Jy*t85VU_T`2_>u*!uYj92y6Y<5K38Vj9qJ;(`VO8@5?W%?s~J!H(z(3 zgwDz;o#VG~o$b*7T}t^>dQXuQ#rkx9eUj1{oxX194&9GE`p_Z+N(7G)F!=Go;=~$! z=wP|w1P5ZcX#xx}LJJbMuwluL@+tM$-Ngi1smlvoEEz*e18y9U!IL8JmLGtiVnr7Y zWbpE-A=E{R9`3->0}JtaN9pj%ezVrvwLVyB%=YP#lhi!!JpM;M3Pydfj$`Z;lkrPg7 z`$dSU1+$CPR`>KlC!w9Ly7k&@sf#d1Sa0>J`IcK>optARej#)<QYqs2ByjvP5+vCSs| z1uV8Xa>QaA0s3g75FU;M@P-#S?1=3T#@Zd@^Y{VgMI0R~?f353(&Ojz`FuVUK7E(( z{kelcAP@-j{i2NTaf&m#kRlYS@eG1NXq2UnHbOKt9y2^1k7o-Mj|X-?*>pnMRLcGd zWfq6r@c#Z{eE8CbBt$|;e1U<1fsq0OGr$)t3Mm5yfG=Ed@Zx#7`S4# z&q*ni@Ok(Am+c|^;3SG}pK*Hh_*}kwsUg6PDOBj8(;g?BF+O+L?uRI+4HzPjD(`bb z@A>-S&**=E4>^R0R8Rv0Mu?`ylVuAa9*@TZ#p8LhqxXA^u}=8k1zSP~*|n$qH_fQB zferC%*Ba;l=kxh|K2pA%v?-kL{vZ$t1OhpH{thRFkv{)lzlpoDELy29(8&AE2RSQ;0nc7)@C|$H|;+$=bM26kbi@o)|_`8%@vsDwv#b zCZQH!-O&A5R7wY{V)`$3d;Qkq@ii&~rhymT%zronY!wbzIAR_5kFTQeIG_)X;^&dV+JI)j54g!p5XU(IM;em( ziH4vsL=laoUrJ>d&>XM$e*;2f9LoU+#(|9kprx#c|iN=L;z>`wn}N=0ECOS zcL4tOCZI@q#+(No7T+9cq6Stv*ZW4>85sP5Ohhx?S7Kxs&>TLeY(R9J0pqE20JWw> z@d1X>o~*72;JoZ<-vEtdtUF-n8n6{M+8Uq61xdU2{bM5ERF$eFsY`F3S&?zo3*2 zFm7!#8c^Z^mDvG7PQ18)-k}dtP`zmnu(**>ejm`axNm^=6Mp@2NHCzy&re~X1^_M$ zR3HUaAFnZf>G+=5#~X?Sae5@?+<;b$Dbi}9a~qV@aLCGC#$CX&$lhx$_6E3`0|1rX z0E8gU%sya-VFP#^;8Zos0d!!p6_xo01ONtgV2{r-96!K_SqJeAFcz%gX#mx-e2G&t zHreIk{&07r@_HA+FV_Bm0WV&*gVdt|v|5(mIJ@k)0Q^3_UTu{sA;GVz|ID)G8_@AU zd^F$|PV*y9b!@`R{r*E&y59h~t=eizKq|EVVaH_yT3BR=0B{(P;#W`v5?tJf#;h%d z95ChV0cpQE07M_&dmm`;z4{et!=A+rfE0)UsXCrN=?gr><~M(y9cc;_$nE{il6|qI z7YFltW<64007kaZvB7=)2^i!*AltqHp%K$Ye>c<{Ao%c|1H^vrFJmJd2Z)Y-MF1cN zw($K1fbB_sd?M5 zfa->cbNZm#=H3qa9(^r?x%1S0vzBFk#PP2Jp*82!L5IW8P9#a7Wa=>osJYYL>049aJO1!|&Q0onFQ341+D`u-ceFI7x#P66YR#9tC z$iID?iqIEpz(OBFPf75Vn_z%Sakn812&K*eRK)XtoOd<=0@wn#Rp%R^(j|ToO<}GX zuoXj6lvJ+2Oq0Go&b6)2CdusdCCdu|=8y~BM|%uFbS06NmH|(oIAH4g9DoTAG{NiY z1pxpgE>hV5BM5v4pgspE)~hO^b}}vT0Zj@DP6Mvw&p0pR`{F(d*OKrJ7-d8}e@H*T zY!0Bcc9yo80cRgGP6Hz@9dZKv2t$P&P(lc~xIg0EXjUI~%|C$JEG&05|Rd{~`?l&Tjy5WO~g3o-tt28nCq12e{1w+zs@Sazg-) zW%GOkd|hdG^(!#ov!t)JIyS9rRV3lWJdzaV(m%qe*mSkkkL8X*d%zt~nDvtM@*q2z zgs!U7!~%I5G#F5iGoV5rXrtel^acoWUUPt~dHK88K;!^f(W3}}RooW-@(l>skah1%xuBd9{JVO z6(1lW%EKSz%5_$~_NVGgSs`;CSAw>M9m5L5%bJ#{9 zhe*~T$V@Y;#i5+N*sFCN8(5cQ(1pV8dB^AZC2OyMfTD$DYw_Xc)eh(Z_$*tC3nM@@ z2;m*mLQ(iUdixXasSjJMW3beDge`oUltMH(^zhVt1`zRg#$b8Fu)KGboJ>&hipd0f z)`s~mfy3xVJ);=j zl!}f9U1xJze`Fs;N)?RkG3DqeXPW!*qxNSV8Z_94cc>Ri*SMFoj^QA+fObjdGoWCD zYycrx5Drj9g$Mb;nERFoEC~$gI2A5{f<_oDZv%1_>VV&P#Ww&$Kn%#VeYXp2dR1`u zw=NL!!!lr4fFNuK`s5|d=g*?ram_?Q$IGEGXLE)m?(?4U?Hq_~rZPO?NN{YM>)e|O z%eIHTv*#3v8X{j&Q4Jztvqh9SWldW4B)&dXjjrMiK`%@^H#qY!$9>qN3$Q?*j3G=-mhDVUYpswgp@3~2-(3h)!>p1#Fdhr|C12z-WkEAlxULavkleCvm+tJ|1Jgt>134d9qA;QSix ztWT!*d~dwKNyD((^bNSm&;T-!10;JCG$(+Lj|+C=Yg*(Lu`u!W0cVjOu0Xb2S50Mr zh)+$rkVra;7iI&F`W`?7@NodnvV8ztL%<{?7VbA-J3Jug<^bO90&J~&FknbR4ZsLV zb{7vg7Q}$mfakOmXl9@}z|Leqk&m!+CatGU3DRY4zoK?Vzb9XAQXr24d`xcn8V1sU z42lEtT+#p>fp37%NI>TRYP@*y0Y8)8EUO6MybRF30XU7>XCUJS0IpW`^r>n!+tucm z4Uj7sm isD{b_r%eMHjR90SKtJFvs>K1+X#vFqlEr<2Mgp$hG|L|V literal 0 HcmV?d00001 diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index a5a1debff..1d0c8406b 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -24,18 +24,7 @@ use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; mod common; -use common::to_arrow; - -/// unpack the test data from test_table.tar.zst into a temp dir, and return the dir it was -/// unpacked into -fn load_test_data(test_name: &str) -> Result> { - let path = format!("tests/golden_data/{}.tar.zst", test_name); - let tar = zstd::Decoder::new(std::fs::File::open(path)?)?; - let mut archive = tar::Archive::new(tar); - let temp_dir = tempfile::tempdir()?; - archive.unpack(temp_dir.path())?; - Ok(temp_dir) -} +use common::{load_test_data, to_arrow}; // NB adapated from DAT: read all parquet files in the directory and concatenate them async fn read_expected(path: &Path) -> DeltaResult { @@ -218,7 +207,7 @@ fn setup_golden_table( Option, tempfile::TempDir, ) { - let test_dir = load_test_data(test_name).unwrap(); + let test_dir = load_test_data("tests/golden_data", test_name).unwrap(); let test_path = test_dir.path().join(test_name); let table_path = test_path.join("delta"); let table = Table::try_from_uri(table_path.to_str().expect("table path to string")) diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index a0a8160c1..7a674ce57 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -318,33 +318,6 @@ async fn stats() -> Result<(), Box> { Ok(()) } -macro_rules! sort_lines { - ($lines: expr) => {{ - // sort except for header + footer - let num_lines = $lines.len(); - if num_lines > 3 { - $lines.as_mut_slice()[2..num_lines - 1].sort_unstable() - } - }}; -} - -// NB: expected_lines_sorted MUST be pre-sorted (via sort_lines!()) -macro_rules! assert_batches_sorted_eq { - ($expected_lines_sorted: expr, $CHUNKS: expr) => { - let formatted = arrow::util::pretty::pretty_format_batches($CHUNKS) - .unwrap() - .to_string(); - // fix for windows: \r\n --> - let mut actual_lines: Vec<&str> = formatted.trim().lines().collect(); - sort_lines!(actual_lines); - assert_eq!( - $expected_lines_sorted, actual_lines, - "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", - $expected_lines_sorted, actual_lines - ); - }; -} - fn read_with_execute( engine: Arc, scan: &Scan,