Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support pg_namespace, pg_class and related psql command #4428

Merged
merged 17 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/catalog/src/system_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
pub mod information_schema;
mod memory_table;
pub mod pg_catalog;
mod predicate;
mod utils;

use std::collections::HashMap;
use std::sync::Arc;
Expand Down
9 changes: 6 additions & 3 deletions src/catalog/src/system_schema/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ pub mod flows;
mod information_memory_table;
pub mod key_column_usage;
mod partitions;
mod predicate;
mod region_peers;
mod runtime_metrics;
pub mod schemata;
mod table_constraints;
mod table_names;
pub mod tables;
pub(crate) mod utils;
mod views;

use std::collections::HashMap;
Expand All @@ -37,14 +35,19 @@ use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use lazy_static::lazy_static;
use paste::paste;
pub(crate) use predicate::Predicates;
use store_api::storage::{ScanRequest, TableId};
use table::metadata::TableType;
use table::TableRef;
pub use table_names::*;
use views::InformationSchemaViews;

use self::columns::InformationSchemaColumns;
pub(crate) use crate::system_schema::predicate::Predicates;
// TODO(J0HN150N133): re-export to provide compatibility for legacy `information_schema` code.
pub(crate) mod utils {
pub use crate::system_schema::utils::*;
}
J0HN50N133 marked this conversation as resolved.
Show resolved Hide resolved

use super::{SystemSchemaProviderInner, SystemTable, SystemTableRef};
use crate::error::Result;
use crate::system_schema::information_schema::cluster_info::InformationSchemaClusterInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use datatypes::schema::{Schema, SchemaRef};
use datatypes::vectors::{Int64Vector, StringVector, VectorRef};

use super::table_names::*;
use crate::system_schema::memory_table::tables::{
use crate::system_schema::utils::tables::{
bigint_column, datetime_column, string_column, string_columns,
};

Expand Down
1 change: 0 additions & 1 deletion src/catalog/src/system_schema/memory_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

mod table_columns;
pub mod tables;

use std::sync::Arc;

Expand Down
23 changes: 20 additions & 3 deletions src/catalog/src/system_schema/pg_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

mod pg_catalog_memory_table;
mod pg_class;
mod pg_namespace;
mod table_names;

use std::collections::HashMap;
Expand All @@ -23,11 +25,13 @@ use datatypes::schema::ColumnSchema;
use lazy_static::lazy_static;
use paste::paste;
use pg_catalog_memory_table::get_schema_columns;
use pg_class::PGClass;
use pg_namespace::PGNamespace;
use table::TableRef;
pub use table_names::*;

use super::memory_table::tables::u32_column;
use super::memory_table::MemoryTable;
use super::utils::tables::u32_column;
use super::{SystemSchemaProvider, SystemSchemaProviderInner, SystemTableRef};
use crate::CatalogManager;

Expand All @@ -46,7 +50,7 @@ fn oid_column() -> ColumnSchema {
/// [`PGCatalogProvider`] is the provider for a schema named `pg_catalog`, it is not a catalog.
pub struct PGCatalogProvider {
catalog_name: String,
_catalog_manager: Weak<dyn CatalogManager>,
catalog_manager: Weak<dyn CatalogManager>,
tables: HashMap<String, TableRef>,
}

Expand Down Expand Up @@ -79,7 +83,7 @@ impl PGCatalogProvider {
pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
let mut provider = Self {
catalog_name,
_catalog_manager: catalog_manager,
catalog_manager,
tables: HashMap::new(),
};
provider.build_tables();
Expand All @@ -93,6 +97,11 @@ impl PGCatalogProvider {
for name in MEMORY_TABLES.iter() {
tables.insert(name.to_string(), self.build_table(name).expect(name));
}
tables.insert(
PG_NAMESPACE.to_string(),
self.build_table(PG_NAMESPACE).unwrap(),
);
tables.insert(PG_CLASS.to_string(), self.build_table(PG_CLASS).unwrap());
J0HN50N133 marked this conversation as resolved.
Show resolved Hide resolved
self.tables = tables;
}
}
Expand All @@ -105,6 +114,14 @@ impl SystemSchemaProviderInner for PGCatalogProvider {
fn system_table(&self, name: &str) -> Option<SystemTableRef> {
match name {
table_names::PG_TYPE => setup_memory_table!(PG_TYPE),
table_names::PG_NAMESPACE => Some(Arc::new(PGNamespace::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
))),
table_names::PG_CLASS => Some(Arc::new(PGClass::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
))),
_ => None,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use datatypes::vectors::{Int16Vector, StringVector, UInt32Vector, VectorRef};
use super::oid_column;
use super::table_names::PG_TYPE;
use crate::memory_table_cols;
use crate::system_schema::memory_table::tables::{i16_column, string_column};
use crate::system_schema::utils::tables::{i16_column, string_column};

fn pg_type_schema_columns() -> (Vec<ColumnSchema>, Vec<VectorRef>) {
// TODO(j0hn50n133): acquire this information from `DataType` instead of hardcoding it to avoid regression.
Expand Down
248 changes: 248 additions & 0 deletions src/catalog/src/system_schema/pg_catalog/pg_class.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::PG_CATALOG_PG_CLASS_TABLE_ID;
use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use table::metadata::TableType;

use super::{OID_COLUMN_NAME, PG_CLASS};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::utils::tables::{string_column, u32_column};
use crate::system_schema::SystemTable;
use crate::CatalogManager;

// === column name ===
pub const RELNAME: &str = "relname";
pub const RELNAMESPACE: &str = "relnamespace";
pub const RELKIND: &str = "relkind";
pub const RELOWNER: &str = "relowner";

// === enum value of relkind ===
pub const RELKIND_TABLE: &str = "r";
pub const RELKIND_VIEW: &str = "v";

/// The initial capacity of the vector builders.
const INIT_CAPACITY: usize = 42;
/// The dummy owner id for the namespace.
const DUMMY_OWNER_ID: u32 = 0;

/// The `pg_catalog.pg_class` table implementation.
pub(super) struct PGClass {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
}

impl PGClass {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
}
}

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
u32_column(OID_COLUMN_NAME),
string_column(RELNAME),
string_column(RELNAMESPACE),
string_column(RELKIND),
u32_column(RELOWNER),
]))
}

fn builder(&self) -> PGClassBuilder {
PGClassBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
)
}
}

impl SystemTable for PGClass {
fn table_id(&self) -> table::metadata::TableId {
PG_CATALOG_PG_CLASS_TABLE_ID
}

fn table_name(&self) -> &'static str {
PG_CLASS
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn to_stream(
&self,
request: ScanRequest,
) -> Result<common_recordbatch::SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_class(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}

impl DfPartitionStream for PGClass {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}

fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_class(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}

/// Builds the `pg_catalog.pg_class` table row by row
/// `relowner` is always the [`DUMMY_OWNER_ID`] cuz we don't have user
J0HN50N133 marked this conversation as resolved.
Show resolved Hide resolved
struct PGClassBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,

oid: UInt32VectorBuilder,
relname: StringVectorBuilder,
relnamespace: StringVectorBuilder,
relkind: StringVectorBuilder,
relowner: UInt32VectorBuilder,
}

impl PGClassBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,

oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
relname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
relnamespace: StringVectorBuilder::with_capacity(INIT_CAPACITY),
relkind: StringVectorBuilder::with_capacity(INIT_CAPACITY),
relowner: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
}
}

async fn make_class(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name);
while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
self.add_class(
&predicates,
table_info.table_id(),
&schema_name,
&table_info.name,
if table_info.table_type == TableType::View {
RELKIND_VIEW
} else {
RELKIND_TABLE
},
);
}
}
self.finish()
}

fn add_class(
&mut self,
predicates: &Predicates,
oid: u32,
schema: &str,
table: &str,
kind: &str,
) {
let row = [
(OID_COLUMN_NAME, &Value::from(oid)),
(RELNAMESPACE, &Value::from(schema)),
(RELNAME, &Value::from(table)),
(RELKIND, &Value::from(kind)),
(RELOWNER, &Value::from(DUMMY_OWNER_ID)),
];

if !predicates.eval(&row) {
return;
}

self.oid.push(Some(oid));
self.relnamespace.push(Some(schema));
self.relname.push(Some(table));
self.relkind.push(Some(kind));
self.relowner.push(Some(DUMMY_OWNER_ID));
}

fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.oid.finish()),
Arc::new(self.relname.finish()),
Arc::new(self.relnamespace.finish()),
Arc::new(self.relkind.finish()),
Arc::new(self.relowner.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}
Loading
Loading