Skip to content

Commit

Permalink
refactor: show tables and show databases (#3423)
Browse files Browse the repository at this point in the history
* refactor: show tables and show databases

* chore: clean code
  • Loading branch information
killme2008 authored Mar 4, 2024
1 parent 2a675e0 commit e681941
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 380 deletions.
4 changes: 2 additions & 2 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ mod partitions;
mod predicate;
mod region_peers;
mod runtime_metrics;
mod schemata;
pub mod schemata;
mod table_names;
mod tables;
pub mod tables;

use std::collections::HashMap;
use std::sync::{Arc, Weak};
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/src/information_schema/schemata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use crate::error::{
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;

const CATALOG_NAME: &str = "catalog_name";
const SCHEMA_NAME: &str = "schema_name";
pub const CATALOG_NAME: &str = "catalog_name";
pub const SCHEMA_NAME: &str = "schema_name";
const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name";
const DEFAULT_COLLATION_NAME: &str = "default_collation_name";
const INIT_CAPACITY: usize = 42;
Expand Down
8 changes: 4 additions & 4 deletions src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ use crate::error::{
use crate::information_schema::{InformationTable, Predicates};
use crate::CatalogManager;

const TABLE_CATALOG: &str = "table_catalog";
const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const TABLE_TYPE: &str = "table_type";
pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const TABLE_TYPE: &str = "table_type";
const TABLE_ID: &str = "table_id";
const ENGINE: &str = "engine";
const INIT_CAPACITY: usize = 42;
Expand Down
4 changes: 2 additions & 2 deletions src/operator/src/statement/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl StatementExecutor {
stmt: ShowDatabases,
query_ctx: QueryContextRef,
) -> Result<Output> {
query::sql::show_databases(stmt, self.catalog_manager.clone(), query_ctx)
query::sql::show_databases(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
.await
.context(ExecuteStatementSnafu)
}
Expand All @@ -44,7 +44,7 @@ impl StatementExecutor {
stmt: ShowTables,
query_ctx: QueryContextRef,
) -> Result<Output> {
query::sql::show_tables(stmt, self.catalog_manager.clone(), query_ctx)
query::sql::show_tables(stmt, &self.query_engine, &self.catalog_manager, query_ctx)
.await
.context(ExecuteStatementSnafu)
}
Expand Down
170 changes: 10 additions & 160 deletions src/query/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,26 @@ use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlan, PhysicalP
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{
EmptyRecordBatchStream, RecordBatch, RecordBatches, SendableRecordBatchStream,
};
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
use common_telemetry::tracing;
use datafusion::common::Column;
use datafusion::physical_plan::analyze::AnalyzeExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_common::{ResolvedTableReference, ScalarValue};
use datafusion_expr::{DmlStatement, Expr as DfExpr, LogicalPlan as DfLogicalPlan, WriteOp};
use datafusion_common::ResolvedTableReference;
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp};
use datatypes::prelude::VectorRef;
use datatypes::schema::Schema;
use futures_util::StreamExt;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::ast::{BinaryOperator, Expr, Value};
use table::requests::{DeleteRequest, InsertRequest};
use table::TableRef;

use crate::dataframe::DataFrame;
pub use crate::datafusion::planner::DfContextProviderAdapter;
use crate::error::{
CatalogSnafu, CreateRecordBatchSnafu, CreateSchemaSnafu, DataFusionSnafu,
MissingTableMutationHandlerSnafu, MissingTimestampColumnSnafu, QueryExecutionSnafu, Result,
TableMutationSnafu, TableNotFoundSnafu, UnimplementedSnafu, UnsupportedExprSnafu,
CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, MissingTableMutationHandlerSnafu,
MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableMutationSnafu,
TableNotFoundSnafu, UnsupportedExprSnafu,
};
use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
Expand Down Expand Up @@ -456,78 +450,6 @@ impl QueryExecutor for DatafusionQueryEngine {
}
}

fn convert_filter_to_df_filter(filter: Expr) -> Result<DfExpr> {
match filter {
Expr::BinaryOp { left, op, right } => {
let left = convert_filter_to_df_filter(*left)?;
let right = convert_filter_to_df_filter(*right)?;
match op {
BinaryOperator::Eq => Ok(left.eq(right)),
_ => UnimplementedSnafu {
operation: format!("convert BinaryOperator into datafusion Expr, op: {op}"),
}
.fail(),
}
}
Expr::Value(value) => match value {
Value::SingleQuotedString(v) => Ok(DfExpr::Literal(ScalarValue::Utf8(Some(v)))),
_ => UnimplementedSnafu {
operation: format!("convert Expr::Value into datafusion Expr, value: {value}"),
}
.fail(),
},
Expr::Identifier(ident) => Ok(DfExpr::Column(Column::from_name(ident.value))),
_ => UnimplementedSnafu {
operation: format!("convert Expr into datafusion Expr, Expr: {filter}"),
}
.fail(),
}
}

/// Creates a table in memory and executes a show statement on the table.
pub async fn execute_show_with_filter(
record_batch: RecordBatch,
filter: Option<Expr>,
) -> Result<Output> {
let table_name = "table_name";
let column_schemas = record_batch.schema.column_schemas().to_vec();
let context = SessionContext::new();
context
.register_batch(table_name, record_batch.into_df_record_batch())
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let mut dataframe = context
.sql(&format!("SELECT * FROM {table_name}"))
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
if let Some(filter) = filter {
let filter = convert_filter_to_df_filter(filter)?;
dataframe = dataframe
.filter(filter)
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
}
let df_batches = dataframe
.collect()
.await
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let mut batches = Vec::with_capacity(df_batches.len());
let schema = Arc::new(Schema::try_new(column_schemas).context(CreateSchemaSnafu)?);
for df_batch in df_batches.into_iter() {
let batch = RecordBatch::try_from_df_record_batch(schema.clone(), df_batch)
.context(CreateRecordBatchSnafu)?;
batches.push(batch);
}
let record_batches = RecordBatches::try_new(schema, batches).context(CreateRecordBatchSnafu)?;
Ok(Output::RecordBatches(record_batches))
}

#[cfg(test)]
mod tests {
use std::borrow::Cow::Borrowed;
Expand All @@ -536,17 +458,12 @@ mod tests {
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
use common_recordbatch::util;
use datafusion::prelude::{col, lit};
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::types::StringType;
use datatypes::vectors::{Helper, StringVectorBuilder, UInt32Vector, UInt64Vector, VectorRef};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::vectors::{Helper, UInt32Vector, UInt64Vector, VectorRef};
use session::context::QueryContext;
use sql::dialect::GreptimeDbDialect;
use sql::parser::{ParseOptions, ParserContext};
use sql::statements::show::{ShowKind, ShowTables};
use sql::statements::statement::Statement;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};

use super::*;
Expand Down Expand Up @@ -691,71 +608,4 @@ mod tests {
);
assert_eq!("Limit: skip=0, fetch=20\n Aggregate: groupBy=[[]], aggr=[[SUM(CAST(numbers.number AS UInt64))]]\n TableScan: numbers projection=[number]", format!("{}", logical_plan.display_indent()));
}

#[tokio::test]
async fn test_show_tables() {
// No filter
let column_schemas = vec![ColumnSchema::new(
"Tables",
ConcreteDataType::String(StringType),
false,
)];
let schema = Arc::new(Schema::new(column_schemas));
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("monitor"));
builder.push(Some("system_metrics"));
let columns = vec![builder.to_vector()];
let record_batch = RecordBatch::new(schema, columns).unwrap();
let output = execute_show_with_filter(record_batch, None).await.unwrap();
let Output::RecordBatches(record_batches) = output else {
unreachable!()
};
let expected = "\
+----------------+
| Tables |
+----------------+
| monitor |
| system_metrics |
+----------------+";
assert_eq!(record_batches.pretty_print().unwrap(), expected);

// Filter
let column_schemas = vec![ColumnSchema::new(
"Tables",
ConcreteDataType::String(StringType),
false,
)];
let schema = Arc::new(Schema::new(column_schemas));
let mut builder = StringVectorBuilder::with_capacity(3);
builder.push(Some("monitor"));
builder.push(Some("system_metrics"));
let columns = vec![builder.to_vector()];
let record_batch = RecordBatch::new(schema, columns).unwrap();
let statement = ParserContext::create_with_dialect(
"SHOW TABLES WHERE \"Tables\"='monitor'",
&GreptimeDbDialect {},
ParseOptions::default(),
)
.unwrap()[0]
.clone();
let Statement::ShowTables(ShowTables { kind, .. }) = statement else {
unreachable!()
};
let ShowKind::Where(filter) = kind else {
unreachable!()
};
let output = execute_show_with_filter(record_batch, Some(filter))
.await
.unwrap();
let Output::RecordBatches(record_batches) = output else {
unreachable!()
};
let expected = "\
+---------+
| Tables |
+---------+
| monitor |
+---------+";
assert_eq!(record_batches.pretty_print().unwrap(), expected);
}
}
12 changes: 8 additions & 4 deletions src/query/src/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,16 @@ impl DfContextProviderAdapter {
pub(crate) async fn try_new(
engine_state: Arc<QueryEngineState>,
session_state: SessionState,
df_stmt: &DfStatement,
df_stmt: Option<&DfStatement>,
query_ctx: QueryContextRef,
) -> Result<Self> {
let table_names = session_state
.resolve_table_references(df_stmt)
.context(DataFusionSnafu)?;
let table_names = if let Some(df_stmt) = df_stmt {
session_state
.resolve_table_references(df_stmt)
.context(DataFusionSnafu)?
} else {
vec![]
};

let mut table_provider = DfTableSourceProvider::new(
engine_state.catalog_manager().clone(),
Expand Down
17 changes: 1 addition & 16 deletions src/query/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,12 @@ pub enum Error {
#[snafu(display("Table not found: {}", table))]
TableNotFound { table: String, location: Location },

#[snafu(display("Failed to do vector computation"))]
VectorComputation {
source: datatypes::error::Error,
location: Location,
},

#[snafu(display("Failed to create RecordBatch"))]
CreateRecordBatch {
source: common_recordbatch::error::Error,
location: Location,
},

#[snafu(display("Failed to create Schema"))]
CreateSchema {
source: datatypes::error::Error,
location: Location,
},

#[snafu(display("Failure during query execution"))]
QueryExecution {
source: BoxedError,
Expand Down Expand Up @@ -291,9 +279,7 @@ impl ErrorExt for Error {

QueryAccessDenied { .. } => StatusCode::AccessDenied,
Catalog { source, .. } => source.status_code(),
VectorComputation { source, .. } | ConvertDatafusionSchema { source, .. } => {
source.status_code()
}
ConvertDatafusionSchema { source, .. } => source.status_code(),
CreateRecordBatch { source, .. } => source.status_code(),
QueryExecution { source, .. } | QueryPlan { source, .. } => source.status_code(),
DataFusion { error, .. } => match error {
Expand All @@ -306,7 +292,6 @@ impl ErrorExt for Error {
Sql { source, .. } => source.status_code(),
PlanSql { .. } => StatusCode::PlanQuery,
ConvertSqlType { source, .. } | ConvertSqlValue { source, .. } => source.status_code(),
CreateSchema { source, .. } => source.status_code(),

RegionQuery { source, .. } => source.status_code(),
TableMutation { source, .. } => source.status_code(),
Expand Down
Loading

0 comments on commit e681941

Please sign in to comment.