diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index c62486873..84e5c2e48 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -10,7 +10,7 @@ use std::sync::LazyLock; use self::deletion_vector::DeletionVectorDescriptor; use crate::actions::schemas::GetStructField; -use crate::schema::{SchemaRef, StructType}; +use crate::schema::{DataType, SchemaRef, StructField, StructType}; use crate::table_features::{ ReaderFeatures, WriterFeatures, SUPPORTED_READER_FEATURES, SUPPORTED_WRITER_FEATURES, }; @@ -84,6 +84,13 @@ pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef { &LOG_COMMIT_INFO_SCHEMA } +pub(crate) fn get_log_commit_info_schema_no_ict() -> &'static SchemaRef { + StructType::new([ + StructField::new("timestamp", DataType::LONG, true), + StructField::new("operation", DataType::STRING, true), + ]) +} + #[derive(Debug, Clone, PartialEq, Eq, Schema)] #[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))] pub struct Format { @@ -331,8 +338,11 @@ where struct CommitInfo { /// The time this logical file was created, as milliseconds since the epoch. /// Read: optional, write: required (that is, kernel always writes). - /// If in-commit timestamps are enabled, this is always required. pub(crate) timestamp: Option, + /// The time this logical file was created, as milliseconds since the epoch. Unlike + /// `timestamp`, this field is guaranteed to be monotonically increase with each commit. + /// If in-commit timestamps are enabled, this is always required. + pub(crate) in_commit_timestamp: Option, /// An arbitrary string that identifies the operation associated with this commit. This is /// specified by the engine. Read: optional, write: required (that is, kernel alwarys writes). pub(crate) operation: Option, @@ -694,6 +704,7 @@ mod tests { "commitInfo", StructType::new(vec![ StructField::new("timestamp", DataType::LONG, true), + StructField::new("inCommitTimestamp", DataType::LONG, true), StructField::new("operation", DataType::STRING, true), StructField::new( "operationParameters", diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index 9c6cfe872..33f71cc03 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -7,8 +7,8 @@ use std::sync::{Arc, LazyLock}; use crate::actions::schemas::GetStructField; use crate::actions::visitors::{visit_deletion_vector_at, ProtocolVisitor}; use crate::actions::{ - get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, METADATA_NAME, - PROTOCOL_NAME, REMOVE_NAME, + get_log_add_schema, Add, Cdc, CommitInfo, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, + COMMIT_INFO_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, }; use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::{column_name, ColumnName}; @@ -79,6 +79,11 @@ pub(crate) fn table_changes_action_iter( /// Deletion vector resolution affects whether a remove action is selected in the second /// phase, so we must perform it ahead of time in phase 1. /// - Ensure that reading is supported on any protocol updates. +/// - Extract the timestamp from [`CommitInfo`] actions if they are present. These are +/// generated when in-commit timestamps is enabled. This must be done in the first phase +/// because the second phase lazily transforms engine data with an extra timestamp column, +/// so the timestamp must be known ahead of time. +/// See: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#in-commit-timestamps /// - Ensure that Change Data Feed is enabled for any metadata update. See [`TableProperties`] /// - Ensure that any schema update is compatible with the provided `schema`. Currently, schema /// compatibility is checked through schema equality. This will be expanded in the future to @@ -94,12 +99,6 @@ pub(crate) fn table_changes_action_iter( /// /// See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vectors /// -/// TODO: When the kernel supports in-commit timestamps, we will also have to inspect CommitInfo -/// actions to find the timestamp. These are generated when incommit timestamps is enabled. -/// This must be done in the first phase because the second phase lazily transforms engine data with -/// an extra timestamp column. Thus, the timestamp must be known ahead of time. -/// See https://github.com/delta-io/delta-kernel-rs/issues/559 -/// /// 2. Scan file generation phase [`LogReplayScanner::into_scan_batches`]: This iterates over every /// action in the commit, and generates [`TableChangesScanData`]. It does so by transforming the /// actions using [`add_transform_expr`], and generating selection vectors with the following rules: @@ -138,6 +137,7 @@ impl LogReplayScanner { /// 2. Construct a map from path to deletion vector of remove actions that share the same path /// as an add action. /// 3. Perform validation on each protocol and metadata action in the commit. + /// 4. Extract the in-commit timestamp from [`CommitInfo`] if it is present. /// /// For more details, see the documentation for [`LogReplayScanner`]. fn try_new( @@ -165,6 +165,7 @@ impl LogReplayScanner { let mut remove_dvs = HashMap::default(); let mut add_paths = HashSet::default(); let mut has_cdc_action = false; + let mut timestamp = commit_file.location.last_modified; for actions in action_iter { let actions = actions?; @@ -172,6 +173,7 @@ impl LogReplayScanner { add_paths: &mut add_paths, remove_dvs: &mut remove_dvs, has_cdc_action: &mut has_cdc_action, + timestamp: &mut timestamp, protocol: None, metadata_info: None, }; @@ -204,7 +206,7 @@ impl LogReplayScanner { remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path)); } Ok(LogReplayScanner { - timestamp: commit_file.location.last_modified, + timestamp, commit_file, has_cdc_action, remove_dvs, @@ -222,7 +224,6 @@ impl LogReplayScanner { has_cdc_action, remove_dvs, commit_file, - // TODO: Add the timestamp as a column with an expression timestamp, } = self; let remove_dvs = Arc::new(remove_dvs); @@ -276,6 +277,7 @@ struct PreparePhaseVisitor<'a> { has_cdc_action: &'a mut bool, add_paths: &'a mut HashSet, remove_dvs: &'a mut HashMap, + timestamp: &'a mut i64, } impl PreparePhaseVisitor<'_> { fn schema() -> Arc { @@ -285,6 +287,7 @@ impl PreparePhaseVisitor<'_> { Option::::get_struct_field(CDC_NAME), Option::::get_struct_field(METADATA_NAME), Option::::get_struct_field(PROTOCOL_NAME), + Option::::get_struct_field(COMMIT_INFO_NAME), ])) } } @@ -316,6 +319,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> { (INTEGER, column_name!("protocol.minWriterVersion")), (string_list.clone(), column_name!("protocol.readerFeatures")), (string_list, column_name!("protocol.writerFeatures")), + (LONG, column_name!("commitInfo.inCommitTimestamp")), ]; let (types, names) = types_and_names.into_iter().unzip(); (names, types).into() @@ -325,7 +329,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> { fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> { require!( - getters.len() == 16, + getters.len() == 17, Error::InternalError(format!( "Wrong number of PreparePhaseVisitor getters: {}", getters.len() @@ -356,6 +360,10 @@ impl RowVisitor for PreparePhaseVisitor<'_> { let protocol = ProtocolVisitor::visit_protocol(i, min_reader_version, &getters[12..=15])?; self.protocol = Some(protocol); + } else if let Some(timestamp) = + getters[16].get_long(i, "commitInfo.inCommitTimestamp")? + { + *self.timestamp = timestamp; } } Ok(()) diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index 9953dd464..ae7cbe85e 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -1,6 +1,7 @@ use super::table_changes_action_iter; use super::TableChangesScanData; use crate::actions::deletion_vector::DeletionVectorDescriptor; +use crate::actions::CommitInfo; use crate::actions::{Add, Cdc, Metadata, Protocol, Remove}; use crate::engine::sync::SyncEngine; use crate::expressions::Scalar; @@ -603,3 +604,37 @@ async fn file_meta_timestamp() { let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap(); assert_eq!(scanner.timestamp, file_meta_ts); } + +#[tokio::test] +async fn table_changes_in_commit_timestamp() { + let engine = Arc::new(SyncEngine::new()); + let mut mock_table = LocalMockTable::new(); + + let timestamp = 12345678; + + mock_table + .commit([ + Action::CommitInfo(CommitInfo { + in_commit_timestamp: Some(timestamp), + ..Default::default() + }), + Action::Add(Add { + path: "fake_path_1".into(), + data_change: true, + ..Default::default() + }), + ]) + .await; + + let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + .unwrap() + .into_iter(); + + let commit = commits.next().unwrap(); + let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap(); + assert_eq!(scanner.timestamp, timestamp); + + let iter = scanner.into_scan_batches(engine, None).unwrap(); + let sv = result_to_sv(iter); + assert_eq!(sv, vec![false, true]); +} diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index 8003ec08f..3e5000afe 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -250,7 +250,7 @@ mod tests { use super::{scan_data_to_scan_file, CdfScanFile, CdfScanFileType}; use crate::actions::deletion_vector::DeletionVectorDescriptor; - use crate::actions::{Add, Cdc, Remove}; + use crate::actions::{Add, Cdc, CommitInfo, Remove}; use crate::engine::sync::SyncEngine; use crate::log_segment::LogSegment; use crate::scan::state::DvInfo; @@ -318,6 +318,12 @@ mod tests { ..Default::default() }; + let cdc_timestamp = 12345678; + let commit_info = CommitInfo { + in_commit_timestamp: Some(cdc_timestamp), + ..Default::default() + }; + mock_table .commit([ Action::Remove(remove_paired.clone()), @@ -325,7 +331,12 @@ mod tests { Action::Remove(remove.clone()), ]) .await; - mock_table.commit([Action::Cdc(cdc.clone())]).await; + mock_table + .commit([ + Action::Cdc(cdc.clone()), + Action::CommitInfo(commit_info.clone()), + ]) + .await; mock_table .commit([Action::Remove(remove_no_partition.clone())]) .await; @@ -392,7 +403,7 @@ mod tests { }, partition_values: cdc.partition_values, commit_version: 1, - commit_timestamp: timestamps[1], + commit_timestamp: cdc_timestamp, remove_dv: None, }, CdfScanFile { diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index c6e93ea7b..05b1d00a8 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -590,6 +590,7 @@ mod tests { serde_json::json!({ "commitInfo": { "timestamp": 0, + "inCommitTimestamp": 0, "operation": "test operation", "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), "operationParameters": {}, @@ -600,6 +601,7 @@ mod tests { serde_json::json!({ "commitInfo": { "timestamp": 0, + "inCommitTimestamp": 0, "operation": "test operation", "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), "operationParameters": {},