diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 6bf2606a9354b..0b05539e3bdcb 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -399,15 +399,19 @@ impl CatalogController { for fragment in fragments { let fragment_id = fragment.fragment_id; let state_table_ids = fragment.state_table_ids.inner_ref().clone(); + let vnode_count = fragment.vnode_count; + let fragment = fragment.into_active_model(); Fragment::insert(fragment).exec(&txn).await?; - // Update fragment id for all state tables. + // Fields including `fragment_id` and `vnode_count` were placeholder values before. + // After table fragments are created, update them for all internal tables. if !for_replace { for state_table_id in state_table_ids { table::ActiveModel { table_id: Set(state_table_id as _), fragment_id: Set(Some(fragment_id)), + vnode_count: Set(vnode_count), ..Default::default() } .update(&txn) @@ -1005,22 +1009,25 @@ impl CatalogController { table.incoming_sinks = Set(incoming_sinks.into()); let table = table.update(txn).await?; - // Update state table fragment id. - let fragment_table_ids: Vec<(FragmentId, I32Array)> = Fragment::find() + // Fields including `fragment_id` and `vnode_count` were placeholder values before. + // After table fragments are created, update them for all internal tables. + let fragment_info: Vec<(FragmentId, I32Array, i32)> = Fragment::find() .select_only() .columns([ fragment::Column::FragmentId, fragment::Column::StateTableIds, + fragment::Column::VnodeCount, ]) .filter(fragment::Column::JobId.eq(dummy_id)) .into_tuple() .all(txn) .await?; - for (fragment_id, state_table_ids) in fragment_table_ids { + for (fragment_id, state_table_ids, vnode_count) in fragment_info { for state_table_id in state_table_ids.into_inner() { table::ActiveModel { table_id: Set(state_table_id as _), fragment_id: Set(Some(fragment_id)), + vnode_count: Set(vnode_count), ..Default::default() } .update(txn)