Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT: Introduce DFField for DFSchema tuple #9735

Closed
wants to merge 80 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
190497c
Start setting up SchemaRef
matthewmturner Jan 13, 2024
b9fd992
Start updating DFSchema
matthewmturner Jan 14, 2024
93c4a1c
More updates to df schema
matthewmturner Jan 15, 2024
92a2a45
More updates
matthewmturner Jan 17, 2024
23c6634
More updates
matthewmturner Jan 18, 2024
dd88233
Start working on columns
matthewmturner Jan 19, 2024
ee2bd43
Start cleaning up columns
matthewmturner Jan 20, 2024
a10a854
Remove DFField from dfschema tests
matthewmturner Jan 25, 2024
25b4e42
More cleanup
matthewmturner Jan 29, 2024
80f1181
datafusion common is building
matthewmturner Jan 30, 2024
20694ea
More cleanup
matthewmturner Jan 31, 2024
cba88e7
Start updating expr
matthewmturner Feb 1, 2024
3210675
More cleanup
matthewmturner Feb 4, 2024
6b465ce
Update build_join_schema
matthewmturner Feb 5, 2024
5a595e6
Cleanup expr to_field
matthewmturner Feb 6, 2024
f49644f
Builder updates
matthewmturner Feb 8, 2024
51f9c07
Update expr utils
matthewmturner Feb 9, 2024
03b133a
Work on logical plan
matthewmturner Feb 12, 2024
a519a7f
Update expr rewriter
matthewmturner Feb 14, 2024
b748981
Cleanup up logical plan
matthewmturner Feb 15, 2024
9b5a870
More cleanup
matthewmturner Feb 16, 2024
fe60a45
More cleanup
matthewmturner Feb 23, 2024
9600fdc
Cleanup
matthewmturner Feb 28, 2024
4524fae
Fix unnest
matthewmturner Mar 7, 2024
3fe9078
make datafusion-expr build
haohuaijin Mar 13, 2024
4819586
make datafusion-optimizer build
haohuaijin Mar 14, 2024
e55354d
can build some datafusion-sql
haohuaijin Mar 14, 2024
c4b3429
clean up
haohuaijin Mar 14, 2024
b86c66b
make datafusion-sql build
haohuaijin Mar 14, 2024
efe29b5
make core build
haohuaijin Mar 14, 2024
f13fe94
make datafusion-substrait build
haohuaijin Mar 14, 2024
ac4629f
clean up
haohuaijin Mar 14, 2024
21630d5
clean up
haohuaijin Mar 14, 2024
121aa5d
fix plan.rs
haohuaijin Mar 14, 2024
966b49d
fix clean up
haohuaijin Mar 15, 2024
618656d
fix to_field
haohuaijin Mar 15, 2024
c039b24
fix select * from file
haohuaijin Mar 15, 2024
038c172
remove DFField in tests
haohuaijin Mar 15, 2024
0af52ff
fix some tests
haohuaijin Mar 15, 2024
796e248
fix unnest and dfschema
haohuaijin Mar 16, 2024
1c4045f
fix dfschema test
haohuaijin Mar 16, 2024
64684c3
make datafusion-proto build
haohuaijin Mar 16, 2024
6223de3
fix some optimizer test
haohuaijin Mar 16, 2024
8e95847
fix dfschema merge
haohuaijin Mar 16, 2024
630456f
fix with_column_renamed
haohuaijin Mar 16, 2024
9eff49d
fix compound identifier tests
haohuaijin Mar 17, 2024
d8987f2
fix unnest plan
haohuaijin Mar 17, 2024
321d2e7
fix except
haohuaijin Mar 17, 2024
3564640
Merge remote-tracking branch 'upstream/main' into feat/make-dfschema-…
haohuaijin Mar 17, 2024
1833fb9
fix test and conflicts
haohuaijin Mar 18, 2024
f87362d
remove clone in dfschema
haohuaijin Mar 18, 2024
c2aefe5
clean up dfschema
haohuaijin Mar 18, 2024
b754477
optimizer dfschema merge
haohuaijin Mar 18, 2024
4ec056a
retrigger ci
haohuaijin Mar 18, 2024
51ae585
Merge branch 'main' into feat/make-dfschema-wrap-schemaref
haohuaijin Mar 19, 2024
84843de
fmt
haohuaijin Mar 19, 2024
a6ce4fb
apply suggestion
haohuaijin Mar 19, 2024
0c5d037
fmt
haohuaijin Mar 19, 2024
668d112
find field return refer
haohuaijin Mar 19, 2024
e7576f6
add some tests
haohuaijin Mar 19, 2024
fce7341
improve build_join_schema
haohuaijin Mar 19, 2024
eb6e21a
remove some clone
haohuaijin Mar 19, 2024
8bd19a5
remove ignore
haohuaijin Mar 21, 2024
9a17cdf
introduce dffield
jayzhan211 Mar 22, 2024
3b9366b
remove comment
jayzhan211 Mar 22, 2024
364a1b2
Merge remote-tracking branch 'upstream/main' into dfschema
jayzhan211 Mar 22, 2024
2d0c470
Merge remote-tracking branch 'upstream/main' into dfschema
jayzhan211 Mar 22, 2024
9a7b929
fmt
jayzhan211 Mar 22, 2024
1d4cd1c
clippy
jayzhan211 Mar 22, 2024
0402c6b
dffield to col
jayzhan211 Mar 22, 2024
da3cbf7
cleanup
jayzhan211 Mar 22, 2024
3db6cb6
fmt
jayzhan211 Mar 22, 2024
5a75fde
clone in unnest_with_options
jayzhan211 Mar 23, 2024
5f33311
arc in unnest
jayzhan211 Mar 23, 2024
ebbd577
use fieldref in build_join_schema
jayzhan211 Mar 23, 2024
43b4b6e
clippy
jayzhan211 Mar 23, 2024
c0295e0
rm with_new_fields
jayzhan211 Mar 23, 2024
936240d
rm clone
jayzhan211 Mar 23, 2024
9d641fa
cleanup
jayzhan211 Mar 23, 2024
c787c49
fix unnest
jayzhan211 Mar 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
More updates
matthewmturner committed Jan 17, 2024
commit 92a2a457ee62c32c38514374152b1b1ff1382964
139 changes: 75 additions & 64 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ use crate::{

use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema, SchemaRef};
use arrow_schema::SchemaBuilder;

/// A reference-counted reference to a [DFSchema].
pub type DFSchemaRef = Arc<DFSchema>;
@@ -218,14 +219,12 @@ impl DFSchema {
schema: &Schema,
) -> Result<Self> {
let qualifier = qualifier.into();
Self::new_with_metadata(
schema
.fields()
.iter()
.map(|f| DFField::from_qualified(qualifier.clone(), f.clone()))
.collect(),
schema.metadata().clone(),
)
let schema = DFSchema {
inner: schema.clone().into(),
field_qualifiers: vec![Some(qualifier); schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
};
Ok(schema)
}

/// Assigns functional dependencies.
@@ -269,21 +268,27 @@ impl DFSchema {
/// Modify this schema by appending the fields from the supplied schema, ignoring any
/// duplicate fields.
pub fn merge(&mut self, other_schema: &DFSchema) {
if other_schema.fields.is_empty() {
if other_schema.inner.fields.is_empty() {
return;
}
let mut schema_builder = SchemaBuilder::from(self.inner.fields);
for (qualifier, field) in other_schema.iter() {
// skip duplicate columns
let duplicated_field = match qualifier {
Some(q) => self.has_column_with_qualified_name(qualifier, field.name()),
Some(q) => self.has_column_with_qualified_name(q, field.name()),
// for unqualified columns, check as unqualified name
None => self.has_column_with_unqualified_name(field.name()),
};
if !duplicated_field {
self.fields.push(field.clone());
// self.inner.fields.push(field.clone());
schema_builder.push(field.clone())
}
}
self.metadata.extend(other_schema.metadata.clone())
let finished = schema_builder.finish();
self.inner = finished.into();
self.inner
.metadata
.extend(other_schema.inner.metadata.clone());
}

/// Get a list of fields
@@ -297,53 +302,50 @@ impl DFSchema {
&self.inner.fields[i]
}

#[deprecated(since = "8.0.0", note = "please use `index_of_column_by_name` instead")]
// #[deprecated(since = "8.0.0", note = "please use `index_of_column_by_name` instead")]
/// Find the index of the column with the given unqualified name
pub fn index_of(&self, name: &str) -> Result<usize> {
for i in 0..self.fields.len() {
if self.fields[i].name() == name {
return Ok(i);
} else {
// Now that `index_of` is deprecated an error is thrown if
// a fully qualified field name is provided.
match &self.fields[i].qualifier {
Some(qualifier) => {
if (qualifier.to_string() + "." + self.fields[i].name()) == name {
return _plan_err!(
"Fully qualified field name '{name}' was supplied to `index_of` \
which is deprecated. Please use `index_of_column_by_name` instead"
);
}
}
None => (),
}
}
}

Err(unqualified_field_not_found(name, self))
}
// pub fn index_of(&self, name: &str) -> Result<usize> {
// for i in 0..self.fields.len() {
// if self.fields[i].name() == name {
// return Ok(i);
// } else {
// // Now that `index_of` is deprecated an error is thrown if
// // a fully qualified field name is provided.
// match &self.fields[i].qualifier {
// Some(qualifier) => {
// if (qualifier.to_string() + "." + self.fields[i].name()) == name {
// return _plan_err!(
// "Fully qualified field name '{name}' was supplied to `index_of` \
// which is deprecated. Please use `index_of_column_by_name` instead"
// );
// }
// }
// None => (),
// }
// }
// }
//
// Err(unqualified_field_not_found(name, self))
// }

pub fn index_of_column_by_name(
&self,
qualifier: Option<&TableReference>,
name: &str,
) -> Result<Option<usize>> {
let mut matches = self
.fields
.iter()
.enumerate()
.filter(|(_, field)| match (qualifier, &field.qualifier) {
.filter(|(i, (q, f))| match (qualifier, q) {
// field to lookup is qualified.
// current field is qualified and not shared between relations, compare both
// qualifier and name.
(Some(q), Some(field_q)) => {
q.resolved_eq(field_q) && field.name() == name
}
(Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
// field to lookup is qualified but current field is unqualified.
(Some(qq), None) => {
// the original field may now be aliased with a name that matches the
// original qualified name
let column = Column::from_qualified_name(field.name());
let column = Column::from_qualified_name(f.name());
match column {
Column {
relation: Some(r),
@@ -353,7 +355,7 @@ impl DFSchema {
}
}
// field to lookup is unqualified, no need to compare qualifier
(None, Some(_)) | (None, None) => field.name() == name,
(None, Some(_)) | (None, None) => f.name() == name,
})
.map(|(idx, _)| idx);
Ok(matches.next())
@@ -386,7 +388,12 @@ impl DFSchema {

/// Find all fields having the given qualifier
pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
self.iter().filter(|(q, f)| q == qualifier).collect()
let fields = self
.iter()
.filter(|(q, f)| q.map(|q| q == qualifier).unwrap_or(false))
.map(|(_, f)| f.into())
.collect();
fields
}

/// Find all fields indices having the given qualifier
@@ -446,10 +453,8 @@ impl DFSchema {
qualifier: &TableReference,
name: &str,
) -> bool {
self.fields().iter().any(|field| {
field.qualifier().map(|q| q.eq(qualifier)).unwrap_or(false)
&& field.name() == name
})
self.iter()
.any(|(q, f)| q.map(|q| q == qualifier).unwrap_or(false) && f.name() == name)
}

/// Find if the field exists with the given qualified column
@@ -631,21 +636,29 @@ impl DFSchema {

/// Strip all field qualifier in schema
pub fn strip_qualifiers(self) -> Self {
self.field_qualifiers = vec![None; self.inner.fields.len()];
self
let stripped_schema = DFSchema {
inner: self.inner.clone(),
field_qualifiers: vec![None; self.inner.fields.len()],
functional_dependencies: self.functional_dependencies.clone(),
};
stripped_schema
}

/// Replace all field qualifier with new value in schema
pub fn replace_qualifier(self, qualifier: impl Into<OwnedTableReference>) -> Self {
let qualifier = qualifier.into();
self.field_qualifiers = vec![qualifier; self.inner.fields().len()];
self
let replaced_schema = DFSchema {
inner: self.inner.clone(),
field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
functional_dependencies: self.functional_dependencies.clone(),
};
replaced_schema
}

/// Get list of fully-qualified field names in this schema
pub fn field_names(&self) -> Vec<String> {
self.iter()
.map(|(qualifier, field)| qualified_name(&qualifier, field))
.map(|(qualifier, field)| qualified_name(qualifier, field.name()))
.collect::<Vec<_>>()
}

@@ -672,16 +685,16 @@ impl DFSchema {
impl From<DFSchema> for Schema {
/// Convert DFSchema into a Schema
fn from(df_schema: DFSchema) -> Self {
let fields: Fields = df_schema.fields.into_iter().map(|f| f.field).collect();
Schema::new_with_metadata(fields, df_schema.metadata)
let fields: Fields = df_schema.inner.fields.clone();
Schema::new_with_metadata(fields, df_schema.inner.metadata)
}
}

impl From<&DFSchema> for Schema {
/// Convert DFSchema reference into a Schema
fn from(df_schema: &DFSchema) -> Self {
let fields: Fields = df_schema.fields.iter().map(|f| f.field.clone()).collect();
Schema::new_with_metadata(fields, df_schema.metadata.clone())
let fields: Fields = df_schema.inner.fields.clone();
Schema::new_with_metadata(fields, df_schema.inner.metadata.clone())
}
}

@@ -690,8 +703,8 @@ impl TryFrom<Schema> for DFSchema {
type Error = DataFusionError;
fn try_from(schema: Schema) -> Result<Self, Self::Error> {
let dfschema = Self {
inner: schema,
field_qualifiers: vec![None, schema.fields.len()],
inner: schema.into(),
field_qualifiers: vec![None; schema.fields.len()],
functional_dependencies: FunctionalDependencies::empty(),
};
Ok(dfschema)
@@ -763,10 +776,8 @@ impl Display for DFSchema {
write!(
f,
"fields:[{}], metadata:{:?}",
self.inner
.fields
.iter()
.map(|field| field.qualified_name())
self.iter()
.map(|(q, f)| qualified_name(q, f.name()))
.collect::<Vec<String>>()
.join(", "),
self.inner.metadata
@@ -871,7 +882,7 @@ impl SchemaExt for Schema {
}
}

fn qualified_name(qualifier: &Option<OwnedTableReference>, name: &str) -> String {
fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
match qualifier {
Some(q) => format!("{}.{}", q, name),
None => name.to_string(),