diff --git a/kernel/examples/read-table-changes/Cargo.toml b/kernel/examples/read-table-changes/Cargo.toml new file mode 100644 index 000000000..f9f980dc2 --- /dev/null +++ b/kernel/examples/read-table-changes/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "read-table-changes" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +arrow-array = { workspace = true } +arrow-schema = { workspace = true } +clap = { version = "4.5", features = ["derive"] } +delta_kernel = { path = "../../../kernel", features = [ + "cloud", + "default-engine", +] } +env_logger = "0.11.3" +url = "2" +itertools = "0.13" +arrow = { workspace = true, features = ["prettyprint"] } diff --git a/kernel/examples/read-table-changes/src/main.rs b/kernel/examples/read-table-changes/src/main.rs new file mode 100644 index 000000000..3360a06cf --- /dev/null +++ b/kernel/examples/read-table-changes/src/main.rs @@ -0,0 +1,58 @@ +use std::{collections::HashMap, sync::Arc}; + +use arrow::{compute::filter_record_batch, util::pretty::print_batches}; +use arrow_array::RecordBatch; +use clap::Parser; +use delta_kernel::engine::arrow_data::ArrowEngineData; +use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; +use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::{DeltaResult, Table}; +use itertools::Itertools; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +#[command(propagate_version = true)] +struct Cli { + /// Path to the table to inspect + path: String, + /// The start version of the table changes + #[arg(short, long, default_value_t = 0)] + start_version: u64, + /// The end version of the table changes + #[arg(short, long)] + end_version: Option, +} + +fn main() -> DeltaResult<()> { + let cli = Cli::parse(); + let table = Table::try_from_uri(cli.path)?; + let options = HashMap::from([("skip_signature", "true".to_string())]); + let engine = Arc::new(DefaultEngine::try_new( + table.location(), + options, + Arc::new(TokioBackgroundExecutor::new()), + )?); + let table_changes = table.table_changes(engine.as_ref(), cli.start_version, cli.end_version)?; + + let table_changes_scan = table_changes.into_scan_builder().build()?; + let batches: Vec = table_changes_scan + .execute(engine.clone())? + .map(|scan_result| -> DeltaResult<_> { + let scan_result = scan_result?; + let mask = scan_result.full_mask(); + let data = scan_result.raw_data?; + let record_batch: RecordBatch = data + .into_any() + .downcast::() + .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? + .into(); + if let Some(mask) = mask { + Ok(filter_record_batch(&record_batch, &mask.into())?) + } else { + Ok(record_batch) + } + }) + .try_collect()?; + print_batches(&batches)?; + Ok(()) +} diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index e771379d1..1d6902d86 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -10,12 +10,16 @@ //! //! A full `rust` example for reading table data using the default engine can be found in the //! [read-table-single-threaded] example (and for a more complex multi-threaded reader see the -//! [read-table-multi-threaded] example). +//! [read-table-multi-threaded] example). An example for reading the table changes for a table +//! using the default engine can be found in the [read-table-changes] example. +//! //! //! [read-table-single-threaded]: //! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-single-threaded //! [read-table-multi-threaded]: //! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-multi-threaded +//! [read-table-changes]: +//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-changes //! //! Simple write examples can be found in the [`write.rs`] integration tests. Standalone write //! examples are coming soon! diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 2c15bd537..b74f65b7a 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -1,4 +1,36 @@ //! Provides an API to read the table's change data feed between two versions. +//! +//! # Example +//! ```rust +//! # use std::sync::Arc; +//! # use delta_kernel::engine::sync::SyncEngine; +//! # use delta_kernel::expressions::{column_expr, Scalar}; +//! # use delta_kernel::{Expression, Table, Error}; +//! # let path = "./tests/data/table-with-cdf"; +//! # let engine = Arc::new(SyncEngine::new()); +//! // Construct a table from a path oaeuhoanut +//! let table = Table::try_from_uri(path)?; +//! +//! // Get the table changes (change data feed) between version 0 and 1 +//! let table_changes = table.table_changes(engine.as_ref(), 0, 1)?; +//! +//! // Optionally specify a schema and predicate to apply to the table changes scan +//! let schema = table_changes +//! .schema() +//! .project(&["id", "_commit_version"])?; +//! let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10))); +//! +//! // Construct the table changes scan +//! let table_changes_scan = table_changes +//! .into_scan_builder() +//! .with_schema(schema) +//! .with_predicate(predicate.clone()) +//! .build()?; +//! +//! // Execute the table changes scan to get a fallible iterator of `ScanResult`s +//! let table_change_batches = table_changes_scan.execute(engine.clone())?; +//! # Ok::<(), Error>(()) +//! ``` use std::collections::HashSet; use std::sync::{Arc, LazyLock}; @@ -39,21 +71,28 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { /// - `_change_type`: String representing the type of change that for that commit. This may be one /// of `delete`, `insert`, `update_preimage`, or `update_postimage`. /// - `_commit_version`: Long representing the commit the change occurred in. -/// - `_commit_timestamp`: Time at which the commit occurred. If In-commit timestamps is enabled, -/// this is retrieved from the [`CommitInfo`] action. Otherwise, the timestamp is the same as the -/// commit file's modification timestamp. +/// - `_commit_timestamp`: Time at which the commit occurred. The timestamp is retrieved from the +/// file modification time of the log file. No timezone is associated with the timestamp. +/// +/// Currently, in-commit timestamps (ICT) is not supported. In the future when ICT is enabled, the +/// timestamp will be retrieved from the `inCommitTimestamp` field of the CommitInfo` action. +/// See issue [#559](https://github.com/delta-io/delta-kernel-rs/issues/559) +/// For details on In-Commit Timestamps, see the [Protocol](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#in-commit-timestamps). +/// /// /// Three properties must hold for the entire CDF range: -/// - Reading must be supported for every commit in the range. This is determined using [`ensure_read_supported`] +/// - Reading must be supported for every commit in the range. Currently the only read feature allowed +/// is deletion vectors. This will be expanded in the future to support more delta table features. +/// Because only deletion vectors are supported, reader version 2 will not be allowed. That is +// because version 2 requires that column mapping is enabled. Reader versions 1 and 3 are allowed. /// - Change Data Feed must be enabled for the entire range with the `delta.enableChangeDataFeed` -/// table property set to 'true'. +/// table property set to `true`. Performing change data feed on tables with column mapping is +/// currently disallowed. We check that column mapping is disabled, or the column mapping mode is `None`. /// - The schema for each commit must be compatible with the end schema. This means that all the /// same fields and their nullability are the same. Schema compatibility will be expanded in the /// future to allow compatible schemas that are not the exact same. /// See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523) /// -/// [`CommitInfo`]: crate::actions::CommitInfo -/// [`ensure_read_supported`]: crate::actions::Protocol::ensure_read_supported /// # Examples /// Get `TableChanges` for versions 0 to 1 (inclusive) /// ```rust @@ -193,11 +232,8 @@ impl TableChanges { } } -/// Ensures that change data feed is enabled in `table_properties`. -/// -/// Performing change data feed on tables with column mapping is currently disallowed. -/// This will be less restrictive in the future. Because column mapping is disallowed, we also -/// check that column mapping is disabled, or the column mapping mode is `None`. +/// Ensures that change data feed is enabled in `table_properties`. See the documentation +/// of [`TableChanges`] for more details. fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult<()> { require!( table_properties.enable_change_data_feed.unwrap_or(false), @@ -214,12 +250,7 @@ fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult } /// Ensures that Change Data Feed is supported for a table with this [`Protocol`] . -// -// Currently the only read feature allowed is deletion vectors. This will be expanded in the -// future to support more delta table features. -// -// Because only deletion vectors are supported, reader version 2 will not be allowed. That is -// because version 2 requires that column mapping is enabled. Reader versions 1 and 3 are allowed. +/// See the documentation of [`TableChanges`] for more details. fn ensure_cdf_read_supported(protocol: &Protocol) -> DeltaResult<()> { static CDF_SUPPORTED_READER_FEATURES: LazyLock> = LazyLock::new(|| HashSet::from([ReaderFeatures::DeletionVectors])); diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 0902bd2d0..9b0ba3067 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -1,3 +1,5 @@ +//! Functionality to create and execute table changes scans over the data in the delta table + use std::sync::Arc; use itertools::Itertools; @@ -16,8 +18,8 @@ use super::resolve_dvs::{resolve_scan_file_dv, ResolvedCdfScanFile}; use super::scan_file::scan_data_to_scan_file; use super::{TableChanges, CDF_FIELDS}; -/// The result of building a [`TableChanges`] scan over a table. This can be used to get a change -/// data feed from the table +/// The result of building a [`TableChanges`] scan over a table. This can be used to get the change +/// data feed from the table. #[derive(Debug)] pub struct TableChangesScan { // The [`TableChanges`] that specifies this scan's start and end versions @@ -37,7 +39,7 @@ pub struct TableChangesScan { /// This builder constructs a [`TableChangesScan`] that can be used to read the [`TableChanges`] /// of a table. [`TableChangesScanBuilder`] allows you to specify a schema to project the columns -/// or specify a predicate to filter rows in the Change Data Feed. Note that predicates over Change +/// or specify a predicate to filter rows in the Change Data Feed. Note that predicates containing Change /// Data Feed columns `_change_type`, `_commit_version`, and `_commit_timestamp` are not currently /// allowed. See issue [#525](https://github.com/delta-io/delta-kernel-rs/issues/525). /// @@ -45,7 +47,7 @@ pub struct TableChangesScan { /// [`ScanBuilder`]. /// /// [`ScanBuilder`]: crate::scan::ScanBuilder -/// #Examples +/// # Example /// Construct a [`TableChangesScan`] from `table_changes` with a given schema and predicate /// ```rust /// # use std::sync::Arc; diff --git a/kernel/tests/cdf.rs b/kernel/tests/cdf.rs index 5263df960..2be5324fc 100644 --- a/kernel/tests/cdf.rs +++ b/kernel/tests/cdf.rs @@ -172,3 +172,60 @@ fn cdf_non_partitioned() -> Result<(), Box> { assert_batches_sorted_eq!(expected, &batches); Ok(()) } + +#[test] +fn cdf_with_cdc_and_dvs() -> Result<(), Box> { + let batches = read_cdf_for_table("cdf-table-with-cdc-and-dvs", 0, None)?; + let mut expected = vec![ + "+----+--------------------+------------------+-----------------+", + "| id | comment | _change_type | _commit_version |", + "+----+--------------------+------------------+-----------------+", + "| 1 | initial | insert | 0 |", + "| 2 | insert1 | insert | 1 |", + "| 3 | insert1-delete1 | insert | 1 |", + "| 4 | insert1-delete2 | insert | 1 |", + "| 5 | insert1-delete2 | insert | 1 |", + "| 3 | insert1-delete1 | delete | 2 |", + "| 3 | insert1-delete1 | insert | 4 |", + "| 4 | insert1-delete2 | delete | 5 |", + "| 5 | insert1-delete2 | delete | 5 |", + "| 4 | insert1-delete2 | insert | 7 |", + "| 5 | insert2 | insert | 8 |", + "| 1 | initial | update_preimage | 9 |", + "| 1 | update1 | update_postimage | 9 |", + "| 2 | insert1 | update_preimage | 9 |", + "| 2 | update1 | update_postimage | 9 |", + "| 3 | insert1-delete1 | update_preimage | 9 |", + "| 3 | update1 | update_postimage | 9 |", + "| 1 | update1 | delete | 10 |", + "| 2 | update1 | update_preimage | 12 |", + "| 2 | update2 | update_postimage | 12 |", + "| 6 | insert3 | insert | 14 |", + "| 7 | insert3 | insert | 14 |", + "| 8 | insert4 | insert | 15 |", + "| 9 | insert4 | insert | 15 |", + "| 8 | insert4 | delete | 16 |", + "| 7 | insert3 | delete | 16 |", + "| 10 | merge1-insert | insert | 18 |", + "| 11 | merge1-insert | insert | 18 |", + "| 9 | merge1-update | update_postimage | 18 |", + "| 9 | insert4 | update_preimage | 18 |", + "| 11 | merge1-insert | update_preimage | 20 |", + "| 11 | | update_postimage | 20 |", + "| 12 | merge2-insert | insert | 22 |", + "| 11 | | delete | 22 |", + "| 3 | update1 | delete | 24 |", + "| 4 | insert1-delete2 | delete | 24 |", + "| 5 | insert2 | delete | 24 |", + "| 2 | update2 | delete | 24 |", + "| 6 | insert3 | delete | 24 |", + "| 9 | merge1-update | delete | 24 |", + "| 0 | new | insert | 25 |", + "| 1 | after-large-delete | insert | 25 |", + "| 2 | | insert | 25 |", + "+----+--------------------+------------------+-----------------+", + ]; + sort_lines!(expected); + assert_batches_sorted_eq!(expected, &batches); + Ok(()) +} diff --git a/kernel/tests/data/cdf-table-with-cdc-and-dvs.tar.zst b/kernel/tests/data/cdf-table-with-cdc-and-dvs.tar.zst new file mode 100644 index 000000000..6304ccb06 Binary files /dev/null and b/kernel/tests/data/cdf-table-with-cdc-and-dvs.tar.zst differ