Skip to content

Commit

Permalink
feat(expr): add function pg_index_column_has_property (#17275)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
Co-authored-by: August <[email protected]>
  • Loading branch information
wangrunji0408 and yezizp2012 authored Jun 17, 2024
1 parent 9274ebc commit e34c83b
Show file tree
Hide file tree
Showing 15 changed files with 210 additions and 3 deletions.
9 changes: 9 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ message Index {
// Only `InputRef` type index is supported Now.
// The index of `InputRef` is the column index of the primary table.
repeated expr.ExprNode index_item = 8;
repeated IndexColumnProperties index_column_properties = 16;
reserved 9; // Deprecated repeated int32 original_columns = 9;

optional uint64 initialized_at_epoch = 10;
Expand All @@ -255,6 +256,14 @@ message Index {
optional string created_at_cluster_version = 15;
}

// https://www.postgresql.org/docs/current/functions-info.html#FUNCTIONS-INFO-INDEX-COLUMN-PROPS
message IndexColumnProperties {
// Whether the column sort in ascending(false) or descending(true) order on a forward scan.
bool is_desc = 1;
// Does the column sort with nulls first on a forward scan?
bool nulls_first = 2;
}

message Function {
uint32 id = 1;
uint32 schema_id = 2;
Expand Down
1 change: 1 addition & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ message ExprNode {
PG_INDEXES_SIZE = 2404;
PG_RELATION_SIZE = 2405;
PG_GET_SERIAL_SEQUENCE = 2406;
PG_INDEX_COLUMN_HAS_PROPERTY = 2410;
HAS_TABLE_PRIVILEGE = 2407;
HAS_ANY_COLUMN_PRIVILEGE = 2408;
HAS_SCHEMA_PRIVILEGE = 2409;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,7 @@ impl Binder {
("pg_get_userbyid", raw_call(ExprType::PgGetUserbyid)),
("pg_get_indexdef", raw_call(ExprType::PgGetIndexdef)),
("pg_get_viewdef", raw_call(ExprType::PgGetViewdef)),
("pg_index_column_has_property", raw_call(ExprType::PgIndexColumnHasProperty)),
("pg_relation_size", raw(|_binder, mut inputs|{
if inputs.is_empty() {
return Err(ErrorCode::ExprError(
Expand Down
13 changes: 12 additions & 1 deletion src/frontend/src/catalog/index_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use itertools::Itertools;
use risingwave_common::catalog::{Field, IndexId, Schema};
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus};
use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus};

use crate::catalog::{DatabaseId, OwnedByUserCatalog, SchemaId, TableCatalog};
use crate::expr::{Expr, ExprDisplay, ExprImpl, FunctionCall};
Expand All @@ -40,6 +40,10 @@ pub struct IndexCatalog {
/// The input args of `FuncCall` is also the column index of the primary table.
pub index_item: Vec<ExprImpl>,

/// The properties of the index columns.
/// <https://www.postgresql.org/docs/current/functions-info.html#FUNCTIONS-INFO-INDEX-COLUMN-PROPS>
pub index_column_properties: Vec<PbIndexColumnProperties>,

pub index_table: Arc<TableCatalog>,

pub primary_table: Arc<TableCatalog>,
Expand Down Expand Up @@ -112,6 +116,7 @@ impl IndexCatalog {
id: index_prost.id.into(),
name: index_prost.name.clone(),
index_item,
index_column_properties: index_prost.index_column_properties.clone(),
index_table: Arc::new(index_table.clone()),
primary_table: Arc::new(primary_table.clone()),
primary_to_secondary_mapping,
Expand Down Expand Up @@ -179,6 +184,7 @@ impl IndexCatalog {
.iter()
.map(|expr| expr.to_expr_proto())
.collect_vec(),
index_column_properties: self.index_column_properties.clone(),
index_columns_len: self.index_columns_len,
initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
created_at_epoch: self.created_at_epoch.map(|e| e.0),
Expand All @@ -188,6 +194,11 @@ impl IndexCatalog {
}
}

/// Get the column properties of the index column.
pub fn get_column_properties(&self, column_idx: usize) -> Option<PbIndexColumnProperties> {
self.index_column_properties.get(column_idx).cloned()
}

pub fn get_column_def(&self, column_idx: usize) -> Option<String> {
if let Some(col) = self.index_table.columns.get(column_idx) {
if col.is_hidden {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/expr/function_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ mod has_privilege;
mod pg_get_indexdef;
mod pg_get_userbyid;
mod pg_get_viewdef;
mod pg_index_column_has_property;
mod pg_indexes_size;
mod pg_relation_size;
128 changes: 128 additions & 0 deletions src/frontend/src/expr/function_impl/pg_index_column_has_property.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_expr::{capture_context, function, Result};

use super::context::{CATALOG_READER, DB_NAME};
use crate::catalog::CatalogReader;

/// Tests whether an index column has the named property.
///
/// `index` is the OID of the index.
/// `column` is the column number (1-based) within the index.
///
/// NULL is returned if the property name is not known or does not apply to the particular object,
/// or if the OID or column number does not identify a valid object.
///
/// # Supported Properties
///
/// - `asc`: Does the column sort in ascending order on a forward scan?
/// - `desc`: Does the column sort in descending order on a forward scan?
/// - `nulls_first`: Does the column sort with nulls first on a forward scan?
/// - `nulls_last`: Does the column sort with nulls last on a forward scan?
///
/// # Examples
///
/// ```slt
/// statement ok
/// create table t(a int, b int);
///
/// statement ok
/// create index i on t (a asc, b desc);
///
/// query B
/// select pg_index_column_has_property('i'::regclass, 1, 'asc');
/// ----
/// t
///
/// query B
/// select pg_index_column_has_property('i'::regclass, 1, 'DESC');
/// ----
/// f
///
/// query B
/// select pg_index_column_has_property('i'::regclass, 1, 'nulls_FIRST');
/// ----
/// f
///
/// query B
/// select pg_index_column_has_property('i'::regclass, 1, 'nulls_last');
/// ----
/// t
///
/// query B
/// select pg_index_column_has_property('i'::regclass, 2, 'asc');
/// ----
/// f
///
/// query B
/// select pg_index_column_has_property('i'::regclass, 2, 'desc');
/// ----
/// t
///
/// query B
/// select pg_index_column_has_property('i'::regclass, 2, 'nulls_first');
/// ----
/// t
///
/// query B
/// select pg_index_column_has_property('i'::regclass, 2, 'nulls_last');
/// ----
/// f
///
/// query B
/// select pg_index_column_has_property('i'::regclass, 1, 'gg'); -- invalid property
/// ----
/// NULL
///
/// query B
/// select pg_index_column_has_property('i'::regclass, 0, 'asc'); -- column 0 does not exist
/// ----
/// NULL
///
/// statement ok
/// drop index i;
///
/// statement ok
/// drop table t;
/// ```
#[function("pg_index_column_has_property(int4, int4, varchar) -> boolean")]
fn pg_index_column_has_property(index: i32, column: i32, property: &str) -> Result<Option<bool>> {
pg_index_column_has_property_impl_captured(index, column, property)
}

#[capture_context(CATALOG_READER, DB_NAME)]
fn pg_index_column_has_property_impl(
catalog: &CatalogReader,
db_name: &str,
index_id: i32,
column_idx: i32,
property: &str,
// `Result` is not necessary for this function, but it's required by `capture_context`.
) -> Result<Option<bool>> {
let catalog_reader = catalog.read_guard();
let Ok(index) = catalog_reader.get_index_by_id(db_name, index_id as u32) else {
return Ok(None);
};
let Some(properties) = index.get_column_properties((column_idx - 1) as usize) else {
return Ok(None);
};
Ok(match property.to_lowercase().as_str() {
"asc" => Some(!properties.is_desc),
"desc" => Some(properties.is_desc),
"nulls_first" => Some(properties.nulls_first),
"nulls_last" => Some(!properties.nulls_first),
_ => None,
})
}
1 change: 1 addition & 0 deletions src/frontend/src/expr/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ impl ExprVisitor for ImpureAnalyzer {
| Type::PgIndexesSize
| Type::PgRelationSize
| Type::PgGetSerialSequence
| Type::PgIndexColumnHasProperty
| Type::HasTablePrivilege
| Type::HasAnyColumnPrivilege
| Type::HasSchemaPrivilege
Expand Down
10 changes: 9 additions & 1 deletion src/frontend/src/handler/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::acl::AclMode;
use risingwave_common::catalog::{IndexId, TableDesc, TableId};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::catalog::{PbIndex, PbStreamJobStatus, PbTable};
use risingwave_pb::catalog::{PbIndex, PbIndexColumnProperties, PbStreamJobStatus, PbTable};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
use risingwave_pb::user::grant_privilege::Object;
use risingwave_sqlparser::ast;
Expand Down Expand Up @@ -225,6 +225,13 @@ pub(crate) fn gen_create_index_plan(
index_table_prost.dependent_relations = vec![table.id.table_id];

let index_columns_len = index_columns_ordered_expr.len() as u32;
let index_column_properties = index_columns_ordered_expr
.iter()
.map(|(_, order)| PbIndexColumnProperties {
is_desc: order.is_descending(),
nulls_first: order.nulls_are_first(),
})
.collect();
let index_item = build_index_item(
index_table.table_desc().into(),
table.name(),
Expand All @@ -241,6 +248,7 @@ pub(crate) fn gen_create_index_plan(
index_table_id: TableId::placeholder().table_id,
primary_table_id: table.id.table_id,
index_item,
index_column_properties,
index_columns_len,
initialized_at_epoch: None,
created_at_epoch: None,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_expr_visitor/strong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ impl Strong {
| ExprType::PgIndexesSize
| ExprType::PgRelationSize
| ExprType::PgGetSerialSequence
| ExprType::PgIndexColumnHasProperty
| ExprType::IcebergTransform
| ExprType::HasTablePrivilege
| ExprType::HasAnyColumnPrivilege
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model_v2/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod m20240417_062305_subscription_internal_table_name;
mod m20240418_142249_function_runtime;
mod m20240506_112555_subscription_partial_ckpt;
mod m20240525_090457_secret;
mod m20240617_070131_index_column_properties;

pub struct Migrator;

Expand All @@ -27,6 +28,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240418_142249_function_runtime::Migration),
Box::new(m20240506_112555_subscription_partial_ckpt::Migration),
Box::new(m20240525_090457_secret::Migration),
Box::new(m20240617_070131_index_column_properties::Migration),
]
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Index::Table)
.add_column(ColumnDef::new(Index::IndexColumnProperties).binary())
.to_owned(),
)
.await
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Index::Table)
.drop_column(Index::IndexColumnProperties)
.to_owned(),
)
.await
}
}

#[derive(DeriveIden)]
enum Index {
Table,
IndexColumnProperties,
}
4 changes: 3 additions & 1 deletion src/meta/model_v2/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::{ExprNodeArray, IndexId, TableId};
use crate::{ExprNodeArray, IndexColumnPropertiesArray, IndexId, TableId};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "index")]
Expand All @@ -28,6 +28,7 @@ pub struct Model {
pub index_table_id: TableId,
pub primary_table_id: TableId,
pub index_items: ExprNodeArray,
pub index_column_properties: IndexColumnPropertiesArray,
pub index_columns_len: i32,
}

Expand Down Expand Up @@ -76,6 +77,7 @@ impl From<PbIndex> for ActiveModel {
primary_table_id: Set(pb_index.primary_table_id as _),
index_items: Set(pb_index.index_item.into()),
index_columns_len: Set(pb_index.index_columns_len as _),
index_column_properties: Set(pb_index.index_column_properties.into()),
}
}
}
5 changes: 5 additions & 0 deletions src/meta/model_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ derive_array_from_blob!(
risingwave_pb::common::PbColumnOrder,
PbColumnOrderArray
);
derive_array_from_blob!(
IndexColumnPropertiesArray,
risingwave_pb::catalog::PbIndexColumnProperties,
PbIndexColumnPropertiesArray
);
derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ impl From<ObjectModel<index::Model>> for PbIndex {
index_table_id: value.0.index_table_id as _,
primary_table_id: value.0.primary_table_id as _,
index_item: value.0.index_items.to_protobuf(),
index_column_properties: value.0.index_column_properties.to_protobuf(),
index_columns_len: value.0.index_columns_len as _,
initialized_at_epoch: Some(
Epoch::from_unix_millis(value.1.initialized_at.timestamp_millis() as _).0,
Expand Down
1 change: 1 addition & 0 deletions src/prost/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// The requirement is from Source node -> SourceCatalog -> WatermarkDesc -> expr
.type_attribute("catalog.WatermarkDesc", "#[derive(Eq, Hash)]")
.type_attribute("catalog.StreamSourceInfo", "#[derive(Eq, Hash)]")
.type_attribute("catalog.IndexColumnProperties", "#[derive(Eq, Hash)]")
.type_attribute("expr.ExprNode", "#[derive(Eq, Hash)]")
.type_attribute("data.DataType", "#[derive(Eq, Hash)]")
.type_attribute("expr.ExprNode.rex_node", "#[derive(Eq, Hash)]")
Expand Down

0 comments on commit e34c83b

Please sign in to comment.