From 5bfc2979759b290a666c62597931fdb7593cc86c Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Thu, 9 Jan 2025 22:42:02 +0800 Subject: [PATCH] feat: metadata columns --- datafusion/catalog/src/table.rs | 5 + datafusion/common/src/dfschema.rs | 340 ++++++++++++----- datafusion/common/src/lib.rs | 3 +- .../src/datasource/default_table_source.rs | 4 + datafusion/core/src/physical_planner.rs | 7 +- datafusion/core/tests/sql/metadata_columns.rs | 341 ++++++++++++++++++ datafusion/core/tests/sql/mod.rs | 1 + datafusion/expr/src/logical_plan/builder.rs | 9 +- datafusion/expr/src/logical_plan/plan.rs | 44 ++- datafusion/expr/src/table_source.rs | 5 + datafusion/ffi/src/table_provider.rs | 12 + datafusion/optimizer/src/optimizer.rs | 13 +- 12 files changed, 686 insertions(+), 98 deletions(-) create mode 100644 datafusion/core/tests/sql/metadata_columns.rs diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 3c8960495588..2339745fe41a 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -55,6 +55,11 @@ pub trait TableProvider: Debug + Sync + Send { /// Get a reference to the schema for this table fn schema(&self) -> SchemaRef; + /// Get metadata columns of this table. + fn metadata_columns(&self) -> Option { + None + } + /// Get a reference to the constraints of the table. /// Returns: /// - `None` for tables that do not support constraints. diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index ac4d8be8045f..2d5a27539eed 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -106,37 +106,169 @@ pub type DFSchemaRef = Arc; /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFSchema { + inner: QualifiedSchema, + /// Stores functional dependencies in the schema. + functional_dependencies: FunctionalDependencies, + /// metadata columns + metadata: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct QualifiedSchema { /// Inner Arrow schema reference. - inner: SchemaRef, + schema: SchemaRef, /// Optional qualifiers for each column in this schema. In the same order as /// the `self.inner.fields()` field_qualifiers: Vec>, - /// Stores functional dependencies in the schema. - functional_dependencies: FunctionalDependencies, +} + +impl QualifiedSchema { + pub fn empty() -> Self { + Self { + schema: Arc::new(Schema::new([])), + field_qualifiers: vec![], + } + } + + pub fn new(schema: SchemaRef, field_qualifiers: Vec>) -> Self { + QualifiedSchema { + schema, + field_qualifiers, + } + } + + pub fn new_with_table(schema: SchemaRef, table_name: &TableReference) -> Self { + let field_qualifiers = schema + .fields() + .iter() + .map(|_| Some(table_name.clone())) + .collect(); + Self::new(schema, field_qualifiers) + } + + pub fn len(&self) -> usize { + self.schema.fields.len() + } + + pub fn qualified_fields_with_unqualified_name( + &self, + name: &str, + ) -> Vec<(Option<&TableReference>, &Field)> { + self.iter() + .filter(|(_, field)| field.name() == name) + .map(|(qualifier, field)| (qualifier, field.as_ref())) + .collect() + } + + /// Iterate over the qualifiers and fields in the DFSchema + pub fn iter(&self) -> impl Iterator, &FieldRef)> { + self.field_qualifiers + .iter() + .zip(self.schema.fields().iter()) + .map(|(qualifier, field)| (qualifier.as_ref(), field)) + } + + pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> { + self.fields() + .iter() + .filter(|field| field.name() == name) + .map(|f| f.as_ref()) + .collect() + } + + /// Get a list of fields + pub fn fields(&self) -> &Fields { + &self.schema.fields + } + + /// Returns an immutable reference of a specific `Field` instance selected using an + /// offset within the internal `fields` vector + pub fn field(&self, i: usize) -> &Field { + &self.schema.fields[i] + } + + /// Returns an immutable reference of a specific `Field` instance selected using an + /// offset within the internal `fields` vector and its qualifier + pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) { + (self.field_qualifiers[i].as_ref(), self.field(i)) + } + + pub fn field_with_qualified_name( + &self, + qualifier: &TableReference, + name: &str, + ) -> Option<&Field> { + let mut matches = self + .iter() + .filter(|(q, f)| match q { + Some(field_q) => qualifier.resolved_eq(field_q) && f.name() == name, + None => false, + }) + .map(|(_, f)| f.as_ref()); + matches.next() + } + + pub fn index_of_column_by_name( + &self, + qualifier: Option<&TableReference>, + name: &str, + ) -> Option { + let mut matches = self + .iter() + .enumerate() + .filter(|(_, (q, f))| match (qualifier, q) { + // field to lookup is qualified. + // current field is qualified and not shared between relations, compare both + // qualifier and name. + (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name, + // field to lookup is qualified but current field is unqualified. + (Some(_), None) => false, + // field to lookup is unqualified, no need to compare qualifier + (None, Some(_)) | (None, None) => f.name() == name, + }) + .map(|(idx, _)| idx); + matches.next() + } + + pub fn field_qualifier(&self, i: usize) -> Option<&TableReference> { + self.field_qualifiers[i].as_ref() + } } impl DFSchema { /// Creates an empty `DFSchema` pub fn empty() -> Self { Self { - inner: Arc::new(Schema::new([])), - field_qualifiers: vec![], + inner: QualifiedSchema::empty(), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, } } + pub fn metadata_schema(&self) -> &Option { + &self.metadata + } + /// Return a reference to the inner Arrow [`Schema`] /// /// Note this does not have the qualifier information pub fn as_arrow(&self) -> &Schema { - self.inner.as_ref() + &self.inner.schema.as_ref() } /// Return a reference to the inner Arrow [`SchemaRef`] /// /// Note this does not have the qualifier information pub fn inner(&self) -> &SchemaRef { - &self.inner + &self.inner.schema + } + + pub fn with_metadata_schema( + mut self, + metadata_schema: Option, + ) -> Self { + self.metadata = metadata_schema; + return self; } /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier @@ -150,9 +282,9 @@ impl DFSchema { let schema = Arc::new(Schema::new_with_metadata(fields, metadata)); let dfschema = Self { - inner: schema, - field_qualifiers: qualifiers, + inner: QualifiedSchema::new(schema, qualifiers), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; dfschema.check_names()?; Ok(dfschema) @@ -179,9 +311,9 @@ impl DFSchema { let field_count = fields.len(); let schema = Arc::new(Schema::new_with_metadata(fields, metadata)); let dfschema = Self { - inner: schema, - field_qualifiers: vec![None; field_count], + inner: QualifiedSchema::new(schema, vec![None; field_count]), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; dfschema.check_names()?; Ok(dfschema) @@ -197,9 +329,12 @@ impl DFSchema { ) -> Result { let qualifier = qualifier.into(); let schema = DFSchema { - inner: schema.clone().into(), - field_qualifiers: vec![Some(qualifier); schema.fields.len()], + inner: QualifiedSchema::new( + schema.clone().into(), + vec![Some(qualifier); schema.fields.len()], + ), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; schema.check_names()?; Ok(schema) @@ -211,9 +346,9 @@ impl DFSchema { schema: &SchemaRef, ) -> Result { let dfschema = Self { - inner: Arc::clone(schema), - field_qualifiers: qualifiers, + inner: QualifiedSchema::new(Arc::clone(schema), qualifiers), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; dfschema.check_names()?; Ok(dfschema) @@ -224,7 +359,9 @@ impl DFSchema { let mut qualified_names = BTreeSet::new(); let mut unqualified_names = BTreeSet::new(); - for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) { + for (field, qualifier) in + self.inner.fields().iter().zip(&self.inner.field_qualifiers) + { if let Some(qualifier) = qualifier { if !qualified_names.insert((qualifier, field.name())) { return _schema_err!(SchemaError::DuplicateQualifiedField { @@ -254,7 +391,7 @@ impl DFSchema { mut self, functional_dependencies: FunctionalDependencies, ) -> Result { - if functional_dependencies.is_valid(self.inner.fields.len()) { + if functional_dependencies.is_valid(self.inner.schema.fields.len()) { self.functional_dependencies = functional_dependencies; Ok(self) } else { @@ -273,17 +410,21 @@ impl DFSchema { schema_builder.extend(schema.fields().iter().cloned()); let new_schema = schema_builder.finish(); - let mut new_metadata = self.inner.metadata.clone(); - new_metadata.extend(schema.inner.metadata.clone()); + let mut new_metadata: HashMap = + self.inner.schema.metadata.clone(); + new_metadata.extend(schema.inner.schema.metadata.clone()); let new_schema_with_metadata = new_schema.with_metadata(new_metadata); - let mut new_qualifiers = self.field_qualifiers.clone(); - new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice()); + let mut new_qualifiers = self.inner.field_qualifiers.clone(); + new_qualifiers.extend_from_slice(schema.inner.field_qualifiers.as_slice()); let new_self = Self { - inner: Arc::new(new_schema_with_metadata), - field_qualifiers: new_qualifiers, + inner: QualifiedSchema::new( + Arc::new(new_schema_with_metadata), + new_qualifiers, + ), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; new_self.check_names()?; Ok(new_self) @@ -292,7 +433,7 @@ impl DFSchema { /// Modify this schema by appending the fields from the supplied schema, ignoring any /// duplicate fields. pub fn merge(&mut self, other_schema: &DFSchema) { - if other_schema.inner.fields.is_empty() { + if other_schema.inner.schema.fields.is_empty() { return; } @@ -300,12 +441,13 @@ impl DFSchema { self.iter().collect(); let self_unqualified_names: HashSet<&str> = self .inner + .schema .fields .iter() .map(|field| field.name().as_str()) .collect(); - let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone()); + let mut schema_builder = SchemaBuilder::from(self.inner.schema.fields.clone()); let mut qualifiers = Vec::new(); for (qualifier, field) in other_schema.iter() { // skip duplicate columns @@ -319,30 +461,40 @@ impl DFSchema { qualifiers.push(qualifier.cloned()); } } - let mut metadata = self.inner.metadata.clone(); - metadata.extend(other_schema.inner.metadata.clone()); + let mut metadata = self.inner.schema.metadata.clone(); + metadata.extend(other_schema.inner.schema.metadata.clone()); let finished = schema_builder.finish(); let finished_with_metadata = finished.with_metadata(metadata); - self.inner = finished_with_metadata.into(); - self.field_qualifiers.extend(qualifiers); + self.inner.schema = finished_with_metadata.into(); + self.inner.field_qualifiers.extend(qualifiers); } /// Get a list of fields pub fn fields(&self) -> &Fields { - &self.inner.fields + &self.inner.schema.fields } /// Returns an immutable reference of a specific `Field` instance selected using an /// offset within the internal `fields` vector pub fn field(&self, i: usize) -> &Field { - &self.inner.fields[i] + if i >= self.inner.len() { + if let Some(metadata) = &self.metadata { + return metadata.field(i - self.inner.len()); + } + } + self.inner.field(i) } /// Returns an immutable reference of a specific `Field` instance selected using an /// offset within the internal `fields` vector and its qualifier pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) { - (self.field_qualifiers[i].as_ref(), self.field(i)) + if i >= self.inner.len() { + if let Some(metadata) = &self.metadata { + return metadata.qualified_field(i - self.inner.len()); + } + } + self.inner.qualified_field(i) } pub fn index_of_column_by_name( @@ -350,21 +502,15 @@ impl DFSchema { qualifier: Option<&TableReference>, name: &str, ) -> Option { - let mut matches = self - .iter() - .enumerate() - .filter(|(_, (q, f))| match (qualifier, q) { - // field to lookup is qualified. - // current field is qualified and not shared between relations, compare both - // qualifier and name. - (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name, - // field to lookup is qualified but current field is unqualified. - (Some(_), None) => false, - // field to lookup is unqualified, no need to compare qualifier - (None, Some(_)) | (None, None) => f.name() == name, - }) - .map(|(idx, _)| idx); - matches.next() + if let Some(idx) = self.inner.index_of_column_by_name(qualifier, name) { + return Some(idx); + } + if let Some(metadata) = &self.metadata { + return metadata + .index_of_column_by_name(qualifier, name) + .map(|idx| idx + self.inner.len()); + } + None } /// Find the index of the column with the given qualifier and name, @@ -405,6 +551,15 @@ impl DFSchema { } } + pub fn field_qualifier(&self, i: usize) -> Option<&TableReference> { + if i >= self.inner.len() { + if let Some(metadata) = &self.metadata { + return metadata.field_qualifier(i - self.inner.len()); + } + } + self.inner.field_qualifier(i) + } + /// Find the qualified field with the given name pub fn qualified_field_with_name( &self, @@ -415,7 +570,7 @@ impl DFSchema { let idx = self .index_of_column_by_name(Some(qualifier), name) .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?; - Ok((self.field_qualifiers[idx].as_ref(), self.field(idx))) + Ok((self.field_qualifier(idx), self.field(idx))) } else { self.qualified_field_with_unqualified_name(name) } @@ -442,11 +597,11 @@ impl DFSchema { /// Find all fields that match the given name pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> { - self.fields() - .iter() - .filter(|field| field.name() == name) - .map(|f| f.as_ref()) - .collect() + let mut fields: Vec<&Field> = self.inner.fields_with_unqualified_name(name); + if let Some(schema) = self.metadata_schema() { + fields.append(&mut schema.fields_with_unqualified_name(name)); + } + fields } /// Find all fields that match the given name and return them with their qualifier @@ -454,10 +609,12 @@ impl DFSchema { &self, name: &str, ) -> Vec<(Option<&TableReference>, &Field)> { - self.iter() - .filter(|(_, field)| field.name() == name) - .map(|(qualifier, field)| (qualifier, field.as_ref())) - .collect() + let mut fields: Vec<(Option<&TableReference>, &Field)> = + self.inner.qualified_fields_with_unqualified_name(name); + if let Some(schema) = self.metadata_schema() { + fields.append(&mut schema.qualified_fields_with_unqualified_name(name)); + } + fields } /// Find all fields that match the given name and convert to column @@ -524,11 +681,18 @@ impl DFSchema { qualifier: &TableReference, name: &str, ) -> Result<&Field> { - let idx = self - .index_of_column_by_name(Some(qualifier), name) - .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?; + let idx = self.index_of_column_by_name(Some(qualifier), name); + if let Some(idx) = idx { + return Ok(self.field(idx)); + } - Ok(self.field(idx)) + if let Some(schema) = &self.metadata { + if let Some(f) = schema.field_with_qualified_name(qualifier, name) { + return Ok(f); + } + } + + Err(field_not_found(Some(qualifier.clone()), name, self)) } /// Find the field with the given qualified column @@ -573,6 +737,7 @@ impl DFSchema { /// Check to see if unqualified field names matches field names in Arrow schema pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool { self.inner + .schema .fields .iter() .zip(arrow_schema.fields().iter()) @@ -775,20 +940,22 @@ impl DFSchema { /// Strip all field qualifier in schema pub fn strip_qualifiers(self) -> Self { + let len = self.inner.len(); DFSchema { - field_qualifiers: vec![None; self.inner.fields.len()], - inner: self.inner, + inner: QualifiedSchema::new(self.inner.schema, vec![None; len]), functional_dependencies: self.functional_dependencies, + metadata: self.metadata, } } /// Replace all field qualifier with new value in schema pub fn replace_qualifier(self, qualifier: impl Into) -> Self { let qualifier = qualifier.into(); + let len = self.inner.len(); DFSchema { - field_qualifiers: vec![Some(qualifier); self.inner.fields.len()], - inner: self.inner, + inner: QualifiedSchema::new(self.inner.schema, vec![Some(qualifier); len]), functional_dependencies: self.functional_dependencies, + metadata: self.metadata, } } @@ -801,7 +968,7 @@ impl DFSchema { /// Get metadata of this schema pub fn metadata(&self) -> &HashMap { - &self.inner.metadata + &self.inner.schema.metadata } /// Get functional dependencies @@ -811,7 +978,8 @@ impl DFSchema { /// Iterate over the qualifiers and fields in the DFSchema pub fn iter(&self) -> impl Iterator, &FieldRef)> { - self.field_qualifiers + self.inner + .field_qualifiers .iter() .zip(self.inner.fields().iter()) .map(|(qualifier, field)| (qualifier.as_ref(), field)) @@ -821,16 +989,16 @@ impl DFSchema { impl From for Schema { /// Convert DFSchema into a Schema fn from(df_schema: DFSchema) -> Self { - let fields: Fields = df_schema.inner.fields.clone(); - Schema::new_with_metadata(fields, df_schema.inner.metadata.clone()) + let fields: Fields = df_schema.inner.schema.fields.clone(); + Schema::new_with_metadata(fields, df_schema.inner.schema.metadata.clone()) } } impl From<&DFSchema> for Schema { /// Convert DFSchema reference into a Schema fn from(df_schema: &DFSchema) -> Self { - let fields: Fields = df_schema.inner.fields.clone(); - Schema::new_with_metadata(fields, df_schema.inner.metadata.clone()) + let fields: Fields = df_schema.inner.schema.fields.clone(); + Schema::new_with_metadata(fields, df_schema.inner.schema.metadata.clone()) } } @@ -862,9 +1030,9 @@ impl TryFrom for DFSchema { fn try_from(schema: SchemaRef) -> Result { let field_count = schema.fields.len(); let dfschema = Self { - inner: schema, - field_qualifiers: vec![None; field_count], + inner: QualifiedSchema::new(schema, vec![None; field_count]), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; Ok(dfschema) } @@ -879,8 +1047,8 @@ impl From for SchemaRef { // Hashing refers to a subset of fields considered in PartialEq. impl Hash for DFSchema { fn hash(&self, state: &mut H) { - self.inner.fields.hash(state); - self.inner.metadata.len().hash(state); // HashMap is not hashable + self.inner.schema.fields.hash(state); + self.inner.schema.metadata.len().hash(state); // HashMap is not hashable } } @@ -918,9 +1086,9 @@ impl ToDFSchema for Vec { metadata: HashMap::new(), }; let dfschema = DFSchema { - inner: schema.into(), - field_qualifiers: vec![None; field_count], + inner: QualifiedSchema::new(schema.into(), vec![None; field_count]), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; Ok(dfschema) } @@ -935,7 +1103,7 @@ impl Display for DFSchema { .map(|(q, f)| qualified_name(q, f.name())) .collect::>() .join(", "), - self.inner.metadata + self.inner.schema.metadata ) } } @@ -1279,9 +1447,12 @@ mod tests { let arrow_schema_ref = Arc::new(arrow_schema.clone()); let df_schema = DFSchema { - inner: Arc::clone(&arrow_schema_ref), - field_qualifiers: vec![None; arrow_schema_ref.fields.len()], + inner: QualifiedSchema::new( + Arc::clone(&arrow_schema_ref), + vec![None; arrow_schema_ref.fields.len()], + ), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; let df_schema_ref = Arc::new(df_schema.clone()); @@ -1325,12 +1496,15 @@ mod tests { let schema = Arc::new(Schema::new(vec![a_field, b_field])); let df_schema = DFSchema { - inner: Arc::clone(&schema), - field_qualifiers: vec![None; schema.fields.len()], + inner: QualifiedSchema::new( + Arc::clone(&schema), + vec![None; schema.fields.len()], + ), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, }; - assert_eq!(df_schema.inner.metadata(), schema.metadata()) + assert_eq!(df_schema.inner.schema.metadata(), schema.metadata()) } #[test] diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 77e8cd60ede2..7b9e640de6c9 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -52,7 +52,8 @@ pub mod utils; pub use arrow; pub use column::Column; pub use dfschema::{ - qualified_name, DFSchema, DFSchemaRef, ExprSchema, SchemaExt, ToDFSchema, + qualified_name, DFSchema, DFSchemaRef, ExprSchema, QualifiedSchema, SchemaExt, + ToDFSchema, }; pub use error::{ field_not_found, unqualified_field_not_found, DataFusionError, Result, SchemaError, diff --git a/datafusion/core/src/datasource/default_table_source.rs b/datafusion/core/src/datasource/default_table_source.rs index 91c1e0ac97fc..ff411311f609 100644 --- a/datafusion/core/src/datasource/default_table_source.rs +++ b/datafusion/core/src/datasource/default_table_source.rs @@ -56,6 +56,10 @@ impl TableSource for DefaultTableSource { self.table_provider.schema() } + fn metadata_columns(&self) -> Option { + self.table_provider.metadata_columns() + } + /// Get a reference to applicable constraints, if any exists. fn constraints(&self) -> Option<&Constraints> { self.table_provider.constraints() diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 47b31d2f4e2d..6b43e32611a9 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -2271,14 +2271,15 @@ mod tests { let expected_error: &str = "Error during planning: \ Extension planner for NoOp created an ExecutionPlan with mismatched schema. \ LogicalPlan schema: \ - DFSchema { inner: Schema { fields: \ + DFSchema { inner: QualifiedSchema { schema: Schema { fields: \ [Field { name: \"a\", \ data_type: Int32, \ nullable: false, \ dict_id: 0, \ dict_is_ordered: false, metadata: {} }], \ - metadata: {} }, field_qualifiers: [None], \ - functional_dependencies: FunctionalDependencies { deps: [] } }, \ + metadata: {} }, \ + field_qualifiers: [None] }, \ + functional_dependencies: FunctionalDependencies { deps: [] }, metadata: None }, \ ExecutionPlan schema: Schema { fields: \ [Field { name: \"b\", \ data_type: Int32, \ diff --git a/datafusion/core/tests/sql/metadata_columns.rs b/datafusion/core/tests/sql/metadata_columns.rs new file mode 100644 index 000000000000..9cabb1ad11f7 --- /dev/null +++ b/datafusion/core/tests/sql/metadata_columns.rs @@ -0,0 +1,341 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{self, Debug, Formatter}; +use std::sync::{Arc, Mutex}; + +use arrow::compute::concat_batches; +use arrow_array::{ArrayRef, UInt64Array}; +use arrow_schema::SchemaBuilder; +use async_trait::async_trait; +use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::file_format::csv::CsvSerializer; +use datafusion::datasource::file_format::write::BatchSerializer; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result; +use datafusion::execution::context::TaskContext; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::memory::MemoryStream; +use datafusion::physical_plan::{ + project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, +}; +use datafusion::prelude::*; + +use datafusion::catalog::Session; + +/// A User, with an id and a bank account +#[derive(Clone, Debug)] +struct User { + id: u8, + bank_account: u64, +} + +/// A custom datasource, used to represent a datastore with a single index +#[derive(Clone)] +pub struct CustomDataSource { + inner: Arc>, + metadata_columns: SchemaRef, +} + +struct CustomDataSourceInner { + data: Vec, +} + +impl Debug for CustomDataSource { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.write_str("custom_db") + } +} + +impl CustomDataSource { + pub(crate) async fn create_physical_plan( + &self, + projections: Option<&Vec>, + schema: SchemaRef, + ) -> Result> { + Ok(Arc::new(CustomExec::new(projections, schema, self.clone()))) + } + + pub(crate) fn populate_users(&self) { + self.add_user(User { + id: 1, + bank_account: 9_000, + }); + self.add_user(User { + id: 2, + bank_account: 100, + }); + self.add_user(User { + id: 3, + bank_account: 1_000, + }); + } + + fn add_user(&self, user: User) { + let mut inner = self.inner.lock().unwrap(); + inner.data.push(user); + } +} + +impl Default for CustomDataSource { + fn default() -> Self { + CustomDataSource { + inner: Arc::new(Mutex::new(CustomDataSourceInner { + data: Default::default(), + })), + metadata_columns: Arc::new(Schema::new(vec![Field::new( + "_rowid", + DataType::UInt64, + false, + )])), + } + } +} + +#[async_trait] +impl TableProvider for CustomDataSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + SchemaRef::new(Schema::new(vec![ + Field::new("id", DataType::UInt8, false), + Field::new("bank_account", DataType::UInt64, true), + ])) + } + + fn metadata_columns(&self) -> Option { + Some(self.metadata_columns.clone()) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option, + ) -> Result> { + let mut schema = self.schema(); + if let Some(metadata) = self.metadata_columns() { + let mut builder = SchemaBuilder::from(schema.as_ref()); + for f in metadata.fields.iter() { + builder.try_merge(f)?; + } + schema = Arc::new(builder.finish()); + } + return self.create_physical_plan(projection, schema).await; + } +} + +#[derive(Debug, Clone)] +struct CustomExec { + db: CustomDataSource, + projected_schema: SchemaRef, + cache: PlanProperties, +} + +impl CustomExec { + fn new( + projections: Option<&Vec>, + schema: SchemaRef, + db: CustomDataSource, + ) -> Self { + let projected_schema = project_schema(&schema, projections).unwrap(); + let cache = Self::compute_properties(projected_schema.clone()); + Self { + db, + projected_schema, + cache, + } + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + let eq_properties = EquivalenceProperties::new(schema); + PlanProperties::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } +} + +impl DisplayAs for CustomExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + write!(f, "CustomExec") + } +} + +impl ExecutionPlan for CustomExec { + fn name(&self) -> &'static str { + "CustomExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + let users: Vec = { + let db = self.db.inner.lock().unwrap(); + db.data.clone() + }; + + let mut id_array = UInt8Builder::with_capacity(users.len()); + let mut account_array = UInt64Builder::with_capacity(users.len()); + let len = users.len() as u64; + + for user in users { + id_array.append_value(user.id); + account_array.append_value(user.bank_account); + } + + let id_array = id_array.finish(); + let account_array = account_array.finish(); + let rowid_array = UInt64Array::from_iter_values(0_u64..len); + + let arrays = self + .projected_schema + .fields + .iter() + .map(|f| match f.name().as_str() { + "_rowid" => Arc::new(rowid_array.clone()) as ArrayRef, + "id" => Arc::new(id_array.clone()) as ArrayRef, + "bank_account" => Arc::new(account_array.clone()) as ArrayRef, + _ => panic!("cannot reach here"), + }) + .collect(); + + Ok(Box::pin(MemoryStream::try_new( + vec![RecordBatch::try_new(self.projected_schema.clone(), arrays)?], + self.schema(), + None, + )?)) + } +} + +#[tokio::test] +async fn select_metadata_column() { + // Verify SessionContext::with_sql_options errors appropriately + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_information_schema(true), + ); + let db = CustomDataSource::default(); + db.populate_users(); + // ctx.sql("CREATE TABLE test (x int)").await.unwrap(); + ctx.register_table("test", Arc::new(db)).unwrap(); + // disallow ddl + let options = SQLOptions::new().with_allow_ddl(false); + + let show_columns = "show columns from test;"; + let df_columns = ctx.sql_with_options(show_columns, options).await.unwrap(); + let all_batchs = df_columns + .select(vec![col("column_name"), col("data_type")]) + .unwrap() + .collect() + .await + .unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + assert_eq!(batch.num_rows(), 2); + let serializer = CsvSerializer::new().with_header(false); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "id,UInt8\nbank_account,UInt64\n"); + let select0 = "SELECT * FROM test order by id"; + let df0 = ctx.sql_with_options(select0, options).await.unwrap(); + assert!(!df0.schema().has_column_with_unqualified_name(&"_rowid")); + + let all_batchs = df0.collect().await.unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "1,9000\n2,100\n3,1000\n"); + + let select1 = "SELECT _rowid FROM test order by _rowid"; + let df1 = ctx.sql_with_options(select1, options).await.unwrap(); + assert_eq!(df1.schema().field_names(), vec!["test._rowid"]); + + let all_batchs = df1.collect().await.unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "0\n1\n2\n"); + + let select2 = "SELECT _rowid, id FROM test order by _rowid"; + let df2 = ctx.sql_with_options(select2, options).await.unwrap(); + assert_eq!(df2.schema().field_names(), vec!["test._rowid", "test.id"]); + + let all_batchs = df2.collect().await.unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "0,1\n1,2\n2,3\n"); + + let select3 = "SELECT _rowid, id FROM test WHERE _rowid = 0"; + let df3 = ctx.sql_with_options(select3, options).await.unwrap(); + assert_eq!(df3.schema().field_names(), vec!["test._rowid", "test.id"]); + + let all_batchs = df3.collect().await.unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "0,1\n"); + + let select4 = "SELECT _rowid FROM test LIMIT 1"; + let df4 = ctx.sql_with_options(select4, options).await.unwrap(); + assert_eq!(df4.schema().field_names(), vec!["test._rowid"]); + + let all_batchs = df4.collect().await.unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "0\n"); + + let select5 = "SELECT _rowid, id FROM test WHERE _rowid % 2 = 1"; + let df5 = ctx.sql_with_options(select5, options).await.unwrap(); + assert_eq!(df5.schema().field_names(), vec!["test._rowid", "test.id"]); + + let all_batchs = df5.collect().await.unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "1,2\n"); +} diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 03c4ad7c013e..c4ec9c516d1e 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -61,6 +61,7 @@ pub mod aggregates; pub mod create_drop; pub mod explain_analyze; pub mod joins; +mod metadata_columns; mod path_partition; pub mod select; mod sql_api; diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index c7cff3ac26b1..a96f5d285e7a 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1444,7 +1444,14 @@ pub fn build_join_schema( .into_iter() .chain(right.metadata().clone()) .collect(); - let dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?; + let metadata_schema = match join_type { + JoinType::LeftMark => left.metadata_schema(), + _ => &None, + }; + let mut dfschema = DFSchema::new_with_metadata(qualified_fields, metadata)?; + if let Some(metadata_schema) = metadata_schema { + dfschema = dfschema.with_metadata_schema(Some(metadata_schema.clone())); + } dfschema.with_functional_dependencies(func_dependencies) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 24fb0609b0fe..4cffccfdb195 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -55,8 +55,8 @@ use datafusion_common::tree_node::{ use datafusion_common::{ aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependence, - FunctionalDependencies, ParamValues, Result, ScalarValue, TableReference, - UnnestOptions, + FunctionalDependencies, ParamValues, QualifiedSchema, Result, ScalarValue, + TableReference, UnnestOptions, }; use indexmap::IndexSet; @@ -370,6 +370,22 @@ impl LogicalPlan { } } + pub fn metadata_schema(&self) -> &Option { + match self { + LogicalPlan::TableScan(TableScan { + projected_schema, .. + }) => projected_schema.metadata_schema(), + LogicalPlan::Join(Join { schema, .. }) => schema.metadata_schema(), + LogicalPlan::Projection(Projection { schema, .. }) => { + schema.metadata_schema() + } + LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => { + schema.metadata_schema() + } + _ => &None, + } + } + /// Returns the (fixed) output schema for explain plans pub fn explain_schema() -> SchemaRef { SchemaRef::new(Schema::new(vec![ @@ -2206,9 +2222,11 @@ impl SubqueryAlias { // Since schema is the same, other than qualifier, we can use existing // functional dependencies: let func_dependencies = plan.schema().functional_dependencies().clone(); + let schema = DFSchemaRef::new( DFSchema::try_from_qualified_schema(alias.clone(), &schema)? - .with_functional_dependencies(func_dependencies)?, + .with_functional_dependencies(func_dependencies)? + .with_metadata_schema(plan.metadata_schema().clone()), ); Ok(SubqueryAlias { input: plan, @@ -2591,7 +2609,8 @@ impl TableScan { table_source.constraints(), schema.fields.len(), ); - let projected_schema = projection + let metadata = table_source.metadata_columns(); + let mut projected_schema = projection .as_ref() .map(|p| { let projected_func_dependencies = @@ -2600,6 +2619,18 @@ impl TableScan { let df_schema = DFSchema::new_with_metadata( p.iter() .map(|i| { + if *i >= schema.fields.len() { + if let Some(metadata) = &metadata { + return ( + Some(table_name.clone()), + Arc::new( + metadata + .field(*i - schema.fields.len()) + .clone(), + ), + ); + } + } (Some(table_name.clone()), Arc::new(schema.field(*i).clone())) }) .collect(), @@ -2612,6 +2643,11 @@ impl TableScan { DFSchema::try_from_qualified_schema(table_name.clone(), &schema)?; df_schema.with_functional_dependencies(func_dependencies) })?; + if let Some(metadata) = metadata { + projected_schema = projected_schema.with_metadata_schema(Some( + QualifiedSchema::new_with_table(metadata, &table_name), + )); + } let projected_schema = Arc::new(projected_schema); Ok(Self { diff --git a/datafusion/expr/src/table_source.rs b/datafusion/expr/src/table_source.rs index d62484153f53..740eca12d334 100644 --- a/datafusion/expr/src/table_source.rs +++ b/datafusion/expr/src/table_source.rs @@ -88,6 +88,11 @@ pub trait TableSource: Sync + Send { /// Get a reference to the schema for this table fn schema(&self) -> SchemaRef; + /// Get metadata columns of this table. + fn metadata_columns(&self) -> Option { + None + } + /// Get primary key indices, if one exists. fn constraints(&self) -> Option<&Constraints> { None diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index b229d908d10d..06dd326b5dfe 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -98,6 +98,7 @@ use datafusion::error::Result; pub struct FFI_TableProvider { /// Return the table schema pub schema: unsafe extern "C" fn(provider: &Self) -> WrappedSchema, + pub metadata_columns: unsafe extern "C" fn(provider: &Self) -> ROption, /// Perform a scan on the table. See [`TableProvider`] for detailed usage information. /// @@ -158,6 +159,15 @@ unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedS provider.schema().into() } +unsafe extern "C" fn metadata_columns_fn_wrapper( + provider: &FFI_TableProvider, +) -> ROption { + let private_data = provider.private_data as *const ProviderPrivateData; + let provider = &(*private_data).provider; + + provider.metadata_columns().map(|s| s.into()).into() +} + unsafe extern "C" fn table_type_fn_wrapper( provider: &FFI_TableProvider, ) -> FFI_TableType { @@ -280,6 +290,7 @@ unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_Table FFI_TableProvider { schema: schema_fn_wrapper, + metadata_columns: metadata_columns_fn_wrapper, scan: scan_fn_wrapper, table_type: table_type_fn_wrapper, supports_filters_pushdown: provider.supports_filters_pushdown, @@ -305,6 +316,7 @@ impl FFI_TableProvider { Self { schema: schema_fn_wrapper, + metadata_columns: metadata_columns_fn_wrapper, scan: scan_fn_wrapper, table_type: table_type_fn_wrapper, supports_filters_pushdown: match can_support_pushdown_filters { diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 49bce3c1ce82..bfaf84d272b6 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -550,21 +550,22 @@ mod tests { Check optimizer-specific invariants after optimizer rule: get table_scan rule\n\ caused by\n\ Internal error: Failed due to a difference in schemas, \ - original schema: DFSchema { inner: Schema { \ + original schema: DFSchema { inner: QualifiedSchema { schema: Schema { \ fields: [], \ metadata: {} }, \ - field_qualifiers: [], \ - functional_dependencies: FunctionalDependencies { deps: [] } \ + field_qualifiers: [] }, \ + functional_dependencies: FunctionalDependencies { deps: [] }, \ + metadata: None \ }, \ - new schema: DFSchema { inner: Schema { \ + new schema: DFSchema { inner: QualifiedSchema { schema: Schema { \ fields: [\ Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \ Field { name: \"b\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, \ Field { name: \"c\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }\ ], \ metadata: {} }, \ - field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" })], \ - functional_dependencies: FunctionalDependencies { deps: [] } }", + field_qualifiers: [Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" }), Some(Bare { table: \"test\" })] }, \ + functional_dependencies: FunctionalDependencies { deps: [] }, metadata: None }", )); }