Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/make dfschema wrap schemaref #8905

Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove DFField from dfschema tests
matthewmturner committed Jan 25, 2024
commit a10a85419b55354b4bd81b740e82180a99119d2c
315 changes: 31 additions & 284 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
@@ -251,6 +251,11 @@ impl DFSchema {
.chain(schema.iter())
.map(|(qualifier, field)| (qualifier.as_ref().clone(), field.clone()))
.unzip();
// let (new_field_qualifiers, new_fields) = self
// .iter()
// .chain(schema.iter())
// .map(|(qualifier, field)| (qualifier.as_ref().clone(), field.clone()))
// .unzip();

let mut new_metadata = self.inner.metadata.clone();
new_metadata.extend(schema.inner.metadata.clone());
@@ -440,12 +445,12 @@ impl DFSchema {
/// Find all fields with the given name and return their fully qualified name.
/// This was added after making DFSchema wrap SchemaRef to facilitate the transition
/// for `Column`. TODO: Or maybe just make a columns_with_unqualified_name method?
pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<String> {
self.iter()
.filter(|(_, field)| field.name() == name)
.map(|(q, f)| qualified_name(q, f.name()))
.collect()
}
// pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<String> {
// self.iter()
// .filter(|(_, field)| field.name() == name)
// .map(|(q, f)| qualified_name(q, f.name()))
// .collect()
// }

/// Find the field with the given name
pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
@@ -970,22 +975,6 @@ mod tests {
Ok(())
}

#[test]
fn from_unqualified_field() {
let field = Field::new("c0", DataType::Boolean, true);
let field = DFField::from(field);
assert_eq!("c0", field.name());
assert_eq!("c0", field.qualified_name());
}

#[test]
fn from_qualified_field() {
let field = Field::new("c0", DataType::Boolean, true);
let field = DFField::from_qualified("t1", field);
assert_eq!("c0", field.name());
assert_eq!("t1.c0", field.qualified_name());
}

#[test]
fn from_unqualified_schema() -> Result<()> {
let schema = DFSchema::try_from(test_schema_1())?;
@@ -1111,9 +1100,14 @@ mod tests {
.to_string(),
expected_help
);
assert_contains!(schema.index_of("y").unwrap_err().to_string(), expected_help);
let y_col = Column::new_unqualified("y");
assert_contains!(
schema.index_of_column(&y_col).unwrap_err().to_string(),
expected_help
);
let c0_column = Column::new(Some("t1"), "c0");
assert_contains!(
schema.index_of("t1.c0").unwrap_err().to_string(),
schema.index_of_column(&c0_column).unwrap_err().to_string(),
expected_err_msg
);
Ok(())
@@ -1133,252 +1127,6 @@ mod tests {
assert_eq!(err.strip_backtrace(), "Schema error: No field named c0.");
}

#[test]
fn equivalent_names_and_types() {
let arrow_field1 = Field::new("f1", DataType::Int16, true);
let arrow_field1_meta = arrow_field1.clone().with_metadata(test_metadata_n(2));

let field1_i16_t = DFField::from(arrow_field1);
let field1_i16_t_meta = DFField::from(arrow_field1_meta);
let field1_i16_t_qualified =
DFField::from_qualified("foo", field1_i16_t.field().clone());
let field1_i16_f = DFField::from(Field::new("f1", DataType::Int16, false));
let field1_i32_t = DFField::from(Field::new("f1", DataType::Int32, true));
let field2_i16_t = DFField::from(Field::new("f2", DataType::Int16, true));
let field3_i16_t = DFField::from(Field::new("f3", DataType::Int16, true));

let dict =
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8));
let field_dict_t = DFField::from(Field::new("f_dict", dict.clone(), true));
let field_dict_f = DFField::from(Field::new("f_dict", dict, false));

let list_t = DFField::from(Field::new_list(
"f_list",
field1_i16_t.field().clone(),
true,
));
let list_f = DFField::from(Field::new_list(
"f_list",
field1_i16_f.field().clone(),
false,
));

let list_f_name = DFField::from(Field::new_list(
"f_list",
field2_i16_t.field().clone(),
false,
));

let struct_t = DFField::from(Field::new_struct(
"f_struct",
vec![field1_i16_t.field().clone()],
true,
));
let struct_f = DFField::from(Field::new_struct(
"f_struct",
vec![field1_i16_f.field().clone()],
false,
));

let struct_f_meta = DFField::from(Field::new_struct(
"f_struct",
vec![field1_i16_t_meta.field().clone()],
false,
));

let struct_f_type = DFField::from(Field::new_struct(
"f_struct",
vec![field1_i32_t.field().clone()],
false,
));

// same
TestCase {
fields1: vec![&field1_i16_t],
fields2: vec![&field1_i16_t],
expected_dfschema: true,
expected_arrow: true,
}
.run();

// same but metadata is different, should still be true
TestCase {
fields1: vec![&field1_i16_t_meta],
fields2: vec![&field1_i16_t],
expected_dfschema: true,
expected_arrow: true,
}
.run();

// different name
TestCase {
fields1: vec![&field1_i16_t],
fields2: vec![&field2_i16_t],
expected_dfschema: false,
expected_arrow: false,
}
.run();

// different type
TestCase {
fields1: vec![&field1_i16_t],
fields2: vec![&field1_i32_t],
expected_dfschema: false,
expected_arrow: false,
}
.run();

// different nullability
TestCase {
fields1: vec![&field1_i16_t],
fields2: vec![&field1_i16_f],
expected_dfschema: true,
expected_arrow: true,
}
.run();

// different qualifier
TestCase {
fields1: vec![&field1_i16_t],
fields2: vec![&field1_i16_t_qualified],
expected_dfschema: false,
expected_arrow: true,
}
.run();

// different name after first
TestCase {
fields1: vec![&field2_i16_t, &field1_i16_t],
fields2: vec![&field2_i16_t, &field3_i16_t],
expected_dfschema: false,
expected_arrow: false,
}
.run();

// different number
TestCase {
fields1: vec![&field1_i16_t, &field2_i16_t],
fields2: vec![&field1_i16_t],
expected_dfschema: false,
expected_arrow: false,
}
.run();

// dictionary
TestCase {
fields1: vec![&field_dict_t],
fields2: vec![&field_dict_t],
expected_dfschema: true,
expected_arrow: true,
}
.run();

// dictionary (different nullable)
TestCase {
fields1: vec![&field_dict_t],
fields2: vec![&field_dict_f],
expected_dfschema: true,
expected_arrow: true,
}
.run();

// dictionary (wrong type)
TestCase {
fields1: vec![&field_dict_t],
fields2: vec![&field1_i16_t],
expected_dfschema: false,
expected_arrow: false,
}
.run();

// list (different embedded nullability)
TestCase {
fields1: vec![&list_t],
fields2: vec![&list_f],
expected_dfschema: true,
expected_arrow: true,
}
.run();

// list (different sub field names)
TestCase {
fields1: vec![&list_t],
fields2: vec![&list_f_name],
expected_dfschema: false,
expected_arrow: false,
}
.run();

// struct
TestCase {
fields1: vec![&struct_t],
fields2: vec![&struct_f],
expected_dfschema: true,
expected_arrow: true,
}
.run();

// struct (different embedded meta)
TestCase {
fields1: vec![&struct_t],
fields2: vec![&struct_f_meta],
expected_dfschema: true,
expected_arrow: true,
}
.run();

// struct (different field type)
TestCase {
fields1: vec![&struct_t],
fields2: vec![&struct_f_type],
expected_dfschema: false,
expected_arrow: false,
}
.run();

#[derive(Debug)]
struct TestCase<'a> {
fields1: Vec<&'a DFField>,
fields2: Vec<&'a DFField>,
expected_dfschema: bool,
expected_arrow: bool,
}

impl<'a> TestCase<'a> {
fn run(self) {
println!("Running {self:#?}");
let schema1 = to_df_schema(self.fields1);
let schema2 = to_df_schema(self.fields2);
assert_eq!(
schema1.equivalent_names_and_types(&schema2),
self.expected_dfschema,
"Comparison did not match expected: {}\n\n\
schema1:\n\n{:#?}\n\nschema2:\n\n{:#?}",
self.expected_dfschema,
schema1,
schema2
);

let arrow_schema1 = Schema::from(schema1);
let arrow_schema2 = Schema::from(schema2);
assert_eq!(
arrow_schema1.equivalent_names_and_types(&arrow_schema2),
self.expected_arrow,
"Comparison did not match expected: {}\n\n\
arrow schema1:\n\n{:#?}\n\n arrow schema2:\n\n{:#?}",
self.expected_arrow,
arrow_schema1,
arrow_schema2
);
}
}

fn to_df_schema(fields: Vec<&DFField>) -> DFSchema {
let fields = fields.into_iter().cloned().collect();
DFSchema::new_with_metadata(fields, HashMap::new()).unwrap()
}
}

#[test]
fn into() {
// Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
@@ -1389,11 +1137,11 @@ mod tests {
);
let arrow_schema_ref = Arc::new(arrow_schema.clone());

let df_schema = DFSchema::new_with_metadata(
vec![DFField::new_unqualified("c0", DataType::Int64, true)],
metadata,
)
.unwrap();
let df_schema = DFSchema {
inner: arrow_schema_ref,
field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
};
let df_schema_ref = Arc::new(df_schema.clone());

{
@@ -1433,16 +1181,15 @@ mod tests {
b_metadata.insert("key".to_string(), "value".to_string());
let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);

let a: DFField = DFField::from_qualified("table1", a_field);
let b: DFField = DFField::from_qualified("table1", b_field);
let schema = Arc::new(Schema::new(vec![a_field, b_field]));

let df_schema = Arc::new(
DFSchema::new_with_metadata([a, b].to_vec(), HashMap::new()).unwrap(),
);
let schema: Schema = df_schema.as_ref().clone().into();
let a_df = df_schema.fields.first().unwrap().field();
let a_arrow = schema.fields.first().unwrap();
assert_eq!(a_df.metadata(), a_arrow.metadata())
let df_schema = DFSchema {
inner: schema,
field_qualifiers: vec![None; schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
};

assert_eq!(df_schema.inner.metadata(), schema.metadata())
}

#[test]
12 changes: 2 additions & 10 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
@@ -596,23 +596,15 @@ pub fn field_not_found<R: Into<OwnedTableReference>>(
) -> DataFusionError {
schema_datafusion_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new(qualifier, name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
valid_fields: schema.columns().iter().map(|c| c.clone()).collect(),
})
}

/// Convenience wrapper over [`field_not_found`] for when there is no qualifier
pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionError {
schema_datafusion_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new_unqualified(name)),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
valid_fields: schema.columns().iter().map(|c| c.clone()).collect(),
})
}

Loading