Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add table changes example, improve TableChanges documentation #597

Merged
merged 9 commits into from
Dec 14, 2024
18 changes: 18 additions & 0 deletions kernel/examples/read-table-changes/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "read-table-changes"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
clap = { version = "4.5", features = ["derive"] }
delta_kernel = { path = "../../../kernel", features = [
"cloud",
"default-engine",
] }
env_logger = "0.11.3"
url = "2"
itertools = "0.13"
arrow = { workspace = true, features = ["prettyprint"] }
58 changes: 58 additions & 0 deletions kernel/examples/read-table-changes/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::{collections::HashMap, sync::Arc};

use arrow::{compute::filter_record_batch, util::pretty::print_batches};
use arrow_array::RecordBatch;
use clap::Parser;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::{DeltaResult, Table};
use itertools::Itertools;

#[derive(Parser)]
#[command(author, version, about, long_about = None)]
#[command(propagate_version = true)]
struct Cli {
/// Path to the table to inspect
path: String,
/// The start version of the table changes
#[arg(short, long, default_value_t = 0)]
start_version: u64,
/// The end version of the table changes
#[arg(short, long)]
end_version: Option<u64>,
}

fn main() -> DeltaResult<()> {
let cli = Cli::parse();
let table = Table::try_from_uri(cli.path)?;
let options = HashMap::from([("skip_signature", "true".to_string())]);
let engine = Arc::new(DefaultEngine::try_new(
table.location(),
options,
Arc::new(TokioBackgroundExecutor::new()),
)?);
let table_changes = table.table_changes(engine.as_ref(), cli.start_version, cli.end_version)?;

let table_changes_scan = table_changes.into_scan_builder().build()?;
let batches: Vec<RecordBatch> = table_changes_scan
.execute(engine.clone())?
.map(|scan_result| -> DeltaResult<_> {
let scan_result = scan_result?;
let mask = scan_result.full_mask();
let data = scan_result.raw_data?;
let record_batch: RecordBatch = data
.into_any()
.downcast::<ArrowEngineData>()
.map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))?
.into();
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
}
})
.try_collect()?;
print_batches(&batches)?;
Ok(())
}
6 changes: 5 additions & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@
//!
//! A full `rust` example for reading table data using the default engine can be found in the
//! [read-table-single-threaded] example (and for a more complex multi-threaded reader see the
//! [read-table-multi-threaded] example).
//! [read-table-multi-threaded] example). An example for reading the table changes for a table
//! using the default engine can be found in the [read-table-changes] example.
//!
//!
//! [read-table-single-threaded]:
//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-single-threaded
//! [read-table-multi-threaded]:
//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-multi-threaded
//! [read-table-changes]:
//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-changes
//!
//! Simple write examples can be found in the [`write.rs`] integration tests. Standalone write
//! examples are coming soon!
Expand Down
67 changes: 49 additions & 18 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,36 @@
//! Provides an API to read the table's change data feed between two versions.
//!
//! # Example
//! ```rust
//! # use std::sync::Arc;
//! # use delta_kernel::engine::sync::SyncEngine;
//! # use delta_kernel::expressions::{column_expr, Scalar};
//! # use delta_kernel::{Expression, Table, Error};
//! # let path = "./tests/data/table-with-cdf";
//! # let engine = Arc::new(SyncEngine::new());
//! // Construct a table from a path oaeuhoanut
//! let table = Table::try_from_uri(path)?;
//!
//! // Get the table changes (change data feed) between version 0 and 1
//! let table_changes = table.table_changes(engine.as_ref(), 0, 1)?;
//!
//! // Optionally specify a schema and predicate to apply to the table changes scan
//! let schema = table_changes
//! .schema()
//! .project(&["id", "_commit_version"])?;
//! let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10)));
//!
//! // Construct the table changes scan
//! let table_changes_scan = table_changes
//! .into_scan_builder()
//! .with_schema(schema)
//! .with_predicate(predicate.clone())
//! .build()?;
//!
//! // Execute the table changes scan to get a fallible iterator of `ScanResult`s
//! let table_change_batches = table_changes_scan.execute(engine.clone())?;
//! # Ok::<(), Error>(())
//! ```
use std::collections::HashSet;
use std::sync::{Arc, LazyLock};

Expand Down Expand Up @@ -39,21 +71,28 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
/// - `_change_type`: String representing the type of change that for that commit. This may be one
/// of `delete`, `insert`, `update_preimage`, or `update_postimage`.
/// - `_commit_version`: Long representing the commit the change occurred in.
/// - `_commit_timestamp`: Time at which the commit occurred. If In-commit timestamps is enabled,
/// this is retrieved from the [`CommitInfo`] action. Otherwise, the timestamp is the same as the
/// commit file's modification timestamp.
/// - `_commit_timestamp`: Time at which the commit occurred. The timestamp is retrieved from the
/// file modification time of the log file. No timezone is associated with the timestamp.
///
/// Currently, in-commit timestamps (ICT) is not supported. In the future when ICT is enabled, the
/// timestamp will be retrieved from the `inCommitTimestamp` field of the CommitInfo` action.
/// See issue [#559](https://github.com/delta-io/delta-kernel-rs/issues/559)
/// For details on In-Commit Timestamps, see the [Protocol](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#in-commit-timestamps).
///
///
/// Three properties must hold for the entire CDF range:
/// - Reading must be supported for every commit in the range. This is determined using [`ensure_read_supported`]
/// - Reading must be supported for every commit in the range. Currently the only read feature allowed
/// is deletion vectors. This will be expanded in the future to support more delta table features.
/// Because only deletion vectors are supported, reader version 2 will not be allowed. That is
// because version 2 requires that column mapping is enabled. Reader versions 1 and 3 are allowed.
/// - Change Data Feed must be enabled for the entire range with the `delta.enableChangeDataFeed`
/// table property set to 'true'.
/// table property set to `true`. Performing change data feed on tables with column mapping is
/// currently disallowed. We check that column mapping is disabled, or the column mapping mode is `None`.
/// - The schema for each commit must be compatible with the end schema. This means that all the
/// same fields and their nullability are the same. Schema compatibility will be expanded in the
/// future to allow compatible schemas that are not the exact same.
/// See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523)
///
/// [`CommitInfo`]: crate::actions::CommitInfo
/// [`ensure_read_supported`]: crate::actions::Protocol::ensure_read_supported
/// # Examples
/// Get `TableChanges` for versions 0 to 1 (inclusive)
/// ```rust
Expand Down Expand Up @@ -193,11 +232,8 @@ impl TableChanges {
}
}

/// Ensures that change data feed is enabled in `table_properties`.
///
/// Performing change data feed on tables with column mapping is currently disallowed.
/// This will be less restrictive in the future. Because column mapping is disallowed, we also
/// check that column mapping is disabled, or the column mapping mode is `None`.
/// Ensures that change data feed is enabled in `table_properties`. See the documentation
/// of [`TableChanges`] for more details.
fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult<()> {
require!(
table_properties.enable_change_data_feed.unwrap_or(false),
Expand All @@ -214,12 +250,7 @@ fn check_cdf_table_properties(table_properties: &TableProperties) -> DeltaResult
}

/// Ensures that Change Data Feed is supported for a table with this [`Protocol`] .
//
// Currently the only read feature allowed is deletion vectors. This will be expanded in the
// future to support more delta table features.
//
// Because only deletion vectors are supported, reader version 2 will not be allowed. That is
// because version 2 requires that column mapping is enabled. Reader versions 1 and 3 are allowed.
//See the documentation of [`TableChanges`] for more details.
OussamaSaoudi-db marked this conversation as resolved.
Show resolved Hide resolved
fn ensure_cdf_read_supported(protocol: &Protocol) -> DeltaResult<()> {
static CDF_SUPPORTED_READER_FEATURES: LazyLock<HashSet<ReaderFeatures>> =
LazyLock::new(|| HashSet::from([ReaderFeatures::DeletionVectors]));
Expand Down
10 changes: 6 additions & 4 deletions kernel/src/table_changes/scan.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Functionality to create and execute table changes scans over the data in the delta table

use std::sync::Arc;

use itertools::Itertools;
Expand All @@ -16,8 +18,8 @@ use super::resolve_dvs::{resolve_scan_file_dv, ResolvedCdfScanFile};
use super::scan_file::scan_data_to_scan_file;
use super::{TableChanges, CDF_FIELDS};

/// The result of building a [`TableChanges`] scan over a table. This can be used to get a change
/// data feed from the table
/// The result of building a [`TableChanges`] scan over a table. This can be used to get the change
/// data feed from the table.
#[derive(Debug)]
pub struct TableChangesScan {
// The [`TableChanges`] that specifies this scan's start and end versions
Expand All @@ -37,15 +39,15 @@ pub struct TableChangesScan {

/// This builder constructs a [`TableChangesScan`] that can be used to read the [`TableChanges`]
/// of a table. [`TableChangesScanBuilder`] allows you to specify a schema to project the columns
/// or specify a predicate to filter rows in the Change Data Feed. Note that predicates over Change
/// or specify a predicate to filter rows in the Change Data Feed. Note that predicates containing Change
/// Data Feed columns `_change_type`, `_commit_version`, and `_commit_timestamp` are not currently
/// allowed. See issue [#525](https://github.com/delta-io/delta-kernel-rs/issues/525).
///
/// Note: There is a lot of shared functionality between [`TableChangesScanBuilder`] and
/// [`ScanBuilder`].
///
/// [`ScanBuilder`]: crate::scan::ScanBuilder
/// #Examples
/// # Example
/// Construct a [`TableChangesScan`] from `table_changes` with a given schema and predicate
/// ```rust
/// # use std::sync::Arc;
Expand Down
57 changes: 57 additions & 0 deletions kernel/tests/cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,60 @@ fn cdf_non_partitioned() -> Result<(), Box<dyn error::Error>> {
assert_batches_sorted_eq!(expected, &batches);
Ok(())
}

#[test]
fn cdf_with_cdc_and_dvs() -> Result<(), Box<dyn error::Error>> {
let batches = read_cdf_for_table("cdf-table-with-cdc-and-dvs", 0, None)?;
let mut expected = vec![
"+----+--------------------+------------------+-----------------+",
"| id | comment | _change_type | _commit_version |",
"+----+--------------------+------------------+-----------------+",
"| 1 | initial | insert | 0 |",
"| 2 | insert1 | insert | 1 |",
"| 3 | insert1-delete1 | insert | 1 |",
"| 4 | insert1-delete2 | insert | 1 |",
"| 5 | insert1-delete2 | insert | 1 |",
"| 3 | insert1-delete1 | delete | 2 |",
"| 3 | insert1-delete1 | insert | 4 |",
"| 4 | insert1-delete2 | delete | 5 |",
"| 5 | insert1-delete2 | delete | 5 |",
"| 4 | insert1-delete2 | insert | 7 |",
"| 5 | insert2 | insert | 8 |",
"| 1 | initial | update_preimage | 9 |",
"| 1 | update1 | update_postimage | 9 |",
"| 2 | insert1 | update_preimage | 9 |",
"| 2 | update1 | update_postimage | 9 |",
"| 3 | insert1-delete1 | update_preimage | 9 |",
"| 3 | update1 | update_postimage | 9 |",
"| 1 | update1 | delete | 10 |",
"| 2 | update1 | update_preimage | 12 |",
"| 2 | update2 | update_postimage | 12 |",
"| 6 | insert3 | insert | 14 |",
"| 7 | insert3 | insert | 14 |",
"| 8 | insert4 | insert | 15 |",
"| 9 | insert4 | insert | 15 |",
"| 8 | insert4 | delete | 16 |",
"| 7 | insert3 | delete | 16 |",
"| 10 | merge1-insert | insert | 18 |",
"| 11 | merge1-insert | insert | 18 |",
"| 9 | merge1-update | update_postimage | 18 |",
"| 9 | insert4 | update_preimage | 18 |",
"| 11 | merge1-insert | update_preimage | 20 |",
"| 11 | | update_postimage | 20 |",
"| 12 | merge2-insert | insert | 22 |",
"| 11 | | delete | 22 |",
"| 3 | update1 | delete | 24 |",
"| 4 | insert1-delete2 | delete | 24 |",
"| 5 | insert2 | delete | 24 |",
"| 2 | update2 | delete | 24 |",
"| 6 | insert3 | delete | 24 |",
"| 9 | merge1-update | delete | 24 |",
"| 0 | new | insert | 25 |",
"| 1 | after-large-delete | insert | 25 |",
"| 2 | | insert | 25 |",
"+----+--------------------+------------------+-----------------+",
];
sort_lines!(expected);
assert_batches_sorted_eq!(expected, &batches);
Ok(())
}
Binary file not shown.
Loading