From 76e51845f92fca46672d384dcbe81ab73e20b751 Mon Sep 17 00:00:00 2001 From: Shanicky Chen <> Date: Thu, 20 Jun 2024 17:18:22 +0800 Subject: [PATCH] Refactor stream_project.rs, add debug in dashboard mod.rs --- .../src/optimizer/plan_node/stream_project.rs | 32 +++++++------------ src/meta/src/dashboard/mod.rs | 2 ++ src/meta/src/manager/catalog/database.rs | 1 + src/meta/src/manager/catalog/mod.rs | 14 +++----- 4 files changed, 19 insertions(+), 30 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index fd690e586125..e8ff1df6e82d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -128,28 +128,9 @@ impl StreamProject { &self.core.exprs } - pub fn watermark_derivations(&self) -> &[(usize, usize)] { - &self.watermark_derivations - } - pub fn noop_update_hint(&self) -> bool { self.noop_update_hint } - - pub fn to_stream_prost_body_inner(&self) -> PbNodeBody { - let (watermark_input_cols, watermark_output_cols) = self - .watermark_derivations - .iter() - .map(|(i, o)| (*i as u32, *o as u32)) - .unzip(); - PbNodeBody::Project(ProjectNode { - select_list: self.core.exprs.iter().map(|x| x.to_expr_proto()).collect(), - watermark_input_cols, - watermark_output_cols, - nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(), - noop_update_hint: self.noop_update_hint, - }) - } } impl PlanTreeNodeUnary for StreamProject { @@ -167,7 +148,18 @@ impl_plan_tree_node_for_unary! {StreamProject} impl StreamNode for StreamProject { fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody { - self.to_stream_prost_body_inner() + let (watermark_input_cols, watermark_output_cols) = self + .watermark_derivations + .iter() + .map(|(i, o)| (*i as u32, *o as u32)) + .unzip(); + PbNodeBody::Project(ProjectNode { + select_list: self.core.exprs.iter().map(|x| x.to_expr_proto()).collect(), + watermark_input_cols, + watermark_output_cols, + nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(), + noop_update_hint: self.noop_update_hint, + }) } } diff --git a/src/meta/src/dashboard/mod.rs b/src/meta/src/dashboard/mod.rs index 122955403261..57ce177a69bd 100644 --- a/src/meta/src/dashboard/mod.rs +++ b/src/meta/src/dashboard/mod.rs @@ -174,6 +174,8 @@ pub(super) mod handlers { MetadataManager::V2(mgr) => mgr.catalog_controller.list_sinks().await.map_err(err)?, }; + println!("sinks {:#?}", sinks); + Ok(Json(sinks)) } diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index 5911188e2bfb..a4794af09643 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -405,6 +405,7 @@ impl DatabaseManager { } pub fn list_sinks(&self) -> Vec { + println!("sinks {:#?}", self.sinks); self.sinks.values().cloned().collect_vec() } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 9ccebd9beb1f..92eb65e8e7df 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -3377,6 +3377,7 @@ impl CatalogManager { dropping_sink_id: Option, updated_sink_ids: Vec, ) -> MetaResult { + println!("updated {:?}", updated_sink_ids); let core = &mut *self.core.lock().await; let database_core = &mut core.database; let mut tables = BTreeMapTransaction::new(&mut database_core.tables); @@ -3394,11 +3395,10 @@ impl CatalogManager { let original_table = tables.get(&table.id).unwrap(); let mut updated_sinks = vec![]; for sink_id in updated_sink_ids { - let mut sink = sinks.get_mut(sink_id).unwrap().clone(); + let mut sink = sinks.get_mut(sink_id).unwrap(); sink.original_target_columns .clone_from(&original_table.columns); - sinks.insert(sink.id, sink.clone()); - updated_sinks.push(sink); + updated_sinks.push(sink.clone()); } if let Some(source) = source { @@ -3458,7 +3458,7 @@ impl CatalogManager { tables.insert(table.id, table.clone()); - commit_meta!(self, tables, indexes, sources)?; + commit_meta!(self, tables, indexes, sources, sinks)?; // Group notification let version = self @@ -3834,12 +3834,6 @@ impl CatalogManager { let mut sinks = vec![]; let guard = self.core.lock().await; for sink_id in sink_ids { - // if let Some(table) = guard.database.in_progress_creating_tables.get(table_id) { - // sinks.push(table.clone()); - // } else if let Some(table) = guard.database.tables.get(table_id) { - // sinks.push(table.clone()); - // } - if let Some(sink) = guard.database.sinks.get(sink_id) { sinks.push(sink.clone()); }