Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support in-commit timestamps in Change Data Feed #581

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::LazyLock;

use self::deletion_vector::DeletionVectorDescriptor;
use crate::actions::schemas::GetStructField;
use crate::schema::{SchemaRef, StructType};
use crate::schema::{DataType, SchemaRef, StructField, StructType};
use crate::table_features::{
ReaderFeatures, WriterFeatures, SUPPORTED_READER_FEATURES, SUPPORTED_WRITER_FEATURES,
};
Expand Down Expand Up @@ -84,6 +84,13 @@ pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
&LOG_COMMIT_INFO_SCHEMA
}

pub(crate) fn get_log_commit_info_schema_no_ict() -> &'static SchemaRef {
StructType::new([
StructField::new("timestamp", DataType::LONG, true),
StructField::new("operation", DataType::STRING, true),
])
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))]
pub struct Format {
Expand Down Expand Up @@ -331,8 +338,11 @@ where
struct CommitInfo {
/// The time this logical file was created, as milliseconds since the epoch.
/// Read: optional, write: required (that is, kernel always writes).
/// If in-commit timestamps are enabled, this is always required.
pub(crate) timestamp: Option<i64>,
/// The time this logical file was created, as milliseconds since the epoch. Unlike
/// `timestamp`, this field is guaranteed to be monotonically increase with each commit.
/// If in-commit timestamps are enabled, this is always required.
pub(crate) in_commit_timestamp: Option<i64>,
/// An arbitrary string that identifies the operation associated with this commit. This is
/// specified by the engine. Read: optional, write: required (that is, kernel alwarys writes).
pub(crate) operation: Option<String>,
Expand Down Expand Up @@ -694,6 +704,7 @@ mod tests {
"commitInfo",
StructType::new(vec![
StructField::new("timestamp", DataType::LONG, true),
StructField::new("inCommitTimestamp", DataType::LONG, true),
StructField::new("operation", DataType::STRING, true),
StructField::new(
"operationParameters",
Expand Down
30 changes: 19 additions & 11 deletions kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::sync::{Arc, LazyLock};
use crate::actions::schemas::GetStructField;
use crate::actions::visitors::{visit_deletion_vector_at, ProtocolVisitor};
use crate::actions::{
get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, METADATA_NAME,
PROTOCOL_NAME, REMOVE_NAME,
get_log_add_schema, Add, Cdc, CommitInfo, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME,
COMMIT_INFO_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
};
use crate::engine_data::{GetData, TypedGetData};
use crate::expressions::{column_name, ColumnName};
Expand Down Expand Up @@ -79,6 +79,11 @@ pub(crate) fn table_changes_action_iter(
/// Deletion vector resolution affects whether a remove action is selected in the second
/// phase, so we must perform it ahead of time in phase 1.
/// - Ensure that reading is supported on any protocol updates.
/// - Extract the timestamp from [`CommitInfo`] actions if they are present. These are
/// generated when in-commit timestamps is enabled. This must be done in the first phase
/// because the second phase lazily transforms engine data with an extra timestamp column,
/// so the timestamp must be known ahead of time.
/// See: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#in-commit-timestamps
/// - Ensure that Change Data Feed is enabled for any metadata update. See [`TableProperties`]
/// - Ensure that any schema update is compatible with the provided `schema`. Currently, schema
/// compatibility is checked through schema equality. This will be expanded in the future to
Expand All @@ -94,12 +99,6 @@ pub(crate) fn table_changes_action_iter(
///
/// See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vectors
///
/// TODO: When the kernel supports in-commit timestamps, we will also have to inspect CommitInfo
/// actions to find the timestamp. These are generated when incommit timestamps is enabled.
/// This must be done in the first phase because the second phase lazily transforms engine data with
/// an extra timestamp column. Thus, the timestamp must be known ahead of time.
/// See https://github.com/delta-io/delta-kernel-rs/issues/559
///
/// 2. Scan file generation phase [`LogReplayScanner::into_scan_batches`]: This iterates over every
/// action in the commit, and generates [`TableChangesScanData`]. It does so by transforming the
/// actions using [`add_transform_expr`], and generating selection vectors with the following rules:
Expand Down Expand Up @@ -138,6 +137,7 @@ impl LogReplayScanner {
/// 2. Construct a map from path to deletion vector of remove actions that share the same path
/// as an add action.
/// 3. Perform validation on each protocol and metadata action in the commit.
/// 4. Extract the in-commit timestamp from [`CommitInfo`] if it is present.
///
/// For more details, see the documentation for [`LogReplayScanner`].
fn try_new(
Expand Down Expand Up @@ -165,13 +165,15 @@ impl LogReplayScanner {
let mut remove_dvs = HashMap::default();
let mut add_paths = HashSet::default();
let mut has_cdc_action = false;
let mut timestamp = commit_file.location.last_modified;
for actions in action_iter {
let actions = actions?;

let mut visitor = PreparePhaseVisitor {
add_paths: &mut add_paths,
remove_dvs: &mut remove_dvs,
has_cdc_action: &mut has_cdc_action,
timestamp: &mut timestamp,
protocol: None,
metadata_info: None,
};
Expand Down Expand Up @@ -204,7 +206,7 @@ impl LogReplayScanner {
remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path));
}
Ok(LogReplayScanner {
timestamp: commit_file.location.last_modified,
timestamp,
commit_file,
has_cdc_action,
remove_dvs,
Expand All @@ -222,7 +224,6 @@ impl LogReplayScanner {
has_cdc_action,
remove_dvs,
commit_file,
// TODO: Add the timestamp as a column with an expression
timestamp,
} = self;
let remove_dvs = Arc::new(remove_dvs);
Expand Down Expand Up @@ -276,6 +277,7 @@ struct PreparePhaseVisitor<'a> {
has_cdc_action: &'a mut bool,
add_paths: &'a mut HashSet<String>,
remove_dvs: &'a mut HashMap<String, DvInfo>,
timestamp: &'a mut i64,
}
impl PreparePhaseVisitor<'_> {
fn schema() -> Arc<StructType> {
Expand All @@ -285,6 +287,7 @@ impl PreparePhaseVisitor<'_> {
Option::<Cdc>::get_struct_field(CDC_NAME),
Option::<Metadata>::get_struct_field(METADATA_NAME),
Option::<Protocol>::get_struct_field(PROTOCOL_NAME),
Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME),
]))
}
}
Expand Down Expand Up @@ -316,6 +319,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> {
(INTEGER, column_name!("protocol.minWriterVersion")),
(string_list.clone(), column_name!("protocol.readerFeatures")),
(string_list, column_name!("protocol.writerFeatures")),
(LONG, column_name!("commitInfo.inCommitTimestamp")),
];
let (types, names) = types_and_names.into_iter().unzip();
(names, types).into()
Expand All @@ -325,7 +329,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> {

fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> {
require!(
getters.len() == 16,
getters.len() == 17,
Error::InternalError(format!(
"Wrong number of PreparePhaseVisitor getters: {}",
getters.len()
Expand Down Expand Up @@ -356,6 +360,10 @@ impl RowVisitor for PreparePhaseVisitor<'_> {
let protocol =
ProtocolVisitor::visit_protocol(i, min_reader_version, &getters[12..=15])?;
self.protocol = Some(protocol);
} else if let Some(timestamp) =
getters[16].get_long(i, "commitInfo.inCommitTimestamp")?
{
*self.timestamp = timestamp;
}
}
Ok(())
Expand Down
35 changes: 35 additions & 0 deletions kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::table_changes_action_iter;
use super::TableChangesScanData;
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::CommitInfo;
use crate::actions::{Add, Cdc, Metadata, Protocol, Remove};
use crate::engine::sync::SyncEngine;
use crate::expressions::Scalar;
Expand Down Expand Up @@ -603,3 +604,37 @@ async fn file_meta_timestamp() {
let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap();
assert_eq!(scanner.timestamp, file_meta_ts);
}

#[tokio::test]
async fn table_changes_in_commit_timestamp() {
let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();

let timestamp = 12345678;

mock_table
.commit([
Action::CommitInfo(CommitInfo {
in_commit_timestamp: Some(timestamp),
..Default::default()
}),
Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
}),
])
.await;

let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();

let commit = commits.next().unwrap();
let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap();
assert_eq!(scanner.timestamp, timestamp);

let iter = scanner.into_scan_batches(engine, None).unwrap();
let sv = result_to_sv(iter);
assert_eq!(sv, vec![false, true]);
}
17 changes: 14 additions & 3 deletions kernel/src/table_changes/scan_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ mod tests {

use super::{scan_data_to_scan_file, CdfScanFile, CdfScanFileType};
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::{Add, Cdc, Remove};
use crate::actions::{Add, Cdc, CommitInfo, Remove};
use crate::engine::sync::SyncEngine;
use crate::log_segment::LogSegment;
use crate::scan::state::DvInfo;
Expand Down Expand Up @@ -318,14 +318,25 @@ mod tests {
..Default::default()
};

let cdc_timestamp = 12345678;
let commit_info = CommitInfo {
in_commit_timestamp: Some(cdc_timestamp),
..Default::default()
};

mock_table
.commit([
Action::Remove(remove_paired.clone()),
Action::Add(add_paired.clone()),
Action::Remove(remove.clone()),
])
.await;
mock_table.commit([Action::Cdc(cdc.clone())]).await;
mock_table
.commit([
Action::Cdc(cdc.clone()),
Action::CommitInfo(commit_info.clone()),
])
.await;
mock_table
.commit([Action::Remove(remove_no_partition.clone())])
.await;
Expand Down Expand Up @@ -392,7 +403,7 @@ mod tests {
},
partition_values: cdc.partition_values,
commit_version: 1,
commit_timestamp: timestamps[1],
commit_timestamp: cdc_timestamp,
remove_dv: None,
},
CdfScanFile {
Expand Down
2 changes: 2 additions & 0 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ mod tests {
serde_json::json!({
"commitInfo": {
"timestamp": 0,
"inCommitTimestamp": 0,
"operation": "test operation",
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
"operationParameters": {},
Expand All @@ -600,6 +601,7 @@ mod tests {
serde_json::json!({
"commitInfo": {
"timestamp": 0,
"inCommitTimestamp": 0,
"operation": "test operation",
"kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")),
"operationParameters": {},
Expand Down
Loading