Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Mar 28, 2024
1 parent 0339b38 commit a299d85
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 8 deletions.
1 change: 0 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_source_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ impl StreamSourceScan {
.to_internal_table_prost(),
),
info: Some(source_catalog.info.clone()),
// XXX: what's the usage of this?
row_id_index: self.core.row_id_index.map(|index| index as _),
columns: self
.core
Expand Down
18 changes: 18 additions & 0 deletions src/meta/src/controller/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,24 @@ impl CatalogController {
Ok(actors)
}

/// Get the actor ids, and each actor's upstream actor ids of the fragment with `fragment_id` with `Running` status.
pub async fn get_running_actors_and_upstream_of_fragment(
&self,
fragment_id: FragmentId,
) -> MetaResult<Vec<ActorId>> {
let inner = self.inner.read().await;
let actors: Vec<(ActorId, Vec<ActorId>)> = Actor::find()
.select_only()
.column(actor::Column::ActorId)
.column(actor::Column::UpstreamActorIds)
.filter(actor::Column::FragmentId.eq(fragment_id))
.filter(actor::Column::Status.eq(ActorStatus::Running))
.into_tuple()
.all(&inner.db)
.await?;
Ok(actors)
}

pub async fn get_actors_by_job_ids(&self, job_ids: Vec<ObjectId>) -> MetaResult<Vec<ActorId>> {
let inner = self.inner.read().await;
let actors: Vec<ActorId> = Actor::find()
Expand Down
3 changes: 2 additions & 1 deletion src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,8 @@ impl FragmentManager {
bail!("fragment not found: {}", fragment_id)
}

pub async fn get_running_actors_and_upstream_fragment_of_fragment(
/// Get the actor ids, and each actor's upstream actor ids of the fragment with `fragment_id` with `Running` status.
pub async fn get_running_actors_and_upstream_of_fragment(
&self,
fragment_id: FragmentId,
) -> MetaResult<HashSet<(ActorId, Vec<ActorId>)>> {
Expand Down
16 changes: 13 additions & 3 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,11 +621,21 @@ impl MetadataManager {
match self {
MetadataManager::V1(mgr) => {
mgr.fragment_manager
.get_running_actors_and_upstream_fragment_of_fragment(id)
.get_running_actors_and_upstream_of_fragment(id)
.await
}
MetadataManager::V2(_mgr) => {
todo!()
MetadataManager::V2(mgr) => {
let actor_ids = mgr
.catalog_controller
.get_running_actors_and_upstream_of_fragment(id)
.await?;
Ok(actor_ids.into_iter().map(|(id, actors)| {
(
id as ActorId,
actors.into_iter().map(|id| id as ActorId).collect(),
)
.collect()
}))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1232,9 +1232,9 @@ impl ScaleController {
.await?;

fragment_stream_source_actor_splits.insert(*fragment_id, actor_splits);
todo!("migrate_splits_backfill");
}
}
// TODO: support migrate splits for SourceBackfill

// Generate fragment reschedule plan
let mut reschedule_fragment: HashMap<FragmentId, Reschedule> =
Expand Down
2 changes: 0 additions & 2 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,6 @@ impl GlobalStreamManager {
}

let dummy_table_id = table_fragments.table_id();
// TODO: does this need change? for replace_table
let init_split_assignment =
self.source_manager.allocate_splits(&dummy_table_id).await?;

Expand Down Expand Up @@ -534,7 +533,6 @@ impl GlobalStreamManager {
.await?;

let dummy_table_id = table_fragments.table_id();
// TODO: does this need change? for replace_table
let init_split_assignment = self.source_manager.allocate_splits(&dummy_table_id).await?;

if let Err(err) = self
Expand Down

0 comments on commit a299d85

Please sign in to comment.