Skip to content

Commit

Permalink
move dynamic file catalog to datafusion-catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Aug 15, 2024
1 parent 51b1d41 commit 75b0b84
Show file tree
Hide file tree
Showing 14 changed files with 385 additions and 321 deletions.
6 changes: 6 additions & 0 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ async fn main() -> Result<()> {
.await?;
parquet_df.describe().await.unwrap().show().await?;

let dyn_ctx = ctx.enable_url_table();
let df = dyn_ctx
.sql(&format!("SELECT * FROM '{}'", file_path.to_str().unwrap()))
.await?;
df.show().await?;

Ok(())
}

Expand Down
5 changes: 5 additions & 0 deletions datafusion/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@ repository.workspace = true
rust-version.workspace = true
version.workspace = true

[features]
home_dir= ["dep:dirs"]

[dependencies]
arrow-schema = { workspace = true }
async-trait = "0.1.41"
datafusion-common = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
dirs = { version = "4.0.0", optional = true }
parking_lot = { workspace = true }

[lints]
workspace = true
241 changes: 241 additions & 0 deletions datafusion/catalog/src/dynamic_file/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! dynamic_file_schema contains a SchemaProvider that creates tables from file paths
use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
use async_trait::async_trait;
#[cfg(feature = "home_dir")]
use dirs::home_dir;
use std::any::Any;
use std::sync::Arc;

pub struct DynamicFileCatalog {
inner: Arc<dyn CatalogProviderList>,
factory: Arc<dyn UrlTableFactory>,
}

impl DynamicFileCatalog {
pub fn new(
inner: Arc<dyn CatalogProviderList>,
factory: Arc<dyn UrlTableFactory>,
) -> Self {
Self { inner, factory }
}
}

impl CatalogProviderList for DynamicFileCatalog {
fn as_any(&self) -> &dyn Any {
self
}

fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
self.inner.register_catalog(name, catalog)
}

fn catalog_names(&self) -> Vec<String> {
self.inner.catalog_names()
}

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.inner.catalog(name).map(|catalog| {
Arc::new(DynamicFileCatalogProvider::new(
catalog,
Arc::clone(&self.factory),
)) as _
})
}
}

/// Wraps another catalog provider
struct DynamicFileCatalogProvider {
inner: Arc<dyn CatalogProvider>,
factory: Arc<dyn UrlTableFactory>,
}

impl DynamicFileCatalogProvider {
pub fn new(
inner: Arc<dyn CatalogProvider>,
factory: Arc<dyn UrlTableFactory>,
) -> Self {
Self { inner, factory }
}
}

impl CatalogProvider for DynamicFileCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema_names(&self) -> Vec<String> {
self.inner.schema_names()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.inner.schema(name).map(|schema| {
Arc::new(DynamicFileSchemaProvider::new(
schema,
Arc::clone(&self.factory),
)) as _
})
}

fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
self.inner.register_schema(name, schema)
}
}

/// Implements the [DynamicFileSchemaProvider] that can create tables provider from the file path.
///
/// The provider will try to create a table provider from the file path if the table provider
/// isn't exist in the inner schema provider. The required object store must be registered in the session context.
pub struct DynamicFileSchemaProvider {
inner: Arc<dyn SchemaProvider>,
factory: Arc<dyn UrlTableFactory>,
}

impl DynamicFileSchemaProvider {
/// Create a new [DynamicFileSchemaProvider] with the given inner schema provider.
pub fn new(
inner: Arc<dyn SchemaProvider>,
factory: Arc<dyn UrlTableFactory>,
) -> Self {
Self { inner, factory }
}
}

#[async_trait]
impl SchemaProvider for DynamicFileSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn table_names(&self) -> Vec<String> {
self.inner.table_names()
}

async fn table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
if let Some(table) = self.inner.table(name).await? {
return Ok(Some(table));
};

self.factory.try_new(name).await
}

fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
self.inner.register_table(name, table)
}

fn deregister_table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
self.inner.deregister_table(name)
}

fn table_exist(&self, name: &str) -> bool {
self.inner.table_exist(name)
}
}

/// Substitute the tilde character in the file path with the user home directory.
#[cfg(feature = "home_dir")]
pub fn substitute_tilde(cur: String) -> String {
if let Some(usr_dir_path) = home_dir() {
if let Some(usr_dir) = usr_dir_path.to_str() {
if cur.starts_with('~') && !usr_dir.is_empty() {
return cur.replacen('~', usr_dir, 1);
}
}
}
cur
}

/// Do nothing if the feature "home_dir" is disabled.
#[cfg(not(feature = "home_dir"))]
pub fn substitute_tilde(cur: String) -> String {
cur
}

/// [UrlTableFactory] is a factory that can create a table provider from the given url.
#[async_trait]
pub trait UrlTableFactory: Sync + Send {
/// create a new table provider from the provided url
async fn try_new(
&self,
url: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>>;
}

#[cfg(all(not(target_os = "windows"), not(feature = "home_dir")))]
#[cfg(test)]
mod tests {
use crate::dynamic_file::catalog::substitute_tilde;
use dirs::home_dir;

#[test]
fn test_substitute_tilde() {
use std::env;
use std::path::MAIN_SEPARATOR;
let original_home = home_dir();
let test_home_path = if cfg!(windows) {
"C:\\Users\\user"
} else {
"/home/user"
};
env::set_var(
if cfg!(windows) { "USERPROFILE" } else { "HOME" },
test_home_path,
);
let input = "~/Code/datafusion/benchmarks/data/tpch_sf1/part/part-0.parquet";
let expected = format!(
"{}{}Code{}datafusion{}benchmarks{}data{}tpch_sf1{}part{}part-0.parquet",
test_home_path,
MAIN_SEPARATOR,
MAIN_SEPARATOR,
MAIN_SEPARATOR,
MAIN_SEPARATOR,
MAIN_SEPARATOR,
MAIN_SEPARATOR,
MAIN_SEPARATOR
);
let actual = substitute_tilde(input.to_string());
assert_eq!(actual, expected);
match original_home {
Some(home_path) => env::set_var(
if cfg!(windows) { "USERPROFILE" } else { "HOME" },
home_path.to_str().unwrap(),
),
None => env::remove_var(if cfg!(windows) { "USERPROFILE" } else { "HOME" }),
}
}
}
2 changes: 1 addition & 1 deletion datafusion/catalog/src/dynamic_file/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
mod catalog;
pub(crate) mod catalog;
3 changes: 2 additions & 1 deletion datafusion/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
// under the License.

mod catalog;
mod dynamic_file;
mod schema;
mod session;
mod table;
mod dynamic_file;

pub use catalog::*;
pub use dynamic_file::catalog::*;
pub use schema::*;
pub use session::*;
pub use table::*;
34 changes: 33 additions & 1 deletion datafusion/catalog/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use datafusion_execution::TaskContext;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF};
use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr};
use parking_lot::{Mutex, RwLock};
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Weak};

/// Interface for accessing [`SessionState`] from the catalog.
///
Expand Down Expand Up @@ -136,3 +137,34 @@ impl From<&dyn Session> for TaskContext {
)
}
}

/// The state store that stores the reference of the runtime session state.
pub struct SessionStore {
session: Arc<Mutex<Option<Weak<RwLock<dyn Session>>>>>,
}

impl SessionStore {
/// Create a new [SessionStore]
pub fn new() -> Self {
Self {
session: Arc::new(Mutex::new(None)),
}
}

/// Set the session state of the store
pub fn with_state(&self, state: Weak<RwLock<dyn Session>>) {
let mut lock = self.session.lock();
*lock = Some(state);
}

/// Get the current session of the store
pub fn get_session(&self) -> Weak<RwLock<dyn Session>> {
self.session.lock().clone().unwrap()
}
}

impl Default for SessionStore {
fn default() -> Self {
Self::new()
}
}
4 changes: 1 addition & 3 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ backtrace = ["datafusion-common/backtrace"]
compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression", "tokio-util"]
crypto_expressions = ["datafusion-functions/crypto_expressions"]
datetime_expressions = ["datafusion-functions/datetime_expressions"]
dirs = ["dep:dirs"]
default = [
"nested_expressions",
"crypto_expressions",
Expand All @@ -60,7 +59,6 @@ default = [
"unicode_expressions",
"compression",
"parquet",
"dirs",
]
encoding_expressions = ["datafusion-functions/encoding_expressions"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
Expand All @@ -79,6 +77,7 @@ unicode_expressions = [
"datafusion-sql/unicode_expressions",
"datafusion-functions/unicode_expressions",
]
home_dir = ["datafusion-catalog/home_dir"]

[dependencies]
ahash = { workspace = true }
Expand Down Expand Up @@ -115,7 +114,6 @@ datafusion-physical-expr-functions-aggregate = { workspace = true }
datafusion-physical-optimizer = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }
dirs = { version = "4.0.0", optional = true }
flate2 = { version = "1.0.24", optional = true }
futures = { workspace = true }
glob = "0.3.0"
Expand Down
Loading

0 comments on commit 75b0b84

Please sign in to comment.