Skip to content

Commit

Permalink
Merge pull request #489 from splitgraph/search-path
Browse files Browse the repository at this point in the history
Implement a crude replacement for SET search_path in Flight SQL frontend
  • Loading branch information
gruuya authored Jan 17, 2024
2 parents 03799a9 + 90b3cd3 commit 4907a35
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 87 deletions.
2 changes: 1 addition & 1 deletion src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl SeafowlCli {
rl.load_history(SEAFOWL_CLI_HISTORY).ok();

loop {
match rl.readline(format!("{}> ", self.ctx.database).as_str()) {
match rl.readline(format!("{}> ", self.ctx.default_catalog).as_str()) {
Ok(line) if line.starts_with('\\') => {
rl.add_history_entry(line.trim_end())?;
let command = line.split_whitespace().collect::<Vec<_>>().join(" ");
Expand Down
3 changes: 2 additions & 1 deletion src/config/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ pub async fn build_context(cfg: &schema::SeafowlConfig) -> Result<SeafowlContext
inner: context,
metastore: Arc::new(metastore),
internal_object_store,
database: DEFAULT_DB.to_string(),
default_catalog: DEFAULT_DB.to_string(),
default_schema: DEFAULT_SCHEMA.to_string(),
max_partition_size: cfg.misc.max_partition_size,
})
}
Expand Down
10 changes: 4 additions & 6 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::catalog::DEFAULT_SCHEMA;
use crate::context::SeafowlContext;
#[cfg(test)]
use crate::frontend::http::tests::deterministic_uuid;
Expand Down Expand Up @@ -280,19 +279,18 @@ pub(super) enum CreateDeltaTableDetails {

impl SeafowlContext {
pub(super) async fn create_delta_table<'a>(
&self,
&'a self,
name: impl Into<TableReference<'a>>,
details: CreateDeltaTableDetails,
) -> Result<Arc<DeltaTable>> {
let table_ref: TableReference = name.into();
let resolved_ref = table_ref.resolve(&self.database, DEFAULT_SCHEMA);
let resolved_ref = self.resolve_table_ref(name);
let schema_name = resolved_ref.schema.clone();
let table_name = resolved_ref.table.clone();

let _ = self
.metastore
.schemas
.get(&self.database, &schema_name)
.get(&self.default_catalog, &schema_name)
.await?;

// NB: there's also a uuid generated below for table's `DeltaTableMetaData::id`, so it would
Expand Down Expand Up @@ -353,7 +351,7 @@ impl SeafowlContext {
self.metastore
.tables
.create(
&self.database,
&self.default_catalog,
&schema_name,
&table_name,
TableProvider::schema(&table).as_ref(),
Expand Down
11 changes: 7 additions & 4 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ use datafusion::optimizer::analyzer::Analyzer;
use datafusion::optimizer::optimizer::Optimizer;
use datafusion::optimizer::simplify_expressions::SimplifyExpressions;
use datafusion::optimizer::{OptimizerContext, OptimizerRule};
use datafusion::prelude::SessionContext;
use datafusion::sql::parser::{CopyToSource, CopyToStatement};
use datafusion::{prelude::SessionContext, sql::TableReference};
use datafusion_common::TableReference;
use datafusion_expr::logical_plan::{Extension, LogicalPlan};
use deltalake::DeltaTable;
use itertools::Itertools;
Expand Down Expand Up @@ -325,8 +326,10 @@ impl SeafowlContext {
// Should become obsolete once `sqlparser-rs` introduces support for some form of the `AS OF`
// clause: https://en.wikipedia.org/wiki/SQL:2011.
async fn rewrite_time_travel_query(&self, q: &mut Query) -> Result<SessionState> {
let mut version_processor =
TableVersionProcessor::new(self.database.clone(), DEFAULT_SCHEMA.to_string());
let mut version_processor = TableVersionProcessor::new(
self.default_catalog.clone(),
DEFAULT_SCHEMA.to_string(),
);
q.visit(&mut version_processor);

if version_processor.table_versions.is_empty() {
Expand All @@ -348,7 +351,7 @@ impl SeafowlContext {

let full_table_name = table.to_string();
let mut resolved_ref = TableReference::from(full_table_name.as_str())
.resolve(&self.database, DEFAULT_SCHEMA);
.resolve(&self.default_catalog, &self.default_schema);

// We only support datetime DeltaTable version specification for start
let table_uuid = self.get_table_uuid(resolved_ref.clone()).await?;
Expand Down
46 changes: 33 additions & 13 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::wasm_udf::wasm::create_udf_from_wasm;
use base64::{engine::general_purpose::STANDARD, Engine};
pub use datafusion::error::{DataFusionError as Error, Result};
use datafusion::{error::DataFusionError, prelude::SessionContext, sql::TableReference};
use datafusion_common::OwnedTableReference;
use datafusion_common::{OwnedTableReference, ResolvedTableReference};
use deltalake::DeltaTable;
use object_store::path::Path;
use std::sync::Arc;
Expand All @@ -24,18 +24,19 @@ pub struct SeafowlContext {
pub inner: SessionContext,
pub metastore: Arc<Metastore>,
pub internal_object_store: Arc<InternalObjectStore>,
pub database: String,
pub default_catalog: String,
pub default_schema: String,
pub max_partition_size: u32,
}

impl SeafowlContext {
// Create a new `SeafowlContext` with a new inner context scoped to a different default DB
pub fn scope_to_database(&self, name: String) -> Arc<SeafowlContext> {
// Create a new `SeafowlContext` with a new inner context scoped to a different default catalog/schema
pub fn scope_to(&self, catalog: String, schema: String) -> Arc<SeafowlContext> {
// Swap the default catalog in the new internal context's session config
let session_config = self
.inner()
.copied_config()
.with_default_catalog_and_schema(name.clone(), DEFAULT_SCHEMA);
.with_default_catalog_and_schema(&catalog, &schema);

let state =
build_state_with_table_factories(session_config, self.inner().runtime_env());
Expand All @@ -44,11 +45,20 @@ impl SeafowlContext {
inner: SessionContext::new_with_state(state),
metastore: self.metastore.clone(),
internal_object_store: self.internal_object_store.clone(),
database: name,
default_catalog: catalog,
default_schema: schema,
max_partition_size: self.max_partition_size,
})
}

pub fn scope_to_catalog(&self, catalog: String) -> Arc<SeafowlContext> {
self.scope_to(catalog, DEFAULT_SCHEMA.to_string())
}

pub fn scope_to_schema(&self, schema: String) -> Arc<SeafowlContext> {
self.scope_to(self.default_catalog.clone(), schema)
}

pub fn inner(&self) -> &SessionContext {
&self.inner
}
Expand All @@ -67,18 +77,28 @@ impl SeafowlContext {
// This does incur a latency cost to every query.

self.inner.register_catalog(
&self.database,
Arc::new(self.metastore.build_catalog(&self.database).await?),
&self.default_catalog,
Arc::new(self.metastore.build_catalog(&self.default_catalog).await?),
);

// Register all functions in the database
self.metastore
.build_functions(&self.database)
.build_functions(&self.default_catalog)
.await?
.iter()
.try_for_each(|f| self.register_function(&f.name, &f.details))
}

// Taken from DF SessionState where's it's private
pub fn resolve_table_ref<'a>(
&'a self,
table_ref: impl Into<TableReference<'a>>,
) -> ResolvedTableReference<'a> {
table_ref
.into()
.resolve(&self.default_catalog, &self.default_schema)
}

// Check that the TableReference doesn't have a database/schema in it.
// We create all external tables in the staging schema (backed by DataFusion's
// in-memory schema provider) instead.
Expand All @@ -93,15 +113,15 @@ impl SeafowlContext {
// This means that any potential catalog/schema references get condensed into the name, so
// we have to unravel that name here again, and then resolve it properly.
let reference = TableReference::from(name.to_string());
let resolved_reference = reference.resolve(&self.database, STAGING_SCHEMA);
let resolved_reference = reference.resolve(&self.default_catalog, STAGING_SCHEMA);

if resolved_reference.catalog != self.database
if resolved_reference.catalog != self.default_catalog
|| resolved_reference.schema != STAGING_SCHEMA
{
return Err(DataFusionError::Plan(format!(
"Can only create external tables in the staging schema.
Omit the schema/database altogether or use {}.{}.{}",
&self.database, STAGING_SCHEMA, resolved_reference.table
&self.default_catalog, STAGING_SCHEMA, resolved_reference.table
)));
}

Expand Down Expand Up @@ -221,7 +241,7 @@ pub mod test_utils {
// place on another node
context.metastore.catalogs.create("testdb").await.unwrap();

let context = context.scope_to_database("testdb".to_string());
let context = context.scope_to_catalog("testdb".to_string());

// Create new non-default collection
context.plan_query("CREATE SCHEMA testcol").await.unwrap();
Expand Down
41 changes: 18 additions & 23 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl SeafowlContext {
// Create a schema and register it
self.metastore
.schemas
.create(&self.database, schema_name)
.create(&self.default_catalog, schema_name)
.await?;
Ok(make_dummy_exec())
}
Expand Down Expand Up @@ -518,8 +518,7 @@ impl SeafowlContext {
if_exists: _,
schema: _,
})) => {
let table_ref = TableReference::from(name);
let resolved_ref = table_ref.resolve(&self.database, DEFAULT_SCHEMA);
let resolved_ref = self.resolve_table_ref(name);

if resolved_ref.schema == STAGING_SCHEMA {
// Dropping a staging table is a in-memory only op
Expand Down Expand Up @@ -549,7 +548,7 @@ impl SeafowlContext {
let schema_name = name.schema_name();

if let SchemaReference::Full { catalog, .. } = name
&& catalog != &self.database
&& catalog != &self.default_catalog
{
return Err(DataFusionError::Execution(
"Cannot delete schemas in other catalogs".to_string(),
Expand All @@ -558,7 +557,7 @@ impl SeafowlContext {

let schema = match self
.inner
.catalog(&self.database)
.catalog(&self.default_catalog)
.expect("Current catalog exists")
.schema(schema_name)
{
Expand All @@ -577,7 +576,7 @@ impl SeafowlContext {
// Delete each table sequentially
for table_name in schema.table_names() {
let table_ref = ResolvedTableReference {
catalog: Cow::from(&self.database),
catalog: Cow::from(&self.default_catalog),
schema: Cow::from(schema_name),
table: Cow::from(table_name),
};
Expand All @@ -591,7 +590,7 @@ impl SeafowlContext {

self.metastore
.schemas
.delete(&self.database, schema_name)
.delete(&self.default_catalog, schema_name)
.await?;

Ok(make_dummy_exec())
Expand Down Expand Up @@ -642,7 +641,7 @@ impl SeafowlContext {
// Persist the function in the metadata storage
self.metastore
.functions
.create(&self.database, name, *or_replace, details)
.create(&self.default_catalog, name, *or_replace, details)
.await?;

Ok(make_dummy_exec())
Expand All @@ -654,7 +653,7 @@ impl SeafowlContext {
}) => {
self.metastore
.functions
.delete(&self.database, *if_exists, func_names)
.delete(&self.default_catalog, *if_exists, func_names)
.await?;
Ok(make_dummy_exec())
}
Expand All @@ -664,20 +663,16 @@ impl SeafowlContext {
..
}) => {
// Resolve new table reference
let new_table_ref = TableReference::from(new_name.as_str());
let resolved_new_ref =
new_table_ref.resolve(&self.database, DEFAULT_SCHEMA);
if resolved_new_ref.catalog != self.database {
let resolved_new_ref = self.resolve_table_ref(new_name);
if resolved_new_ref.catalog != self.default_catalog {
return Err(Error::Plan(
"Changing the table's database is not supported!"
.to_string(),
));
}

// Resolve old table reference
let old_table_ref = TableReference::from(old_name.as_str());
let resolved_old_ref =
old_table_ref.resolve(&self.database, DEFAULT_SCHEMA);
let resolved_old_ref = self.resolve_table_ref(old_name);

// Finally update our catalog entry
self.metastore
Expand All @@ -701,9 +696,7 @@ impl SeafowlContext {
if database.is_some() {
gc_databases(self, database.clone()).await;
} else if let Some(table_name) = table_name {
let table_ref = TableReference::from(table_name.as_str());
let resolved_ref =
table_ref.resolve(&self.database, DEFAULT_SCHEMA);
let resolved_ref = self.resolve_table_ref(table_name);

if let Ok(mut delta_table) =
self.try_get_delta_table(resolved_ref.clone()).await
Expand Down Expand Up @@ -829,8 +822,10 @@ impl SeafowlContext {
// Check whether table already exists and ensure that the schema exists
let table_exists = match self
.inner
.catalog(&self.database)
.ok_or_else(|| Error::Plan(format!("Database {} not found!", self.database)))?
.catalog(&self.default_catalog)
.ok_or_else(|| {
Error::Plan(format!("Database {} not found!", self.default_catalog))
})?
.schema(&schema_name)
{
Some(_) => {
Expand All @@ -847,7 +842,7 @@ impl SeafowlContext {
// Schema doesn't exist; create one first, and then reload to pick it up
self.metastore
.schemas
.create(&self.database, &schema_name)
.create(&self.default_catalog, &schema_name)
.await?;
self.reload_schema().await?;
false
Expand Down Expand Up @@ -888,7 +883,7 @@ impl SeafowlContext {
let plan = source.scan(&self.inner.state(), None, &[], None).await?;

let table_ref = TableReference::Full {
catalog: Cow::from(&self.database),
catalog: Cow::from(&self.default_catalog),
schema: Cow::from(schema_name),
table: Cow::from(table_name),
};
Expand Down
19 changes: 17 additions & 2 deletions src/frontend/flight/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use datafusion_common::DataFusionError;
use lazy_static::lazy_static;
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::metadata::MetadataMap;

lazy_static! {
pub static ref SEAFOWL_SQL_DATA: SqlInfoData = {
Expand Down Expand Up @@ -44,9 +45,23 @@ impl SeafowlFlightHandler {
&self,
query: &str,
query_id: String,
metadata: &MetadataMap,
) -> Result<SchemaRef> {
let plan = self.context.plan_query(query).await?;
let batch_stream = self.context.execute_stream(plan).await?;
let ctx = if let Some(search_path) = metadata.get("search-path") {
self.context.scope_to_schema(
search_path
.to_str()
.map_err(|e| DataFusionError::Execution(format!(
"Couldn't parse search path from header value {search_path:?}: {e}"
)))?
.to_string(),
)
} else {
self.context.clone()
};

let plan = ctx.plan_query(query).await?;
let batch_stream = ctx.execute_stream(plan).await?;
let schema = batch_stream.schema();

self.results.insert(query_id, Mutex::new(batch_stream));
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/flight/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl FlightSqlService for SeafowlFlightHandler {
let query_id = Uuid::new_v4().to_string();

let schema = self
.query_to_stream(&query.query, query_id.clone())
.query_to_stream(&query.query, query_id.clone(), request.metadata())
.await
.map_err(|e| Status::internal(e.to_string()))?;

Expand Down
Loading

0 comments on commit 4907a35

Please sign in to comment.