diff --git a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs index 7016ca4ccb56..dc1e1d33a7aa 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source_scan.rs @@ -151,7 +151,6 @@ impl StreamSourceScan { .to_internal_table_prost(), ), info: Some(source_catalog.info.clone()), - // XXX: what's the usage of this? row_id_index: self.core.row_id_index.map(|index| index as _), columns: self .core diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 1ede0941ba98..cafa6ced898a 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -1089,6 +1089,24 @@ impl CatalogController { Ok(actors) } + /// Get the actor ids, and each actor's upstream actor ids of the fragment with `fragment_id` with `Running` status. + pub async fn get_running_actors_and_upstream_of_fragment( + &self, + fragment_id: FragmentId, + ) -> MetaResult> { + let inner = self.inner.read().await; + let actors: Vec<(ActorId, Vec)> = Actor::find() + .select_only() + .column(actor::Column::ActorId) + .column(actor::Column::UpstreamActorIds) + .filter(actor::Column::FragmentId.eq(fragment_id)) + .filter(actor::Column::Status.eq(ActorStatus::Running)) + .into_tuple() + .all(&inner.db) + .await?; + Ok(actors) + } + pub async fn get_actors_by_job_ids(&self, job_ids: Vec) -> MetaResult> { let inner = self.inner.read().await; let actors: Vec = Actor::find() diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 01ac5379bab5..56873c2829a7 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -1027,7 +1027,8 @@ impl FragmentManager { bail!("fragment not found: {}", fragment_id) } - pub async fn get_running_actors_and_upstream_fragment_of_fragment( + /// Get the actor ids, and each actor's upstream actor ids of the fragment with `fragment_id` with `Running` status. + pub async fn get_running_actors_and_upstream_of_fragment( &self, fragment_id: FragmentId, ) -> MetaResult)>> { diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 6417343f7927..848547e47a62 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -621,11 +621,21 @@ impl MetadataManager { match self { MetadataManager::V1(mgr) => { mgr.fragment_manager - .get_running_actors_and_upstream_fragment_of_fragment(id) + .get_running_actors_and_upstream_of_fragment(id) .await } - MetadataManager::V2(_mgr) => { - todo!() + MetadataManager::V2(mgr) => { + let actor_ids = mgr + .catalog_controller + .get_running_actors_and_upstream_of_fragment(id) + .await?; + Ok(actor_ids.into_iter().map(|(id, actors)| { + ( + id as ActorId, + actors.into_iter().map(|id| id as ActorId).collect(), + ) + .collect() + })) } } } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index f5d4a2ec5da0..baa10428a084 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -1232,9 +1232,9 @@ impl ScaleController { .await?; fragment_stream_source_actor_splits.insert(*fragment_id, actor_splits); - todo!("migrate_splits_backfill"); } } + // TODO: support migrate splits for SourceBackfill // Generate fragment reschedule plan let mut reschedule_fragment: HashMap = diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 7cb85d91af7d..5b1720c9edad 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -462,7 +462,6 @@ impl GlobalStreamManager { } let dummy_table_id = table_fragments.table_id(); - // TODO: does this need change? for replace_table let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?; @@ -534,7 +533,6 @@ impl GlobalStreamManager { .await?; let dummy_table_id = table_fragments.table_id(); - // TODO: does this need change? for replace_table let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?; if let Err(err) = self