Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make dfschema wrap schemaref #9595

Merged
merged 74 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
190497c
Start setting up SchemaRef
matthewmturner Jan 13, 2024
b9fd992
Start updating DFSchema
matthewmturner Jan 14, 2024
93c4a1c
More updates to df schema
matthewmturner Jan 15, 2024
92a2a45
More updates
matthewmturner Jan 17, 2024
23c6634
More updates
matthewmturner Jan 18, 2024
dd88233
Start working on columns
matthewmturner Jan 19, 2024
ee2bd43
Start cleaning up columns
matthewmturner Jan 20, 2024
a10a854
Remove DFField from dfschema tests
matthewmturner Jan 25, 2024
25b4e42
More cleanup
matthewmturner Jan 29, 2024
80f1181
datafusion common is building
matthewmturner Jan 30, 2024
20694ea
More cleanup
matthewmturner Jan 31, 2024
cba88e7
Start updating expr
matthewmturner Feb 1, 2024
3210675
More cleanup
matthewmturner Feb 4, 2024
6b465ce
Update build_join_schema
matthewmturner Feb 5, 2024
5a595e6
Cleanup expr to_field
matthewmturner Feb 6, 2024
f49644f
Builder updates
matthewmturner Feb 8, 2024
51f9c07
Update expr utils
matthewmturner Feb 9, 2024
03b133a
Work on logical plan
matthewmturner Feb 12, 2024
a519a7f
Update expr rewriter
matthewmturner Feb 14, 2024
b748981
Cleanup up logical plan
matthewmturner Feb 15, 2024
9b5a870
More cleanup
matthewmturner Feb 16, 2024
fe60a45
More cleanup
matthewmturner Feb 23, 2024
9600fdc
Cleanup
matthewmturner Feb 28, 2024
4524fae
Fix unnest
matthewmturner Mar 7, 2024
3fe9078
make datafusion-expr build
haohuaijin Mar 13, 2024
4819586
make datafusion-optimizer build
haohuaijin Mar 14, 2024
e55354d
can build some datafusion-sql
haohuaijin Mar 14, 2024
c4b3429
clean up
haohuaijin Mar 14, 2024
b86c66b
make datafusion-sql build
haohuaijin Mar 14, 2024
efe29b5
make core build
haohuaijin Mar 14, 2024
f13fe94
make datafusion-substrait build
haohuaijin Mar 14, 2024
ac4629f
clean up
haohuaijin Mar 14, 2024
21630d5
clean up
haohuaijin Mar 14, 2024
121aa5d
fix plan.rs
haohuaijin Mar 14, 2024
966b49d
fix clean up
haohuaijin Mar 15, 2024
618656d
fix to_field
haohuaijin Mar 15, 2024
c039b24
fix select * from file
haohuaijin Mar 15, 2024
038c172
remove DFField in tests
haohuaijin Mar 15, 2024
0af52ff
fix some tests
haohuaijin Mar 15, 2024
796e248
fix unnest and dfschema
haohuaijin Mar 16, 2024
1c4045f
fix dfschema test
haohuaijin Mar 16, 2024
64684c3
make datafusion-proto build
haohuaijin Mar 16, 2024
6223de3
fix some optimizer test
haohuaijin Mar 16, 2024
8e95847
fix dfschema merge
haohuaijin Mar 16, 2024
630456f
fix with_column_renamed
haohuaijin Mar 16, 2024
9eff49d
fix compound identifier tests
haohuaijin Mar 17, 2024
d8987f2
fix unnest plan
haohuaijin Mar 17, 2024
321d2e7
fix except
haohuaijin Mar 17, 2024
3564640
Merge remote-tracking branch 'upstream/main' into feat/make-dfschema-…
haohuaijin Mar 17, 2024
1833fb9
fix test and conflicts
haohuaijin Mar 18, 2024
f87362d
remove clone in dfschema
haohuaijin Mar 18, 2024
c2aefe5
clean up dfschema
haohuaijin Mar 18, 2024
b754477
optimizer dfschema merge
haohuaijin Mar 18, 2024
4ec056a
retrigger ci
haohuaijin Mar 18, 2024
51ae585
Merge branch 'main' into feat/make-dfschema-wrap-schemaref
haohuaijin Mar 19, 2024
84843de
fmt
haohuaijin Mar 19, 2024
a6ce4fb
apply suggestion
haohuaijin Mar 19, 2024
0c5d037
fmt
haohuaijin Mar 19, 2024
668d112
find field return refer
haohuaijin Mar 19, 2024
e7576f6
add some tests
haohuaijin Mar 19, 2024
fce7341
improve build_join_schema
haohuaijin Mar 19, 2024
eb6e21a
remove some clone
haohuaijin Mar 19, 2024
8bd19a5
remove ignore
haohuaijin Mar 21, 2024
31099cd
Merge branch 'main' into feat/make-dfschema-wrap-schemaref
haohuaijin Mar 22, 2024
bde2a07
fmt
haohuaijin Mar 22, 2024
fb93dc2
Merge remote-tracking branch 'upstream/main' into feat/make-dfschema-…
haohuaijin Mar 22, 2024
8ac0993
remove dfschema create method
haohuaijin Mar 22, 2024
ad71328
add column from trait
haohuaijin Mar 22, 2024
5daebe6
from Vec<Field> to Fields
haohuaijin Mar 22, 2024
55806e4
fmt
haohuaijin Mar 22, 2024
b55d459
Merge branch 'apache:main' into feat/make-dfschema-wrap-schemaref
haohuaijin Mar 24, 2024
b7d6989
Merge branch 'apache:main' into feat/make-dfschema-wrap-schemaref
haohuaijin Mar 25, 2024
a29bcc5
Merge remote-tracking branch 'apache/main' into feat/make-dfschema-wr…
alamb Apr 1, 2024
54de5cd
Add schema validation check for CREATE EXTERNAL TABLE
alamb Apr 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ impl ConvertOpt {
// Select all apart from the padding column
let selection = csv
.schema()
.fields()
.iter()
.take(schema.fields.len() - 1)
.map(|d| Expr::Column(d.qualified_column()))
.map(|(qualifier, field)| {
Expr::Column(Column::new(qualifier.cloned(), field.name()))
})
.collect();

csv = csv.select(selection)?;
Expand Down
19 changes: 8 additions & 11 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::array::{BooleanArray, Int32Array};
use arrow::record_batch::RecordBatch;

use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::{DFField, DFSchema};
use datafusion::common::DFSchema;
use datafusion::error::Result;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_expr::{
Expand Down Expand Up @@ -273,31 +273,28 @@ fn expression_type_demo() -> Result<()> {
// a schema. In this case we create a schema where the column `c` is of
// type Utf8 (a String / VARCHAR)
let schema = DFSchema::new_with_metadata(
vec![DFField::new_unqualified("c", DataType::Utf8, true)],
vec![Field::new("c", DataType::Utf8, true)],
HashMap::new(),
)
.unwrap();
);
assert_eq!("Utf8", format!("{}", expr.get_type(&schema).unwrap()));

// Using a schema where the column `foo` is of type Int32
let schema = DFSchema::new_with_metadata(
vec![DFField::new_unqualified("c", DataType::Int32, true)],
vec![Field::new("c", DataType::Int32, true)],
HashMap::new(),
)
.unwrap();
);
assert_eq!("Int32", format!("{}", expr.get_type(&schema).unwrap()));

// Get the type of an expression that adds 2 columns. Adding an Int32
// and Float32 results in Float32 type
let expr = col("c1") + col("c2");
let schema = DFSchema::new_with_metadata(
vec![
DFField::new_unqualified("c1", DataType::Int32, true),
DFField::new_unqualified("c2", DataType::Float32, true),
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Float32, true),
],
HashMap::new(),
)
.unwrap();
);
assert_eq!("Float32", format!("{}", expr.get_type(&schema).unwrap()));

Ok(())
Expand Down
86 changes: 41 additions & 45 deletions datafusion/common/src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,15 @@ impl Column {
}

for schema in schemas {
let fields = schema.fields_with_unqualified_name(&self.name);
match fields.len() {
let qualified_fields =
schema.qualified_fields_with_unqualified_name(&self.name);
match qualified_fields.len() {
0 => continue,
1 => {
return Ok(fields[0].qualified_column());
return Ok(Column::new(
qualified_fields[0].0.cloned(),
qualified_fields[0].1.name(),
));
}
_ => {
// More than 1 fields in this schema have their names set to self.name.
Expand All @@ -198,14 +202,13 @@ impl Column {
// We will use the relation from the first matched field to normalize self.

// Compare matched fields with one USING JOIN clause at a time
let columns = schema.columns_with_unqualified_name(&self.name);
for using_col in using_columns {
let all_matched = fields
.iter()
.all(|f| using_col.contains(&f.qualified_column()));
let all_matched = columns.iter().all(|f| using_col.contains(f));
// All matched fields belong to the same using column set, in orther words
// the same join clause. We simply pick the qualifer from the first match.
if all_matched {
return Ok(fields[0].qualified_column());
return Ok(columns[0].clone());
}
}
}
Expand All @@ -214,10 +217,7 @@ impl Column {

_schema_err!(SchemaError::FieldNotFound {
field: Box::new(Column::new(self.relation.clone(), self.name)),
valid_fields: schemas
.iter()
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.collect(),
valid_fields: schemas.iter().flat_map(|s| s.columns()).collect(),
})
}

Expand Down Expand Up @@ -267,13 +267,18 @@ impl Column {
}

for schema_level in schemas {
let fields = schema_level
let qualified_fields = schema_level
.iter()
.flat_map(|s| s.fields_with_unqualified_name(&self.name))
.flat_map(|s| s.qualified_fields_with_unqualified_name(&self.name))
.collect::<Vec<_>>();
match fields.len() {
match qualified_fields.len() {
0 => continue,
1 => return Ok(fields[0].qualified_column()),
1 => {
return Ok(Column::new(
qualified_fields[0].0.cloned(),
qualified_fields[0].1.name(),
))
}
_ => {
// More than 1 fields in this schema have their names set to self.name.
//
Expand All @@ -288,14 +293,16 @@ impl Column {
// We will use the relation from the first matched field to normalize self.

// Compare matched fields with one USING JOIN clause at a time
let columns = schema_level
.iter()
.flat_map(|s| s.columns_with_unqualified_name(&self.name))
.collect::<Vec<_>>();
for using_col in using_columns {
let all_matched = fields
.iter()
.all(|f| using_col.contains(&f.qualified_column()));
let all_matched = columns.iter().all(|c| using_col.contains(c));
// All matched fields belong to the same using column set, in orther words
// the same join clause. We simply pick the qualifer from the first match.
if all_matched {
return Ok(fields[0].qualified_column());
return Ok(columns[0].clone());
}
}

Expand All @@ -312,7 +319,7 @@ impl Column {
valid_fields: schemas
.iter()
.flat_map(|s| s.iter())
.flat_map(|s| s.fields().iter().map(|f| f.qualified_column()))
.flat_map(|s| s.columns())
.collect(),
})
}
Expand Down Expand Up @@ -355,36 +362,25 @@ impl fmt::Display for Column {
#[cfg(test)]
mod tests {
use super::*;
use crate::DFField;
use arrow::datatypes::DataType;
use std::collections::HashMap;

fn create_schema(names: &[(Option<&str>, &str)]) -> Result<DFSchema> {
let fields = names
.iter()
.map(|(qualifier, name)| {
DFField::new(
qualifier.to_owned().map(|s| s.to_string()),
name,
DataType::Boolean,
true,
)
})
.collect::<Vec<_>>();
DFSchema::new_with_metadata(fields, HashMap::new())
use arrow_schema::{Field, SchemaBuilder};

fn create_qualified_schema(qualifier: &str, names: Vec<&str>) -> Result<DFSchema> {
let mut schema_builder = SchemaBuilder::new();
schema_builder.extend(
names
.iter()
.map(|f| Field::new(*f, DataType::Boolean, true)),
);
let schema = Arc::new(schema_builder.finish());
DFSchema::try_from_qualified_schema(qualifier, &schema)
}

#[test]
fn test_normalize_with_schemas_and_ambiguity_check() -> Result<()> {
let schema1 = create_schema(&[(Some("t1"), "a"), (Some("t1"), "b")])?;
let schema2 = create_schema(&[(Some("t2"), "c"), (Some("t2"), "d")])?;
let schema3 = create_schema(&[
(Some("t3"), "a"),
(Some("t3"), "b"),
(Some("t3"), "c"),
(Some("t3"), "d"),
(Some("t3"), "e"),
])?;
let schema1 = create_qualified_schema("t1", vec!["a", "b"])?;
let schema2 = create_qualified_schema("t2", vec!["c", "d"])?;
let schema3 = create_qualified_schema("t3", vec!["a", "b", "c", "d", "e"])?;

// already normalized
let col = Column::new(Some("t1"), "a");
Expand Down
Loading
Loading