From e34c83b4c63ebefd7a4b292fb02e952c2db9909f Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Mon, 17 Jun 2024 16:40:05 +0800 Subject: [PATCH] feat(expr): add function `pg_index_column_has_property` (#17275) Signed-off-by: Runji Wang Co-authored-by: August --- proto/catalog.proto | 9 ++ proto/expr.proto | 1 + src/frontend/src/binder/expr/function.rs | 1 + src/frontend/src/catalog/index_catalog.rs | 13 +- src/frontend/src/expr/function_impl/mod.rs | 1 + .../pg_index_column_has_property.rs | 128 ++++++++++++++++++ src/frontend/src/expr/pure.rs | 1 + src/frontend/src/handler/create_index.rs | 10 +- .../src/optimizer/plan_expr_visitor/strong.rs | 1 + src/meta/model_v2/migration/src/lib.rs | 2 + ...20240617_070131_index_column_properties.rs | 35 +++++ src/meta/model_v2/src/index.rs | 4 +- src/meta/model_v2/src/lib.rs | 5 + src/meta/src/controller/mod.rs | 1 + src/prost/build.rs | 1 + 15 files changed, 210 insertions(+), 3 deletions(-) create mode 100644 src/frontend/src/expr/function_impl/pg_index_column_has_property.rs create mode 100644 src/meta/model_v2/migration/src/m20240617_070131_index_column_properties.rs diff --git a/proto/catalog.proto b/proto/catalog.proto index 7dfefa003217..5b4f5ae40ff4 100644 --- a/proto/catalog.proto +++ b/proto/catalog.proto @@ -241,6 +241,7 @@ message Index { // Only `InputRef` type index is supported Now. // The index of `InputRef` is the column index of the primary table. repeated expr.ExprNode index_item = 8; + repeated IndexColumnProperties index_column_properties = 16; reserved 9; // Deprecated repeated int32 original_columns = 9; optional uint64 initialized_at_epoch = 10; @@ -255,6 +256,14 @@ message Index { optional string created_at_cluster_version = 15; } +// https://www.postgresql.org/docs/current/functions-info.html#FUNCTIONS-INFO-INDEX-COLUMN-PROPS +message IndexColumnProperties { + // Whether the column sort in ascending(false) or descending(true) order on a forward scan. + bool is_desc = 1; + // Does the column sort with nulls first on a forward scan? + bool nulls_first = 2; +} + message Function { uint32 id = 1; uint32 schema_id = 2; diff --git a/proto/expr.proto b/proto/expr.proto index 602c712975ec..ada2159bb80a 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -302,6 +302,7 @@ message ExprNode { PG_INDEXES_SIZE = 2404; PG_RELATION_SIZE = 2405; PG_GET_SERIAL_SEQUENCE = 2406; + PG_INDEX_COLUMN_HAS_PROPERTY = 2410; HAS_TABLE_PRIVILEGE = 2407; HAS_ANY_COLUMN_PRIVILEGE = 2408; HAS_SCHEMA_PRIVILEGE = 2409; diff --git a/src/frontend/src/binder/expr/function.rs b/src/frontend/src/binder/expr/function.rs index 2a4812c2e1d1..09c05a29695d 100644 --- a/src/frontend/src/binder/expr/function.rs +++ b/src/frontend/src/binder/expr/function.rs @@ -1256,6 +1256,7 @@ impl Binder { ("pg_get_userbyid", raw_call(ExprType::PgGetUserbyid)), ("pg_get_indexdef", raw_call(ExprType::PgGetIndexdef)), ("pg_get_viewdef", raw_call(ExprType::PgGetViewdef)), + ("pg_index_column_has_property", raw_call(ExprType::PgIndexColumnHasProperty)), ("pg_relation_size", raw(|_binder, mut inputs|{ if inputs.is_empty() { return Err(ErrorCode::ExprError( diff --git a/src/frontend/src/catalog/index_catalog.rs b/src/frontend/src/catalog/index_catalog.rs index 7e13c365407e..098eca22af9e 100644 --- a/src/frontend/src/catalog/index_catalog.rs +++ b/src/frontend/src/catalog/index_catalog.rs @@ -21,7 +21,7 @@ use itertools::Itertools; use risingwave_common::catalog::{Field, IndexId, Schema}; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::sort_util::ColumnOrder; -use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus}; +use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus}; use crate::catalog::{DatabaseId, OwnedByUserCatalog, SchemaId, TableCatalog}; use crate::expr::{Expr, ExprDisplay, ExprImpl, FunctionCall}; @@ -40,6 +40,10 @@ pub struct IndexCatalog { /// The input args of `FuncCall` is also the column index of the primary table. pub index_item: Vec, + /// The properties of the index columns. + /// + pub index_column_properties: Vec, + pub index_table: Arc, pub primary_table: Arc, @@ -112,6 +116,7 @@ impl IndexCatalog { id: index_prost.id.into(), name: index_prost.name.clone(), index_item, + index_column_properties: index_prost.index_column_properties.clone(), index_table: Arc::new(index_table.clone()), primary_table: Arc::new(primary_table.clone()), primary_to_secondary_mapping, @@ -179,6 +184,7 @@ impl IndexCatalog { .iter() .map(|expr| expr.to_expr_proto()) .collect_vec(), + index_column_properties: self.index_column_properties.clone(), index_columns_len: self.index_columns_len, initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0), created_at_epoch: self.created_at_epoch.map(|e| e.0), @@ -188,6 +194,11 @@ impl IndexCatalog { } } + /// Get the column properties of the index column. + pub fn get_column_properties(&self, column_idx: usize) -> Option { + self.index_column_properties.get(column_idx).cloned() + } + pub fn get_column_def(&self, column_idx: usize) -> Option { if let Some(col) = self.index_table.columns.get(column_idx) { if col.is_hidden { diff --git a/src/frontend/src/expr/function_impl/mod.rs b/src/frontend/src/expr/function_impl/mod.rs index 3a70ebf5db47..a0cff36840b4 100644 --- a/src/frontend/src/expr/function_impl/mod.rs +++ b/src/frontend/src/expr/function_impl/mod.rs @@ -19,5 +19,6 @@ mod has_privilege; mod pg_get_indexdef; mod pg_get_userbyid; mod pg_get_viewdef; +mod pg_index_column_has_property; mod pg_indexes_size; mod pg_relation_size; diff --git a/src/frontend/src/expr/function_impl/pg_index_column_has_property.rs b/src/frontend/src/expr/function_impl/pg_index_column_has_property.rs new file mode 100644 index 000000000000..fdabf5aefc77 --- /dev/null +++ b/src/frontend/src/expr/function_impl/pg_index_column_has_property.rs @@ -0,0 +1,128 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_expr::{capture_context, function, Result}; + +use super::context::{CATALOG_READER, DB_NAME}; +use crate::catalog::CatalogReader; + +/// Tests whether an index column has the named property. +/// +/// `index` is the OID of the index. +/// `column` is the column number (1-based) within the index. +/// +/// NULL is returned if the property name is not known or does not apply to the particular object, +/// or if the OID or column number does not identify a valid object. +/// +/// # Supported Properties +/// +/// - `asc`: Does the column sort in ascending order on a forward scan? +/// - `desc`: Does the column sort in descending order on a forward scan? +/// - `nulls_first`: Does the column sort with nulls first on a forward scan? +/// - `nulls_last`: Does the column sort with nulls last on a forward scan? +/// +/// # Examples +/// +/// ```slt +/// statement ok +/// create table t(a int, b int); +/// +/// statement ok +/// create index i on t (a asc, b desc); +/// +/// query B +/// select pg_index_column_has_property('i'::regclass, 1, 'asc'); +/// ---- +/// t +/// +/// query B +/// select pg_index_column_has_property('i'::regclass, 1, 'DESC'); +/// ---- +/// f +/// +/// query B +/// select pg_index_column_has_property('i'::regclass, 1, 'nulls_FIRST'); +/// ---- +/// f +/// +/// query B +/// select pg_index_column_has_property('i'::regclass, 1, 'nulls_last'); +/// ---- +/// t +/// +/// query B +/// select pg_index_column_has_property('i'::regclass, 2, 'asc'); +/// ---- +/// f +/// +/// query B +/// select pg_index_column_has_property('i'::regclass, 2, 'desc'); +/// ---- +/// t +/// +/// query B +/// select pg_index_column_has_property('i'::regclass, 2, 'nulls_first'); +/// ---- +/// t +/// +/// query B +/// select pg_index_column_has_property('i'::regclass, 2, 'nulls_last'); +/// ---- +/// f +/// +/// query B +/// select pg_index_column_has_property('i'::regclass, 1, 'gg'); -- invalid property +/// ---- +/// NULL +/// +/// query B +/// select pg_index_column_has_property('i'::regclass, 0, 'asc'); -- column 0 does not exist +/// ---- +/// NULL +/// +/// statement ok +/// drop index i; +/// +/// statement ok +/// drop table t; +/// ``` +#[function("pg_index_column_has_property(int4, int4, varchar) -> boolean")] +fn pg_index_column_has_property(index: i32, column: i32, property: &str) -> Result> { + pg_index_column_has_property_impl_captured(index, column, property) +} + +#[capture_context(CATALOG_READER, DB_NAME)] +fn pg_index_column_has_property_impl( + catalog: &CatalogReader, + db_name: &str, + index_id: i32, + column_idx: i32, + property: &str, + // `Result` is not necessary for this function, but it's required by `capture_context`. +) -> Result> { + let catalog_reader = catalog.read_guard(); + let Ok(index) = catalog_reader.get_index_by_id(db_name, index_id as u32) else { + return Ok(None); + }; + let Some(properties) = index.get_column_properties((column_idx - 1) as usize) else { + return Ok(None); + }; + Ok(match property.to_lowercase().as_str() { + "asc" => Some(!properties.is_desc), + "desc" => Some(properties.is_desc), + "nulls_first" => Some(properties.nulls_first), + "nulls_last" => Some(!properties.nulls_first), + _ => None, + }) +} diff --git a/src/frontend/src/expr/pure.rs b/src/frontend/src/expr/pure.rs index d03b7507fdfb..b404fb3408df 100644 --- a/src/frontend/src/expr/pure.rs +++ b/src/frontend/src/expr/pure.rs @@ -270,6 +270,7 @@ impl ExprVisitor for ImpureAnalyzer { | Type::PgIndexesSize | Type::PgRelationSize | Type::PgGetSerialSequence + | Type::PgIndexColumnHasProperty | Type::HasTablePrivilege | Type::HasAnyColumnPrivilege | Type::HasSchemaPrivilege diff --git a/src/frontend/src/handler/create_index.rs b/src/frontend/src/handler/create_index.rs index 6e9793646c7e..a0586ab20e4d 100644 --- a/src/frontend/src/handler/create_index.rs +++ b/src/frontend/src/handler/create_index.rs @@ -24,7 +24,7 @@ use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::acl::AclMode; use risingwave_common::catalog::{IndexId, TableDesc, TableId}; use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; -use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus, PbTable}; +use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus, PbTable}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::user::grant_privilege::Object; use risingwave_sqlparser::ast; @@ -225,6 +225,13 @@ pub(crate) fn gen_create_index_plan( index_table_prost.dependent_relations = vec![table.id.table_id]; let index_columns_len = index_columns_ordered_expr.len() as u32; + let index_column_properties = index_columns_ordered_expr + .iter() + .map(|(_, order)| PbIndexColumnProperties { + is_desc: order.is_descending(), + nulls_first: order.nulls_are_first(), + }) + .collect(); let index_item = build_index_item( index_table.table_desc().into(), table.name(), @@ -241,6 +248,7 @@ pub(crate) fn gen_create_index_plan( index_table_id: TableId::placeholder().table_id, primary_table_id: table.id.table_id, index_item, + index_column_properties, index_columns_len, initialized_at_epoch: None, created_at_epoch: None, diff --git a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs index fefdd1e4547f..55d8e3a18adf 100644 --- a/src/frontend/src/optimizer/plan_expr_visitor/strong.rs +++ b/src/frontend/src/optimizer/plan_expr_visitor/strong.rs @@ -303,6 +303,7 @@ impl Strong { | ExprType::PgIndexesSize | ExprType::PgRelationSize | ExprType::PgGetSerialSequence + | ExprType::PgIndexColumnHasProperty | ExprType::IcebergTransform | ExprType::HasTablePrivilege | ExprType::HasAnyColumnPrivilege diff --git a/src/meta/model_v2/migration/src/lib.rs b/src/meta/model_v2/migration/src/lib.rs index 66f136b6159d..627e565a47f3 100644 --- a/src/meta/model_v2/migration/src/lib.rs +++ b/src/meta/model_v2/migration/src/lib.rs @@ -11,6 +11,7 @@ mod m20240417_062305_subscription_internal_table_name; mod m20240418_142249_function_runtime; mod m20240506_112555_subscription_partial_ckpt; mod m20240525_090457_secret; +mod m20240617_070131_index_column_properties; pub struct Migrator; @@ -27,6 +28,7 @@ impl MigratorTrait for Migrator { Box::new(m20240418_142249_function_runtime::Migration), Box::new(m20240506_112555_subscription_partial_ckpt::Migration), Box::new(m20240525_090457_secret::Migration), + Box::new(m20240617_070131_index_column_properties::Migration), ] } } diff --git a/src/meta/model_v2/migration/src/m20240617_070131_index_column_properties.rs b/src/meta/model_v2/migration/src/m20240617_070131_index_column_properties.rs new file mode 100644 index 000000000000..daff755da999 --- /dev/null +++ b/src/meta/model_v2/migration/src/m20240617_070131_index_column_properties.rs @@ -0,0 +1,35 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Index::Table) + .add_column(ColumnDef::new(Index::IndexColumnProperties).binary()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Index::Table) + .drop_column(Index::IndexColumnProperties) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Index { + Table, + IndexColumnProperties, +} diff --git a/src/meta/model_v2/src/index.rs b/src/meta/model_v2/src/index.rs index ca2f39c0f179..a291e325b24e 100644 --- a/src/meta/model_v2/src/index.rs +++ b/src/meta/model_v2/src/index.rs @@ -17,7 +17,7 @@ use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; use serde::{Deserialize, Serialize}; -use crate::{ExprNodeArray, IndexId, TableId}; +use crate::{ExprNodeArray, IndexColumnPropertiesArray, IndexId, TableId}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "index")] @@ -28,6 +28,7 @@ pub struct Model { pub index_table_id: TableId, pub primary_table_id: TableId, pub index_items: ExprNodeArray, + pub index_column_properties: IndexColumnPropertiesArray, pub index_columns_len: i32, } @@ -76,6 +77,7 @@ impl From for ActiveModel { primary_table_id: Set(pb_index.primary_table_id as _), index_items: Set(pb_index.index_item.into()), index_columns_len: Set(pb_index.index_columns_len as _), + index_column_properties: Set(pb_index.index_column_properties.into()), } } } diff --git a/src/meta/model_v2/src/lib.rs b/src/meta/model_v2/src/lib.rs index 647ce99ec1e7..11c5209bdc56 100644 --- a/src/meta/model_v2/src/lib.rs +++ b/src/meta/model_v2/src/lib.rs @@ -324,6 +324,11 @@ derive_array_from_blob!( risingwave_pb::common::PbColumnOrder, PbColumnOrderArray ); +derive_array_from_blob!( + IndexColumnPropertiesArray, + risingwave_pb::catalog::PbIndexColumnProperties, + PbIndexColumnPropertiesArray +); derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc); derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality); derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion); diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index ac61ceb67b77..66491d515dd4 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -274,6 +274,7 @@ impl From> for PbIndex { index_table_id: value.0.index_table_id as _, primary_table_id: value.0.primary_table_id as _, index_item: value.0.index_items.to_protobuf(), + index_column_properties: value.0.index_column_properties.to_protobuf(), index_columns_len: value.0.index_columns_len as _, initialized_at_epoch: Some( Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0, diff --git a/src/prost/build.rs b/src/prost/build.rs index 6cbfa82225e6..961dbe196944 100644 --- a/src/prost/build.rs +++ b/src/prost/build.rs @@ -111,6 +111,7 @@ fn main() -> Result<(), Box> { // The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr .type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]") .type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]") + .type_attribute("catalog.IndexColumnProperties", "#[derive(Eq, Hash)]") .type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]") .type_attribute("data.DataType", "#[derive(Eq, Hash)]") .type_attribute("expr.ExprNode.rex_node", "#[derive(Eq, Hash)]")