Skip to content

Commit

Permalink
feat: dbeaver mysql compatibility, use statement and information_sche…
Browse files Browse the repository at this point in the history
…ma.tables (#4218)

* feat: add more placeholder field in information_schema.tables

* feat: make schema modifiable for use statement

* chore: add todo items

* fix: resolve lint issues after data type changes

* chore: update sqlness results

* refactor: patch for select database is no longer needed

* test: align tests and data types

* Apply suggestions from code review

Co-authored-by: dennis zhuang <[email protected]>

* fix: use canonicalize_identifier for database name

* feat: add all columns for information_schema.tables

* test: remove vairables from sqlness results

* feat: add to_string impl for table options

---------

Co-authored-by: dennis zhuang <[email protected]>
  • Loading branch information
sunng87 and killme2008 authored Jul 3, 2024
1 parent be29e48 commit 11cf9c8
Show file tree
Hide file tree
Showing 32 changed files with 497 additions and 220 deletions.
147 changes: 134 additions & 13 deletions src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use datatypes::vectors::{
DateTimeVectorBuilder, StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
use table::metadata::{TableInfo, TableType};

use super::TABLES;
use crate::error::{
Expand All @@ -43,6 +45,24 @@ pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const TABLE_TYPE: &str = "table_type";
pub const VERSION: &str = "version";
pub const ROW_FORMAT: &str = "row_format";
pub const TABLE_ROWS: &str = "table_rows";
pub const DATA_LENGTH: &str = "data_length";
pub const INDEX_LENGTH: &str = "index_length";
pub const MAX_DATA_LENGTH: &str = "max_data_length";
pub const AVG_ROW_LENGTH: &str = "avg_row_length";
pub const DATA_FREE: &str = "data_free";
pub const AUTO_INCREMENT: &str = "auto_increment";
pub const CREATE_TIME: &str = "create_time";
pub const UPDATE_TIME: &str = "update_time";
pub const CHECK_TIME: &str = "check_time";
pub const TABLE_COLLATION: &str = "table_collation";
pub const CHECKSUM: &str = "checksum";
pub const CREATE_OPTIONS: &str = "create_options";
pub const TABLE_COMMENT: &str = "table_comment";
pub const MAX_INDEX_LENGTH: &str = "max_index_length";
pub const TEMPORARY: &str = "temporary";
const TABLE_ID: &str = "table_id";
const ENGINE: &str = "engine";
const INIT_CAPACITY: usize = 42;
Expand All @@ -69,7 +89,25 @@ impl InformationSchemaTables {
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(MAX_DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(MAX_INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(AVG_ROW_LENGTH, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(VERSION, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(ROW_FORMAT, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(TABLE_ROWS, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(DATA_FREE, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(AUTO_INCREMENT, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(CREATE_TIME, ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new(UPDATE_TIME, ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new(CHECK_TIME, ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new(TABLE_COLLATION, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(CHECKSUM, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(CREATE_OPTIONS, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(TABLE_COMMENT, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(TEMPORARY, ConcreteDataType::string_datatype(), true),
]))
}

Expand Down Expand Up @@ -131,7 +169,25 @@ struct InformationSchemaTablesBuilder {
table_names: StringVectorBuilder,
table_types: StringVectorBuilder,
table_ids: UInt32VectorBuilder,
version: UInt64VectorBuilder,
row_format: StringVectorBuilder,
table_rows: UInt64VectorBuilder,
data_length: UInt64VectorBuilder,
max_data_length: UInt64VectorBuilder,
index_length: UInt64VectorBuilder,
avg_row_length: UInt64VectorBuilder,
max_index_length: UInt64VectorBuilder,
data_free: UInt64VectorBuilder,
auto_increment: UInt64VectorBuilder,
create_time: DateTimeVectorBuilder,
update_time: DateTimeVectorBuilder,
check_time: DateTimeVectorBuilder,
table_collation: StringVectorBuilder,
checksum: UInt64VectorBuilder,
create_options: StringVectorBuilder,
table_comment: StringVectorBuilder,
engines: StringVectorBuilder,
temporary: StringVectorBuilder,
}

impl InformationSchemaTablesBuilder {
Expand All @@ -149,7 +205,25 @@ impl InformationSchemaTablesBuilder {
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
max_data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
avg_row_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
version: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
row_format: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
create_time: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
update_time: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
check_time: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_comment: StringVectorBuilder::with_capacity(INIT_CAPACITY),
temporary: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}

Expand All @@ -171,10 +245,8 @@ impl InformationSchemaTablesBuilder {
&predicates,
&catalog_name,
&schema_name,
&table_info.name,
table_info,
table.table_type(),
Some(table_info.ident.table_id),
Some(&table_info.meta.engine),
);
}
}
Expand All @@ -188,12 +260,14 @@ impl InformationSchemaTablesBuilder {
predicates: &Predicates,
catalog_name: &str,
schema_name: &str,
table_name: &str,
table_info: Arc<TableInfo>,
table_type: TableType,
table_id: Option<u32>,
engine: Option<&str>,
) {
let table_type = match table_type {
let table_name = table_info.name.as_ref();
let table_id = table_info.table_id();
let engine = table_info.meta.engine.as_ref();

let table_type_text = match table_type {
TableType::Base => "BASE TABLE",
TableType::View => "VIEW",
TableType::Temporary => "LOCAL TEMPORARY",
Expand All @@ -203,7 +277,7 @@ impl InformationSchemaTablesBuilder {
(TABLE_CATALOG, &Value::from(catalog_name)),
(TABLE_SCHEMA, &Value::from(schema_name)),
(TABLE_NAME, &Value::from(table_name)),
(TABLE_TYPE, &Value::from(table_type)),
(TABLE_TYPE, &Value::from(table_type_text)),
];

if !predicates.eval(&row) {
Expand All @@ -213,9 +287,38 @@ impl InformationSchemaTablesBuilder {
self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.table_names.push(Some(table_name));
self.table_types.push(Some(table_type));
self.table_ids.push(table_id);
self.engines.push(engine);
self.table_types.push(Some(table_type_text));
self.table_ids.push(Some(table_id));
// TODO(sunng87): use real data for these fields
self.data_length.push(Some(0));
self.max_data_length.push(Some(0));
self.index_length.push(Some(0));
self.avg_row_length.push(Some(0));
self.max_index_length.push(Some(0));
self.checksum.push(Some(0));
self.table_rows.push(Some(0));
self.data_free.push(Some(0));
self.auto_increment.push(Some(0));
self.row_format.push(Some("Fixed"));
self.table_collation.push(None);
self.update_time.push(None);
self.check_time.push(None);

// use mariadb default table version number here
self.version.push(Some(11));
self.table_comment.push(table_info.desc.as_deref());
self.create_options
.push(Some(table_info.meta.options.to_string().as_ref()));
self.create_time
.push(Some(table_info.meta.created_on.timestamp_millis().into()));

self.temporary
.push(if matches!(table_type, TableType::Temporary) {
Some("Y")
} else {
Some("N")
});
self.engines.push(Some(engine));
}

fn finish(&mut self) -> Result<RecordBatch> {
Expand All @@ -225,7 +328,25 @@ impl InformationSchemaTablesBuilder {
Arc::new(self.table_names.finish()),
Arc::new(self.table_types.finish()),
Arc::new(self.table_ids.finish()),
Arc::new(self.data_length.finish()),
Arc::new(self.max_data_length.finish()),
Arc::new(self.index_length.finish()),
Arc::new(self.max_index_length.finish()),
Arc::new(self.avg_row_length.finish()),
Arc::new(self.engines.finish()),
Arc::new(self.version.finish()),
Arc::new(self.row_format.finish()),
Arc::new(self.table_rows.finish()),
Arc::new(self.data_free.finish()),
Arc::new(self.auto_increment.finish()),
Arc::new(self.create_time.finish()),
Arc::new(self.update_time.finish()),
Arc::new(self.check_time.finish()),
Arc::new(self.table_collation.finish()),
Arc::new(self.checksum.finish()),
Arc::new(self.create_options.finish()),
Arc::new(self.table_comment.finish()),
Arc::new(self.temporary.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl DfTableSourceProvider {
disallow_cross_catalog_query,
resolved_tables: HashMap::new(),
default_catalog: query_ctx.current_catalog().to_owned(),
default_schema: query_ctx.current_schema().to_owned(),
default_schema: query_ctx.current_schema(),
plan_decoder,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/src/system/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Function for DatabaseFunction {
fn eval(&self, func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let db = func_ctx.query_ctx.current_schema();

Ok(Arc::new(StringVector::from_slice(&[db])) as _)
Ok(Arc::new(StringVector::from_slice(&[&db])) as _)
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,8 @@ pub fn check_permission(
Statement::CreateDatabase(_)
| Statement::ShowDatabases(_)
| Statement::DropDatabase(_)
| Statement::DropFlow(_) => {}
| Statement::DropFlow(_)
| Statement::Use(_) => {}
Statement::ShowCreateTable(stmt) => {
validate_param(&stmt.table_name, query_ctx)?;
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl Instance {
let table_name = prom_store::table_name(query)?;

let output = self
.handle_remote_query(&ctx, catalog_name, schema_name, &table_name, query)
.handle_remote_query(&ctx, catalog_name, &schema_name, &table_name, query)
.await
.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu)?;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ mod python {
.script_manager
.insert_and_compile(
query_ctx.current_catalog(),
query_ctx.current_schema(),
&query_ctx.current_schema(),
name,
script,
)
Expand Down Expand Up @@ -266,7 +266,7 @@ mod python {
self.script_manager
.execute(
query_ctx.current_catalog(),
query_ctx.current_schema(),
&query_ctx.current_schema(),
name,
params,
)
Expand Down
2 changes: 1 addition & 1 deletion src/operator/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl Deleter {
for req in &mut requests.deletes {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table = self.get_table(catalog, schema, &req.table_name).await?;
let table = self.get_table(catalog, &schema, &req.table_name).await?;
let key_column_names = self.key_column_names(&table)?;

let rows = req.rows.as_mut().unwrap();
Expand Down
10 changes: 6 additions & 4 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,9 @@ pub fn to_create_flow_task_expr(
.to_string();
let schema = sink_table_ref
.schema()
.unwrap_or(query_ctx.current_schema())
.to_string();
.map(|s| s.to_owned())
.unwrap_or(query_ctx.current_schema());

let sink_table_name = TableName {
catalog_name: catalog,
schema_name: schema,
Expand All @@ -571,8 +572,9 @@ pub fn to_create_flow_task_expr(
.to_string();
let schema = reference
.schema()
.unwrap_or(query_ctx.current_schema())
.to_string();
.map(|s| s.to_string())
.unwrap_or(query_ctx.current_schema());

let table_name = TableName {
catalog_name: catalog,
schema_name: schema,
Expand Down
18 changes: 9 additions & 9 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl Inserter {
for req in &requests.inserts {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table = self.get_table(catalog, schema, &req.table_name).await?;
let table = self.get_table(catalog, &schema, &req.table_name).await?;
match table {
Some(table) => {
let table_info = table.table_info();
Expand Down Expand Up @@ -525,14 +525,14 @@ impl Inserter {

// check if exist
if self
.get_table(catalog_name, schema_name, &physical_table)
.get_table(catalog_name, &schema_name, &physical_table)
.await?
.is_some()
{
return Ok(());
}

let table_reference = TableReference::full(catalog_name, schema_name, &physical_table);
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
info!("Physical metric table `{table_reference}` does not exist, try creating table");

// schema with timestamp and field column
Expand Down Expand Up @@ -621,8 +621,8 @@ impl Inserter {
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);
let schema = ctx.current_schema();
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);

let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;
Expand Down Expand Up @@ -652,8 +652,8 @@ impl Inserter {
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);
let schema = ctx.current_schema();
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
// SAFETY: `req.rows` is guaranteed to be `Some` by `handle_log_inserts`.
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;
Expand Down Expand Up @@ -692,7 +692,7 @@ impl Inserter {
let create_table_exprs = create_tables
.iter()
.map(|req| {
let table_ref = TableReference::full(catalog_name, schema_name, &req.table_name);
let table_ref = TableReference::full(catalog_name, &schema_name, &req.table_name);
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let mut create_table_expr = build_create_table_expr(&table_ref, request_schema)?;

Expand All @@ -707,7 +707,7 @@ impl Inserter {
.collect::<Result<Vec<_>>>()?;

let res = statement_executor
.create_logical_tables(catalog_name, schema_name, &create_table_exprs, ctx.clone())
.create_logical_tables(catalog_name, &schema_name, &create_table_exprs, ctx.clone())
.await;

match res {
Expand Down
4 changes: 2 additions & 2 deletions src/operator/src/req_convert/delete/row_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ impl<'a> RowToRegion<'a> {
let catalog_name = self.ctx.current_catalog();
let schema_name = self.ctx.current_schema();
self.catalog_manager
.table(catalog_name, schema_name, table_name)
.table(catalog_name, &schema_name, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(
catalog_name,
schema_name,
&schema_name,
table_name,
),
})
Expand Down
Loading

0 comments on commit 11cf9c8

Please sign in to comment.