Skip to content

Commit

Permalink
fix(query): avoid SHOW TABLES failure (databendlabs#13982)
Browse files Browse the repository at this point in the history
fix corrupted tables listing
  • Loading branch information
flaneur2020 authored Dec 11, 2023
1 parent 35fc8c2 commit de30450
Showing 1 changed file with 97 additions and 62 deletions.
159 changes: 97 additions & 62 deletions src/query/storages/system/src/tables_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use common_functions::BUILTIN_FUNCTIONS;
use common_meta_app::schema::TableIdent;
use common_meta_app::schema::TableInfo;
use common_meta_app::schema::TableMeta;
use common_users::GrantObjectVisibilityChecker;

use crate::table::AsyncOneBlockSystemTable;
use crate::table::AsyncSystemTable;
Expand Down Expand Up @@ -101,20 +102,85 @@ where TablesTable<T>: HistoryAware
) -> Result<DataBlock> {
let tenant = ctx.get_tenant();
let catalog_mgr = CatalogManager::instance();
let ctls: Vec<(String, Arc<dyn Catalog>)> = catalog_mgr
.list_catalogs(&tenant)
.await?
.iter()
.map(|e| (e.name(), e.clone()))
.collect();
let catalogs = catalog_mgr.list_catalogs(&tenant).await?;
let visibility_checker = ctx.get_visibility_checker().await?;

Ok(self
.get_full_data_from_catalogs(ctx, push_downs, catalogs, visibility_checker)
.await)
}
}

impl<const T: bool> TablesTable<T>
where TablesTable<T>: HistoryAware
{
pub fn schema() -> TableSchemaRef {
TableSchemaRefExt::create(vec![
TableField::new("catalog", TableDataType::String),
TableField::new("database", TableDataType::String),
TableField::new("name", TableDataType::String),
TableField::new("table_id", TableDataType::Number(NumberDataType::UInt64)),
TableField::new("engine", TableDataType::String),
TableField::new("engine_full", TableDataType::String),
TableField::new("cluster_by", TableDataType::String),
TableField::new("is_transient", TableDataType::String),
TableField::new("created_on", TableDataType::Timestamp),
TableField::new(
"dropped_on",
TableDataType::Nullable(Box::new(TableDataType::Timestamp)),
),
TableField::new("updated_on", TableDataType::Timestamp),
TableField::new(
"num_rows",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
TableField::new(
"data_size",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
TableField::new(
"data_compressed_size",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
TableField::new(
"index_size",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
TableField::new(
"number_of_segments",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
TableField::new(
"number_of_blocks",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
TableField::new(
"owner",
TableDataType::Nullable(Box::new(TableDataType::String)),
),
])
}

/// dump all the tables from all the catalogs with pushdown, this is used for `SHOW TABLES` command.
/// please note that this function is intended to not wrapped with Result<>, because we do not want to
/// break ALL the output on reading ANY of the catalog, database or table failed.
#[async_backtrace::framed]
async fn get_full_data_from_catalogs(
&self,
ctx: Arc<dyn TableContext>,
push_downs: Option<PushDownInfo>,
catalogs: Vec<Arc<dyn Catalog>>,
visibility_checker: GrantObjectVisibilityChecker,
) -> DataBlock {
let tenant = ctx.get_tenant();
let ctls: Vec<(String, Arc<dyn Catalog>)> =
catalogs.iter().map(|e| (e.name(), e.clone())).collect();

let mut catalogs = vec![];
let mut databases = vec![];

let mut database_tables = vec![];

let visibility_checker = ctx.get_visibility_checker().await?;

for (ctl_name, ctl) in ctls.into_iter() {
let mut dbs = Vec::new();
if let Some(push_downs) = &push_downs {
Expand Down Expand Up @@ -142,7 +208,17 @@ where TablesTable<T>: HistoryAware
}

if dbs.is_empty() {
dbs = ctl.list_databases(tenant.as_str()).await?;
dbs = match ctl.list_databases(tenant.as_str()).await {
Ok(dbs) => dbs,
Err(err) => {
ctx.push_warning(format!(
"list databases failed on catalog {}: {}",
ctl.name(),
err
));
vec![]
}
}
}
let ctl_name: &str = Box::leak(ctl_name.into_boxed_str());

Expand Down Expand Up @@ -202,7 +278,17 @@ where TablesTable<T>: HistoryAware
.as_ref()
.map(|v| v.owner_role_name.as_bytes().to_vec()),
);
let stats = tbl.table_statistics().await?;
let stats = match tbl.table_statistics().await {
Ok(stats) => stats,
Err(err) => {
ctx.push_warning(format!(
"get table statistics failed on table {}: {}",
tbl.name(),
err
));
None
}
};
num_rows.push(stats.as_ref().and_then(|v| v.num_rows));
number_of_blocks.push(stats.as_ref().and_then(|v| v.number_of_blocks));
number_of_segments.push(stats.as_ref().and_then(|v| v.number_of_segments));
Expand Down Expand Up @@ -263,7 +349,7 @@ where TablesTable<T>: HistoryAware
}
})
.collect();
Ok(DataBlock::new_from_columns(vec![
DataBlock::new_from_columns(vec![
StringType::from_data(catalogs),
StringType::from_data(databases),
StringType::from_data(names),
Expand All @@ -282,57 +368,6 @@ where TablesTable<T>: HistoryAware
UInt64Type::from_opt_data(number_of_segments),
UInt64Type::from_opt_data(number_of_blocks),
StringType::from_opt_data(owner),
]))
}
}

impl<const T: bool> TablesTable<T>
where TablesTable<T>: HistoryAware
{
pub fn schema() -> TableSchemaRef {
TableSchemaRefExt::create(vec![
TableField::new("catalog", TableDataType::String),
TableField::new("database", TableDataType::String),
TableField::new("name", TableDataType::String),
TableField::new("table_id", TableDataType::Number(NumberDataType::UInt64)),
TableField::new("engine", TableDataType::String),
TableField::new("engine_full", TableDataType::String),
TableField::new("cluster_by", TableDataType::String),
TableField::new("is_transient", TableDataType::String),
TableField::new("created_on", TableDataType::Timestamp),
TableField::new(
"dropped_on",
TableDataType::Nullable(Box::new(TableDataType::Timestamp)),
),
TableField::new("updated_on", TableDataType::Timestamp),
TableField::new(
"num_rows",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
TableField::new(
"data_size",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
TableField::new(
"data_compressed_size",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
TableField::new(
"index_size",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
TableField::new(
"number_of_segments",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
TableField::new(
"number_of_blocks",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
TableField::new(
"owner",
TableDataType::Nullable(Box::new(TableDataType::String)),
),
])
}

Expand Down

0 comments on commit de30450

Please sign in to comment.