diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index a79b890cade20..195612bfbd8be 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -720,11 +720,14 @@ impl CatalogController { pub async fn get_max_parallelism_by_id(&self, job_id: ObjectId) -> MetaResult { let inner = self.inner.read().await; - let job = StreamingJob::find_by_id(job_id) + let max_parallelism: i32 = StreamingJob::find_by_id(job_id) + .select_only() + .column(streaming_job::Column::MaxParallelism) + .into_tuple() .one(&inner.db) .await? .ok_or_else(|| anyhow::anyhow!("job {} not found in database", job_id))?; - Ok(job.max_parallelism as usize) + Ok(max_parallelism as usize) } /// Get all actor ids in the target streaming jobs. diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index cf90d64620edb..ded8fe167a2b3 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -723,10 +723,14 @@ impl CatalogController { .ok_or_else(|| MetaError::catalog_id_not_found(ObjectType::Table.as_str(), id))?; if original_max_parallelism != max_parallelism as i32 { - bail_invalid_parameter!( - "cannot use a different max parallelism when altering or sinking into an existing table, \ - please `SET STREAMING_MAX_PARALLELISM TO {}` first", - original_max_parallelism + // We already override the max parallelism in `StreamFragmentGraph` before entering this function. + // This should not happen in normal cases. + bail!( + "cannot use a different max parallelism \ + when altering or creating/dropping a sink into an existing table, \ + original: {}, new: {}", + original_max_parallelism, + max_parallelism ); } diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 4289e864c4884..d2c46b284bddf 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -1059,6 +1059,16 @@ impl DdlController { .. } = replace_table_info; + // Ensure the max parallelism unchanged before replacing table. + let original_max_parallelism = self + .metadata_manager + .get_job_max_parallelism(streaming_job.id().into()) + .await?; + let fragment_graph = PbStreamFragmentGraph { + max_parallelism: original_max_parallelism as _, + ..fragment_graph + }; + let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); @@ -1198,6 +1208,16 @@ impl DdlController { panic!("additional replace table event only occurs when dropping sink into table") }; + // Ensure the max parallelism unchanged before replacing table. + let original_max_parallelism = self + .metadata_manager + .get_job_max_parallelism(streaming_job.id().into()) + .await?; + let fragment_graph = PbStreamFragmentGraph { + max_parallelism: original_max_parallelism as _, + ..fragment_graph + }; + let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); @@ -1344,6 +1364,16 @@ impl DdlController { let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await; let ctx = StreamContext::from_protobuf(fragment_graph.get_ctx().unwrap()); + // Ensure the max parallelism unchanged before replacing table. + let original_max_parallelism = self + .metadata_manager + .get_job_max_parallelism(streaming_job.id().into()) + .await?; + let fragment_graph = PbStreamFragmentGraph { + max_parallelism: original_max_parallelism as _, + ..fragment_graph + }; + // 1. build fragment graph. let fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job)?; streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id());