Skip to content

Commit

Permalink
refactor: show queries directly query system tables
Browse files Browse the repository at this point in the history
  • Loading branch information
yukkit committed Nov 13, 2023
1 parent 5351561 commit 86a0769
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 158 deletions.
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions query_server/query/src/execution/sys/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
mod kill_query;
mod show_queries;

use std::sync::Arc;

Expand All @@ -10,7 +9,6 @@ use spi::query::logical_planner::SYSPlan;
use spi::Result;

use self::kill_query::KillQueryTask;
use self::show_queries::ShowQueriesTask;
use crate::dispatcher::query_tracker::QueryTracker;

pub struct SystemExecution {
Expand Down Expand Up @@ -90,10 +88,6 @@ struct SystemTaskFactory {
impl SystemTaskFactory {
fn create_task(&self) -> Box<dyn SystemTask> {
match &self.plan {
SYSPlan::ShowQueries => Box::new(ShowQueriesTask::new(
self.query_tracker.clone(),
self.plan.schema(),
)),
SYSPlan::KillQuery(query_id) => {
Box::new(KillQueryTask::new(self.query_tracker.clone(), *query_id))
}
Expand Down
119 changes: 0 additions & 119 deletions query_server/query/src/execution/sys/show_queries.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::metadata::information_schema_provider::builder::queries::{
};
use crate::metadata::information_schema_provider::InformationSchemaTableFactory;

const INFORMATION_SCHEMA_QUERIES: &str = "QUERIES";
pub const INFORMATION_SCHEMA_QUERIES: &str = "QUERIES";

/// This view shows real-time snapshots of SQL statements for real-time monitoring of SQL jobs
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub use builder::tables::{
use datafusion::datasource::TableProvider;
pub use factory::columns::INFORMATION_SCHEMA_COLUMNS;
pub use factory::databases::INFORMATION_SCHEMA_DATABASES;
pub use factory::queries::INFORMATION_SCHEMA_QUERIES;
pub use factory::tables::INFORMATION_SCHEMA_TABLES;
use meta::error::MetaError;
use meta::model::MetaClientRef;
Expand Down
5 changes: 3 additions & 2 deletions query_server/query/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ pub use information_schema_provider::{
COLUMNS_DATA_TYPE, COLUMNS_TABLE_NAME, DATABASES_DATABASE_NAME, DATABASES_PRECISION,
DATABASES_REPLICA, DATABASES_SHARD, DATABASES_TENANT_NAME, DATABASES_TTL,
DATABASES_VNODE_DURATION, INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_DATABASES,
INFORMATION_SCHEMA_TABLES, TABLES_TABLE_DATABASE, TABLES_TABLE_ENGINE, TABLES_TABLE_NAME,
TABLES_TABLE_OPTIONS, TABLES_TABLE_TENANT, TABLES_TABLE_TYPE,
INFORMATION_SCHEMA_QUERIES, INFORMATION_SCHEMA_TABLES, TABLES_TABLE_DATABASE,
TABLES_TABLE_ENGINE, TABLES_TABLE_NAME, TABLES_TABLE_OPTIONS, TABLES_TABLE_TENANT,
TABLES_TABLE_TYPE,
};
use meta::error::MetaError;
use meta::model::MetaClientRef;
Expand Down
38 changes: 28 additions & 10 deletions query_server/query/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ use crate::metadata::{
COLUMNS_COLUMN_TYPE, COLUMNS_COMPRESSION_CODEC, COLUMNS_DATABASE_NAME, COLUMNS_DATA_TYPE,
COLUMNS_TABLE_NAME, DATABASES_DATABASE_NAME, DATABASES_PRECISION, DATABASES_REPLICA,
DATABASES_SHARD, DATABASES_TTL, DATABASES_VNODE_DURATION, INFORMATION_SCHEMA,
INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_DATABASES, INFORMATION_SCHEMA_TABLES,
TABLES_TABLE_DATABASE, TABLES_TABLE_NAME,
INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_DATABASES, INFORMATION_SCHEMA_QUERIES,
INFORMATION_SCHEMA_TABLES, TABLES_TABLE_DATABASE, TABLES_TABLE_NAME,
};

/// CnosDB SQL query planner
Expand Down Expand Up @@ -185,14 +185,7 @@ impl<'a, S: ContextProviderExtension + Send + Sync + 'a> SqlPlanner<'a, S> {
ExtStatement::AlterUser(stmt) => self.alter_user_to_plan(stmt).await,
ExtStatement::GrantRevoke(stmt) => self.grant_revoke_to_plan(stmt, session),
// system statement
ExtStatement::ShowQueries => {
let plan = Plan::SYSTEM(SYSPlan::ShowQueries);
// TODO privileges
Ok(PlanWithPrivileges {
plan,
privileges: vec![],
})
}
ExtStatement::ShowQueries => self.show_queries_to_plan(session),
ExtStatement::Copy(stmt) => self.copy_to_plan(stmt, session).await,
// vnode statement
ExtStatement::DropVnode(stmt) => self.drop_vnode_to_plan(stmt),
Expand Down Expand Up @@ -1876,6 +1869,31 @@ impl<'a, S: ContextProviderExtension + Send + Sync + 'a> SqlPlanner<'a, S> {
Ok(PlanWithPrivileges { plan, privileges })
}

fn show_queries_to_plan(&self, session: &SessionCtx) -> Result<PlanWithPrivileges> {
// QUERY_SCHEMA: query_id, query_type, query_text, user_name, tenant_name, state, duration
let projections = vec![0, 1, 2, 4, 6, 7, 8];

let table_ref = TableReference::partial(INFORMATION_SCHEMA, INFORMATION_SCHEMA_QUERIES);

let table_source = self.get_table_source(table_ref.clone())?;

let df_plan =
LogicalPlanBuilder::scan(table_ref, table_source, Some(projections))?.build()?;

let plan = Plan::Query(QueryPlan { df_plan });

// privileges
let tenant_id = *session.tenant_id();
let privilege = Privilege::TenantObject(
TenantObjectPrivilege::Database(DatabasePrivilege::Read, None),
Some(tenant_id),
);
Ok(PlanWithPrivileges {
plan,
privileges: vec![privilege],
})
}

fn drop_vnode_to_plan(&self, stmt: ASTDropVnode) -> Result<PlanWithPrivileges> {
let ASTDropVnode { vnode_id } = stmt;

Expand Down
12 changes: 1 addition & 11 deletions query_server/spi/src/query/logical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,22 +214,12 @@ pub struct DeleteFromTable {

#[derive(Debug, Clone)]
pub enum SYSPlan {
ShowQueries,
KillQuery(QueryId),
}

impl SYSPlan {
pub fn schema(&self) -> SchemaRef {
match self {
SYSPlan::ShowQueries => Arc::new(Schema::new(vec![
Field::new("query_id", DataType::Utf8, false),
Field::new("user", DataType::Utf8, false),
Field::new("query", DataType::Utf8, false),
Field::new("state", DataType::Utf8, false),
Field::new("duration", DataType::UInt64, false),
])),
_ => Arc::new(Schema::empty()),
}
Arc::new(Schema::empty())
}
}

Expand Down

0 comments on commit 86a0769

Please sign in to comment.