Skip to content

Commit c627b46

Browse files
OussamaSaoudi-dbOussamaSaoudi
authored andcommitted
Initial schema compat
Add schema compatibility tests Add error-based schema compatibility check Remove old schema compat change naming, remove redundant test Improve comments, add check that field names aren't case-insensitive duplicates Change ordering, remove unnecessary comment Add comments to can_read_as methods Add schema compatibility check to CDF, update cdf tests remove todo comments Add clarifying comments
1 parent b53b68f commit c627b46

File tree

3 files changed

+41
-33
lines changed

3 files changed

+41
-33
lines changed

kernel/src/table_changes/log_replay.rs

+4-7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::expressions::{column_name, ColumnName};
1515
use crate::path::ParsedLogPath;
1616
use crate::scan::data_skipping::DataSkippingFilter;
1717
use crate::scan::state::DvInfo;
18+
use crate::schema::compare::SchemaComparison;
1819
use crate::schema::{ArrayType, ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructType};
1920
use crate::table_changes::scan_file::{cdf_scan_row_expression, cdf_scan_row_schema};
2021
use crate::table_changes::{check_cdf_table_properties, ensure_cdf_read_supported};
@@ -79,9 +80,8 @@ pub(crate) fn table_changes_action_iter(
7980
/// phase, so we must perform it ahead of time in phase 1.
8081
/// - Ensure that reading is supported on any protocol updates.
8182
/// - Ensure that Change Data Feed is enabled for any metadata update. See [`TableProperties`]
82-
/// - Ensure that any schema update is compatible with the provided `schema`. Currently, schema
83-
/// compatibility is checked through schema equality. This will be expanded in the future to
84-
/// allow limited schema evolution.
83+
/// - Ensure that any schema update is compatible with the provided `schema`. To see how schema
84+
/// compatibility is defined, see [`StructType::can_read_as`].
8585
///
8686
/// Note: We check the protocol, change data feed enablement, and schema compatibility in phase 1
8787
/// in order to detect errors and fail early.
@@ -181,11 +181,8 @@ impl LogReplayScanner {
181181
}
182182
if let Some((schema, configuration)) = visitor.metadata_info {
183183
let schema: StructType = serde_json::from_str(&schema)?;
184-
// Currently, schema compatibility is defined as having equal schema types. In the
185-
// future, more permisive schema evolution will be supported.
186-
// See: https://github.com/delta-io/delta-kernel-rs/issues/523
187184
require!(
188-
table_schema.as_ref() == &schema,
185+
schema.can_read_as(table_schema).is_ok(),
189186
Error::change_data_feed_incompatible_schema(table_schema, &schema)
190187
);
191188
let table_properties = TableProperties::from(configuration);

kernel/src/table_changes/log_replay/tests.rs

+33-21
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ async fn column_mapping_should_fail() {
185185
// Note: This should be removed once type widening support is added for CDF
186186
#[tokio::test]
187187
async fn incompatible_schemas_fail() {
188-
async fn assert_incompatible_schema(commit_schema: StructType, cdf_schema: StructType) {
188+
async fn assert_schema_check(commit_schema: StructType, cdf_schema: StructType, passes: bool) {
189189
let engine = Arc::new(SyncEngine::new());
190190
let mut mock_table = LocalMockTable::new();
191191

@@ -210,29 +210,38 @@ async fn incompatible_schemas_fail() {
210210
.unwrap()
211211
.try_collect();
212212

213-
assert!(matches!(
214-
res,
215-
Err(Error::ChangeDataFeedIncompatibleSchema(_, _))
216-
));
213+
if passes {
214+
assert!(res.is_ok());
215+
} else {
216+
assert!(matches!(
217+
res,
218+
Err(Error::ChangeDataFeedIncompatibleSchema(_, _))
219+
));
220+
}
217221
}
218222

223+
// Column `year` exists in commit schema, but not in the read schema.
224+
//
219225
// The CDF schema has fields: `id: int` and `value: string`.
220-
// This commit has schema with fields: `id: long`, `value: string` and `year: int` (nullable).
226+
// This commit has schema with fields: `id: int`, `value: string` and `year: int` (nullable).
221227
let schema = StructType::new([
222228
StructField::nullable("id", DataType::LONG),
223229
StructField::nullable("value", DataType::STRING),
224230
StructField::nullable("year", DataType::INTEGER),
225231
]);
226-
assert_incompatible_schema(schema, get_schema()).await;
232+
assert_schema_check(schema, get_schema(), false).await;
227233

234+
// Commit schema's `id` column has wider type than in the read schema.
235+
//
228236
// The CDF schema has fields: `id: int` and `value: string`.
229237
// This commit has schema with fields: `id: long` and `value: string`.
230238
let schema = StructType::new([
231239
StructField::nullable("id", DataType::LONG),
232240
StructField::nullable("value", DataType::STRING),
233241
]);
234-
assert_incompatible_schema(schema, get_schema()).await;
242+
assert_schema_check(schema, get_schema(), false).await;
235243

244+
// Commit schema's `id` column is narrower than in the read schema.
236245
// NOTE: Once type widening is supported, this should not return an error.
237246
//
238247
// The CDF schema has fields: `id: long` and `value: string`.
@@ -245,31 +254,34 @@ async fn incompatible_schemas_fail() {
245254
StructField::nullable("id", DataType::INTEGER),
246255
StructField::nullable("value", DataType::STRING),
247256
]);
248-
assert_incompatible_schema(cdf_schema, commit_schema).await;
257+
assert_schema_check(commit_schema, cdf_schema, false).await;
249258

250-
// Note: Once schema evolution is supported, this should not return an error.
259+
// Commit schema's `id` column has an incompatible type with the read schema's.
251260
//
252-
// The CDF schema has fields: nullable `id` and nullable `value`.
253-
// This commit has schema with fields: non-nullable `id` and nullable `value`.
261+
// The CDF schema has fields: `id: int` and `value: string`.
262+
// This commit has schema with fields: `id: string` and `value: string`.
254263
let schema = StructType::new([
255-
StructField::not_null("id", DataType::LONG),
264+
StructField::nullable("id", DataType::STRING),
256265
StructField::nullable("value", DataType::STRING),
257266
]);
258-
assert_incompatible_schema(schema, get_schema()).await;
267+
assert_schema_check(schema, get_schema(), false).await;
259268

260-
// The CDF schema has fields: `id: int` and `value: string`.
261-
// This commit has schema with fields:`id: string` and `value: string`.
269+
// Commit schema's `id` column is non-nullable, but is nullable in the read schema.
270+
//
271+
// The CDF schema has fields: nullable `id` and nullable `value`.
272+
// This commit has schema with fields: non-nullable `id` and nullable `value`.
262273
let schema = StructType::new([
263-
StructField::nullable("id", DataType::STRING),
264-
StructField::nullable("value", DataType::STRING),
274+
StructField::new("id", DataType::INTEGER, false),
275+
StructField::new("value", DataType::STRING, true),
265276
]);
266-
assert_incompatible_schema(schema, get_schema()).await;
277+
assert_schema_check(schema, get_schema(), true).await;
267278

268-
// Note: Once schema evolution is supported, this should not return an error.
279+
// Commit schema is missing a nullable `value` column that's found in the read schema.
280+
//
269281
// The CDF schema has fields: `id` (nullable) and `value` (nullable).
270282
// This commit has schema with fields: `id` (nullable).
271283
let schema = get_schema().project_as_struct(&["id"]).unwrap();
272-
assert_incompatible_schema(schema, get_schema()).await;
284+
assert_schema_check(schema, get_schema(), true).await;
273285
}
274286

275287
#[tokio::test]

kernel/src/table_changes/mod.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use url::Url;
4040
use crate::actions::{ensure_supported_features, Protocol};
4141
use crate::log_segment::LogSegment;
4242
use crate::path::AsUrl;
43+
use crate::schema::compare::SchemaComparison;
4344
use crate::schema::{DataType, Schema, StructField, StructType};
4445
use crate::snapshot::Snapshot;
4546
use crate::table_features::{ColumnMappingMode, ReaderFeatures};
@@ -173,11 +174,9 @@ impl TableChanges {
173174

174175
// Verify that the start and end schemas are compatible. We must still check schema
175176
// compatibility for each schema update in the CDF range.
176-
// Note: Schema compatibility check will be changed in the future to be more flexible.
177-
// See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523)
178-
if start_snapshot.schema() != end_snapshot.schema() {
177+
if let Err(err) = start_snapshot.schema().can_read_as(end_snapshot.schema()) {
179178
return Err(Error::generic(format!(
180-
"Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(),
179+
"Failed to build TableChanges: {}\n Found start version schema {:?} and end version schema {:?}", err, start_snapshot.schema(), end_snapshot.schema(),
181180
)));
182181
}
183182

@@ -303,7 +302,7 @@ mod tests {
303302
let path = "./tests/data/table-with-cdf";
304303
let engine = Box::new(SyncEngine::new());
305304
let table = Table::try_from_uri(path).unwrap();
306-
let expected_msg = "Failed to build TableChanges: Start and end version schemas are different. Found start version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: true, metadata: {} }} } and end version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: false, metadata: {} }} }";
305+
let expected_msg = "Failed to build TableChanges: Generic delta kernel error: Read field is non-nullable while this field is nullable\n Found start version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: true, metadata: {} }} } and end version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: false, metadata: {} }} }";
307306

308307
// A field in the schema goes from being nullable to non-nullable
309308
let table_changes_res = table.table_changes(engine.as_ref(), 3, 4);

0 commit comments

Comments
 (0)