From 23a1d969e8d8612b7a3d5641b5945929add2e69a Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Sun, 29 Sep 2024 16:46:37 +0800 Subject: [PATCH] fix(meta): persist correct vnode count for catalogs Signed-off-by: Bugen Zhao --- src/meta/src/controller/streaming_job.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 6bf2606a9354b..0b05539e3bdcb 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -399,15 +399,19 @@ impl CatalogController { for fragment in fragments { let fragment_id = fragment.fragment_id; let state_table_ids = fragment.state_table_ids.inner_ref().clone(); + let vnode_count = fragment.vnode_count; + let fragment = fragment.into_active_model(); Fragment::insert(fragment).exec(&txn).await?; - // Update fragment id for all state tables. + // Fields including `fragment_id` and `vnode_count` were placeholder values before. + // After table fragments are created, update them for all internal tables. if !for_replace { for state_table_id in state_table_ids { table::ActiveModel { table_id: Set(state_table_id as _), fragment_id: Set(Some(fragment_id)), + vnode_count: Set(vnode_count), ..Default::default() } .update(&txn) @@ -1005,22 +1009,25 @@ impl CatalogController { table.incoming_sinks = Set(incoming_sinks.into()); let table = table.update(txn).await?; - // Update state table fragment id. - let fragment_table_ids: Vec<(FragmentId, I32Array)> = Fragment::find() + // Fields including `fragment_id` and `vnode_count` were placeholder values before. + // After table fragments are created, update them for all internal tables. + let fragment_info: Vec<(FragmentId, I32Array, i32)> = Fragment::find() .select_only() .columns([ fragment::Column::FragmentId, fragment::Column::StateTableIds, + fragment::Column::VnodeCount, ]) .filter(fragment::Column::JobId.eq(dummy_id)) .into_tuple() .all(txn) .await?; - for (fragment_id, state_table_ids) in fragment_table_ids { + for (fragment_id, state_table_ids, vnode_count) in fragment_info { for state_table_id in state_table_ids.into_inner() { table::ActiveModel { table_id: Set(state_table_id as _), fragment_id: Set(Some(fragment_id)), + vnode_count: Set(vnode_count), ..Default::default() } .update(txn)