Skip to content

Commit d295ffc

Browse files
OussamaSaoudi-dbOussamaSaoudi
authored andcommitted
Add ict and tests
Update docs for ICT Assert selection vector wip ict impl add tmp ict fixup for writes remove non_ict schema fix commit Remove cdf changes for ict remove unused imports Add clarifying comment for inCommitTimestamp Add documentation for ICT Revert "Remove cdf changes for ict" This reverts commit e2e38cb. Fix ict reading Address nits make ICT only work if it is the first row in a commit Rename and patch comments Fix naming referring to CommitInfo Patch up docs
1 parent 06d8dbb commit d295ffc

File tree

3 files changed

+81
-27
lines changed

3 files changed

+81
-27
lines changed

kernel/src/table_changes/log_replay.rs

+32-24
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,17 @@ use std::sync::{Arc, LazyLock};
77
use crate::actions::schemas::GetStructField;
88
use crate::actions::visitors::{visit_deletion_vector_at, ProtocolVisitor};
99
use crate::actions::{
10-
get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, METADATA_NAME,
11-
PROTOCOL_NAME, REMOVE_NAME,
10+
get_log_add_schema, Add, Cdc, Metadata, Protocol, Remove, ADD_NAME, CDC_NAME, COMMIT_INFO_NAME,
11+
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
1212
};
1313
use crate::engine_data::{GetData, TypedGetData};
1414
use crate::expressions::{column_name, ColumnName};
1515
use crate::path::ParsedLogPath;
1616
use crate::scan::data_skipping::DataSkippingFilter;
1717
use crate::scan::state::DvInfo;
18-
use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType};
18+
use crate::schema::{
19+
ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType,
20+
};
1921
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
2022
use crate::table_changes::{check_cdf_table_properties, ensure_cdf_read_supported};
2123
use crate::table_properties::TableProperties;
@@ -78,6 +80,12 @@ pub(crate) fn table_changes_action_iter(
7880
/// Deletion vector resolution affects whether a remove action is selected in the second
7981
/// phase, so we must perform it ahead of time in phase 1.
8082
/// - Ensure that reading is supported on any protocol updates.
83+
/// - Extract the in-commit timestamps from [`CommitInfo`] actions if they are present. These are
84+
/// generated when in-commit timestamps (ICT) table feature is enabled. This must be done in the
85+
/// first phase because the second phase lazily transforms engine data with an extra timestamp
86+
/// column, so the timestamp must be known ahead of time. Note that when ICT is enabled, CommitInfo
87+
/// should be the first action in every commit.
88+
/// See: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#in-commit-timestamps
8189
/// - Ensure that Change Data Feed is enabled for any metadata update. See [`TableProperties`]
8290
/// - Ensure that any schema update is compatible with the provided `schema`. Currently, schema
8391
/// compatibility is checked through schema equality. This will be expanded in the future to
@@ -93,12 +101,6 @@ pub(crate) fn table_changes_action_iter(
93101
///
94102
/// See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vectors
95103
///
96-
/// TODO: When the kernel supports in-commit timestamps, we will also have to inspect CommitInfo
97-
/// actions to find the timestamp. These are generated when incommit timestamps is enabled.
98-
/// This must be done in the first phase because the second phase lazily transforms engine data with
99-
/// an extra timestamp column. Thus, the timestamp must be known ahead of time.
100-
/// See https://github.com/delta-io/delta-kernel-rs/issues/559
101-
///
102104
/// 2. Scan file generation phase [`LogReplayScanner::into_scan_batches`]: This iterates over every
103105
/// action in the commit, and generates [`TableChangesScanData`]. It does so by transforming the
104106
/// actions using [`add_transform_expr`], and generating selection vectors with the following rules:
@@ -118,14 +120,8 @@ struct LogReplayScanner {
118120
// The commit file that this replay scanner will operate on.
119121
commit_file: ParsedLogPath,
120122
// The timestamp associated with this commit. This is the file modification time
121-
// from the commit's [`FileMeta`].
122-
//
123-
//
124-
// TODO when incommit timestamps are supported: If there is a [`CommitInfo`] with a timestamp
125-
// generated by in-commit timestamps, that timestamp will be used instead.
126-
//
127-
// Note: This will be used once an expression is introduced to transform the engine data in
128-
// [`TableChangesScanData`]
123+
// from the commit's [`FileMeta`]. If in-commit timestamps feature is enabled, this will be the
124+
// in-commit timestamp from the [`CommitInfo`] action.
129125
timestamp: i64,
130126
}
131127

@@ -136,15 +132,14 @@ impl LogReplayScanner {
136132
/// 2. Construct a map from path to deletion vector of remove actions that share the same path
137133
/// as an add action.
138134
/// 3. Perform validation on each protocol and metadata action in the commit.
135+
/// 4. Extract the in-commit timestamp from [`CommitInfo`] if it is present.
139136
///
140137
/// For more details, see the documentation for [`LogReplayScanner`].
141138
fn try_new(
142139
engine: &dyn Engine,
143140
commit_file: ParsedLogPath,
144141
table_schema: &SchemaRef,
145142
) -> DeltaResult<Self> {
146-
let visitor_schema = PreparePhaseVisitor::schema();
147-
148143
// Note: We do not perform data skipping yet because we need to visit all add and
149144
// remove actions for deletion vector resolution to be correct.
150145
//
@@ -156,22 +151,25 @@ impl LogReplayScanner {
156151
// vectors are resolved so that we can skip both actions in the pair.
157152
let action_iter = engine.get_json_handler().read_json_files(
158153
&[commit_file.location.clone()],
159-
visitor_schema,
154+
PreparePhaseVisitor::schema(),
160155
None, // not safe to apply data skipping yet
161156
)?;
162157

163158
let mut remove_dvs = HashMap::default();
164159
let mut add_paths = HashSet::default();
165160
let mut has_cdc_action = false;
166-
for actions in action_iter {
161+
let mut timestamp = commit_file.location.last_modified;
162+
for (i, actions) in action_iter.enumerate() {
167163
let actions = actions?;
168164

169165
let mut visitor = PreparePhaseVisitor {
170166
add_paths: &mut add_paths,
171167
remove_dvs: &mut remove_dvs,
172168
has_cdc_action: &mut has_cdc_action,
169+
commit_timestamp: &mut timestamp,
173170
protocol: None,
174171
metadata_info: None,
172+
is_first_batch: i == 0,
175173
};
176174
visitor.visit_rows_of(actions.as_ref())?;
177175

@@ -202,7 +200,7 @@ impl LogReplayScanner {
202200
remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path));
203201
}
204202
Ok(LogReplayScanner {
205-
timestamp: commit_file.location.last_modified,
203+
timestamp,
206204
commit_file,
207205
has_cdc_action,
208206
remove_dvs,
@@ -220,7 +218,6 @@ impl LogReplayScanner {
220218
has_cdc_action,
221219
remove_dvs,
222220
commit_file,
223-
// TODO: Add the timestamp as a column with an expression
224221
timestamp,
225222
} = self;
226223
let remove_dvs = Arc::new(remove_dvs);
@@ -274,15 +271,19 @@ struct PreparePhaseVisitor<'a> {
274271
has_cdc_action: &'a mut bool,
275272
add_paths: &'a mut HashSet<String>,
276273
remove_dvs: &'a mut HashMap<String, DvInfo>,
274+
commit_timestamp: &'a mut i64,
275+
is_first_batch: bool,
277276
}
278277
impl PreparePhaseVisitor<'_> {
279278
fn schema() -> Arc<StructType> {
279+
let ict_type = StructField::new("inCommitTimestamp", DataType::LONG, true);
280280
Arc::new(StructType::new(vec![
281281
Option::<Add>::get_struct_field(ADD_NAME),
282282
Option::<Remove>::get_struct_field(REMOVE_NAME),
283283
Option::<Cdc>::get_struct_field(CDC_NAME),
284284
Option::<Metadata>::get_struct_field(METADATA_NAME),
285285
Option::<Protocol>::get_struct_field(PROTOCOL_NAME),
286+
StructField::new(COMMIT_INFO_NAME, StructType::new([ict_type]), true),
286287
]))
287288
}
288289
}
@@ -314,6 +315,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> {
314315
(INTEGER, column_name!("protocol.minWriterVersion")),
315316
(string_list.clone(), column_name!("protocol.readerFeatures")),
316317
(string_list, column_name!("protocol.writerFeatures")),
318+
(LONG, column_name!("commitInfo.inCommitTimestamp")),
317319
];
318320
let (types, names) = types_and_names.into_iter().unzip();
319321
(names, types).into()
@@ -323,7 +325,7 @@ impl RowVisitor for PreparePhaseVisitor<'_> {
323325

324326
fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> {
325327
require!(
326-
getters.len() == 16,
328+
getters.len() == 17,
327329
Error::InternalError(format!(
328330
"Wrong number of PreparePhaseVisitor getters: {}",
329331
getters.len()
@@ -354,6 +356,12 @@ impl RowVisitor for PreparePhaseVisitor<'_> {
354356
let protocol =
355357
ProtocolVisitor::visit_protocol(i, min_reader_version, &getters[12..=15])?;
356358
self.protocol = Some(protocol);
359+
} else if let Some(in_commit_timestamp) =
360+
getters[16].get_long(i, "commitInfo.inCommitTimestamp")?
361+
{
362+
if self.is_first_batch && i == 0 {
363+
*self.commit_timestamp = in_commit_timestamp;
364+
}
357365
}
358366
}
359367
Ok(())

kernel/src/table_changes/log_replay/tests.rs

+35
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::table_changes_action_iter;
22
use super::TableChangesScanData;
33
use crate::actions::deletion_vector::DeletionVectorDescriptor;
4+
use crate::actions::CommitInfo;
45
use crate::actions::{Add, Cdc, Metadata, Protocol, Remove};
56
use crate::engine::sync::SyncEngine;
67
use crate::expressions::Scalar;
@@ -609,3 +610,37 @@ async fn file_meta_timestamp() {
609610
let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap();
610611
assert_eq!(scanner.timestamp, file_meta_ts);
611612
}
613+
614+
#[tokio::test]
615+
async fn table_changes_in_commit_timestamp() {
616+
let engine = Arc::new(SyncEngine::new());
617+
let mut mock_table = LocalMockTable::new();
618+
619+
let timestamp = 12345678;
620+
621+
mock_table
622+
.commit([
623+
Action::CommitInfo(CommitInfo {
624+
in_commit_timestamp: Some(timestamp),
625+
..Default::default()
626+
}),
627+
Action::Add(Add {
628+
path: "fake_path_1".into(),
629+
data_change: true,
630+
..Default::default()
631+
}),
632+
])
633+
.await;
634+
635+
let mut commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
636+
.unwrap()
637+
.into_iter();
638+
639+
let commit = commits.next().unwrap();
640+
let scanner = LogReplayScanner::try_new(engine.as_ref(), commit, &get_schema().into()).unwrap();
641+
assert_eq!(scanner.timestamp, timestamp);
642+
643+
let iter = scanner.into_scan_batches(engine, None).unwrap();
644+
let sv = result_to_sv(iter);
645+
assert_eq!(sv, vec![false, true]);
646+
}

kernel/src/table_changes/scan_file.rs

+14-3
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ mod tests {
244244

245245
use super::{scan_data_to_scan_file, CdfScanFile, CdfScanFileType};
246246
use crate::actions::deletion_vector::DeletionVectorDescriptor;
247-
use crate::actions::{Add, Cdc, Remove};
247+
use crate::actions::{Add, Cdc, CommitInfo, Remove};
248248
use crate::engine::sync::SyncEngine;
249249
use crate::log_segment::LogSegment;
250250
use crate::scan::state::DvInfo;
@@ -312,14 +312,25 @@ mod tests {
312312
..Default::default()
313313
};
314314

315+
let cdc_timestamp = 12345678;
316+
let commit_info = CommitInfo {
317+
in_commit_timestamp: Some(cdc_timestamp),
318+
..Default::default()
319+
};
320+
315321
mock_table
316322
.commit([
317323
Action::Remove(remove_paired.clone()),
318324
Action::Add(add_paired.clone()),
319325
Action::Remove(remove.clone()),
320326
])
321327
.await;
322-
mock_table.commit([Action::Cdc(cdc.clone())]).await;
328+
mock_table
329+
.commit([
330+
Action::CommitInfo(commit_info.clone()),
331+
Action::Cdc(cdc.clone()),
332+
])
333+
.await;
323334
mock_table
324335
.commit([Action::Remove(remove_no_partition.clone())])
325336
.await;
@@ -386,7 +397,7 @@ mod tests {
386397
},
387398
partition_values: cdc.partition_values,
388399
commit_version: 1,
389-
commit_timestamp: timestamps[1],
400+
commit_timestamp: cdc_timestamp,
390401
remove_dv: None,
391402
},
392403
CdfScanFile {

0 commit comments

Comments
 (0)