diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 84354c8c0e9a..c9b0045a6ec9 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1236,7 +1236,6 @@ dependencies = [ "bytes", "bzip2 0.5.0", "chrono", - "dashmap", "datafusion-catalog", "datafusion-common", "datafusion-common-runtime", @@ -1280,10 +1279,12 @@ version = "44.0.0" dependencies = [ "arrow-schema", "async-trait", + "dashmap", "datafusion-common", "datafusion-execution", "datafusion-expr", "datafusion-physical-plan", + "itertools 0.14.0", "parking_lot", ] diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml index 32a87cc7611c..bcc258c7a7f4 100644 --- a/datafusion/catalog/Cargo.toml +++ b/datafusion/catalog/Cargo.toml @@ -30,10 +30,12 @@ version.workspace = true [dependencies] arrow-schema = { workspace = true } async-trait = { workspace = true } +dashmap = { workspace = true } datafusion-common = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-plan = { workspace = true } +itertools = { workspace = true } parking_lot = { workspace = true } [dev-dependencies] diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs index 3cf2a3b3cd33..28410eb76fab 100644 --- a/datafusion/catalog/src/lib.rs +++ b/datafusion/catalog/src/lib.rs @@ -15,6 +15,16 @@ // specific language governing permissions and limitations // under the License. +//! Interfaces and default implementations of catalogs and schemas. +//! +//! Implementations +//! * Simple memory based catalog: [`MemoryCatalogProviderList`], [`MemoryCatalogProvider`], [`MemorySchemaProvider`] + +pub mod memory; +pub use memory::{ + MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider, +}; + mod r#async; mod catalog; mod dynamic_file; diff --git a/datafusion/core/src/catalog_common/memory.rs b/datafusion/catalog/src/memory.rs similarity index 51% rename from datafusion/core/src/catalog_common/memory.rs rename to datafusion/catalog/src/memory.rs index 6cdefc31f18c..d22a98d3d064 100644 --- a/datafusion/core/src/catalog_common/memory.rs +++ b/datafusion/catalog/src/memory.rs @@ -18,9 +18,7 @@ //! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory //! implementations of [`CatalogProviderList`] and [`CatalogProvider`]. -use crate::catalog::{ - CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider, -}; +use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; use async_trait::async_trait; use dashmap::DashMap; use datafusion_common::{exec_err, DataFusionError}; @@ -200,156 +198,3 @@ impl SchemaProvider for MemorySchemaProvider { self.tables.contains_key(name) } } - -#[cfg(test)] -mod test { - use super::*; - use crate::catalog::CatalogProvider; - use crate::catalog_common::memory::MemorySchemaProvider; - use crate::datasource::empty::EmptyTable; - use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl}; - use crate::prelude::SessionContext; - use arrow_schema::Schema; - use datafusion_common::assert_batches_eq; - use std::any::Any; - use std::sync::Arc; - - #[test] - fn memory_catalog_dereg_nonempty_schema() { - let cat = Arc::new(MemoryCatalogProvider::new()) as Arc; - - let schema = Arc::new(MemorySchemaProvider::new()) as Arc; - let test_table = Arc::new(EmptyTable::new(Arc::new(Schema::empty()))) - as Arc; - schema.register_table("t".into(), test_table).unwrap(); - - cat.register_schema("foo", schema.clone()).unwrap(); - - assert!( - cat.deregister_schema("foo", false).is_err(), - "dropping empty schema without cascade should error" - ); - assert!(cat.deregister_schema("foo", true).unwrap().is_some()); - } - - #[test] - fn memory_catalog_dereg_empty_schema() { - let cat = Arc::new(MemoryCatalogProvider::new()) as Arc; - - let schema = Arc::new(MemorySchemaProvider::new()) as Arc; - cat.register_schema("foo", schema).unwrap(); - - assert!(cat.deregister_schema("foo", false).unwrap().is_some()); - } - - #[test] - fn memory_catalog_dereg_missing() { - let cat = Arc::new(MemoryCatalogProvider::new()) as Arc; - assert!(cat.deregister_schema("foo", false).unwrap().is_none()); - } - - #[test] - fn default_register_schema_not_supported() { - // mimic a new CatalogProvider and ensure it does not support registering schemas - #[derive(Debug)] - struct TestProvider {} - impl CatalogProvider for TestProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Vec { - unimplemented!() - } - - fn schema(&self, _name: &str) -> Option> { - unimplemented!() - } - } - - let schema = Arc::new(MemorySchemaProvider::new()) as Arc; - let catalog = Arc::new(TestProvider {}); - - match catalog.register_schema("foo", schema) { - Ok(_) => panic!("unexpected OK"), - Err(e) => assert_eq!(e.strip_backtrace(), "This feature is not implemented: Registering new schemas is not supported"), - }; - } - - #[tokio::test] - async fn test_mem_provider() { - let provider = MemorySchemaProvider::new(); - let table_name = "test_table_exist"; - assert!(!provider.table_exist(table_name)); - assert!(provider.deregister_table(table_name).unwrap().is_none()); - let test_table = EmptyTable::new(Arc::new(Schema::empty())); - // register table successfully - assert!(provider - .register_table(table_name.to_string(), Arc::new(test_table)) - .unwrap() - .is_none()); - assert!(provider.table_exist(table_name)); - let other_table = EmptyTable::new(Arc::new(Schema::empty())); - let result = - provider.register_table(table_name.to_string(), Arc::new(other_table)); - assert!(result.is_err()); - } - - #[tokio::test] - async fn test_schema_register_listing_table() { - let testdata = crate::test_util::parquet_test_data(); - let testdir = if testdata.starts_with('/') { - format!("file://{testdata}") - } else { - format!("file:///{testdata}") - }; - let filename = if testdir.ends_with('/') { - format!("{}{}", testdir, "alltypes_plain.parquet") - } else { - format!("{}/{}", testdir, "alltypes_plain.parquet") - }; - - let table_path = ListingTableUrl::parse(filename).unwrap(); - - let catalog = MemoryCatalogProvider::new(); - let schema = MemorySchemaProvider::new(); - - let ctx = SessionContext::new(); - - let config = ListingTableConfig::new(table_path) - .infer(&ctx.state()) - .await - .unwrap(); - let table = ListingTable::try_new(config).unwrap(); - - schema - .register_table("alltypes_plain".to_string(), Arc::new(table)) - .unwrap(); - - catalog.register_schema("active", Arc::new(schema)).unwrap(); - ctx.register_catalog("cat", Arc::new(catalog)); - - let df = ctx - .sql("SELECT id, bool_col FROM cat.active.alltypes_plain") - .await - .unwrap(); - - let actual = df.collect().await.unwrap(); - - let expected = [ - "+----+----------+", - "| id | bool_col |", - "+----+----------+", - "| 4 | true |", - "| 5 | false |", - "| 6 | true |", - "| 7 | false |", - "| 2 | true |", - "| 3 | false |", - "| 0 | true |", - "| 1 | false |", - "+----+----------+", - ]; - assert_batches_eq!(expected, &actual); - } -} diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 149bf8beb96e..c9b059ad0f40 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -99,7 +99,6 @@ async-trait = { workspace = true } bytes = { workspace = true } bzip2 = { version = "0.5.0", optional = true } chrono = { workspace = true } -dashmap = { workspace = true } datafusion-catalog = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } datafusion-common-runtime = { workspace = true } diff --git a/datafusion/core/src/catalog_common/mod.rs b/datafusion/core/src/catalog_common/mod.rs index 68c78dda4899..45fb6ddae1d6 100644 --- a/datafusion/core/src/catalog_common/mod.rs +++ b/datafusion/core/src/catalog_common/mod.rs @@ -18,18 +18,12 @@ //! Interfaces and default implementations of catalogs and schemas. //! //! Implementations -//! * Simple memory based catalog: [`MemoryCatalogProviderList`], [`MemoryCatalogProvider`], [`MemorySchemaProvider`] //! * Information schema: [`information_schema`] //! * Listing schema: [`listing_schema`] pub mod information_schema; pub mod listing_schema; -pub mod memory; - pub use crate::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider}; -pub use memory::{ - MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider, -}; pub use datafusion_sql::{ResolvedTableReference, TableReference}; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index e377dd6297ce..a0aa6447871c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -17,6 +17,8 @@ //! [`SessionContext`] API for registering data sources and executing queries +use datafusion_catalog::memory::MemorySchemaProvider; +use datafusion_catalog::MemoryCatalogProvider; use std::collections::HashSet; use std::fmt::Debug; use std::sync::{Arc, Weak}; @@ -27,8 +29,6 @@ use crate::{ CatalogProvider, CatalogProviderList, TableProvider, TableProviderFactory, }, catalog_common::listing_schema::ListingSchemaProvider, - catalog_common::memory::MemorySchemaProvider, - catalog_common::MemoryCatalogProvider, dataframe::DataFrame, datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 54d505e1b4b9..6c3349625f04 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -27,13 +27,13 @@ use crate::catalog::{CatalogProviderList, SchemaProvider, TableProviderFactory}; use crate::catalog_common::information_schema::{ InformationSchemaProvider, INFORMATION_SCHEMA, }; -use crate::catalog_common::MemoryCatalogProviderList; use crate::datasource::cte_worktable::CteWorkTable; use crate::datasource::file_format::{format_as_file_type, FileFormatFactory}; use crate::datasource::provider_as_source; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; use crate::execution::SessionStateDefaults; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; +use datafusion_catalog::MemoryCatalogProviderList; use arrow_schema::{DataType, SchemaRef}; use datafusion_catalog::{Session, TableFunction, TableFunctionImpl}; @@ -1987,11 +1987,11 @@ pub(crate) struct PreparedPlan { #[cfg(test)] mod tests { use super::{SessionContextProvider, SessionStateBuilder}; - use crate::catalog_common::MemoryCatalogProviderList; use crate::datasource::MemTable; use crate::execution::context::SessionState; use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; + use datafusion_catalog::MemoryCatalogProviderList; use datafusion_common::DFSchema; use datafusion_common::Result; use datafusion_execution::config::SessionConfig; diff --git a/datafusion/core/src/execution/session_state_defaults.rs b/datafusion/core/src/execution/session_state_defaults.rs index 106082bc7b3b..92f649781cfd 100644 --- a/datafusion/core/src/execution/session_state_defaults.rs +++ b/datafusion/core/src/execution/session_state_defaults.rs @@ -17,7 +17,6 @@ use crate::catalog::{CatalogProvider, TableProviderFactory}; use crate::catalog_common::listing_schema::ListingSchemaProvider; -use crate::catalog_common::{MemoryCatalogProvider, MemorySchemaProvider}; use crate::datasource::file_format::arrow::ArrowFormatFactory; use crate::datasource::file_format::avro::AvroFormatFactory; use crate::datasource::file_format::csv::CsvFormatFactory; @@ -31,6 +30,7 @@ use crate::execution::context::SessionState; use crate::functions_nested; use crate::{functions, functions_aggregate, functions_table, functions_window}; use datafusion_catalog::TableFunction; +use datafusion_catalog::{MemoryCatalogProvider, MemorySchemaProvider}; use datafusion_execution::config::SessionConfig; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; diff --git a/datafusion/core/tests/catalog/memory.rs b/datafusion/core/tests/catalog/memory.rs new file mode 100644 index 000000000000..bef23fff3e96 --- /dev/null +++ b/datafusion/core/tests/catalog/memory.rs @@ -0,0 +1,170 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::Schema; +use datafusion::catalog::CatalogProvider; +use datafusion::datasource::empty::EmptyTable; +use datafusion::datasource::listing::{ + ListingTable, ListingTableConfig, ListingTableUrl, +}; +use datafusion::prelude::SessionContext; +use datafusion_catalog::memory::*; +use datafusion_catalog::{SchemaProvider, TableProvider}; +use datafusion_common::assert_batches_eq; +use std::any::Any; +use std::sync::Arc; + +#[test] +fn memory_catalog_dereg_nonempty_schema() { + let cat = Arc::new(MemoryCatalogProvider::new()) as Arc; + + let schema = Arc::new(MemorySchemaProvider::new()) as Arc; + let test_table = + Arc::new(EmptyTable::new(Arc::new(Schema::empty()))) as Arc; + schema.register_table("t".into(), test_table).unwrap(); + + cat.register_schema("foo", schema.clone()).unwrap(); + + assert!( + cat.deregister_schema("foo", false).is_err(), + "dropping empty schema without cascade should error" + ); + assert!(cat.deregister_schema("foo", true).unwrap().is_some()); +} + +#[test] +fn memory_catalog_dereg_empty_schema() { + let cat = Arc::new(MemoryCatalogProvider::new()) as Arc; + + let schema = Arc::new(MemorySchemaProvider::new()) as Arc; + cat.register_schema("foo", schema).unwrap(); + + assert!(cat.deregister_schema("foo", false).unwrap().is_some()); +} + +#[test] +fn memory_catalog_dereg_missing() { + let cat = Arc::new(MemoryCatalogProvider::new()) as Arc; + assert!(cat.deregister_schema("foo", false).unwrap().is_none()); +} + +#[test] +fn default_register_schema_not_supported() { + // mimic a new CatalogProvider and ensure it does not support registering schemas + #[derive(Debug)] + struct TestProvider {} + impl CatalogProvider for TestProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + unimplemented!() + } + + fn schema(&self, _name: &str) -> Option> { + unimplemented!() + } + } + + let schema = Arc::new(MemorySchemaProvider::new()) as Arc; + let catalog = Arc::new(TestProvider {}); + + match catalog.register_schema("foo", schema) { + Ok(_) => panic!("unexpected OK"), + Err(e) => assert_eq!( + e.strip_backtrace(), + "This feature is not implemented: Registering new schemas is not supported" + ), + }; +} + +#[tokio::test] +async fn test_mem_provider() { + let provider = MemorySchemaProvider::new(); + let table_name = "test_table_exist"; + assert!(!provider.table_exist(table_name)); + assert!(provider.deregister_table(table_name).unwrap().is_none()); + let test_table = EmptyTable::new(Arc::new(Schema::empty())); + // register table successfully + assert!(provider + .register_table(table_name.to_string(), Arc::new(test_table)) + .unwrap() + .is_none()); + assert!(provider.table_exist(table_name)); + let other_table = EmptyTable::new(Arc::new(Schema::empty())); + let result = provider.register_table(table_name.to_string(), Arc::new(other_table)); + assert!(result.is_err()); +} + +#[tokio::test] +async fn test_schema_register_listing_table() { + let testdata = datafusion::test_util::parquet_test_data(); + let testdir = if testdata.starts_with('/') { + format!("file://{testdata}") + } else { + format!("file:///{testdata}") + }; + let filename = if testdir.ends_with('/') { + format!("{}{}", testdir, "alltypes_plain.parquet") + } else { + format!("{}/{}", testdir, "alltypes_plain.parquet") + }; + + let table_path = ListingTableUrl::parse(filename).unwrap(); + + let catalog = MemoryCatalogProvider::new(); + let schema = MemorySchemaProvider::new(); + + let ctx = SessionContext::new(); + + let config = ListingTableConfig::new(table_path) + .infer(&ctx.state()) + .await + .unwrap(); + let table = ListingTable::try_new(config).unwrap(); + + schema + .register_table("alltypes_plain".to_string(), Arc::new(table)) + .unwrap(); + + catalog.register_schema("active", Arc::new(schema)).unwrap(); + ctx.register_catalog("cat", Arc::new(catalog)); + + let df = ctx + .sql("SELECT id, bool_col FROM cat.active.alltypes_plain") + .await + .unwrap(); + + let actual = df.collect().await.unwrap(); + + let expected = [ + "+----+----------+", + "| id | bool_col |", + "+----+----------+", + "| 4 | true |", + "| 5 | false |", + "| 6 | true |", + "| 7 | false |", + "| 2 | true |", + "| 3 | false |", + "| 0 | true |", + "| 1 | false |", + "+----+----------+", + ]; + assert_batches_eq!(expected, &actual); +} diff --git a/datafusion/core/tests/catalog/mod.rs b/datafusion/core/tests/catalog/mod.rs new file mode 100644 index 000000000000..e86118afaecf --- /dev/null +++ b/datafusion/core/tests/catalog/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod memory; diff --git a/datafusion/core/tests/core_integration.rs b/datafusion/core/tests/core_integration.rs index 93f66282333d..66b4103160e7 100644 --- a/datafusion/core/tests/core_integration.rs +++ b/datafusion/core/tests/core_integration.rs @@ -44,6 +44,8 @@ mod optimizer; mod physical_optimizer; +mod catalog; + #[cfg(test)] #[ctor::ctor] fn init() { diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 38fe14ab90b7..41b96e341074 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -42,6 +42,7 @@ bytes = { workspace = true, optional = true } chrono = { workspace = true, optional = true } clap = { version = "4.5.16", features = ["derive", "env"] } datafusion = { workspace = true, default-features = true, features = ["avro"] } +datafusion-catalog = { workspace = true, default-features = true } datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } futures = { workspace = true } diff --git a/datafusion/sqllogictest/src/test_context.rs b/datafusion/sqllogictest/src/test_context.rs index 2466303c32a9..f7c9346a8983 100644 --- a/datafusion/sqllogictest/src/test_context.rs +++ b/datafusion/sqllogictest/src/test_context.rs @@ -31,11 +31,11 @@ use datafusion::logical_expr::{create_udf, ColumnarValue, Expr, ScalarUDF, Volat use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionConfig; use datafusion::{ - catalog::CatalogProvider, - catalog_common::{memory::MemoryCatalogProvider, memory::MemorySchemaProvider}, datasource::{MemTable, TableProvider, TableType}, prelude::{CsvReadOptions, SessionContext}, }; +use datafusion_catalog::CatalogProvider; +use datafusion_catalog::{memory::MemoryCatalogProvider, memory::MemorySchemaProvider}; use datafusion_common::cast::as_float64_array; use datafusion_common::DataFusionError;