Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dbeaver mysql compatibility, use statement and information_schema.tables #4218

Merged
merged 12 commits into from
Jul 3, 2024
146 changes: 133 additions & 13 deletions src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatc
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder};
use datatypes::vectors::{
DateTimeVectorBuilder, StringVectorBuilder, UInt32VectorBuilder, UInt64VectorBuilder,
};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
use table::metadata::{TableInfo, TableType};

use super::TABLES;
use crate::error::{
Expand All @@ -43,6 +45,24 @@ pub const TABLE_CATALOG: &str = "table_catalog";
pub const TABLE_SCHEMA: &str = "table_schema";
pub const TABLE_NAME: &str = "table_name";
pub const TABLE_TYPE: &str = "table_type";
pub const VERSION: &str = "version";
pub const ROW_FORMAT: &str = "row_format";
pub const TABLE_ROWS: &str = "table_rows";
pub const DATA_LENGTH: &str = "data_length";
pub const INDEX_LENGTH: &str = "index_length";
pub const MAX_DATA_LENGTH: &str = "max_data_length";
pub const AVG_ROW_LENGTH: &str = "avg_row_length";
pub const DATA_FREE: &str = "data_free";
pub const AUTO_INCREMENT: &str = "auto_increment";
pub const CREATE_TIME: &str = "create_time";
pub const UPDATE_TIME: &str = "update_time";
pub const CHECK_TIME: &str = "check_time";
pub const TABLE_COLLATION: &str = "table_collation";
pub const CHECKSUM: &str = "checksum";
pub const CREATE_OPTIONS: &str = "create_options";
pub const TABLE_COMMENT: &str = "table_comment";
pub const MAX_INDEX_LENGTH: &str = "max_index_length";
pub const TEMPORARY: &str = "temporary";
const TABLE_ID: &str = "table_id";
const ENGINE: &str = "engine";
const INIT_CAPACITY: usize = 42;
Expand All @@ -69,7 +89,25 @@ impl InformationSchemaTables {
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_ID, ConcreteDataType::uint32_datatype(), true),
ColumnSchema::new(DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(MAX_DATA_LENGTH, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(MAX_INDEX_LENGTH, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(AVG_ROW_LENGTH, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(ENGINE, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(VERSION, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(ROW_FORMAT, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(TABLE_ROWS, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(DATA_FREE, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(AUTO_INCREMENT, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(CREATE_TIME, ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new(UPDATE_TIME, ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new(CHECK_TIME, ConcreteDataType::datetime_datatype(), true),
ColumnSchema::new(TABLE_COLLATION, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(CHECKSUM, ConcreteDataType::uint64_datatype(), true),
ColumnSchema::new(CREATE_OPTIONS, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(TABLE_COMMENT, ConcreteDataType::string_datatype(), true),
ColumnSchema::new(TEMPORARY, ConcreteDataType::string_datatype(), true),
]))
}

Expand Down Expand Up @@ -131,7 +169,25 @@ struct InformationSchemaTablesBuilder {
table_names: StringVectorBuilder,
table_types: StringVectorBuilder,
table_ids: UInt32VectorBuilder,
version: UInt64VectorBuilder,
row_format: StringVectorBuilder,
table_rows: UInt64VectorBuilder,
data_length: UInt64VectorBuilder,
max_data_length: UInt64VectorBuilder,
index_length: UInt64VectorBuilder,
avg_row_length: UInt64VectorBuilder,
max_index_length: UInt64VectorBuilder,
data_free: UInt64VectorBuilder,
auto_increment: UInt64VectorBuilder,
create_time: DateTimeVectorBuilder,
update_time: DateTimeVectorBuilder,
check_time: DateTimeVectorBuilder,
table_collation: StringVectorBuilder,
checksum: UInt64VectorBuilder,
create_options: StringVectorBuilder,
table_comment: StringVectorBuilder,
engines: StringVectorBuilder,
temporary: StringVectorBuilder,
}

impl InformationSchemaTablesBuilder {
Expand All @@ -149,7 +205,25 @@ impl InformationSchemaTablesBuilder {
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
max_data_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
avg_row_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
engines: StringVectorBuilder::with_capacity(INIT_CAPACITY),
version: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
row_format: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_rows: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
max_index_length: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
data_free: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
auto_increment: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
create_time: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
update_time: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
check_time: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY),
table_collation: StringVectorBuilder::with_capacity(INIT_CAPACITY),
checksum: UInt64VectorBuilder::with_capacity(INIT_CAPACITY),
create_options: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_comment: StringVectorBuilder::with_capacity(INIT_CAPACITY),
temporary: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}

Expand All @@ -171,10 +245,8 @@ impl InformationSchemaTablesBuilder {
&predicates,
&catalog_name,
&schema_name,
&table_info.name,
table_info,
table.table_type(),
Some(table_info.ident.table_id),
Some(&table_info.meta.engine),
);
}
}
Expand All @@ -188,12 +260,14 @@ impl InformationSchemaTablesBuilder {
predicates: &Predicates,
catalog_name: &str,
schema_name: &str,
table_name: &str,
table_info: Arc<TableInfo>,
table_type: TableType,
table_id: Option<u32>,
engine: Option<&str>,
) {
let table_type = match table_type {
let table_name = table_info.name.as_ref();
let table_id = table_info.table_id();
let engine = table_info.meta.engine.as_ref();

let table_type_text = match table_type {
TableType::Base => "BASE TABLE",
TableType::View => "VIEW",
TableType::Temporary => "LOCAL TEMPORARY",
Expand All @@ -203,7 +277,7 @@ impl InformationSchemaTablesBuilder {
(TABLE_CATALOG, &Value::from(catalog_name)),
(TABLE_SCHEMA, &Value::from(schema_name)),
(TABLE_NAME, &Value::from(table_name)),
(TABLE_TYPE, &Value::from(table_type)),
(TABLE_TYPE, &Value::from(table_type_text)),
];

if !predicates.eval(&row) {
Expand All @@ -213,9 +287,37 @@ impl InformationSchemaTablesBuilder {
self.catalog_names.push(Some(catalog_name));
self.schema_names.push(Some(schema_name));
self.table_names.push(Some(table_name));
self.table_types.push(Some(table_type));
self.table_ids.push(table_id);
self.engines.push(engine);
self.table_types.push(Some(table_type_text));
self.table_ids.push(Some(table_id));
// TODO(sunng87): use real data for these fields
self.data_length.push(Some(0));
self.max_data_length.push(Some(0));
self.index_length.push(Some(0));
self.avg_row_length.push(Some(0));
self.max_index_length.push(Some(0));
self.checksum.push(Some(0));
self.table_rows.push(Some(0));
self.data_free.push(Some(0));
self.auto_increment.push(Some(0));
self.row_format.push(Some("Fixed"));
self.table_collation.push(None);
self.update_time.push(None);
self.check_time.push(None);

self.version.push(Some(11));
sunng87 marked this conversation as resolved.
Show resolved Hide resolved
self.table_comment.push(table_info.desc.as_deref());
self.create_options
.push(Some(format!("{:?}", table_info.meta.options).as_ref()));
sunng87 marked this conversation as resolved.
Show resolved Hide resolved
self.create_time
.push(Some(table_info.meta.created_on.timestamp_millis().into()));

self.temporary
.push(if matches!(table_type, TableType::Temporary) {
Some("Y")
} else {
Some("N")
});
self.engines.push(Some(engine));
}

fn finish(&mut self) -> Result<RecordBatch> {
Expand All @@ -225,7 +327,25 @@ impl InformationSchemaTablesBuilder {
Arc::new(self.table_names.finish()),
Arc::new(self.table_types.finish()),
Arc::new(self.table_ids.finish()),
Arc::new(self.data_length.finish()),
Arc::new(self.max_data_length.finish()),
Arc::new(self.index_length.finish()),
Arc::new(self.max_index_length.finish()),
Arc::new(self.avg_row_length.finish()),
Arc::new(self.engines.finish()),
Arc::new(self.version.finish()),
Arc::new(self.row_format.finish()),
Arc::new(self.table_rows.finish()),
Arc::new(self.data_free.finish()),
Arc::new(self.auto_increment.finish()),
Arc::new(self.create_time.finish()),
Arc::new(self.update_time.finish()),
Arc::new(self.check_time.finish()),
Arc::new(self.table_collation.finish()),
Arc::new(self.checksum.finish()),
Arc::new(self.create_options.finish()),
Arc::new(self.table_comment.finish()),
Arc::new(self.temporary.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl DfTableSourceProvider {
disallow_cross_catalog_query,
resolved_tables: HashMap::new(),
default_catalog: query_ctx.current_catalog().to_owned(),
default_schema: query_ctx.current_schema().to_owned(),
default_schema: query_ctx.current_schema(),
plan_decoder,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/src/system/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Function for DatabaseFunction {
fn eval(&self, func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
let db = func_ctx.query_ctx.current_schema();

Ok(Arc::new(StringVector::from_slice(&[db])) as _)
Ok(Arc::new(StringVector::from_slice(&[&db])) as _)
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ pub fn check_permission(
Statement::CreateDatabase(_)
| Statement::ShowDatabases(_)
| Statement::DropDatabase(_)
| Statement::DropFlow(_) => {}
| Statement::DropFlow(_)
| Statement::Use(_) => {}
Statement::ShowCreateTable(stmt) => {
validate_param(&stmt.table_name, query_ctx)?;
}
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl Instance {
let table_name = prom_store::table_name(query)?;

let output = self
.handle_remote_query(&ctx, catalog_name, schema_name, &table_name, query)
.handle_remote_query(&ctx, catalog_name, &schema_name, &table_name, query)
.await
.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu)?;
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ mod python {
.script_manager
.insert_and_compile(
query_ctx.current_catalog(),
query_ctx.current_schema(),
&query_ctx.current_schema(),
name,
script,
)
Expand Down Expand Up @@ -266,7 +266,7 @@ mod python {
self.script_manager
.execute(
query_ctx.current_catalog(),
query_ctx.current_schema(),
&query_ctx.current_schema(),
name,
params,
)
Expand Down
2 changes: 1 addition & 1 deletion src/operator/src/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl Deleter {
for req in &mut requests.deletes {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table = self.get_table(catalog, schema, &req.table_name).await?;
let table = self.get_table(catalog, &schema, &req.table_name).await?;
let key_column_names = self.key_column_names(&table)?;

let rows = req.rows.as_mut().unwrap();
Expand Down
10 changes: 6 additions & 4 deletions src/operator/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,9 @@ pub fn to_create_flow_task_expr(
.to_string();
let schema = sink_table_ref
.schema()
.unwrap_or(query_ctx.current_schema())
.to_string();
.map(|s| s.to_owned())
.unwrap_or(query_ctx.current_schema());

let sink_table_name = TableName {
catalog_name: catalog,
schema_name: schema,
Expand All @@ -571,8 +572,9 @@ pub fn to_create_flow_task_expr(
.to_string();
let schema = reference
.schema()
.unwrap_or(query_ctx.current_schema())
.to_string();
.map(|s| s.to_string())
.unwrap_or(query_ctx.current_schema());

let table_name = TableName {
catalog_name: catalog,
schema_name: schema,
Expand Down
18 changes: 9 additions & 9 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ impl Inserter {
for req in &requests.inserts {
let catalog = ctx.current_catalog();
let schema = ctx.current_schema();
let table = self.get_table(catalog, schema, &req.table_name).await?;
let table = self.get_table(catalog, &schema, &req.table_name).await?;
match table {
Some(table) => {
let table_info = table.table_info();
Expand Down Expand Up @@ -525,14 +525,14 @@ impl Inserter {

// check if exist
if self
.get_table(catalog_name, schema_name, &physical_table)
.get_table(catalog_name, &schema_name, &physical_table)
.await?
.is_some()
{
return Ok(());
}

let table_reference = TableReference::full(catalog_name, schema_name, &physical_table);
let table_reference = TableReference::full(catalog_name, &schema_name, &physical_table);
info!("Physical metric table `{table_reference}` does not exist, try creating table");

// schema with timestamp and field column
Expand Down Expand Up @@ -621,8 +621,8 @@ impl Inserter {
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);
let schema = ctx.current_schema();
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);

let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;
Expand Down Expand Up @@ -652,8 +652,8 @@ impl Inserter {
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);
let schema = ctx.current_schema();
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
// SAFETY: `req.rows` is guaranteed to be `Some` by `handle_log_inserts`.
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;
Expand Down Expand Up @@ -692,7 +692,7 @@ impl Inserter {
let create_table_exprs = create_tables
.iter()
.map(|req| {
let table_ref = TableReference::full(catalog_name, schema_name, &req.table_name);
let table_ref = TableReference::full(catalog_name, &schema_name, &req.table_name);
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let mut create_table_expr = build_create_table_expr(&table_ref, request_schema)?;

Expand All @@ -707,7 +707,7 @@ impl Inserter {
.collect::<Result<Vec<_>>>()?;

let res = statement_executor
.create_logical_tables(catalog_name, schema_name, &create_table_exprs, ctx.clone())
.create_logical_tables(catalog_name, &schema_name, &create_table_exprs, ctx.clone())
.await;

match res {
Expand Down
4 changes: 2 additions & 2 deletions src/operator/src/req_convert/delete/row_to_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ impl<'a> RowToRegion<'a> {
let catalog_name = self.ctx.current_catalog();
let schema_name = self.ctx.current_schema();
self.catalog_manager
.table(catalog_name, schema_name, table_name)
.table(catalog_name, &schema_name, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(
catalog_name,
schema_name,
&schema_name,
table_name,
),
})
Expand Down
Loading