From fd985ab046f8ef33d2f23026f2671e85767a84be Mon Sep 17 00:00:00 2001 From: Shanicky Chen <> Date: Wed, 12 Jun 2024 22:46:56 +0800 Subject: [PATCH] fix Signed-off-by: Shanicky Chen --- src/common/src/array/data_chunk.rs | 1 + src/common/src/array/stream_chunk.rs | 2 +- src/expr/core/src/expr/expr_input_ref.rs | 2 + .../src/handler/alter_table_column.rs | 18 ++- src/frontend/src/handler/create_sink.rs | 33 +---- src/frontend/src/handler/drop_sink.rs | 4 +- src/meta/src/controller/catalog.rs | 13 ++ src/meta/src/manager/catalog/mod.rs | 17 +++ src/meta/src/manager/metadata.rs | 13 +- src/meta/src/rpc/ddl_controller.rs | 137 +++++++++--------- 10 files changed, 138 insertions(+), 102 deletions(-) diff --git a/src/common/src/array/data_chunk.rs b/src/common/src/array/data_chunk.rs index 4e08163817e23..540e317a14935 100644 --- a/src/common/src/array/data_chunk.rs +++ b/src/common/src/array/data_chunk.rs @@ -183,6 +183,7 @@ impl DataChunk { } pub fn column_at(&self, idx: usize) -> &ArrayRef { + println!("chunk {:?} idx {}", self, idx); &self.columns[idx] } diff --git a/src/common/src/array/stream_chunk.rs b/src/common/src/array/stream_chunk.rs index e9ee048010057..9f2cbe27ae560 100644 --- a/src/common/src/array/stream_chunk.rs +++ b/src/common/src/array/stream_chunk.rs @@ -303,7 +303,7 @@ impl StreamChunk { /// will be `[c, b, a]`. If `indices` is [2, 0], then the output will be `[c, a]`. /// If the input mapping is identity mapping, no reorder will be performed. pub fn project(&self, indices: &[usize]) -> Self { - println!("data {:#?}", self.data); + println!("data {:?} indices {:?}", self.data, indices); Self { ops: self.ops.clone(), data: self.data.project(indices), diff --git a/src/expr/core/src/expr/expr_input_ref.rs b/src/expr/core/src/expr/expr_input_ref.rs index 89e7def9ab89d..3bdf0bd0f8b4f 100644 --- a/src/expr/core/src/expr/expr_input_ref.rs +++ b/src/expr/core/src/expr/expr_input_ref.rs @@ -37,6 +37,8 @@ impl Expression for InputRefExpression { } async fn eval(&self, input: &DataChunk) -> Result { + println!("self {:?}", self); + println!("input {:?} idx {}", input, self.idx); Ok(input.column_at(self.idx).clone()) } diff --git a/src/frontend/src/handler/alter_table_column.rs b/src/frontend/src/handler/alter_table_column.rs index f2dab51a66407..5c9f1ab5d18cf 100644 --- a/src/frontend/src/handler/alter_table_column.rs +++ b/src/frontend/src/handler/alter_table_column.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashSet}; +use std::collections::HashSet; use std::rc::Rc; use std::sync::Arc; @@ -21,7 +21,7 @@ use create_sink::derive_default_column_project_for_sink; use itertools::Itertools; use pgwire::pg_response::{PgResponse, StatementType}; use risingwave_common::bail_not_implemented; -use risingwave_common::catalog::ColumnCatalog; +use risingwave_common::catalog::{ColumnCatalog, Field}; use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_connector::sink::catalog::SinkCatalog; use risingwave_pb::stream_plan::StreamFragmentGraph; @@ -144,6 +144,11 @@ pub(crate) fn hijack_merger_for_target_table( false, // todo )?; + println!( + "sink {} exprs {:?} target {:?} default {:?}", + sink.name, exprs, target_columns, default_columns + ); + let pb_project = StreamProject::new(generic::Project::new( exprs, LogicalSource::new( @@ -160,7 +165,14 @@ pub(crate) fn hijack_merger_for_target_table( for fragment in graph.fragments.values_mut() { if let Some(node) = &mut fragment.node { - insert_merger_to_union_with_project(node, &pb_project); + insert_merger_to_union_with_project( + node, + &pb_project, + &format!( + "{}.{}.{}", + sink.database_id.database_id, sink.schema_id.schema_id, sink.name + ), + ); } } diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 45cd43d41c0af..dd77824f9e0c2 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -35,6 +35,7 @@ use risingwave_connector::sink::{ }; use risingwave_pb::catalog::{PbSource, Table}; use risingwave_pb::ddl_service::ReplaceTablePlan; +use risingwave_pb::plan_common::PbField; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::stream_node::{NodeBody, PbNodeBody}; use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode}; @@ -56,9 +57,7 @@ use crate::error::{ErrorCode, Result, RwError}; use crate::expr::{ExprImpl, InputRef}; use crate::handler::alter_table_column::fetch_table_catalog_for_alter; use crate::handler::create_mv::parse_column_names; -use crate::handler::create_table::{ - generate_stream_graph_for_table, ColumnIdGenerator, -}; +use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGenerator}; use crate::handler::privilege::resolve_query_privileges; use crate::handler::util::SourceSchemaCompatExt; use crate::handler::HandlerArgs; @@ -694,44 +693,22 @@ pub(crate) async fn reparse_table_for_sink( Ok((graph, table, source)) } -pub(crate) fn insert_merger_to_union(node: &mut StreamNode) { - if let Some(NodeBody::Union(_union_node)) = &mut node.node_body { - node.input.push(StreamNode { - identity: "Merge (sink into table)".to_string(), - fields: node.fields.clone(), - node_body: Some(NodeBody::Merge(MergeNode { - upstream_dispatcher_type: DispatcherType::Hash as _, - ..Default::default() - })), - ..Default::default() - }); - - return; - } - - for input in &mut node.input { - insert_merger_to_union(input); - } -} - pub(crate) fn insert_merger_to_union_with_project( node: &mut StreamNode, project_node: &PbNodeBody, + uniq_name: &str, ) { if let Some(NodeBody::Union(_union_node)) = &mut node.node_body { node.input.push(StreamNode { input: vec![StreamNode { - identity: "Merge (sink into table)".to_string(), - fields: node.fields.clone(), node_body: Some(NodeBody::Merge(MergeNode { - upstream_dispatcher_type: DispatcherType::Hash as _, ..Default::default() })), ..Default::default() }], stream_key: vec![], append_only: false, - identity: "".to_string(), + identity: uniq_name.to_string(), fields: vec![], node_body: Some(project_node.clone()), ..Default::default() @@ -741,7 +718,7 @@ pub(crate) fn insert_merger_to_union_with_project( } for input in &mut node.input { - insert_merger_to_union_with_project(input, project_node); + insert_merger_to_union_with_project(input, project_node, uniq_name); } } diff --git a/src/frontend/src/handler/drop_sink.rs b/src/frontend/src/handler/drop_sink.rs index 1e7d589757507..b661a2c27f403 100644 --- a/src/frontend/src/handler/drop_sink.rs +++ b/src/frontend/src/handler/drop_sink.rs @@ -24,9 +24,7 @@ use crate::binder::Binder; use crate::catalog::root_catalog::SchemaPath; use crate::error::Result; use crate::expr::ExprImpl; -use crate::handler::create_sink::{ - fetch_incoming_sinks, reparse_table_for_sink, -}; +use crate::handler::create_sink::{fetch_incoming_sinks, reparse_table_for_sink}; use crate::handler::HandlerArgs; use crate::{OptimizerContext, TableCatalog}; diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index 200736725cd07..b810c4a1309e0 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -2678,6 +2678,19 @@ impl CatalogController { .collect()) } + pub async fn get_sink_by_ids(&self, sink_ids: Vec) -> MetaResult> { + let inner = self.inner.read().await; + let sink_objs = Sink::find() + .find_also_related(Object) + .filter(sink::Column::SinkId.is_in(sink_ids)) + .all(&inner.db) + .await?; + Ok(sink_objs + .into_iter() + .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into()) + .collect()) + } + pub async fn get_subscription_by_id( &self, subscription_id: SubscriptionId, diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 3aeab64bef128..05486f5a6c48b 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -3815,6 +3815,23 @@ impl CatalogManager { Ok(subscription.clone()) } + pub async fn get_sinks(&self, sink_ids: &[SinkId]) -> Vec { + let mut sinks = vec![]; + let guard = self.core.lock().await; + for sink_id in sink_ids { + // if let Some(table) = guard.database.in_progress_creating_tables.get(table_id) { + // sinks.push(table.clone()); + // } else if let Some(table) = guard.database.tables.get(table_id) { + // sinks.push(table.clone()); + // } + + if let Some(sink) = guard.database.sinks.get(sink_id) { + sinks.push(sink.clone()); + } + } + sinks + } + pub async fn get_created_table_ids(&self) -> Vec { let guard = self.core.lock().await; guard diff --git a/src/meta/src/manager/metadata.rs b/src/meta/src/manager/metadata.rs index 241a47941755b..5f3451b48d40e 100644 --- a/src/meta/src/manager/metadata.rs +++ b/src/meta/src/manager/metadata.rs @@ -19,7 +19,7 @@ use std::time::Duration; use futures::future::{select, Either}; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_meta_model_v2::SourceId; -use risingwave_pb::catalog::{PbSource, PbTable}; +use risingwave_pb::catalog::{PbSink, PbSource, PbTable}; use risingwave_pb::common::worker_node::{PbResource, State}; use risingwave_pb::common::{HostAddress, PbWorkerNode, PbWorkerType, WorkerNode, WorkerType}; use risingwave_pb::meta::add_worker_node_request::Property as AddNodeProperty; @@ -534,6 +534,17 @@ impl MetadataManager { } } + pub async fn get_sink_catalog_by_ids(&self, ids: &[u32]) -> MetaResult> { + match &self { + MetadataManager::V1(mgr) => Ok(mgr.catalog_manager.get_sinks(ids).await), + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .get_sink_by_ids(ids.iter().map(|id| *id as _).collect()) + .await + } + } + } + pub async fn get_downstream_chain_fragments( &self, job_id: u32, diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 8651efe1153d6..bf9cbee4b09e4 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -14,6 +14,7 @@ use std::cmp::Ordering; use std::collections::HashMap; +use std::io::sink; use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; @@ -1091,7 +1092,7 @@ impl DdlController { } } - let table = streaming_job.table().unwrap(); + let target_table = streaming_job.table().unwrap(); let target_fragment_id = union_fragment_id.expect("fragment of placeholder merger not found"); @@ -1099,51 +1100,64 @@ impl DdlController { if let Some(creating_sink_table_fragments) = creating_sink_table_fragments { let sink_fragment = creating_sink_table_fragments.sink_fragment().unwrap(); + let sink = sink.unwrap(); + + let uniq_name = &format!("{}.{}.{}", sink.database_id, sink.schema_id, sink.name); + Self::inject_replace_table_plan_for_sink( - sink.map(|sink| sink.id), + Some(sink.id), &sink_fragment, - table, + target_table, &mut replace_table_ctx, &mut table_fragments, target_fragment_id, + uniq_name, ); } let [table_catalog]: [_; 1] = mgr - .get_table_catalog_by_ids(vec![table.id]) + .get_table_catalog_by_ids(vec![target_table.id]) .await? .try_into() .expect("Target table should exist in sink into table"); - assert_eq!(table_catalog.incoming_sinks, table.incoming_sinks); + assert_eq!(table_catalog.incoming_sinks, target_table.incoming_sinks); println!("in {:?}", table_catalog.incoming_sinks); println!("drop {:?}", dropping_sink_id); { - for sink_id in &table_catalog.incoming_sinks { + let catalogs = mgr + .get_sink_catalog_by_ids(&table_catalog.incoming_sinks) + .await?; + + for sink in catalogs { + let sink_id = sink.id; + if let Some(dropping_sink_id) = dropping_sink_id - && *sink_id == dropping_sink_id + && sink_id == dropping_sink_id { continue; }; + let uniq_name = &format!("{}.{}.{}", sink.database_id, sink.schema_id, sink.name); println!("??????1"); let sink_table_fragments = mgr - .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) + .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(sink_id)) .await?; let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); Self::inject_replace_table_plan_for_sink( - Some(*sink_id), + Some(sink_id), &sink_fragment, - table, + target_table, &mut replace_table_ctx, &mut table_fragments, target_fragment_id, + uniq_name, ); } } @@ -1173,6 +1187,7 @@ impl DdlController { replace_table_ctx: &mut ReplaceTableContext, table_fragments: &mut TableFragments, target_fragment_id: FragmentId, + uniq_name: &str, ) { let sink_actor_ids = sink_fragment .actors @@ -1212,8 +1227,12 @@ impl DdlController { .map(|(idx, _)| idx as _) .collect_vec(); + println!("output indice {:?}", output_indices); + let dist_key_indices = table.distribution_key.iter().map(|i| *i as _).collect_vec(); + println!("dist key {:?}", dist_key_indices); + let mapping = downstream_actor_ids .iter() .map(|id| { @@ -1250,67 +1269,43 @@ impl DdlController { println!("actor {:#?}", actor.actor_id); if let Some(node) = &mut actor.nodes { - // let fields = node.fields.clone(); - let fields = sink_fields.clone(); - visit_stream_node_cont_mut(node, |node| { - fn hijack_merge_node( - sink_id: Option, - sink_actor_ids: &Vec, - upstream_fragment_id: u32, - sink_fields: &[Field], - table_fields: &[Field], - input: &mut StreamNode, - ) -> Option { - match &mut input.node_body { - Some(NodeBody::Merge(merge_node)) - if merge_node.upstream_actor_id.is_empty() => - { - if let Some(sink_id) = sink_id { - input.identity = - format!("MergeExecutor(from sink {})", sink_id); + if let Some(NodeBody::Union(_)) = &mut node.node_body { + for input in &mut node.input { + if let Some(NodeBody::Project(p)) = &mut input.node_body { + let merge_stream_node = + input.input.iter_mut().exactly_one().unwrap(); + + println!("uniq name {}", uniq_name); + println!("iden {}", input.identity); + + if input.identity.as_str() != uniq_name { + continue; } - *merge_node = MergeNode { - upstream_actor_id: sink_actor_ids.clone(), - upstream_fragment_id, - upstream_dispatcher_type: DispatcherType::Hash as _, - fields: sink_fields.to_vec(), - }; + if let Some(NodeBody::Merge(merge_node)) = + &mut merge_stream_node.node_body + && merge_node.upstream_actor_id.is_empty() + { + if let Some(sink_id) = sink_id { + merge_stream_node.identity = + format!("MergeExecutor(from sink {})", sink_id); + } - input.fields = sink_fields.to_vec(); + *merge_node = MergeNode { + upstream_actor_id: sink_actor_ids.clone(), + upstream_fragment_id, + upstream_dispatcher_type: DispatcherType::Hash as _, + fields: sink_fields.to_vec(), + }; - Some(false) - } - Some(NodeBody::Project(_)) => { - let merge_node = input.input.iter_mut().exactly_one().unwrap(); - - input.fields = table_fields.to_vec(); - - hijack_merge_node( - sink_id, - sink_actor_ids, - upstream_fragment_id, - sink_fields, - table_fields, - merge_node, - ) - } - _ => None, - } - } + merge_stream_node.fields = sink_fields.to_vec(); - if let Some(NodeBody::Union(_)) = &mut node.node_body { - for input in &mut node.input { - if let Some(value) = hijack_merge_node( - sink_id, - &sink_actor_ids, - upstream_fragment_id, - &fields, - &node.fields, - input, - ) { - return value; + // input.fields = sink_fields.to_vec(); + input.fields = node.fields.clone(); + + return false; + } } } } @@ -2006,7 +2001,16 @@ impl DdlController { println!("incoming sinks {:#?}", table.incoming_sinks); - for sink_id in &table.incoming_sinks { + let catalogs = self + .metadata_manager + .get_sink_catalog_by_ids(&table.incoming_sinks) + .await?; + + for sink in catalogs { + let sink_id = &sink.id; + + let uniq_name = &format!("{}.{}.{}", sink.database_id, sink.schema_id, sink.name); + let sink_table_fragments = self .metadata_manager .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) @@ -2026,6 +2030,7 @@ impl DdlController { &mut ctx, &mut table_fragments, target_fragment_id, + uniq_name, ); }