Skip to content

Commit

Permalink
fix(meta): fix query for running_fragment_parallelisms in postgres …
Browse files Browse the repository at this point in the history
…backend (#18743)
  • Loading branch information
BugenZhao authored Sep 27, 2024
1 parent 056ccfb commit 811fa54
Showing 1 changed file with 30 additions and 10 deletions.
40 changes: 30 additions & 10 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::hash::{VnodeCountCompat, WorkerSlotId};
use risingwave_common::util::stream_graph_visitor::visit_stream_node;
use risingwave_meta_model_migration::{Alias, SelectStatement};
use risingwave_meta_model_v2::actor::ActorStatus;
use risingwave_meta_model_v2::fragment::DistributionType;
use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob};
Expand Down Expand Up @@ -48,7 +49,7 @@ use sea_orm::sea_query::Expr;
use sea_orm::ActiveValue::Set;
use sea_orm::{
ColumnTrait, DbErr, EntityTrait, JoinType, ModelTrait, PaginatorTrait, QueryFilter,
QuerySelect, RelationTrait, TransactionTrait, Value,
QuerySelect, RelationTrait, SelectGetableTuple, Selector, TransactionTrait, Value,
};

use crate::controller::catalog::{CatalogController, CatalogControllerInner};
Expand Down Expand Up @@ -480,21 +481,40 @@ impl CatalogController {
) -> MetaResult<HashMap<FragmentId, FragmentParallelismInfo>> {
let inner = self.inner.read().await;

let mut select = Actor::find()
.select_only()
let query_alias = Alias::new("fragment_actor_count");
let count_alias = Alias::new("count");

let mut query = SelectStatement::new()
.column(actor::Column::FragmentId)
.column_as(actor::Column::ActorId.count(), "count")
.group_by(actor::Column::FragmentId);
.expr_as(actor::Column::ActorId.count(), count_alias.clone())
.from(Actor)
.group_by_col(actor::Column::FragmentId)
.to_owned();

if let Some(id_filter) = id_filter {
select = select.having(actor::Column::FragmentId.is_in(id_filter));
query.cond_having(actor::Column::FragmentId.is_in(id_filter));
}
select = select
.join(JoinType::InnerJoin, actor::Relation::Fragment.def())

let outer = SelectStatement::new()
.column((Fragment, fragment::Column::FragmentId))
.column(count_alias)
.column(fragment::Column::DistributionType)
.column(fragment::Column::VnodeCount);
.column(fragment::Column::VnodeCount)
.from_subquery(query.to_owned(), query_alias.clone())
.inner_join(
Fragment,
Expr::col((query_alias, actor::Column::FragmentId))
.equals((Fragment, fragment::Column::FragmentId)),
)
.to_owned();

let fragment_parallelisms: Vec<(FragmentId, i64, DistributionType, i32)> =
select.into_tuple().all(&inner.db).await?;
Selector::<SelectGetableTuple<(FragmentId, i64, DistributionType, i32)>>::into_tuple(
outer.to_owned(),
)
.all(&inner.db)
.await?;

Ok(fragment_parallelisms
.into_iter()
.map(|(fragment_id, count, distribution_type, vnode_count)| {
Expand Down

0 comments on commit 811fa54

Please sign in to comment.