Skip to content

Commit fa86900

Browse files
Add schema compatibility check to CDF, update cdf tests
1 parent c6deb44 commit fa86900

File tree

3 files changed

+31
-33
lines changed

3 files changed

+31
-33
lines changed

kernel/src/table_changes/log_replay.rs

+3-7
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,8 @@ pub(crate) fn table_changes_action_iter(
7979
/// phase, so we must perform it ahead of time in phase 1.
8080
/// - Ensure that reading is supported on any protocol updates.
8181
/// - Ensure that Change Data Feed is enabled for any metadata update. See [`TableProperties`]
82-
/// - Ensure that any schema update is compatible with the provided `schema`. Currently, schema
83-
/// compatibility is checked through schema equality. This will be expanded in the future to
84-
/// allow limited schema evolution.
82+
/// - Ensure that any schema update is compatible with the provided `schema`. To see how schema
83+
/// compatibility is defined, see [`StructType::can_read_as`].
8584
///
8685
/// Note: We check the protocol, change data feed enablement, and schema compatibility in phase 1
8786
/// in order to detect errors and fail early.
@@ -181,11 +180,8 @@ impl LogReplayScanner {
181180
}
182181
if let Some((schema, configuration)) = visitor.metadata_info {
183182
let schema: StructType = serde_json::from_str(&schema)?;
184-
// Currently, schema compatibility is defined as having equal schema types. In the
185-
// future, more permisive schema evolution will be supported.
186-
// See: https://github.com/delta-io/delta-kernel-rs/issues/523
187183
require!(
188-
table_schema.as_ref() == &schema,
184+
schema.can_read_as(table_schema).is_ok(),
189185
Error::change_data_feed_incompatible_schema(table_schema, &schema)
190186
);
191187
let table_properties = TableProperties::from(configuration);

kernel/src/table_changes/log_replay/tests.rs

+25-21
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ async fn column_mapping_should_fail() {
185185
// Note: This should be removed once type widening support is added for CDF
186186
#[tokio::test]
187187
async fn incompatible_schemas_fail() {
188-
async fn assert_incompatible_schema(commit_schema: StructType, cdf_schema: StructType) {
188+
async fn assert_schema_check(commit_schema: StructType, cdf_schema: StructType, passes: bool) {
189189
let engine = Arc::new(SyncEngine::new());
190190
let mut mock_table = LocalMockTable::new();
191191

@@ -210,28 +210,32 @@ async fn incompatible_schemas_fail() {
210210
.unwrap()
211211
.try_collect();
212212

213-
assert!(matches!(
214-
res,
215-
Err(Error::ChangeDataFeedIncompatibleSchema(_, _))
216-
));
213+
if passes {
214+
assert!(res.is_ok());
215+
} else {
216+
assert!(matches!(
217+
res,
218+
Err(Error::ChangeDataFeedIncompatibleSchema(_, _))
219+
));
220+
}
217221
}
218222

219223
// The CDF schema has fields: `id: int` and `value: string`.
220-
// This commit has schema with fields: `id: long`, `value: string` and `year: int` (nullable).
224+
// This commit has schema with fields: `id: int`, `value: string` and `year: int` (nullable).
221225
let schema = StructType::new([
222-
StructField::new("id", DataType::LONG, true),
226+
StructField::new("id", DataType::INTEGER, true),
223227
StructField::new("value", DataType::STRING, true),
224228
StructField::new("year", DataType::INTEGER, true),
225229
]);
226-
assert_incompatible_schema(schema, get_schema()).await;
230+
assert_schema_check(schema, get_schema(), false).await;
227231

228232
// The CDF schema has fields: `id: int` and `value: string`.
229233
// This commit has schema with fields: `id: long` and `value: string`.
230234
let schema = StructType::new([
231235
StructField::new("id", DataType::LONG, true),
232236
StructField::new("value", DataType::STRING, true),
233237
]);
234-
assert_incompatible_schema(schema, get_schema()).await;
238+
assert_schema_check(schema, get_schema(), false).await;
235239

236240
// NOTE: Once type widening is supported, this should not return an error.
237241
//
@@ -245,31 +249,31 @@ async fn incompatible_schemas_fail() {
245249
StructField::new("id", DataType::INTEGER, true),
246250
StructField::new("value", DataType::STRING, true),
247251
]);
248-
assert_incompatible_schema(cdf_schema, commit_schema).await;
252+
assert_schema_check(commit_schema, cdf_schema, false).await;
249253

250-
// Note: Once schema evolution is supported, this should not return an error.
251-
//
252-
// The CDF schema has fields: nullable `id` and nullable `value`.
253-
// This commit has schema with fields: non-nullable `id` and nullable `value`.
254+
// The CDF schema has fields: `id: int` and `value: string`.
255+
// This commit has schema with fields:`id: string` and `value: string`.
254256
let schema = StructType::new([
255-
StructField::new("id", DataType::LONG, false),
257+
StructField::new("id", DataType::STRING, true),
256258
StructField::new("value", DataType::STRING, true),
257259
]);
258-
assert_incompatible_schema(schema, get_schema()).await;
260+
assert_schema_check(schema, get_schema(), false).await;
259261

260-
// The CDF schema has fields: `id: int` and `value: string`.
261-
// This commit has schema with fields:`id: string` and `value: string`.
262+
// Note: Once schema evolution is supported, this should not return an error.
263+
//
264+
// The CDF schema has fields: nullable `id` and nullable `value`.
265+
// This commit has schema with fields: non-nullable `id` and nullable `value`.
262266
let schema = StructType::new([
263-
StructField::new("id", DataType::STRING, true),
267+
StructField::new("id", DataType::INTEGER, false),
264268
StructField::new("value", DataType::STRING, true),
265269
]);
266-
assert_incompatible_schema(schema, get_schema()).await;
270+
assert_schema_check(schema, get_schema(), true).await;
267271

268272
// Note: Once schema evolution is supported, this should not return an error.
269273
// The CDF schema has fields: `id` (nullable) and `value` (nullable).
270274
// This commit has schema with fields: `id` (nullable).
271275
let schema = get_schema().project_as_struct(&["id"]).unwrap();
272-
assert_incompatible_schema(schema, get_schema()).await;
276+
assert_schema_check(schema, get_schema(), true).await;
273277
}
274278

275279
#[tokio::test]

kernel/src/table_changes/mod.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,9 @@ impl TableChanges {
165165

166166
// Verify that the start and end schemas are compatible. We must still check schema
167167
// compatibility for each schema update in the CDF range.
168-
// Note: Schema compatibility check will be changed in the future to be more flexible.
169-
// See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523)
170-
if start_snapshot.schema() != end_snapshot.schema() {
168+
if let Err(err) = start_snapshot.schema().can_read_as(end_snapshot.schema()) {
171169
return Err(Error::generic(format!(
172-
"Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(),
170+
"Failed to build TableChanges: {}\n Found start version schema {:?} and end version schema {:?}", err, start_snapshot.schema(), end_snapshot.schema(),
173171
)));
174172
}
175173

@@ -303,7 +301,7 @@ mod tests {
303301
let path = "./tests/data/table-with-cdf";
304302
let engine = Box::new(SyncEngine::new());
305303
let table = Table::try_from_uri(path).unwrap();
306-
let expected_msg = "Failed to build TableChanges: Start and end version schemas are different. Found start version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: true, metadata: {} }} } and end version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: false, metadata: {} }} }";
304+
let expected_msg = "Failed to build TableChanges: Generic delta kernel error: Read field is non-nullable while this field is nullable\n Found start version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: true, metadata: {} }} } and end version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: false, metadata: {} }} }";
307305

308306
// A field in the schema goes from being nullable to non-nullable
309307
let table_changes_res = table.table_changes(engine.as_ref(), 3, 4);

0 commit comments

Comments
 (0)