Skip to content

Commit

Permalink
Introduce INFORMATION_SCHEMA.ROUTINES table (apache#13255)
Browse files Browse the repository at this point in the history
* tmp

* introduce routines table

* add is_deterministic field

* cargo fmt

* rollback the session_state changed
  • Loading branch information
goldmedal authored Nov 7, 2024
1 parent ddee471 commit b0b6e44
Show file tree
Hide file tree
Showing 8 changed files with 415 additions and 20 deletions.
303 changes: 291 additions & 12 deletions datafusion/core/src/catalog_common/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,41 @@
//!
//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use std::fmt::Debug;
use std::{any::Any, sync::Arc};

use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider};
use crate::datasource::streaming::StreamingTable;
use crate::execution::context::TaskContext;
use crate::logical_expr::TableType;
use crate::logical_expr::{TableType, Volatility};
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::SendableRecordBatchStream;
use crate::{
config::{ConfigEntry, ConfigOptions},
physical_plan::streaming::PartitionStream,
};
use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use arrow_array::builder::BooleanBuilder;
use async_trait::async_trait;
use datafusion_common::error::Result;
use datafusion_common::DataFusionError;
use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, WindowUDF};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::{any::Any, sync::Arc};

pub(crate) const INFORMATION_SCHEMA: &str = "information_schema";
pub(crate) const TABLES: &str = "tables";
pub(crate) const VIEWS: &str = "views";
pub(crate) const COLUMNS: &str = "columns";
pub(crate) const DF_SETTINGS: &str = "df_settings";
pub(crate) const SCHEMATA: &str = "schemata";
pub(crate) const ROUTINES: &str = "routines";

/// All information schema tables
pub const INFORMATION_SCHEMA_TABLES: &[&str] =
&[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA];
&[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA, ROUTINES];

/// Implements the `information_schema` virtual schema and tables
///
Expand Down Expand Up @@ -208,6 +212,151 @@ impl InformationSchemaConfig {
builder.add_setting(entry);
}
}

fn make_routines(
&self,
udfs: &HashMap<String, Arc<ScalarUDF>>,
udafs: &HashMap<String, Arc<AggregateUDF>>,
udwfs: &HashMap<String, Arc<WindowUDF>>,
config_options: &ConfigOptions,
builder: &mut InformationSchemaRoutinesBuilder,
) -> Result<()> {
let catalog_name = &config_options.catalog.default_catalog;
let schema_name = &config_options.catalog.default_schema;

for (name, udf) in udfs {
let return_types = get_udf_args_and_return_types(udf)?
.into_iter()
.map(|(_, return_type)| return_type)
.collect::<HashSet<_>>();
for return_type in return_types {
builder.add_routine(
catalog_name,
schema_name,
name,
"FUNCTION",
Self::is_deterministic(udf.signature()),
return_type,
"SCALAR",
udf.documentation().map(|d| d.description.to_string()),
)
}
}

for (name, udaf) in udafs {
let return_types = get_udaf_args_and_return_types(udaf)?
.into_iter()
.map(|(_, return_type)| return_type)
.collect::<HashSet<_>>();
for return_type in return_types {
builder.add_routine(
catalog_name,
schema_name,
name,
"FUNCTION",
Self::is_deterministic(udaf.signature()),
return_type,
"AGGREGATE",
udaf.documentation().map(|d| d.description.to_string()),
)
}
}

for (name, udwf) in udwfs {
let return_types = get_udwf_args_and_return_types(udwf)?
.into_iter()
.map(|(_, return_type)| return_type)
.collect::<HashSet<_>>();
for return_type in return_types {
builder.add_routine(
catalog_name,
schema_name,
name,
"FUNCTION",
Self::is_deterministic(udwf.signature()),
return_type,
"WINDOW",
udwf.documentation().map(|d| d.description.to_string()),
)
}
}
Ok(())
}

fn is_deterministic(signature: &Signature) -> bool {
signature.volatility == Volatility::Immutable
}
}

/// get the arguments and return types of a UDF
/// returns a tuple of (arg_types, return_type)
fn get_udf_args_and_return_types(
udf: &Arc<ScalarUDF>,
) -> Result<Vec<(Vec<String>, Option<String>)>> {
let signature = udf.signature();
let arg_types = signature.type_signature.get_possible_types();
if arg_types.is_empty() {
Ok(vec![(vec![], None)])
} else {
Ok(arg_types
.into_iter()
.map(|arg_types| {
// only handle the function which implemented [`ScalarUDFImpl::return_type`] method
let return_type = udf.return_type(&arg_types).ok().map(|t| t.to_string());
let arg_types = arg_types
.into_iter()
.map(|t| t.to_string())
.collect::<Vec<_>>();
(arg_types, return_type)
})
.collect::<Vec<_>>())
}
}

fn get_udaf_args_and_return_types(
udaf: &Arc<AggregateUDF>,
) -> Result<Vec<(Vec<String>, Option<String>)>> {
let signature = udaf.signature();
let arg_types = signature.type_signature.get_possible_types();
if arg_types.is_empty() {
Ok(vec![(vec![], None)])
} else {
Ok(arg_types
.into_iter()
.map(|arg_types| {
// only handle the function which implemented [`ScalarUDFImpl::return_type`] method
let return_type =
udaf.return_type(&arg_types).ok().map(|t| t.to_string());
let arg_types = arg_types
.into_iter()
.map(|t| t.to_string())
.collect::<Vec<_>>();
(arg_types, return_type)
})
.collect::<Vec<_>>())
}
}

fn get_udwf_args_and_return_types(
udwf: &Arc<WindowUDF>,
) -> Result<Vec<(Vec<String>, Option<String>)>> {
let signature = udwf.signature();
let arg_types = signature.type_signature.get_possible_types();
if arg_types.is_empty() {
Ok(vec![(vec![], None)])
} else {
Ok(arg_types
.into_iter()
.map(|arg_types| {
// only handle the function which implemented [`ScalarUDFImpl::return_type`] method
let arg_types = arg_types
.into_iter()
.map(|t| t.to_string())
.collect::<Vec<_>>();
(arg_types, None)
})
.collect::<Vec<_>>())
}
}

#[async_trait]
Expand All @@ -234,6 +383,7 @@ impl SchemaProvider for InformationSchemaProvider {
VIEWS => Arc::new(InformationSchemaViews::new(config)),
DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)),
SCHEMATA => Arc::new(InformationSchemata::new(config)),
ROUTINES => Arc::new(InformationSchemaRoutines::new(config)),
_ => return Ok(None),
};

Expand Down Expand Up @@ -819,3 +969,132 @@ impl InformationSchemaDfSettingsBuilder {
.unwrap()
}
}

#[derive(Debug)]
struct InformationSchemaRoutines {
schema: SchemaRef,
config: InformationSchemaConfig,
}

impl InformationSchemaRoutines {
fn new(config: InformationSchemaConfig) -> Self {
let schema = Arc::new(Schema::new(vec![
Field::new("specific_catalog", DataType::Utf8, false),
Field::new("specific_schema", DataType::Utf8, false),
Field::new("specific_name", DataType::Utf8, false),
Field::new("routine_catalog", DataType::Utf8, false),
Field::new("routine_schema", DataType::Utf8, false),
Field::new("routine_name", DataType::Utf8, false),
Field::new("routine_type", DataType::Utf8, false),
Field::new("is_deterministic", DataType::Boolean, true),
Field::new("data_type", DataType::Utf8, true),
Field::new("function_type", DataType::Utf8, true),
Field::new("description", DataType::Utf8, true),
]));

Self { schema, config }
}

fn builder(&self) -> InformationSchemaRoutinesBuilder {
InformationSchemaRoutinesBuilder {
schema: self.schema.clone(),
specific_catalog: StringBuilder::new(),
specific_schema: StringBuilder::new(),
specific_name: StringBuilder::new(),
routine_catalog: StringBuilder::new(),
routine_schema: StringBuilder::new(),
routine_name: StringBuilder::new(),
routine_type: StringBuilder::new(),
is_deterministic: BooleanBuilder::new(),
data_type: StringBuilder::new(),
function_type: StringBuilder::new(),
description: StringBuilder::new(),
}
}
}

struct InformationSchemaRoutinesBuilder {
schema: SchemaRef,
specific_catalog: StringBuilder,
specific_schema: StringBuilder,
specific_name: StringBuilder,
routine_catalog: StringBuilder,
routine_schema: StringBuilder,
routine_name: StringBuilder,
routine_type: StringBuilder,
is_deterministic: BooleanBuilder,
data_type: StringBuilder,
function_type: StringBuilder,
description: StringBuilder,
}

impl InformationSchemaRoutinesBuilder {
#[allow(clippy::too_many_arguments)]
fn add_routine(
&mut self,
catalog_name: impl AsRef<str>,
schema_name: impl AsRef<str>,
routine_name: impl AsRef<str>,
routine_type: impl AsRef<str>,
is_deterministic: bool,
data_type: Option<impl AsRef<str>>,
function_type: impl AsRef<str>,
description: Option<impl AsRef<str>>,
) {
self.specific_catalog.append_value(catalog_name.as_ref());
self.specific_schema.append_value(schema_name.as_ref());
self.specific_name.append_value(routine_name.as_ref());
self.routine_catalog.append_value(catalog_name.as_ref());
self.routine_schema.append_value(schema_name.as_ref());
self.routine_name.append_value(routine_name.as_ref());
self.routine_type.append_value(routine_type.as_ref());
self.is_deterministic.append_value(is_deterministic);
self.data_type.append_option(data_type.as_ref());
self.function_type.append_value(function_type.as_ref());
self.description.append_option(description);
}

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(self.specific_catalog.finish()),
Arc::new(self.specific_schema.finish()),
Arc::new(self.specific_name.finish()),
Arc::new(self.routine_catalog.finish()),
Arc::new(self.routine_schema.finish()),
Arc::new(self.routine_name.finish()),
Arc::new(self.routine_type.finish()),
Arc::new(self.is_deterministic.finish()),
Arc::new(self.data_type.finish()),
Arc::new(self.function_type.finish()),
Arc::new(self.description.finish()),
],
)
.unwrap()
}
}

impl PartitionStream for InformationSchemaRoutines {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let config = self.config.clone();
let mut builder = self.builder();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
futures::stream::once(async move {
config.make_routines(
ctx.scalar_functions(),
ctx.aggregate_functions(),
ctx.window_functions(),
ctx.session_config().options(),
&mut builder,
)?;
Ok(builder.finish())
}),
))
}
}
6 changes: 3 additions & 3 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,15 +231,15 @@ impl Session for SessionState {
}

fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
self.scalar_functions()
&self.scalar_functions
}

fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
self.aggregate_functions()
&self.aggregate_functions
}

fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
self.window_functions()
&self.window_functions
}

fn runtime_env(&self) -> &Arc<RuntimeEnv> {
Expand Down
Loading

0 comments on commit b0b6e44

Please sign in to comment.