Skip to content

Commit 717d51e

Browse files
committed
Change error type for change data feed
1 parent 29144bb commit 717d51e

File tree

4 files changed

+49
-34
lines changed

4 files changed

+49
-34
lines changed

kernel/src/error.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
str::Utf8Error,
77
};
88

9-
use crate::schema::{DataType, StructType};
9+
use crate::schema::{compare::SchemaComparisonError, DataType};
1010
use crate::table_properties::ParseIntervalError;
1111
use crate::Version;
1212

@@ -189,8 +189,8 @@ pub enum Error {
189189
#[error("Change data feed is unsupported for the table at version {0}")]
190190
ChangeDataFeedUnsupported(Version),
191191

192-
#[error("Change data feed encountered incompatible schema. Expected {0}, got {1}")]
193-
ChangeDataFeedIncompatibleSchema(String, String),
192+
#[error("Change data feed encountered incompatible schema at version {0}: {1}")]
193+
ChangeDataFeedIncompatibleSchema(Version, #[source] SchemaComparisonError),
194194

195195
/// Invalid checkpoint files
196196
#[error("Invalid Checkpoint: {0}")]
@@ -262,10 +262,10 @@ impl Error {
262262
Self::ChangeDataFeedUnsupported(version.into())
263263
}
264264
pub(crate) fn change_data_feed_incompatible_schema(
265-
expected: &StructType,
266-
actual: &StructType,
265+
version: Version,
266+
schema_error: SchemaComparisonError,
267267
) -> Self {
268-
Self::ChangeDataFeedIncompatibleSchema(format!("{expected:?}"), format!("{actual:?}"))
268+
Self::ChangeDataFeedIncompatibleSchema(version, schema_error)
269269
}
270270

271271
pub fn invalid_checkpoint(msg: impl ToString) -> Self {

kernel/src/schema/compare.rs

+35-20
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub(crate) struct Nullable(bool);
3333

3434
/// Represents the ways a schema comparison can fail.
3535
#[derive(Debug, thiserror::Error)]
36-
pub(crate) enum Error {
36+
pub enum SchemaComparisonError {
3737
#[error("The nullability was tightened for a field")]
3838
NullabilityTightening,
3939
#[error("Field names do not match")]
@@ -48,8 +48,8 @@ pub(crate) enum Error {
4848
TypeMismatch,
4949
}
5050

51-
/// A [`std::result::Result`] that has the schema comparison [`Error`] as the error variant.
52-
pub(crate) type SchemaComparisonResult = Result<(), Error>;
51+
/// A [`std::result::Result`] that has the schema comparison [`SchemaComparisonError`] as the error variant.
52+
pub(crate) type SchemaComparisonResult = Result<(), SchemaComparisonError>;
5353

5454
/// Represents a schema compatibility check for the type. If `self` can be read as `read_type`,
5555
/// this function returns `Ok(())`. Otherwise, this function returns `Err`.
@@ -65,7 +65,10 @@ impl SchemaComparison for Nullable {
6565
// column as non-nullable. So we avoid the case where !read_nullable && nullable
6666
// Hence we check that !(!read_nullable && existing_nullable)
6767
// == read_nullable || !existing_nullable
68-
require!(read_nullable.0 || !self.0, Error::NullabilityTightening);
68+
require!(
69+
read_nullable.0 || !self.0,
70+
SchemaComparisonError::NullabilityTightening
71+
);
6972
Ok(())
7073
}
7174
}
@@ -78,7 +81,10 @@ impl SchemaComparison for StructField {
7881
/// 3. You can read this data type as the `read_field`'s data type.
7982
fn can_read_as(&self, read_field: &Self) -> SchemaComparisonResult {
8083
Nullable(self.nullable).can_read_as(&Nullable(read_field.nullable))?;
81-
require!(self.name() == read_field.name(), Error::FieldNameMismatch);
84+
require!(
85+
self.name() == read_field.name(),
86+
SchemaComparisonError::FieldNameMismatch
87+
);
8288
self.data_type().can_read_as(read_field.data_type())?;
8389
Ok(())
8490
}
@@ -100,30 +106,33 @@ impl SchemaComparison for StructType {
100106
.collect();
101107
require!(
102108
lowercase_field_map.len() == self.fields.len(),
103-
Error::InvalidSchema
109+
SchemaComparisonError::InvalidSchema
104110
);
105111

106112
let lowercase_read_field_names: HashSet<String> =
107113
read_type.fields.keys().map(|x| x.to_lowercase()).collect();
108114
require!(
109115
lowercase_read_field_names.len() == read_type.fields.len(),
110-
Error::InvalidSchema
116+
SchemaComparisonError::InvalidSchema
111117
);
112118

113119
// Check that the field names are a subset of the read fields.
114120
if lowercase_field_map
115121
.keys()
116122
.any(|name| !lowercase_read_field_names.contains(name))
117123
{
118-
return Err(Error::MissingColumn);
124+
return Err(SchemaComparisonError::MissingColumn);
119125
}
120126
for read_field in read_type.fields() {
121127
match lowercase_field_map.get(&read_field.name().to_lowercase()) {
122128
Some(existing_field) => existing_field.can_read_as(read_field)?,
123129
None => {
124130
// Note: Delta spark does not perform the following check. Hence it ignores
125131
// non-null fields that exist in the read schema that aren't in this schema.
126-
require!(read_field.is_nullable(), Error::NewNonNullableColumn);
132+
require!(
133+
read_field.is_nullable(),
134+
SchemaComparisonError::NewNonNullableColumn
135+
);
127136
}
128137
}
129138
}
@@ -161,7 +170,7 @@ impl SchemaComparison for DataType {
161170
(a, b) => {
162171
// TODO: In the future, we will change this to support type widening.
163172
// See: #623
164-
require!(a == b, Error::TypeMismatch);
173+
require!(a == b, SchemaComparisonError::TypeMismatch);
165174
}
166175
};
167176
Ok(())
@@ -170,7 +179,7 @@ impl SchemaComparison for DataType {
170179

171180
#[cfg(test)]
172181
mod tests {
173-
use crate::schema::compare::{Error, SchemaComparison};
182+
use crate::schema::compare::{SchemaComparison, SchemaComparisonError};
174183
use crate::schema::{ArrayType, DataType, MapType, StructField, StructType};
175184

176185
#[test]
@@ -252,7 +261,7 @@ mod tests {
252261

253262
assert!(matches!(
254263
existing_schema.can_read_as(&read_schema),
255-
Err(Error::NullabilityTightening)
264+
Err(SchemaComparisonError::NullabilityTightening)
256265
));
257266
}
258267
#[test]
@@ -270,7 +279,7 @@ mod tests {
270279
]);
271280
assert!(matches!(
272281
existing_schema.can_read_as(&read_schema),
273-
Err(Error::FieldNameMismatch)
282+
Err(SchemaComparisonError::FieldNameMismatch)
274283
));
275284
}
276285
#[test]
@@ -287,7 +296,7 @@ mod tests {
287296
]);
288297
assert!(matches!(
289298
existing_schema.can_read_as(&read_schema),
290-
Err(Error::TypeMismatch)
299+
Err(SchemaComparisonError::TypeMismatch)
291300
));
292301
}
293302
#[test]
@@ -318,7 +327,7 @@ mod tests {
318327
]);
319328
assert!(matches!(
320329
existing_schema.can_read_as(&read_schema),
321-
Err(Error::NullabilityTightening)
330+
Err(SchemaComparisonError::NullabilityTightening)
322331
));
323332
}
324333
#[test]
@@ -340,7 +349,10 @@ mod tests {
340349
assert!(a.can_read_as(&b).is_ok());
341350

342351
// Read `b` as `a`. `a` is missing a column that is present in `b`.
343-
assert!(matches!(b.can_read_as(&a), Err(Error::MissingColumn)));
352+
assert!(matches!(
353+
b.can_read_as(&a),
354+
Err(SchemaComparisonError::MissingColumn)
355+
));
344356
}
345357
#[test]
346358
fn differ_by_non_nullable_column() {
@@ -360,11 +372,14 @@ mod tests {
360372
// Read `a` as `b`. `b` has an extra non-nullable column.
361373
assert!(matches!(
362374
a.can_read_as(&b),
363-
Err(Error::NewNonNullableColumn)
375+
Err(SchemaComparisonError::NewNonNullableColumn)
364376
));
365377

366378
// Read `b` as `a`. `a` is missing a column that is present in `b`.
367-
assert!(matches!(b.can_read_as(&a), Err(Error::MissingColumn)));
379+
assert!(matches!(
380+
b.can_read_as(&a),
381+
Err(SchemaComparisonError::MissingColumn)
382+
));
368383
}
369384

370385
#[test]
@@ -384,13 +399,13 @@ mod tests {
384399
]);
385400
assert!(matches!(
386401
existing_schema.can_read_as(&read_schema),
387-
Err(Error::InvalidSchema)
402+
Err(SchemaComparisonError::InvalidSchema)
388403
));
389404

390405
// Checks in the inverse order
391406
assert!(matches!(
392407
read_schema.can_read_as(&existing_schema),
393-
Err(Error::InvalidSchema)
408+
Err(SchemaComparisonError::InvalidSchema)
394409
));
395410
}
396411
}

kernel/src/table_changes/log_replay.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,10 @@ impl LogReplayScanner {
181181
}
182182
if let Some((schema, configuration)) = visitor.metadata_info {
183183
let schema: StructType = serde_json::from_str(&schema)?;
184-
require!(
185-
schema.can_read_as(table_schema).is_ok(),
186-
Error::change_data_feed_incompatible_schema(table_schema, &schema)
187-
);
184+
185+
schema.can_read_as(table_schema).map_err(|err| {
186+
Error::change_data_feed_incompatible_schema(commit_file.version, err)
187+
})?;
188188
let table_properties = TableProperties::from(configuration);
189189
check_cdf_table_properties(&table_properties)
190190
.map_err(|_| Error::change_data_feed_unsupported(commit_file.version))?;

kernel/src/table_changes/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,11 @@ impl TableChanges {
174174

175175
// Verify that the start and end schemas are compatible. We must still check schema
176176
// compatibility for each schema update in the CDF range.
177-
if let Err(err) = start_snapshot.schema().can_read_as(end_snapshot.schema()) {
178-
return Err(Error::generic(format!(
177+
start_snapshot.schema().can_read_as(end_snapshot.schema()).map_err(|err|{
178+
Error::generic(format!(
179179
"Failed to build TableChanges: {}\n Found start version schema {:?} and end version schema {:?}", err, start_snapshot.schema(), end_snapshot.schema(),
180-
)));
181-
}
180+
))
181+
})?;
182182

183183
let schema = StructType::new(
184184
end_snapshot

0 commit comments

Comments
 (0)