From 86a0769d88b41e700c9ddb13d5d841c2f6d2c076 Mon Sep 17 00:00:00 2001 From: "yukkit.zhang" Date: Mon, 13 Nov 2023 11:45:15 +0800 Subject: [PATCH] refactor: show queries directly query system tables --- Cargo.lock | 18 +-- query_server/query/src/execution/sys/mod.rs | 6 - .../query/src/execution/sys/show_queries.rs | 119 ------------------ .../factory/queries.rs | 2 +- .../information_schema_provider/mod.rs | 1 + query_server/query/src/metadata/mod.rs | 5 +- query_server/query/src/sql/planner.rs | 38 ++++-- query_server/spi/src/query/logical_planner.rs | 12 +- 8 files changed, 43 insertions(+), 158 deletions(-) delete mode 100644 query_server/query/src/execution/sys/show_queries.rs diff --git a/Cargo.lock b/Cargo.lock index c7cef8c7e..e1ff47417 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1595,7 +1595,7 @@ dependencies = [ [[package]] name = "datafusion" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#75f8567e01921565ac81770602a1a06fee276ccf" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#7a3a902f0dcba7dc2b8c77a37cdb47998bf7826f" dependencies = [ "ahash", "arrow", @@ -1643,7 +1643,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#75f8567e01921565ac81770602a1a06fee276ccf" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#7a3a902f0dcba7dc2b8c77a37cdb47998bf7826f" dependencies = [ "arrow", "arrow-array", @@ -1657,7 +1657,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#75f8567e01921565ac81770602a1a06fee276ccf" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#7a3a902f0dcba7dc2b8c77a37cdb47998bf7826f" dependencies = [ "dashmap", "datafusion-common", @@ -1674,7 +1674,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#75f8567e01921565ac81770602a1a06fee276ccf" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#7a3a902f0dcba7dc2b8c77a37cdb47998bf7826f" dependencies = [ "ahash", "arrow", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#75f8567e01921565ac81770602a1a06fee276ccf" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#7a3a902f0dcba7dc2b8c77a37cdb47998bf7826f" dependencies = [ "arrow", "async-trait", @@ -1705,7 +1705,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#75f8567e01921565ac81770602a1a06fee276ccf" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#7a3a902f0dcba7dc2b8c77a37cdb47998bf7826f" dependencies = [ "ahash", "arrow", @@ -1737,7 +1737,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#75f8567e01921565ac81770602a1a06fee276ccf" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#7a3a902f0dcba7dc2b8c77a37cdb47998bf7826f" dependencies = [ "arrow", "chrono", @@ -1751,7 +1751,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#75f8567e01921565ac81770602a1a06fee276ccf" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#7a3a902f0dcba7dc2b8c77a37cdb47998bf7826f" dependencies = [ "arrow", "datafusion-common", @@ -1762,7 +1762,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "27.0.0" -source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#75f8567e01921565ac81770602a1a06fee276ccf" +source = "git+https://github.com/cnosdb/arrow-datafusion.git?branch=27.0.0#7a3a902f0dcba7dc2b8c77a37cdb47998bf7826f" dependencies = [ "arrow", "arrow-schema", diff --git a/query_server/query/src/execution/sys/mod.rs b/query_server/query/src/execution/sys/mod.rs index 21f3c22db..a39dc9936 100644 --- a/query_server/query/src/execution/sys/mod.rs +++ b/query_server/query/src/execution/sys/mod.rs @@ -1,5 +1,4 @@ mod kill_query; -mod show_queries; use std::sync::Arc; @@ -10,7 +9,6 @@ use spi::query::logical_planner::SYSPlan; use spi::Result; use self::kill_query::KillQueryTask; -use self::show_queries::ShowQueriesTask; use crate::dispatcher::query_tracker::QueryTracker; pub struct SystemExecution { @@ -90,10 +88,6 @@ struct SystemTaskFactory { impl SystemTaskFactory { fn create_task(&self) -> Box { match &self.plan { - SYSPlan::ShowQueries => Box::new(ShowQueriesTask::new( - self.query_tracker.clone(), - self.plan.schema(), - )), SYSPlan::KillQuery(query_id) => { Box::new(KillQueryTask::new(self.query_tracker.clone(), *query_id)) } diff --git a/query_server/query/src/execution/sys/show_queries.rs b/query_server/query/src/execution/sys/show_queries.rs deleted file mode 100644 index 5a8acba79..000000000 --- a/query_server/query/src/execution/sys/show_queries.rs +++ /dev/null @@ -1,119 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use async_trait::async_trait; -use datafusion::arrow::array::{StringBuilder, UInt64Builder}; -use datafusion::arrow::datatypes::SchemaRef; -use datafusion::arrow::error::ArrowError; -use datafusion::arrow::record_batch::RecordBatch; -use spi::query::execution::{Output, QueryState, QueryStateMachineRef}; -use spi::query::recordbatch::RecordBatchStreamWrapper; -use spi::service::protocol::QueryId; -use spi::Result; - -use super::SystemTask; -use crate::dispatcher::query_tracker::QueryTracker; - -pub struct ShowQueriesTask { - schema: SchemaRef, - query_tracker: Arc, -} - -impl ShowQueriesTask { - pub fn new(query_tracker: Arc, schema: SchemaRef) -> Self { - Self { - schema, - query_tracker, - } - } -} - -#[async_trait] -impl SystemTask for ShowQueriesTask { - async fn execute(&self, _query_state_machine: QueryStateMachineRef) -> Result { - let mut result_builder = ShowQueriesResultBuilder::new(self.schema.clone()); - - self.query_tracker.running_queries().iter().for_each(|e| { - let info = e.info(); - let status = e.status(); - result_builder.add_column( - info.query_id(), - info.user_name(), - info.query(), - status.query_state(), - status.duration(), - ) - }); - Ok(Output::StreamData(Box::pin(RecordBatchStreamWrapper::new( - result_builder.schema(), - result_builder.build()?, - )))) - } -} - -struct ShowQueriesResultBuilder { - schema: SchemaRef, - - query_ids: StringBuilder, - users: StringBuilder, - queries: StringBuilder, - states: StringBuilder, - durations: UInt64Builder, -} - -impl ShowQueriesResultBuilder { - fn new(schema: SchemaRef) -> Self { - Self { - schema, - query_ids: StringBuilder::new(), - users: StringBuilder::new(), - queries: StringBuilder::new(), - states: StringBuilder::new(), - durations: UInt64Builder::new(), - } - } - - #[allow(clippy::too_many_arguments)] - fn add_column( - &mut self, - query_id: QueryId, - user: impl AsRef, - query: impl AsRef, - state: &QueryState, - duration: &Duration, - ) { - self.query_ids.append_value(query_id.to_string()); - self.users.append_value(user.as_ref()); - self.queries.append_value(query.as_ref()); - self.states.append_value(state.as_ref()); - self.durations.append_value(duration.as_millis() as u64); - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn build(self) -> std::result::Result, ArrowError> { - let ShowQueriesResultBuilder { - schema, - mut query_ids, - mut users, - mut queries, - mut states, - mut durations, - } = self; - - let batch = RecordBatch::try_new( - schema, - vec![ - Arc::new(query_ids.finish()), - Arc::new(users.finish()), - Arc::new(queries.finish()), - Arc::new(states.finish()), - Arc::new(durations.finish()), - ], - )?; - - Ok(vec![batch]) - } -} diff --git a/query_server/query/src/metadata/information_schema_provider/factory/queries.rs b/query_server/query/src/metadata/information_schema_provider/factory/queries.rs index 72d7da268..66d39c1b8 100644 --- a/query_server/query/src/metadata/information_schema_provider/factory/queries.rs +++ b/query_server/query/src/metadata/information_schema_provider/factory/queries.rs @@ -21,7 +21,7 @@ use crate::metadata::information_schema_provider::builder::queries::{ }; use crate::metadata::information_schema_provider::InformationSchemaTableFactory; -const INFORMATION_SCHEMA_QUERIES: &str = "QUERIES"; +pub const INFORMATION_SCHEMA_QUERIES: &str = "QUERIES"; /// This view shows real-time snapshots of SQL statements for real-time monitoring of SQL jobs /// diff --git a/query_server/query/src/metadata/information_schema_provider/mod.rs b/query_server/query/src/metadata/information_schema_provider/mod.rs index 59705394f..26e87ed40 100644 --- a/query_server/query/src/metadata/information_schema_provider/mod.rs +++ b/query_server/query/src/metadata/information_schema_provider/mod.rs @@ -17,6 +17,7 @@ pub use builder::tables::{ use datafusion::datasource::TableProvider; pub use factory::columns::INFORMATION_SCHEMA_COLUMNS; pub use factory::databases::INFORMATION_SCHEMA_DATABASES; +pub use factory::queries::INFORMATION_SCHEMA_QUERIES; pub use factory::tables::INFORMATION_SCHEMA_TABLES; use meta::error::MetaError; use meta::model::MetaClientRef; diff --git a/query_server/query/src/metadata/mod.rs b/query_server/query/src/metadata/mod.rs index bfeddd064..39a60567e 100644 --- a/query_server/query/src/metadata/mod.rs +++ b/query_server/query/src/metadata/mod.rs @@ -18,8 +18,9 @@ pub use information_schema_provider::{ COLUMNS_DATA_TYPE, COLUMNS_TABLE_NAME, DATABASES_DATABASE_NAME, DATABASES_PRECISION, DATABASES_REPLICA, DATABASES_SHARD, DATABASES_TENANT_NAME, DATABASES_TTL, DATABASES_VNODE_DURATION, INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_DATABASES, - INFORMATION_SCHEMA_TABLES, TABLES_TABLE_DATABASE, TABLES_TABLE_ENGINE, TABLES_TABLE_NAME, - TABLES_TABLE_OPTIONS, TABLES_TABLE_TENANT, TABLES_TABLE_TYPE, + INFORMATION_SCHEMA_QUERIES, INFORMATION_SCHEMA_TABLES, TABLES_TABLE_DATABASE, + TABLES_TABLE_ENGINE, TABLES_TABLE_NAME, TABLES_TABLE_OPTIONS, TABLES_TABLE_TENANT, + TABLES_TABLE_TYPE, }; use meta::error::MetaError; use meta::model::MetaClientRef; diff --git a/query_server/query/src/sql/planner.rs b/query_server/query/src/sql/planner.rs index 039f8dadd..0c0efe02e 100644 --- a/query_server/query/src/sql/planner.rs +++ b/query_server/query/src/sql/planner.rs @@ -104,8 +104,8 @@ use crate::metadata::{ COLUMNS_COLUMN_TYPE, COLUMNS_COMPRESSION_CODEC, COLUMNS_DATABASE_NAME, COLUMNS_DATA_TYPE, COLUMNS_TABLE_NAME, DATABASES_DATABASE_NAME, DATABASES_PRECISION, DATABASES_REPLICA, DATABASES_SHARD, DATABASES_TTL, DATABASES_VNODE_DURATION, INFORMATION_SCHEMA, - INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_DATABASES, INFORMATION_SCHEMA_TABLES, - TABLES_TABLE_DATABASE, TABLES_TABLE_NAME, + INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_DATABASES, INFORMATION_SCHEMA_QUERIES, + INFORMATION_SCHEMA_TABLES, TABLES_TABLE_DATABASE, TABLES_TABLE_NAME, }; /// CnosDB SQL query planner @@ -185,14 +185,7 @@ impl<'a, S: ContextProviderExtension + Send + Sync + 'a> SqlPlanner<'a, S> { ExtStatement::AlterUser(stmt) => self.alter_user_to_plan(stmt).await, ExtStatement::GrantRevoke(stmt) => self.grant_revoke_to_plan(stmt, session), // system statement - ExtStatement::ShowQueries => { - let plan = Plan::SYSTEM(SYSPlan::ShowQueries); - // TODO privileges - Ok(PlanWithPrivileges { - plan, - privileges: vec![], - }) - } + ExtStatement::ShowQueries => self.show_queries_to_plan(session), ExtStatement::Copy(stmt) => self.copy_to_plan(stmt, session).await, // vnode statement ExtStatement::DropVnode(stmt) => self.drop_vnode_to_plan(stmt), @@ -1876,6 +1869,31 @@ impl<'a, S: ContextProviderExtension + Send + Sync + 'a> SqlPlanner<'a, S> { Ok(PlanWithPrivileges { plan, privileges }) } + fn show_queries_to_plan(&self, session: &SessionCtx) -> Result { + // QUERY_SCHEMA: query_id, query_type, query_text, user_name, tenant_name, state, duration + let projections = vec![0, 1, 2, 4, 6, 7, 8]; + + let table_ref = TableReference::partial(INFORMATION_SCHEMA, INFORMATION_SCHEMA_QUERIES); + + let table_source = self.get_table_source(table_ref.clone())?; + + let df_plan = + LogicalPlanBuilder::scan(table_ref, table_source, Some(projections))?.build()?; + + let plan = Plan::Query(QueryPlan { df_plan }); + + // privileges + let tenant_id = *session.tenant_id(); + let privilege = Privilege::TenantObject( + TenantObjectPrivilege::Database(DatabasePrivilege::Read, None), + Some(tenant_id), + ); + Ok(PlanWithPrivileges { + plan, + privileges: vec![privilege], + }) + } + fn drop_vnode_to_plan(&self, stmt: ASTDropVnode) -> Result { let ASTDropVnode { vnode_id } = stmt; diff --git a/query_server/spi/src/query/logical_planner.rs b/query_server/spi/src/query/logical_planner.rs index 47c9451ea..514103dd3 100644 --- a/query_server/spi/src/query/logical_planner.rs +++ b/query_server/spi/src/query/logical_planner.rs @@ -214,22 +214,12 @@ pub struct DeleteFromTable { #[derive(Debug, Clone)] pub enum SYSPlan { - ShowQueries, KillQuery(QueryId), } impl SYSPlan { pub fn schema(&self) -> SchemaRef { - match self { - SYSPlan::ShowQueries => Arc::new(Schema::new(vec![ - Field::new("query_id", DataType::Utf8, false), - Field::new("user", DataType::Utf8, false), - Field::new("query", DataType::Utf8, false), - Field::new("state", DataType::Utf8, false), - Field::new("duration", DataType::UInt64, false), - ])), - _ => Arc::new(Schema::empty()), - } + Arc::new(Schema::empty()) } }