Skip to content

Commit

Permalink
Refactor stream_project.rs, add debug in dashboard mod.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
Shanicky Chen committed Jun 20, 2024
1 parent f9c8a52 commit 76e5184
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 30 deletions.
32 changes: 12 additions & 20 deletions src/frontend/src/optimizer/plan_node/stream_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,28 +128,9 @@ impl StreamProject {
&self.core.exprs
}

pub fn watermark_derivations(&self) -> &[(usize, usize)] {
&self.watermark_derivations
}

pub fn noop_update_hint(&self) -> bool {
self.noop_update_hint
}

pub fn to_stream_prost_body_inner(&self) -> PbNodeBody {
let (watermark_input_cols, watermark_output_cols) = self
.watermark_derivations
.iter()
.map(|(i, o)| (*i as u32, *o as u32))
.unzip();
PbNodeBody::Project(ProjectNode {
select_list: self.core.exprs.iter().map(|x| x.to_expr_proto()).collect(),
watermark_input_cols,
watermark_output_cols,
nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
noop_update_hint: self.noop_update_hint,
})
}
}

impl PlanTreeNodeUnary for StreamProject {
Expand All @@ -167,7 +148,18 @@ impl_plan_tree_node_for_unary! {StreamProject}

impl StreamNode for StreamProject {
fn to_stream_prost_body(&self, _state: &mut BuildFragmentGraphState) -> PbNodeBody {
self.to_stream_prost_body_inner()
let (watermark_input_cols, watermark_output_cols) = self
.watermark_derivations
.iter()
.map(|(i, o)| (*i as u32, *o as u32))
.unzip();
PbNodeBody::Project(ProjectNode {
select_list: self.core.exprs.iter().map(|x| x.to_expr_proto()).collect(),
watermark_input_cols,
watermark_output_cols,
nondecreasing_exprs: self.nondecreasing_exprs.iter().map(|i| *i as _).collect(),
noop_update_hint: self.noop_update_hint,
})
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ pub(super) mod handlers {
MetadataManager::V2(mgr) => mgr.catalog_controller.list_sinks().await.map_err(err)?,
};

println!("sinks {:#?}", sinks);

Ok(Json(sinks))
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/catalog/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ impl DatabaseManager {
}

pub fn list_sinks(&self) -> Vec<Sink> {
println!("sinks {:#?}", self.sinks);
self.sinks.values().cloned().collect_vec()
}

Expand Down
14 changes: 4 additions & 10 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3377,6 +3377,7 @@ impl CatalogManager {
dropping_sink_id: Option<SinkId>,
updated_sink_ids: Vec<SinkId>,
) -> MetaResult<NotificationVersion> {
println!("updated {:?}", updated_sink_ids);
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let mut tables = BTreeMapTransaction::new(&mut database_core.tables);
Expand All @@ -3394,11 +3395,10 @@ impl CatalogManager {
let original_table = tables.get(&table.id).unwrap();
let mut updated_sinks = vec![];
for sink_id in updated_sink_ids {
let mut sink = sinks.get_mut(sink_id).unwrap().clone();
let mut sink = sinks.get_mut(sink_id).unwrap();
sink.original_target_columns
.clone_from(&original_table.columns);
sinks.insert(sink.id, sink.clone());
updated_sinks.push(sink);
updated_sinks.push(sink.clone());
}

if let Some(source) = source {
Expand Down Expand Up @@ -3458,7 +3458,7 @@ impl CatalogManager {

tables.insert(table.id, table.clone());

commit_meta!(self, tables, indexes, sources)?;
commit_meta!(self, tables, indexes, sources, sinks)?;

// Group notification
let version = self
Expand Down Expand Up @@ -3834,12 +3834,6 @@ impl CatalogManager {
let mut sinks = vec![];
let guard = self.core.lock().await;
for sink_id in sink_ids {
// if let Some(table) = guard.database.in_progress_creating_tables.get(table_id) {
// sinks.push(table.clone());
// } else if let Some(table) = guard.database.tables.get(table_id) {
// sinks.push(table.clone());
// }

if let Some(sink) = guard.database.sinks.get(sink_id) {
sinks.push(sink.clone());
}
Expand Down

0 comments on commit 76e5184

Please sign in to comment.