Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move information_schema to datafusion-catalog #14364

Merged
merged 7 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion datafusion/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ rust-version.workspace = true
version.workspace = true

[dependencies]
arrow-schema = { workspace = true }
arrow = { workspace = true }
async-trait = { workspace = true }
dashmap = { workspace = true }
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
sqlparser = { workspace = true }

[dev-dependencies]
tokio = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ mod tests {
},
};

use arrow_schema::SchemaRef;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::{error::Result, Statistics, TableReference};
use datafusion_execution::config::SessionConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,29 @@
//!
//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema

use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider};
use crate::datasource::streaming::StreamingTable;
use crate::execution::context::TaskContext;
use crate::logical_expr::{TableType, Volatility};
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::SendableRecordBatchStream;
use crate::{
config::{ConfigEntry, ConfigOptions},
physical_plan::streaming::PartitionStream,
};
use crate::streaming::StreamingTable;
use crate::{CatalogProviderList, SchemaProvider, TableProvider};
use arrow::array::builder::{BooleanBuilder, UInt8Builder};
use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use arrow_array::builder::{BooleanBuilder, UInt8Builder};
use async_trait::async_trait;
use datafusion_common::config::{ConfigEntry, ConfigOptions};
use datafusion_common::error::Result;
use datafusion_common::DataFusionError;
use datafusion_execution::TaskContext;
use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
use datafusion_expr::{TableType, Volatility};
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::streaming::PartitionStream;
use datafusion_physical_plan::SendableRecordBatchStream;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::{any::Any, sync::Arc};

pub(crate) const INFORMATION_SCHEMA: &str = "information_schema";
pub const INFORMATION_SCHEMA: &str = "information_schema";
pub(crate) const TABLES: &str = "tables";
pub(crate) const VIEWS: &str = "views";
pub(crate) const COLUMNS: &str = "columns";
Expand Down
243 changes: 242 additions & 1 deletion datafusion/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,264 @@
//! Interfaces and default implementations of catalogs and schemas.
//!
//! Implementations
//! * Information schema: [`information_schema`]
//! * Simple memory based catalog: [`MemoryCatalogProviderList`], [`MemoryCatalogProvider`], [`MemorySchemaProvider`]
pub mod memory;
pub use datafusion_sql::{ResolvedTableReference, TableReference};
pub use memory::{
MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
};
use std::collections::BTreeSet;
use std::ops::ControlFlow;

mod r#async;
mod catalog;
mod dynamic_file;
pub mod information_schema;
mod schema;
mod session;
mod table;

pub use catalog::*;
pub use dynamic_file::catalog::*;
pub use r#async::*;
pub use schema::*;
pub use session::*;
pub use table::*;
pub mod streaming;

/// Collects all tables and views referenced in the SQL statement. CTEs are collected separately.
/// This can be used to determine which tables need to be in the catalog for a query to be planned.
///
/// # Returns
///
/// A `(table_refs, ctes)` tuple, the first element contains table and view references and the second
/// element contains any CTE aliases that were defined and possibly referenced.
///
/// ## Example
///
/// ```
/// # use datafusion_sql::parser::DFParser;
/// # use datafusion_catalog::resolve_table_references;
/// let query = "SELECT a FROM foo where x IN (SELECT y FROM bar)";
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
/// assert_eq!(table_refs.len(), 2);
/// assert_eq!(table_refs[0].to_string(), "bar");
/// assert_eq!(table_refs[1].to_string(), "foo");
/// assert_eq!(ctes.len(), 0);
/// ```
///
/// ## Example with CTEs
///
/// ```
/// # use datafusion_sql::parser::DFParser;
/// # use datafusion_catalog::resolve_table_references;
/// let query = "with my_cte as (values (1), (2)) SELECT * from my_cte;";
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
/// assert_eq!(table_refs.len(), 0);
/// assert_eq!(ctes.len(), 1);
/// assert_eq!(ctes[0].to_string(), "my_cte");
/// ```
pub fn resolve_table_references(
statement: &datafusion_sql::parser::Statement,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think since this function belongs in datafusion-sql as it is walking over the sql parse tree

perhaps we can put it in https://github.com/apache/datafusion/tree/main/datafusion/sql/src/resolve.rs or something and then remove the dependency of datafusion-catalog on datafusion-sql

I think we could do this in a follow on PR as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do that then since I might not be able to commit over the next few days due to some personal commitments.

enable_ident_normalization: bool,
) -> datafusion_common::Result<(Vec<TableReference>, Vec<TableReference>)> {
use datafusion_sql::parser::{
CopyToSource, CopyToStatement, Statement as DFStatement,
};
use datafusion_sql::planner::object_name_to_table_reference;
use information_schema::INFORMATION_SCHEMA;
use information_schema::INFORMATION_SCHEMA_TABLES;
use sqlparser::ast::*;

struct RelationVisitor {
relations: BTreeSet<ObjectName>,
all_ctes: BTreeSet<ObjectName>,
ctes_in_scope: Vec<ObjectName>,
}

impl RelationVisitor {
/// Record the reference to `relation`, if it's not a CTE reference.
fn insert_relation(&mut self, relation: &ObjectName) {
if !self.relations.contains(relation)
&& !self.ctes_in_scope.contains(relation)
{
self.relations.insert(relation.clone());
}
}
}

impl Visitor for RelationVisitor {
type Break = ();

fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> {
self.insert_relation(relation);
ControlFlow::Continue(())
}

fn pre_visit_query(&mut self, q: &Query) -> ControlFlow<Self::Break> {
if let Some(with) = &q.with {
for cte in &with.cte_tables {
// The non-recursive CTE name is not in scope when evaluating the CTE itself, so this is valid:
// `WITH t AS (SELECT * FROM t) SELECT * FROM t`
// Where the first `t` refers to a predefined table. So we are careful here
// to visit the CTE first, before putting it in scope.
if !with.recursive {
// This is a bit hackish as the CTE will be visited again as part of visiting `q`,
// but thankfully `insert_relation` is idempotent.
cte.visit(self);
}
self.ctes_in_scope
.push(ObjectName(vec![cte.alias.name.clone()]));
}
}
ControlFlow::Continue(())
}

fn post_visit_query(&mut self, q: &Query) -> ControlFlow<Self::Break> {
if let Some(with) = &q.with {
for _ in &with.cte_tables {
// Unwrap: We just pushed these in `pre_visit_query`
self.all_ctes.insert(self.ctes_in_scope.pop().unwrap());
}
}
ControlFlow::Continue(())
}

fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> {
if let Statement::ShowCreate {
obj_type: ShowCreateObject::Table | ShowCreateObject::View,
obj_name,
} = statement
{
self.insert_relation(obj_name)
}

// SHOW statements will later be rewritten into a SELECT from the information_schema
let requires_information_schema = matches!(
statement,
Statement::ShowFunctions { .. }
| Statement::ShowVariable { .. }
| Statement::ShowStatus { .. }
| Statement::ShowVariables { .. }
| Statement::ShowCreate { .. }
| Statement::ShowColumns { .. }
| Statement::ShowTables { .. }
| Statement::ShowCollation { .. }
);
if requires_information_schema {
for s in INFORMATION_SCHEMA_TABLES {
self.relations.insert(ObjectName(vec![
Ident::new(INFORMATION_SCHEMA),
Ident::new(*s),
]));
}
}
ControlFlow::Continue(())
}
}

let mut visitor = RelationVisitor {
relations: BTreeSet::new(),
all_ctes: BTreeSet::new(),
ctes_in_scope: vec![],
};

fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor) {
match statement {
DFStatement::Statement(s) => {
let _ = s.as_ref().visit(visitor);
}
DFStatement::CreateExternalTable(table) => {
visitor.relations.insert(table.name.clone());
}
DFStatement::CopyTo(CopyToStatement { source, .. }) => match source {
CopyToSource::Relation(table_name) => {
visitor.insert_relation(table_name);
}
CopyToSource::Query(query) => {
query.visit(visitor);
}
},
DFStatement::Explain(explain) => visit_statement(&explain.statement, visitor),
}
}

visit_statement(statement, &mut visitor);

let table_refs = visitor
.relations
.into_iter()
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
.collect::<datafusion_common::Result<_>>()?;
let ctes = visitor
.all_ctes
.into_iter()
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
.collect::<datafusion_common::Result<_>>()?;
Ok((table_refs, ctes))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn resolve_table_references_shadowed_cte() {
use datafusion_sql::parser::DFParser;

// An interesting edge case where the `t` name is used both as an ordinary table reference
// and as a CTE reference.
let query = "WITH t AS (SELECT * FROM t) SELECT * FROM t";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 1);
assert_eq!(ctes.len(), 1);
assert_eq!(ctes[0].to_string(), "t");
assert_eq!(table_refs[0].to_string(), "t");

// UNION is a special case where the CTE is not in scope for the second branch.
let query = "(with t as (select 1) select * from t) union (select * from t)";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 1);
assert_eq!(ctes.len(), 1);
assert_eq!(ctes[0].to_string(), "t");
assert_eq!(table_refs[0].to_string(), "t");

// Nested CTEs are also handled.
// Here the first `u` is a CTE, but the second `u` is a table reference.
// While `t` is always a CTE.
let query = "(with t as (with u as (select 1) select * from u) select * from u cross join t)";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 1);
assert_eq!(ctes.len(), 2);
assert_eq!(ctes[0].to_string(), "t");
assert_eq!(ctes[1].to_string(), "u");
assert_eq!(table_refs[0].to_string(), "u");
}

#[test]
fn resolve_table_references_recursive_cte() {
use datafusion_sql::parser::DFParser;

let query = "
WITH RECURSIVE nodes AS (
SELECT 1 as id
UNION ALL
SELECT id + 1 as id
FROM nodes
WHERE id < 10
)
SELECT * FROM nodes
";
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
assert_eq!(table_refs.len(), 0);
assert_eq!(ctes.len(), 1);
assert_eq!(ctes[0].to_string(), "nodes");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;

use crate::datasource::TableProvider;
use crate::physical_plan::streaming::{PartitionStream, StreamingTableExec};
use crate::physical_plan::ExecutionPlan;
use datafusion_catalog::Session;
use crate::Session;
use crate::TableProvider;
use datafusion_common::{plan_err, Result};
use datafusion_expr::{Expr, TableType};
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::ExecutionPlan;
use log::debug;

/// A [`TableProvider`] that streams a set of [`PartitionStream`]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::fmt::Debug;
use std::sync::Arc;

use crate::session::Session;
use arrow_schema::SchemaRef;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Constraints, Statistics};
Expand Down
Loading
Loading